Spaces:
Sleeping
Sleeping
# openai_tts_tool.py | |
from openai import OpenAI | |
import os | |
from langdetect import detect, DetectorFactory | |
import logging | |
# Set up logging configuration | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') | |
# Ensure consistent results from langdetect | |
DetectorFactory.seed = 0 | |
# Simple in-memory cache for translations | |
translation_cache = {} | |
def translate_text(api_key, text, target_language): | |
""" | |
Translate text to the target language using OpenAI's API with gpt-4o-mini model. | |
Args: | |
api_key (str): OpenAI API key | |
text (str): Text to translate | |
target_language (str): Target language code (e.g., 'en' for English) | |
Returns: | |
str: Translated text or error message | |
""" | |
cache_key = (text, target_language) | |
if cache_key in translation_cache: | |
logging.info("Fetching translation from cache.") | |
return translation_cache[cache_key] | |
try: | |
logging.info("Starting translation process.") | |
client = OpenAI(api_key=api_key) | |
prompt = f"Translate the following text to {target_language}:\n\n{text}" | |
response = client.completions.create( | |
model="gpt-4o-mini", # Updated model name | |
prompt=prompt, | |
max_tokens=1000, | |
temperature=0.3 | |
) | |
translated_text = response.choices[0].text.strip() | |
logging.info("Translation successful.") | |
# Cache the translation | |
translation_cache[cache_key] = translated_text | |
return translated_text | |
except Exception as e: | |
logging.error(f"Error in translation: {str(e)}") | |
return f"Error in translation: {str(e)}" | |
def generate_audio_and_text(api_key, input_text, model_name, voice_type, voice_speed, language, output_option): | |
""" | |
Generate audio and text files from input text using OpenAI's TTS API. | |
Args: | |
api_key (str): OpenAI API key | |
input_text (str): Text to convert to speech | |
model_name (str): OpenAI model name | |
voice_type (str): Voice type for TTS | |
voice_speed (float): Speed of speech | |
language (str): Language code for synthesis | |
output_option (str): Output type ('audio', 'script_text', or 'both') | |
Returns: | |
tuple: (audio_file_path, script_file_path, status_message) | |
""" | |
if not input_text: | |
logging.warning("No input text provided.") | |
return None, None, "No input text provided" | |
if not api_key: | |
logging.warning("No API key provided.") | |
return None, None, "No API key provided" | |
try: | |
logging.info("Initializing OpenAI client.") | |
client = OpenAI(api_key=api_key) | |
# Create temp directory if it doesn't exist | |
temp_dir = os.path.join(os.getcwd(), 'temp') | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
logging.info(f"Created temporary directory at {temp_dir}.") | |
# Detect input language | |
try: | |
detected_language = detect(input_text) | |
logging.info(f"Detected input language: {detected_language}") | |
except Exception as e: | |
logging.error(f"Error detecting language: {str(e)}") | |
return None, None, f"Error detecting language: {str(e)}" | |
# Map language codes if necessary (langdetect uses ISO 639-1 codes) | |
target_language = language.lower()[:2] # e.g., 'en' for English | |
# If detected language is different from target, translate | |
if detected_language != target_language: | |
logging.info("Input language differs from target language. Proceeding to translate.") | |
translated_text = translate_text(api_key, input_text, target_language) | |
if translated_text.startswith("Error in translation:"): | |
return None, None, translated_text | |
else: | |
logging.info("Input language matches target language. No translation needed.") | |
translated_text = input_text | |
# Generate audio file | |
audio_file = None | |
if output_option in ["audio", "both"]: | |
try: | |
logging.info("Starting audio generation.") | |
speech_response = client.audio.speech.create( | |
model="tts-1", | |
voice=voice_type, | |
input=translated_text, | |
speed=float(voice_speed) | |
) | |
# Save the audio to a temporary file | |
audio_filename = f"output_{hash(translated_text)}_{target_language}.mp3" | |
audio_path = os.path.join(temp_dir, audio_filename) | |
with open(audio_path, "wb") as f: | |
for chunk in speech_response.iter_bytes(): | |
f.write(chunk) | |
logging.info(f"Audio file saved at {audio_path}.") | |
audio_file = audio_path | |
except Exception as e: | |
logging.error(f"Error during audio generation: {str(e)}") | |
return None, None, f"Error during audio generation: {str(e)}" | |
# Save the (translated) text as a script file | |
script_file = None | |
if output_option in ["script_text", "both"]: | |
try: | |
logging.info("Starting script text generation.") | |
script_text = translated_text | |
script_filename = f"script_{hash(script_text)}_{target_language}.txt" | |
script_path = os.path.join(temp_dir, script_filename) | |
with open(script_path, "w", encoding='utf-8') as f: | |
f.write(script_text) | |
logging.info(f"Script file saved at {script_path}.") | |
script_file = script_path | |
except Exception as e: | |
logging.error(f"Error during script text generation: {str(e)}") | |
return None, None, f"Error during script text generation: {str(e)}" | |
status_message = f"Generation completed successfully in {language}!" | |
logging.info(status_message) | |
return audio_file, script_file, status_message | |
except Exception as e: | |
logging.error(f"Unexpected error: {str(e)}") | |
return None, None, f"Error: {str(e)}" | |