|
import os |
|
import json |
|
import logging |
|
from datetime import datetime, timedelta |
|
import requests |
|
import replicate |
|
import gradio as gr |
|
from dotenv import load_dotenv |
|
from flask import Flask, jsonify |
|
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
CHUNK_SIZE = 1024 |
|
XI_API_KEY = os.getenv("XI_API_KEY") |
|
VOICE_ID = os.getenv("VOICE_ID") |
|
REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN") |
|
MESSAGE_LIMIT = 45 |
|
TIME_LIMIT = timedelta(hours=2) |
|
|
|
|
|
usage_data = { |
|
'message_count': 0, |
|
'last_reset': datetime.now() |
|
} |
|
|
|
app = Flask(__name__) |
|
|
|
@app.route('/reset_usage', methods=['POST']) |
|
def reset_usage(): |
|
global usage_data |
|
usage_data = { |
|
'message_count': 0, |
|
'last_reset': datetime.now() |
|
} |
|
return jsonify({"success": "Uso reiniciado."}), 200 |
|
|
|
def text_to_speech(text): |
|
global usage_data |
|
current_time = datetime.now() |
|
|
|
if current_time - usage_data['last_reset'] > TIME_LIMIT: |
|
usage_data = { |
|
'message_count': 0, |
|
'last_reset': current_time |
|
} |
|
|
|
if usage_data['message_count'] >= MESSAGE_LIMIT: |
|
return "Error: Límite de mensajes alcanzado. Intente nuevamente en 2 horas." |
|
|
|
tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream" |
|
headers = { |
|
"Accept": "application/json", |
|
"xi-api-key": XI_API_KEY |
|
} |
|
data = { |
|
"text": text, |
|
"model_id": "eleven_multilingual_v2", |
|
"voice_settings": { |
|
"stability": 0.5, |
|
"similarity_boost": 0.8, |
|
"style": 0.0, |
|
"use_speaker_boost": True |
|
} |
|
} |
|
|
|
try: |
|
response = requests.post(tts_url, headers=headers, json=data, stream=True) |
|
response.raise_for_status() |
|
|
|
output_path = "output.mp3" |
|
with open(output_path, "wb") as f: |
|
for chunk in response.iter_content(chunk_size=CHUNK_SIZE): |
|
f.write(chunk) |
|
|
|
usage_data['message_count'] += 1 |
|
return output_path |
|
except requests.RequestException as e: |
|
logger.error(f"Error en text_to_speech: {e}") |
|
return f"Error: {str(e)}" |
|
|
|
def save_transcript_to_json(text): |
|
transcript = {"text": text} |
|
json_path = "transcript.json" |
|
with open(json_path, "w") as json_file: |
|
json.dump(transcript, json_file) |
|
return json_path |
|
|
|
def upload_to_temp_storage(file_path): |
|
url = "https://file.io" |
|
try: |
|
with open(file_path, 'rb') as f: |
|
response = requests.post(url, files={'file': f}) |
|
response.raise_for_status() |
|
return response.json()['link'] |
|
except requests.RequestException as e: |
|
logger.error(f"Error al subir archivo al almacenamiento temporal: {e}") |
|
raise |
|
|
|
def split_video(video_file_path, segment_duration=30): |
|
video = VideoFileClip(video_file_path) |
|
duration = video.duration |
|
segments = [] |
|
|
|
for start in range(0, int(duration), segment_duration): |
|
end = min(start + segment_duration, duration) |
|
segment = video.subclip(start, end) |
|
segment_path = f"segment_{start}_{end}.mp4" |
|
segment.write_videofile(segment_path, codec='libx264', audio_codec='aac') |
|
segments.append(segment_path) |
|
|
|
return segments |
|
|
|
def download_video(uri, output_path): |
|
try: |
|
response = requests.get(uri, stream=True) |
|
response.raise_for_status() |
|
with open(output_path, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=CHUNK_SIZE): |
|
f.write(chunk) |
|
return output_path |
|
except requests.RequestException as e: |
|
logger.error(f"Error al descargar video de {uri}: {e}") |
|
raise |
|
|
|
def generate_video_with_subtitles(text, video_file_path): |
|
transcript_json_path = save_transcript_to_json(text) |
|
transcript_url = upload_to_temp_storage(transcript_json_path) |
|
|
|
segments = split_video(video_file_path) |
|
processed_segments = [] |
|
|
|
for segment_path in segments: |
|
try: |
|
with open(segment_path, "rb") as video_file: |
|
output = replicate.run( |
|
"fictions-ai/autocaption:18a45ff0d95feb4449d192bbdc06b4a6df168fa33def76dfc51b78ae224b599b", |
|
input={ |
|
"font": "Poppins/Poppins-ExtraBold.ttf", |
|
"color": "white", |
|
"kerning": -5, |
|
"opacity": 0, |
|
"MaxChars": 20, |
|
"fontsize": 4, |
|
"translate": False, |
|
"output_video": True, |
|
"stroke_color": "black", |
|
"stroke_width": 2.6, |
|
"right_to_left": False, |
|
"subs_position": "bottom75", |
|
"highlight_color": "yellow", |
|
"video_file_input": video_file, |
|
"output_transcript": True, |
|
"transcript_file_input": transcript_url |
|
} |
|
) |
|
|
|
logger.debug(f"Tipo de salida: {type(output)}") |
|
logger.debug(f"Contenido de salida: {output}") |
|
|
|
if isinstance(output, list): |
|
for item in output: |
|
if isinstance(item, str): |
|
video_output_path = f"processed_{os.path.basename(segment_path)}" |
|
processed_segments.append(download_video(item, video_output_path)) |
|
else: |
|
logger.warning(f"Elemento inesperado en la salida: {item}") |
|
elif isinstance(output, dict): |
|
|
|
if 'url' in output: |
|
video_output_path = f"processed_{os.path.basename(segment_path)}" |
|
processed_segments.append(download_video(output['url'], video_output_path)) |
|
else: |
|
logger.error(f"Formato de salida inesperado: {output}") |
|
else: |
|
raise ValueError(f"Formato de salida inesperado: {output}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error procesando segmento {segment_path}: {e}") |
|
raise |
|
|
|
return concatenate_videos(processed_segments) |
|
|
|
def concatenate_videos(video_paths): |
|
clips = [VideoFileClip(path) for path in video_paths] |
|
final_clip = concatenate_videoclips(clips) |
|
output_path = "final_combined_video.mp4" |
|
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac") |
|
return output_path |
|
|
|
def combine_audio_and_video(video_path, audio_path, output_path="final_output.mp4"): |
|
video_clip = VideoFileClip(video_path) |
|
audio_clip = AudioFileClip(audio_path) |
|
|
|
final_clip = video_clip.set_audio(audio_clip) |
|
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac") |
|
return output_path |
|
|
|
def process_text_and_generate_video(text, video_file): |
|
if video_file is None: |
|
return None, None |
|
|
|
try: |
|
audio_path = text_to_speech(text) |
|
if audio_path.startswith("Error"): |
|
return audio_path, None |
|
|
|
logger.debug(f"Generando video con subtítulos para el archivo: {video_file.name}") |
|
video_output = generate_video_with_subtitles(text, video_file.name) |
|
logger.debug(f"Video con subtítulos generado: {video_output}") |
|
|
|
final_output = combine_audio_and_video(video_output, audio_path) |
|
logger.debug(f"Video final generado: {final_output}") |
|
|
|
return audio_path, final_output |
|
except Exception as e: |
|
logger.error(f"Error en process_text_and_generate_video: {e}") |
|
return None, f"Error: {str(e)}" |
|
|
|
iface = gr.Interface( |
|
fn=process_text_and_generate_video, |
|
inputs=["text", "file"], |
|
outputs=[ |
|
gr.Audio(label="Audio generado"), |
|
gr.File(label="Video generado") |
|
], |
|
title="Generación de Video con Subtítulos y Audio", |
|
description="Ingrese texto y suba un archivo de video para generar un video con subtítulos y audio." |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |
|
|
|
|