Spaces:
Sleeping
Sleeping
# openai_tts_tool.py | |
import os | |
from langdetect import detect, DetectorFactory | |
import logging | |
from openai import OpenAI | |
client = OpenAI(api_key=api_key) | |
# Ensure consistent results from langdetect | |
DetectorFactory.seed = 0 | |
# Set up logging configuration | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') | |
# Initialize your custom OpenAI client here | |
# Replace the following line with your actual client initialization | |
# For example: | |
# from your_custom_client_module import client | |
client = None # Placeholder: Initialize your client appropriately | |
# Simple in-memory cache for translations | |
translation_cache = {} | |
def translate_text(api_key, text, target_language, length=1000): | |
""" | |
Translate text to the target language using OpenAI's Chat Completion API with gpt-4o-mini model. | |
Args: | |
api_key (str): OpenAI API key | |
text (str): Text to translate | |
target_language (str): Target language code (e.g., 'en' for English) | |
length (int): Maximum number of tokens for the response | |
Returns: | |
str: Translated text or error message | |
""" | |
cache_key = (text, target_language) | |
if cache_key in translation_cache: | |
logging.info("Fetching translation from cache.") | |
return translation_cache[cache_key] | |
try: | |
logging.info("Starting translation process.") | |
# Ensure the client is initialized | |
if client is None: | |
logging.error("OpenAI client is not initialized.") | |
return "Error: OpenAI client is not initialized." | |
prompt = f"Translate the following text to {target_language}:\n\n{text}" | |
# Using your provided chat completion code snippet | |
completion = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=length | |
) | |
translated_text = completion.choices[0].message.content.strip() | |
logging.info("Translation successful.") | |
# Cache the translation | |
translation_cache[cache_key] = translated_text | |
return translated_text | |
except Exception as e: | |
logging.error(f"Error in translation: {str(e)}") | |
return f"Error in translation: {str(e)}" | |
def text_to_speech_openai(text, audio_path, voice, speed): | |
""" | |
Convert text to speech using OpenAI's TTS API and save the audio to a file. | |
Args: | |
text (str): Text to convert to speech | |
audio_path (str): Path to save the generated audio file | |
voice (str): Voice type for TTS | |
speed (float): Speed of speech | |
Returns: | |
str: Status message indicating success or error | |
""" | |
try: | |
logging.info("Starting text-to-speech generation.") | |
# Ensure the client is initialized | |
if client is None: | |
logging.error("OpenAI client is not initialized.") | |
return "Error: OpenAI client is not initialized." | |
response = client.audio.speech.create( | |
model="tts-1-hd", | |
voice=voice, | |
input=text, | |
speed=speed | |
) | |
response.stream_to_file(audio_path) | |
logging.info(f"Audio file saved at {audio_path}.") | |
return f"Audio generated and saved to {audio_path}." | |
except Exception as e: | |
logging.error(f"Error during audio generation: {str(e)}") | |
return f"Error during audio generation: {str(e)}" | |
def generate_audio_and_text(api_key, input_text, model_name, voice_type, voice_speed, language, output_option): | |
""" | |
Generate audio and/or script text from input text using translation and TTS. | |
Args: | |
api_key (str): OpenAI API key | |
input_text (str): Text to translate and convert to speech | |
model_name (str): OpenAI model name (unused in current implementation) | |
voice_type (str): Voice type for TTS | |
voice_speed (float): Speed of speech | |
language (str): Target language code for translation and synthesis | |
output_option (str): Output type ('audio', 'script_text', or 'both') | |
Returns: | |
tuple: (audio_file_path or None, script_file_path or None, status_message) | |
""" | |
if not input_text: | |
logging.warning("No input text provided.") | |
return None, None, "No input text provided." | |
if not api_key: | |
logging.warning("No API key provided.") | |
return None, None, "No API key provided." | |
try: | |
logging.info("Processing generation request.") | |
# Translate text if necessary | |
detected_language = detect(input_text) | |
logging.info(f"Detected language: {detected_language}") | |
if detected_language != language: | |
logging.info("Translation required.") | |
translated_text = translate_text(api_key, input_text, language) | |
if translated_text.startswith("Error in translation:"): | |
return None, None, translated_text | |
else: | |
logging.info("No translation required.") | |
translated_text = input_text | |
audio_file = None | |
script_file = None | |
status_messages = [] | |
# Generate audio | |
if output_option in ["audio", "both"]: | |
temp_dir = create_temp_dir() | |
audio_filename = f"output_{hash(translated_text)}_{language}.mp3" | |
audio_path = os.path.join(temp_dir, audio_filename) | |
audio_status = text_to_speech_openai(translated_text, audio_path, voice_type, voice_speed) | |
if "Error" in audio_status: | |
return None, None, audio_status | |
audio_file = audio_path | |
status_messages.append("Audio generated successfully.") | |
# Generate script text | |
if output_option in ["script_text", "both"]: | |
try: | |
temp_dir = create_temp_dir() | |
script_filename = f"script_{hash(translated_text)}_{language}.txt" | |
script_path = os.path.join(temp_dir, script_filename) | |
with open(script_path, "w", encoding='utf-8') as f: | |
f.write(translated_text) | |
script_file = script_path | |
status_messages.append("Script text generated successfully.") | |
except Exception as e: | |
logging.error(f"Error during script text generation: {str(e)}") | |
return audio_file, None, f"Error during script text generation: {str(e)}" | |
status_message = " ".join(status_messages) | |
logging.info(status_message) | |
return audio_file, script_file, status_message | |
except Exception as e: | |
logging.error(f"Unexpected error: {str(e)}") | |
return None, None, f"Error: {str(e)}" | |
def create_temp_dir(): | |
"""Create temporary directory if it doesn't exist""" | |
temp_dir = os.path.join(os.getcwd(), 'temp') | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
return temp_dir | |