DocChat_n_Talk / openai_tts_tool.py
capradeepgujaran's picture
Update openai_tts_tool.py
886e3e9 verified
raw
history blame
7.15 kB
# openai_tts_tool.py
import os
from langdetect import detect, DetectorFactory
import logging
from openai import OpenAI
client = OpenAI(api_key=api_key)
# Ensure consistent results from langdetect
DetectorFactory.seed = 0
# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s')
# Initialize your custom OpenAI client here
# Replace the following line with your actual client initialization
# For example:
# from your_custom_client_module import client
client = None # Placeholder: Initialize your client appropriately
# Simple in-memory cache for translations
translation_cache = {}
def translate_text(api_key, text, target_language, length=1000):
"""
Translate text to the target language using OpenAI's Chat Completion API with gpt-4o-mini model.
Args:
api_key (str): OpenAI API key
text (str): Text to translate
target_language (str): Target language code (e.g., 'en' for English)
length (int): Maximum number of tokens for the response
Returns:
str: Translated text or error message
"""
cache_key = (text, target_language)
if cache_key in translation_cache:
logging.info("Fetching translation from cache.")
return translation_cache[cache_key]
try:
logging.info("Starting translation process.")
# Ensure the client is initialized
if client is None:
logging.error("OpenAI client is not initialized.")
return "Error: OpenAI client is not initialized."
prompt = f"Translate the following text to {target_language}:\n\n{text}"
# Using your provided chat completion code snippet
completion = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
max_tokens=length
)
translated_text = completion.choices[0].message.content.strip()
logging.info("Translation successful.")
# Cache the translation
translation_cache[cache_key] = translated_text
return translated_text
except Exception as e:
logging.error(f"Error in translation: {str(e)}")
return f"Error in translation: {str(e)}"
def text_to_speech_openai(text, audio_path, voice, speed):
"""
Convert text to speech using OpenAI's TTS API and save the audio to a file.
Args:
text (str): Text to convert to speech
audio_path (str): Path to save the generated audio file
voice (str): Voice type for TTS
speed (float): Speed of speech
Returns:
str: Status message indicating success or error
"""
try:
logging.info("Starting text-to-speech generation.")
# Ensure the client is initialized
if client is None:
logging.error("OpenAI client is not initialized.")
return "Error: OpenAI client is not initialized."
response = client.audio.speech.create(
model="tts-1-hd",
voice=voice,
input=text,
speed=speed
)
response.stream_to_file(audio_path)
logging.info(f"Audio file saved at {audio_path}.")
return f"Audio generated and saved to {audio_path}."
except Exception as e:
logging.error(f"Error during audio generation: {str(e)}")
return f"Error during audio generation: {str(e)}"
def generate_audio_and_text(api_key, input_text, model_name, voice_type, voice_speed, language, output_option):
"""
Generate audio and/or script text from input text using translation and TTS.
Args:
api_key (str): OpenAI API key
input_text (str): Text to translate and convert to speech
model_name (str): OpenAI model name (unused in current implementation)
voice_type (str): Voice type for TTS
voice_speed (float): Speed of speech
language (str): Target language code for translation and synthesis
output_option (str): Output type ('audio', 'script_text', or 'both')
Returns:
tuple: (audio_file_path or None, script_file_path or None, status_message)
"""
if not input_text:
logging.warning("No input text provided.")
return None, None, "No input text provided."
if not api_key:
logging.warning("No API key provided.")
return None, None, "No API key provided."
try:
logging.info("Processing generation request.")
# Translate text if necessary
detected_language = detect(input_text)
logging.info(f"Detected language: {detected_language}")
if detected_language != language:
logging.info("Translation required.")
translated_text = translate_text(api_key, input_text, language)
if translated_text.startswith("Error in translation:"):
return None, None, translated_text
else:
logging.info("No translation required.")
translated_text = input_text
audio_file = None
script_file = None
status_messages = []
# Generate audio
if output_option in ["audio", "both"]:
temp_dir = create_temp_dir()
audio_filename = f"output_{hash(translated_text)}_{language}.mp3"
audio_path = os.path.join(temp_dir, audio_filename)
audio_status = text_to_speech_openai(translated_text, audio_path, voice_type, voice_speed)
if "Error" in audio_status:
return None, None, audio_status
audio_file = audio_path
status_messages.append("Audio generated successfully.")
# Generate script text
if output_option in ["script_text", "both"]:
try:
temp_dir = create_temp_dir()
script_filename = f"script_{hash(translated_text)}_{language}.txt"
script_path = os.path.join(temp_dir, script_filename)
with open(script_path, "w", encoding='utf-8') as f:
f.write(translated_text)
script_file = script_path
status_messages.append("Script text generated successfully.")
except Exception as e:
logging.error(f"Error during script text generation: {str(e)}")
return audio_file, None, f"Error during script text generation: {str(e)}"
status_message = " ".join(status_messages)
logging.info(status_message)
return audio_file, script_file, status_message
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
return None, None, f"Error: {str(e)}"
def create_temp_dir():
"""Create temporary directory if it doesn't exist"""
temp_dir = os.path.join(os.getcwd(), 'temp')
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return temp_dir