Spaces:
Sleeping
Sleeping
import os | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
import pytesseract | |
import gradio as gr | |
from pdf2image import convert_from_path | |
import PyPDF2 | |
from llama_index.core import VectorStoreIndex, Document | |
from llama_index.embeddings.openai import OpenAIEmbedding | |
from llama_index.llms.openai import OpenAI | |
from llama_index.core import get_response_synthesizer | |
from dotenv import load_dotenv | |
from sentence_transformers import SentenceTransformer, util | |
import logging | |
from openai_tts_tool import generate_audio_and_text # Importing from openai_tts_tool | |
# Set up logging configuration | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') | |
# Load environment variables from .env file | |
load_dotenv() | |
# Initialize global variables | |
vector_index = None | |
query_log = [] | |
sentence_model = SentenceTransformer('all-MiniLM-L6-v2') | |
langs = os.popen('tesseract --list-langs').read().split('\n')[1:-1] | |
# Preprocessing function | |
def preprocess_image(image_path): | |
img = cv2.imread(image_path) | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
gray = cv2.equalizeHist(gray) | |
gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
processed_image = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 11, 2) | |
temp_filename = "processed_image.png" | |
cv2.imwrite(temp_filename, processed_image) | |
return temp_filename | |
# Function to extract text from images | |
def extract_text_from_image(image_path, lang='eng'): | |
processed_image_path = preprocess_image(image_path) | |
text = pytesseract.image_to_string(Image.open(processed_image_path), lang=lang) | |
return text | |
# Function to extract text from PDFs | |
def extract_text_from_pdf(pdf_path, lang='eng'): | |
text = "" | |
try: | |
with open(pdf_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
page_text = page.extract_text() | |
if page_text.strip(): | |
text += page_text | |
else: | |
images = convert_from_path(pdf_path, first_page=page_num + 1, last_page=page_num + 1) | |
for image in images: | |
image.save('temp_image.png', 'PNG') | |
text += extract_text_from_image('temp_image.png', lang=lang) | |
text += f"\n[OCR applied on page {page_num + 1}]\n" | |
except Exception as e: | |
return f"Error processing PDF: {str(e)}" | |
return text | |
# General function to handle different file types | |
def extract_text(file_path, lang='eng'): | |
file_ext = file_path.lower().split('.')[-1] | |
if file_ext in ['pdf']: | |
return extract_text_from_pdf(file_path, lang) | |
elif file_ext in ['png', 'jpg', 'jpeg']: | |
return extract_text_from_image(file_path, lang) | |
else: | |
return f"Unsupported file type: {file_ext}" | |
# Process uploaded documents and index them | |
def process_upload(api_key, files, lang): | |
global vector_index | |
if not api_key: | |
return "Please provide a valid OpenAI API Key.", None | |
if not files: | |
return "No files uploaded.", None | |
documents = [] | |
error_messages = [] | |
image_heavy_docs = [] | |
for file_path in files: | |
try: | |
text = extract_text(file_path, lang) | |
if "This document consists of" in text and "page(s) of images" in text: | |
image_heavy_docs.append(os.path.basename(file_path)) | |
documents.append(Document(text=text)) | |
except Exception as e: | |
error_message = f"Error processing file {file_path}: {str(e)}" | |
logging.error(error_message) | |
error_messages.append(error_message) | |
if documents: | |
try: | |
embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key) | |
vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) | |
success_message = f"Successfully indexed {len(documents)} files." | |
if image_heavy_docs: | |
success_message += f"\nNote: The following documents consist mainly of images and may require manual review: {', '.join(image_heavy_docs)}" | |
if error_messages: | |
success_message += f"\nErrors: {'; '.join(error_messages)}" | |
return success_message, vector_index | |
except Exception as e: | |
return f"Error creating index: {str(e)}", None | |
else: | |
return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}", None | |
# Function to calculate similarity | |
def calculate_similarity(response, ground_truth): | |
response_embedding = sentence_model.encode(response, convert_to_tensor=True) | |
truth_embedding = sentence_model.encode(ground_truth, convert_to_tensor=True) | |
response_embedding = response_embedding / np.linalg.norm(response_embedding) | |
truth_embedding = truth_embedding / np.linalg.norm(truth_embedding) | |
similarity = np.dot(response_embedding, truth_embedding) | |
similarity_percentage = (similarity + 1) / 2 * 100 | |
return similarity_percentage | |
# Function to query documents | |
def query_app(query, model_name, use_similarity_check, openai_api_key): | |
global vector_index, query_log | |
if vector_index is None: | |
logging.error("No documents indexed yet. Please upload documents first.") | |
return "No documents indexed yet. Please upload documents first.", None | |
if not openai_api_key: | |
logging.error("No OpenAI API Key provided.") | |
return "Please provide a valid OpenAI API Key.", None | |
try: | |
llm = OpenAI(model=model_name, api_key=openai_api_key) | |
except Exception as e: | |
logging.error(f"Error initializing the OpenAI model: {e}") | |
return f"Error initializing the OpenAI model: {e}", None | |
response_synthesizer = get_response_synthesizer(llm=llm) | |
query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer) | |
try: | |
response = query_engine.query(query) | |
except Exception as e: | |
logging.error(f"Error during query processing: {e}") | |
return f"Error during query processing: {e}", None | |
generated_response = response.response | |
query_log.append({ | |
"query_id": str(len(query_log) + 1), | |
"query": query, | |
"gt_answer": "Placeholder ground truth answer", | |
"response": generated_response, | |
"retrieved_context": [{"text": doc.text} for doc in response.source_nodes] | |
}) | |
metrics = {} | |
if use_similarity_check: | |
try: | |
logging.info("Similarity check is enabled. Calculating similarity.") | |
similarity = calculate_similarity(generated_response, "Placeholder ground truth answer") | |
metrics['similarity'] = similarity | |
logging.info(f"Similarity calculated: {similarity}") | |
except Exception as e: | |
logging.error(f"Error during similarity calculation: {e}") | |
metrics['error'] = f"Error during similarity calculation: {e}" | |
return generated_response, metrics if use_similarity_check else None | |
# Function to generate audio and text (integrating from openai_tts_tool.py) | |
def process_tts(api_key, input_text, model_name, voice_type, voice_speed, language, output_option, summary_length, additional_prompt): | |
try: | |
return generate_audio_and_text(api_key, input_text, model_name, voice_type, voice_speed, language, output_option, summary_length, additional_prompt) | |
except Exception as e: | |
logging.error(f"Error during TTS generation: {e}") | |
return f"Error during TTS generation: {e}", None | |
# Main function with Gradio interface | |
def main(): | |
with gr.Blocks(title="Document Processing and TTS App") as demo: | |
gr.Markdown("# π Document Processing, Text & Audio Generation App") | |
# Upload documents and chat functionality | |
with gr.Tab("π€ Upload Documents"): | |
api_key_input = gr.Textbox(label="Enter OpenAI API Key", placeholder="Paste your OpenAI API Key here") | |
file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath") | |
lang_dropdown = gr.Dropdown(choices=langs, label="Select OCR Language", value='eng') | |
upload_button = gr.Button("Upload and Index") | |
upload_status = gr.Textbox(label="Status", interactive=False) | |
upload_button.click(fn=process_upload, inputs=[api_key_input, file_upload, lang_dropdown], outputs=[upload_status]) | |
# Chat with document | |
with gr.Tab("β Ask a Question"): | |
query_input = gr.Textbox(label="Enter your question") | |
model_dropdown = gr.Dropdown(choices=["gpt-4o", "gpt-4o-mini"], label="Select Model", value="gpt-4o") | |
similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False) | |
query_button = gr.Button("Ask") | |
answer_output = gr.Textbox(label="Answer", interactive=False) | |
metrics_output = gr.JSON(label="Metrics") | |
query_button.click(fn=query_app, inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input], outputs=[answer_output, metrics_output]) | |
# Text-to-Speech generation | |
with gr.Tab("π£οΈ Generate Audio and Text"): | |
text_input = gr.Textbox(label="Enter text for generation") | |
voice_type = gr.Dropdown(choices=["alloy", "echo", "fable", "onyx"], label="Voice Type", value="alloy") | |
voice_speed = gr.Dropdown(choices=["normal", "slow", "fast"], label="Voice Speed", value="normal") | |
language = gr.Dropdown(choices=["en", "ar", "de", "hi"], label="Language", value="en") | |
output_option = gr.Radio(choices=["audio", "summary_text", "both"], label="Output Option", value="both") | |
summary_length = gr.Number(label="Summary Length", value=100) | |
additional_prompt = gr.Textbox(label="Additional Prompt (Optional)") | |
generate_button = gr.Button("Generate") | |
audio_output = gr.Audio(label="Generated Audio", interactive=False) | |
summary_output = gr.Textbox(label="Generated Summary Text", interactive=False) | |
generate_button.click(fn=process_tts, inputs=[api_key_input, text_input, model_dropdown, voice_type, voice_speed, language, output_option, summary_length, additional_prompt], outputs=[audio_output, summary_output]) | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |