HN / app.py
GAS17's picture
Update app.py
0285314 verified
import os
import json
import logging
from datetime import datetime, timedelta
import requests
import replicate
import gradio as gr
from dotenv import load_dotenv
from flask import Flask, jsonify
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips
# Configurar logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Cargar variables de entorno
load_dotenv()
# Constantes
CHUNK_SIZE = 1024
XI_API_KEY = os.getenv("XI_API_KEY")
VOICE_ID = os.getenv("VOICE_ID")
REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN")
MESSAGE_LIMIT = 45
TIME_LIMIT = timedelta(hours=2)
# Datos de uso
usage_data = {
'message_count': 0,
'last_reset': datetime.now()
}
app = Flask(__name__)
@app.route('/reset_usage', methods=['POST'])
def reset_usage():
global usage_data
usage_data = {
'message_count': 0,
'last_reset': datetime.now()
}
return jsonify({"success": "Uso reiniciado."}), 200
def text_to_speech(text):
global usage_data
current_time = datetime.now()
if current_time - usage_data['last_reset'] > TIME_LIMIT:
usage_data = {
'message_count': 0,
'last_reset': current_time
}
if usage_data['message_count'] >= MESSAGE_LIMIT:
return "Error: Límite de mensajes alcanzado. Intente nuevamente en 2 horas."
tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream"
headers = {
"Accept": "application/json",
"xi-api-key": XI_API_KEY
}
data = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.8,
"style": 0.0,
"use_speaker_boost": True
}
}
try:
response = requests.post(tts_url, headers=headers, json=data, stream=True)
response.raise_for_status()
output_path = "output.mp3"
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
f.write(chunk)
usage_data['message_count'] += 1
return output_path
except requests.RequestException as e:
logger.error(f"Error en text_to_speech: {e}")
return f"Error: {str(e)}"
def save_transcript_to_json(text):
transcript = {"text": text}
json_path = "transcript.json"
with open(json_path, "w") as json_file:
json.dump(transcript, json_file)
return json_path
def upload_to_temp_storage(file_path):
url = "https://file.io"
try:
with open(file_path, 'rb') as f:
response = requests.post(url, files={'file': f})
response.raise_for_status()
return response.json()['link']
except requests.RequestException as e:
logger.error(f"Error al subir archivo al almacenamiento temporal: {e}")
raise
def split_video(video_file_path, segment_duration=30):
video = VideoFileClip(video_file_path)
duration = video.duration
segments = []
for start in range(0, int(duration), segment_duration):
end = min(start + segment_duration, duration)
segment = video.subclip(start, end)
segment_path = f"segment_{start}_{end}.mp4"
segment.write_videofile(segment_path, codec='libx264', audio_codec='aac')
segments.append(segment_path)
return segments
def download_video(uri, output_path):
try:
response = requests.get(uri, stream=True)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
f.write(chunk)
return output_path
except requests.RequestException as e:
logger.error(f"Error al descargar video de {uri}: {e}")
raise
def generate_video_with_subtitles(text, video_file_path):
transcript_json_path = save_transcript_to_json(text)
transcript_url = upload_to_temp_storage(transcript_json_path)
segments = split_video(video_file_path)
processed_segments = []
for segment_path in segments:
try:
with open(segment_path, "rb") as video_file:
output = replicate.run(
"fictions-ai/autocaption:18a45ff0d95feb4449d192bbdc06b4a6df168fa33def76dfc51b78ae224b599b",
input={
"font": "Poppins/Poppins-ExtraBold.ttf",
"color": "white",
"kerning": -5,
"opacity": 0,
"MaxChars": 20,
"fontsize": 4,
"translate": False,
"output_video": True,
"stroke_color": "black",
"stroke_width": 2.6,
"right_to_left": False,
"subs_position": "bottom75",
"highlight_color": "yellow",
"video_file_input": video_file,
"output_transcript": True,
"transcript_file_input": transcript_url
}
)
logger.debug(f"Tipo de salida: {type(output)}")
logger.debug(f"Contenido de salida: {output}")
if isinstance(output, list):
for item in output:
if isinstance(item, str): # Asumiendo que cada elemento es una URL
video_output_path = f"processed_{os.path.basename(segment_path)}"
processed_segments.append(download_video(item, video_output_path))
else:
logger.warning(f"Elemento inesperado en la salida: {item}")
elif isinstance(output, dict):
# Manejar el caso en que la salida es un diccionario
if 'url' in output:
video_output_path = f"processed_{os.path.basename(segment_path)}"
processed_segments.append(download_video(output['url'], video_output_path))
else:
logger.error(f"Formato de salida inesperado: {output}")
else:
raise ValueError(f"Formato de salida inesperado: {output}")
except Exception as e:
logger.error(f"Error procesando segmento {segment_path}: {e}")
raise
return concatenate_videos(processed_segments)
def concatenate_videos(video_paths):
clips = [VideoFileClip(path) for path in video_paths]
final_clip = concatenate_videoclips(clips)
output_path = "final_combined_video.mp4"
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
return output_path
def combine_audio_and_video(video_path, audio_path, output_path="final_output.mp4"):
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(audio_path)
final_clip = video_clip.set_audio(audio_clip)
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac")
return output_path
def process_text_and_generate_video(text, video_file):
if video_file is None:
return None, None
try:
audio_path = text_to_speech(text)
if audio_path.startswith("Error"):
return audio_path, None
logger.debug(f"Generando video con subtítulos para el archivo: {video_file.name}")
video_output = generate_video_with_subtitles(text, video_file.name)
logger.debug(f"Video con subtítulos generado: {video_output}")
final_output = combine_audio_and_video(video_output, audio_path)
logger.debug(f"Video final generado: {final_output}")
return audio_path, final_output
except Exception as e:
logger.error(f"Error en process_text_and_generate_video: {e}")
return None, f"Error: {str(e)}"
iface = gr.Interface(
fn=process_text_and_generate_video,
inputs=["text", "file"],
outputs=[
gr.Audio(label="Audio generado"),
gr.File(label="Video generado")
],
title="Generación de Video con Subtítulos y Audio",
description="Ingrese texto y suba un archivo de video para generar un video con subtítulos y audio."
)
if __name__ == "__main__":
iface.launch()