import os import cv2 import numpy as np from PIL import Image import pytesseract import gradio as gr from pdf2image import convert_from_path import PyPDF2 from llama_index.core import VectorStoreIndex, Document from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.llms.openai import OpenAI from llama_index.core import get_response_synthesizer from sentence_transformers import SentenceTransformer, util import logging from openai_tts_tool import generate_audio_and_text import tempfile # Set up logging configuration logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') # Initialize global variables vector_index = None query_log = [] sentence_model = SentenceTransformer('all-MiniLM-L6-v2') # Define available languages for TTS AVAILABLE_LANGUAGES = [ "English", "Arabic", "German", "Marathi", "Kannada", "Filipino (Tagalog)", "French", "Gujarati", "Hindi", "Malayalam", "Tamil", "Telugu", "Urdu", "Sinhala" ] LANGUAGE_CODES = { "English": "en", "Arabic": "ar", "German": "de", "Marathi": "mr", "Kannada": "kn", "Filipino (Tagalog)": "tl", "French": "fr", "Gujarati": "gu", "Hindi": "hi", "Malayalam": "ml", "Tamil": "ta", "Telugu": "te", "Urdu": "ur", "Sinhala": "si" } # Get available languages for OCR try: langs = os.popen('tesseract --list-langs').read().split('\n')[1:-1] except: langs = ['eng'] # Fallback to English if tesseract isn't properly configured def create_temp_dir(): """Create temporary directory if it doesn't exist""" temp_dir = os.path.join(os.getcwd(), 'temp') if not os.path.exists(temp_dir): os.makedirs(temp_dir) return temp_dir def preprocess_image(image_path): """Preprocess the image for better OCR results""" img = cv2.imread(image_path) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) gray = cv2.equalizeHist(gray) gray = cv2.GaussianBlur(gray, (5, 5), 0) processed_image = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) temp_dir = create_temp_dir() temp_filename = os.path.join(temp_dir, "processed_image.png") cv2.imwrite(temp_filename, processed_image) return temp_filename def extract_text_from_image(image_path, lang='eng'): """Extract text from image using OCR""" processed_image_path = preprocess_image(image_path) text = pytesseract.image_to_string(Image.open(processed_image_path), lang=lang) try: os.remove(processed_image_path) except: pass return text def extract_text_from_pdf(pdf_path, lang='eng'): """Extract text from PDF file""" text = "" temp_dir = create_temp_dir() try: with open(pdf_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] page_text = page.extract_text() if page_text.strip(): text += page_text else: images = convert_from_path(pdf_path, first_page=page_num + 1, last_page=page_num + 1) for image in images: temp_image_path = os.path.join(temp_dir, f'temp_image_{page_num}.png') image.save(temp_image_path, 'PNG') text += extract_text_from_image(temp_image_path, lang=lang) text += f"\n[OCR applied on page {page_num + 1}]\n" try: os.remove(temp_image_path) except: pass except Exception as e: return f"Error processing PDF: {str(e)}" return text def extract_text(file_path, lang='eng'): """Extract text from uploaded file""" file_ext = file_path.lower().split('.')[-1] if file_ext in ['pdf']: return extract_text_from_pdf(file_path, lang) elif file_ext in ['png', 'jpg', 'jpeg']: return extract_text_from_image(file_path, lang) else: return f"Unsupported file type: {file_ext}" def process_upload(api_key, files, lang): """Process uploaded files and create vector index""" global vector_index if not api_key: return "Please provide a valid OpenAI API Key." if not files: return "No files uploaded." documents = [] error_messages = [] image_heavy_docs = [] for file_path in files: try: text = extract_text(file_path, lang) if text.strip(): # Only add non-empty documents documents.append(Document(text=text)) else: error_messages.append(f"No text extracted from {os.path.basename(file_path)}") except Exception as e: error_message = f"Error processing file {os.path.basename(file_path)}: {str(e)}" logging.error(error_message) error_messages.append(error_message) if documents: try: embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key) vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) success_message = f"Successfully indexed {len(documents)} files." if error_messages: success_message += f"\nErrors: {'; '.join(error_messages)}" return success_message except Exception as e: return f"Error creating index: {str(e)}" else: return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}" def query_app(query, model_name, use_similarity_check, api_key): """Process query and return response""" global vector_index, query_log if vector_index is None: return "No documents indexed yet. Please upload documents first." if not api_key: return "Please provide a valid OpenAI API Key." try: llm = OpenAI(model=model_name, api_key=api_key) response_synthesizer = get_response_synthesizer(llm=llm) query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer) response = query_engine.query(query) return response.response except Exception as e: logging.error(f"Error during query processing: {e}") return f"Error during query processing: {str(e)}" def create_gradio_interface(): """Create and configure the Gradio interface""" with gr.Blocks(title="Document Processing and TTS App") as demo: gr.Markdown("# 📄 Document Processing, Text & Audio Generation App") with gr.Tab("📤 Upload Documents"): api_key_input = gr.Textbox( label="Enter OpenAI API Key", placeholder="Paste your OpenAI API Key here", type="password" ) file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath") lang_dropdown = gr.Dropdown(choices=langs, label="Select OCR Language", value='eng') upload_button = gr.Button("Upload and Index") upload_status = gr.Textbox(label="Status", interactive=False) with gr.Tab("❓ Ask a Question"): query_input = gr.Textbox(label="Enter your question") model_dropdown = gr.Dropdown( choices=["gpt-4-0125-preview", "gpt-3.5-turbo-0125"], label="Select Model", value="gpt-3.5-turbo-0125" ) similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False) query_button = gr.Button("Ask") answer_output = gr.Textbox(label="Answer", interactive=False) with gr.Tab("🗣️ Generate Audio and Text"): text_input = gr.Textbox(label="Enter text for generation") voice_type = gr.Dropdown( choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], label="Voice Type", value="alloy" ) voice_speed = gr.Slider( minimum=0.25, maximum=4.0, value=1.0, label="Voice Speed" ) language = gr.Dropdown( choices=AVAILABLE_LANGUAGES, label="Language for Audio and Script", value="English" ) output_option = gr.Radio( choices=["audio", "script_text", "both"], label="Output Option", value="both" ) generate_button = gr.Button("Generate") audio_output = gr.Audio(label="Generated Audio") script_output = gr.File(label="Script Text File") status_output = gr.Textbox(label="Status", interactive=False) # Wire up the components upload_button.click( fn=process_upload, inputs=[api_key_input, file_upload, lang_dropdown], outputs=[upload_status] ) query_button.click( fn=query_app, inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input], outputs=[answer_output] ) answer_output.change( fn=lambda ans: ans, inputs=[answer_output], outputs=[text_input] ) def process_generation(*args): args = list(args) # Convert language name to code args[5] = LANGUAGE_CODES[args[5]] return generate_audio_and_text(*args) generate_button.click( fn=process_generation, inputs=[ api_key_input, text_input, model_dropdown, voice_type, voice_speed, language, output_option ], outputs=[audio_output, script_output, status_output] ) return demo if __name__ == "__main__": demo = create_gradio_interface() demo.launch()