oceansweep's picture
Upload 169 files
c5b0bb7 verified
raw
history blame
64.3 kB
# RAG_QA_Chat_tab.py
# Description: Gradio UI for RAG QA Chat
#
# Imports
import csv
import logging
import json
import os
import re
from datetime import datetime
#
# External Imports
import docx2txt
import gradio as gr
#
# Local Imports
from App_Function_Libraries.Books.Book_Ingestion_Lib import read_epub
from App_Function_Libraries.DB.Character_Chat_DB import search_character_chat, search_character_cards
from App_Function_Libraries.DB.DB_Manager import DatabaseError, get_paginated_files, add_media_with_keywords, \
get_all_conversations, get_note_by_id, get_notes_by_keywords, start_new_conversation, update_note, save_notes, \
clear_keywords_from_note, add_keywords_to_note, load_chat_history, save_message, add_keywords_to_conversation, \
get_keywords_for_note, delete_note, search_conversations_by_keywords, get_conversation_title, delete_conversation, \
update_conversation_title, fetch_all_conversations, fetch_all_notes, fetch_conversations_by_ids, fetch_notes_by_ids, \
search_media_db, search_notes_titles, list_prompts
from App_Function_Libraries.DB.RAG_QA_Chat_DB import get_notes, delete_messages_in_conversation, search_rag_notes, \
search_rag_chat, get_conversation_rating, set_conversation_rating
from App_Function_Libraries.Gradio_UI.Gradio_Shared import update_user_prompt
from App_Function_Libraries.PDF.PDF_Ingestion_Lib import extract_text_and_format_from_pdf
from App_Function_Libraries.RAG.RAG_Library_2 import generate_answer, enhanced_rag_pipeline
from App_Function_Libraries.RAG.RAG_QA_Chat import search_database, rag_qa_chat
from App_Function_Libraries.Utils.Utils import default_api_endpoint, global_api_endpoints, format_api_name, \
load_comprehensive_config
#
########################################################################################################################
#
# Functions:
def create_rag_qa_chat_tab():
try:
default_value = None
if default_api_endpoint:
if default_api_endpoint in global_api_endpoints:
default_value = format_api_name(default_api_endpoint)
else:
logging.warning(f"Default API endpoint '{default_api_endpoint}' not found in global_api_endpoints")
except Exception as e:
logging.error(f"Error setting default API endpoint: {str(e)}")
default_value = None
with gr.TabItem("RAG QA Chat", visible=True):
gr.Markdown("# RAG QA Chat")
state = gr.State({
"page": 1,
"context_source": "Entire Media Database",
"conversation_messages": [],
"conversation_id": None
})
note_state = gr.State({"note_id": None})
def auto_save_conversation(message, response, state_value, auto_save_enabled):
"""Automatically save the conversation if auto-save is enabled"""
try:
if not auto_save_enabled:
return state_value
conversation_id = state_value.get("conversation_id")
if not conversation_id:
# Create new conversation with default title
title = "Auto-saved Conversation " + datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conversation_id = start_new_conversation(title=title)
state_value = state_value.copy()
state_value["conversation_id"] = conversation_id
# Save the messages
save_message(conversation_id, "user", message)
save_message(conversation_id, "assistant", response)
return state_value
except Exception as e:
logging.error(f"Error in auto-save: {str(e)}")
return state_value
# Update the conversation list function
def update_conversation_list():
conversations, total_pages, total_count = get_all_conversations()
choices = [
f"{conversation['title']} (ID: {conversation['conversation_id']}) - Rating: {conversation['rating'] or 'Not Rated'}"
for conversation in conversations
]
return choices
with gr.Row():
with gr.Column(scale=1):
# FIXME - Offer the user to search 2+ databases at once
database_types = ["Media DB", "RAG Chat", "RAG Notes", "Character Chat", "Character Cards"]
db_choice = gr.CheckboxGroup(
label="Select Database(s)",
choices=database_types,
value=["Media DB"],
interactive=True
)
context_source = gr.Radio(
["All Files in the Database", "Search Database", "Upload File"],
label="Context Source",
value="All Files in the Database"
)
existing_file = gr.Dropdown(label="Select Existing File", choices=[], interactive=True)
file_page = gr.State(value=1)
with gr.Row():
prev_page_btn = gr.Button("Previous Page")
next_page_btn = gr.Button("Next Page")
page_info = gr.HTML("Page 1")
top_k_input = gr.Number(value=10, label="Maximum amount of results to use (Default: 10)", minimum=1, maximum=50, step=1, precision=0, interactive=True)
keywords_input = gr.Textbox(label="Keywords (comma-separated) to filter results by)", value="rag_qa_default_keyword" ,visible=True)
use_query_rewriting = gr.Checkbox(label="Use Query Rewriting", value=True)
use_re_ranking = gr.Checkbox(label="Use Re-ranking", value=True)
config = load_comprehensive_config()
auto_save_value = config.getboolean('auto-save', 'save_character_chats', fallback=False)
auto_save_checkbox = gr.Checkbox(
label="Save chats automatically",
value=auto_save_value,
info="When enabled, conversations will be saved automatically after each message"
)
initial_prompts, total_pages, current_page = list_prompts(page=1, per_page=10)
preset_prompt_checkbox = gr.Checkbox(
label="View Custom Prompts(have to copy/paste them)",
value=False,
visible=True
)
with gr.Row(visible=False) as preset_prompt_controls:
prev_prompt_page = gr.Button("Previous")
current_prompt_page_text = gr.Text(f"Page {current_page} of {total_pages}")
next_prompt_page = gr.Button("Next")
current_prompt_page_state = gr.State(value=1)
preset_prompt = gr.Dropdown(
label="Select Preset Prompt",
choices=initial_prompts,
visible=False
)
user_prompt = gr.Textbox(
label="Custom Prompt",
placeholder="Enter custom prompt here",
lines=3,
visible=False
)
system_prompt_input = gr.Textbox(
label="System Prompt",
lines=3,
visible=False
)
search_query = gr.Textbox(label="Search Query", visible=False)
search_button = gr.Button("Search", visible=False)
search_results = gr.Dropdown(label="Search Results", choices=[], visible=False)
file_upload = gr.File(
label="Upload File",
visible=False,
file_types=["txt", "pdf", "epub", "md", "rtf", "json", "csv", "docx"]
)
convert_to_text = gr.Checkbox(label="Convert to plain text", visible=False)
with gr.Column(scale=1):
load_conversation = gr.Dropdown(
label="Load Conversation",
choices=update_conversation_list()
)
new_conversation = gr.Button("New Conversation")
save_conversation_button = gr.Button("Save Conversation")
conversation_title = gr.Textbox(
label="Conversation Title",
placeholder="Enter a title for the new conversation"
)
keywords = gr.Textbox(label="Keywords (comma-separated)", visible=True)
# Add the rating display and input
rating_display = gr.Markdown(value="", visible=False)
rating_input = gr.Radio(
choices=["1", "2", "3"],
label="Rate this Conversation (1-3 stars)",
visible=False
)
# Refactored API selection dropdown
api_choice = gr.Dropdown(
choices=["None"] + [format_api_name(api) for api in global_api_endpoints],
value=default_value,
label="API for Chat Response (Optional)"
)
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(height=700)
msg = gr.Textbox(label="Enter your message")
submit = gr.Button("Submit")
clear_chat = gr.Button("Clear Chat History")
with gr.Column(scale=1):
# Adding UI elements for notes
note_title = gr.Textbox(label="Note Title", placeholder="Enter a title for the note")
notes = gr.TextArea(label="Notes", placeholder="Enter your notes here...", lines=25)
keywords_for_notes = gr.Textbox(
label="Keywords for Notes (comma-separated)",
placeholder="Enter keywords for the note",
visible=True,
)
save_notes_btn = gr.Button("Save Note")
clear_notes_btn = gr.Button("Clear Current Note text")
new_note_btn = gr.Button("New Note")
# FIXME - Change from only keywords to generalized search
search_notes_title = gr.Textbox(label="Search Notes by Title")
search_notes_by_keyword = gr.Textbox(label="Search Notes by Keyword")
search_notes_button = gr.Button("Search Notes")
note_results = gr.Dropdown(label="Notes", choices=[])
load_note = gr.Dropdown(label="Load Note", choices=[])
loading_indicator = gr.HTML("Loading...", visible=False)
status_message = gr.HTML()
auto_save_status = gr.HTML()
# Function Definitions
def update_prompt_page(direction, current_page_val):
new_page = max(1, min(total_pages, current_page_val + direction))
prompts, _, _ = list_prompts(page=new_page, per_page=10)
return (
gr.update(choices=prompts),
gr.update(value=f"Page {new_page} of {total_pages}"),
new_page
)
def update_prompts(preset_name):
prompts = update_user_prompt(preset_name)
return (
gr.update(value=prompts["user_prompt"], visible=True),
gr.update(value=prompts["system_prompt"], visible=True)
)
def toggle_preset_prompt(checkbox_value):
return (
gr.update(visible=checkbox_value),
gr.update(visible=checkbox_value),
gr.update(visible=False),
gr.update(visible=False)
)
prev_prompt_page.click(
lambda x: update_prompt_page(-1, x),
inputs=[current_prompt_page_state],
outputs=[preset_prompt, current_prompt_page_text, current_prompt_page_state]
)
next_prompt_page.click(
lambda x: update_prompt_page(1, x),
inputs=[current_prompt_page_state],
outputs=[preset_prompt, current_prompt_page_text, current_prompt_page_state]
)
preset_prompt.change(
update_prompts,
inputs=preset_prompt,
outputs=[user_prompt, system_prompt_input]
)
preset_prompt_checkbox.change(
toggle_preset_prompt,
inputs=[preset_prompt_checkbox],
outputs=[preset_prompt, preset_prompt_controls, user_prompt, system_prompt_input]
)
def update_state(state, **kwargs):
new_state = state.copy()
new_state.update(kwargs)
return new_state
def create_new_note():
return gr.update(value='un-named note'), gr.update(value=''), {"note_id": None}
new_note_btn.click(
create_new_note,
outputs=[note_title, notes, note_state]
)
def search_notes(search_notes_title, keywords):
if keywords:
keywords_list = [kw.strip() for kw in keywords.split(',')]
notes_data, total_pages, total_count = get_notes_by_keywords(keywords_list)
choices = [f"Note {note_id} - {title} ({timestamp})" for
note_id, title, content, timestamp, conversation_id in notes_data]
return gr.update(choices=choices, label=f"Found {total_count} notes")
elif search_notes_title:
notes_data, total_pages, total_count = search_notes_titles(search_notes_title)
choices = [f"Note {note_id} - {title} ({timestamp})" for
note_id, title, content, timestamp, conversation_id in notes_data]
return gr.update(choices=choices, label=f"Found {total_count} notes")
else:
# This will now return all notes, ordered by timestamp
notes_data, total_pages, total_count = search_notes_titles("")
choices = [f"Note {note_id} - {title} ({timestamp})" for
note_id, title, content, timestamp, conversation_id in notes_data]
return gr.update(choices=choices, label=f"All notes ({total_count} total)")
search_notes_button.click(
search_notes,
inputs=[search_notes_title, search_notes_by_keyword],
outputs=[note_results]
)
def load_selected_note(note_selection):
if note_selection:
note_id = int(note_selection.split(' ')[1])
note_data = get_note_by_id(note_id)
if note_data:
note_id, title, content = note_data[0]
updated_note_state = {"note_id": note_id}
return gr.update(value=title), gr.update(value=content), updated_note_state
return gr.update(value=''), gr.update(value=''), {"note_id": None}
note_results.change(
load_selected_note,
inputs=[note_results],
outputs=[note_title, notes, note_state]
)
def save_notes_function(note_title_text, notes_content, keywords_content, note_state_value, state_value):
"""Save the notes and associated keywords to the database."""
logging.info(f"Starting save_notes_function with state: {state_value}")
logging.info(f"Note title: {note_title_text}")
logging.info(f"Notes content length: {len(notes_content) if notes_content else 0}")
try:
# Check current state
conversation_id = state_value.get("conversation_id")
logging.info(f"Current conversation_id: {conversation_id}")
# Create new conversation if none exists
if not conversation_id:
logging.info("No conversation ID found, creating new conversation")
conversation_title = note_title_text if note_title_text else "Untitled Conversation"
conversation_id = start_new_conversation(title=conversation_title)
state_value = state_value.copy() # Create a new copy of the state
state_value["conversation_id"] = conversation_id
logging.info(f"Created new conversation with ID: {conversation_id}")
if not notes_content:
logging.warning("No notes content provided")
return notes_content, note_state_value, state_value, gr.update(
value="<p style='color:red;'>Cannot save empty notes.</p>")
# Save or update note
note_id = note_state_value.get("note_id")
if note_id:
logging.info(f"Updating existing note with ID: {note_id}")
update_note(note_id, note_title_text, notes_content)
else:
logging.info(f"Creating new note for conversation: {conversation_id}")
note_id = save_notes(conversation_id, note_title_text or "Untitled Note", notes_content)
note_state_value = {"note_id": note_id}
logging.info(f"Created new note with ID: {note_id}")
# Handle keywords
if keywords_content:
logging.info("Processing keywords")
clear_keywords_from_note(note_id)
keywords = [kw.strip() for kw in keywords_content.split(',')]
add_keywords_to_note(note_id, keywords)
logging.info(f"Added keywords: {keywords}")
logging.info("Notes saved successfully")
return (
notes_content,
note_state_value,
state_value,
gr.update(value="<p style='color:green;'>Notes saved successfully!</p>")
)
except Exception as e:
logging.error(f"Error in save_notes_function: {str(e)}", exc_info=True)
return (
notes_content,
note_state_value,
state_value,
gr.update(value=f"<p style='color:red;'>Error saving notes: {str(e)}</p>")
)
save_notes_btn.click(
save_notes_function,
inputs=[note_title, notes, keywords_for_notes, note_state, state],
outputs=[notes, note_state, state, status_message]
)
def clear_notes_function():
"""Clear notes for the current note."""
return gr.update(value=''), {"note_id": None}
clear_notes_btn.click(
clear_notes_function,
outputs=[notes, note_state]
)
# Initialize the conversation list
load_conversation.choices = update_conversation_list()
def load_conversation_history(selected_conversation, state_value):
try:
if not selected_conversation:
return [], state_value, "", gr.update(value="", visible=False), gr.update(visible=False)
# Extract conversation ID
match = re.search(r'\(ID: ([0-9a-fA-F\-]+)\)', selected_conversation)
if not match:
logging.error(f"Invalid conversation format: {selected_conversation}")
return [], state_value, "", gr.update(value="", visible=False), gr.update(visible=False)
conversation_id = match.group(1)
chat_data, total_pages_val, _ = load_chat_history(conversation_id, 1, 50)
# Update state with valid conversation id
updated_state = state_value.copy()
updated_state["conversation_id"] = conversation_id
updated_state["conversation_messages"] = chat_data
# Format chat history
history = []
for role, content in chat_data:
if role == 'user':
history.append((content, ''))
elif history:
history[-1] = (history[-1][0], content)
# Fetch and display the conversation rating
rating = get_conversation_rating(conversation_id)
if rating is not None:
rating_text = f"**Current Rating:** {rating} star(s)"
rating_display_update = gr.update(value=rating_text, visible=True)
rating_input_update = gr.update(value=str(rating), visible=True)
else:
rating_display_update = gr.update(value="**Current Rating:** Not Rated", visible=True)
rating_input_update = gr.update(value=None, visible=True)
notes_content = get_notes(conversation_id)
return history, updated_state, "\n".join(
notes_content) if notes_content else "", rating_display_update, rating_input_update
except Exception as e:
logging.error(f"Error loading conversation: {str(e)}")
return [], state_value, "", gr.update(value="", visible=False), gr.update(visible=False)
load_conversation.change(
load_conversation_history,
inputs=[load_conversation, state],
outputs=[chatbot, state, notes, rating_display, rating_input]
)
# Modify save_conversation_function to use gr.update()
def save_conversation_function(conversation_title_text, keywords_text, rating_value, state_value):
conversation_messages = state_value.get("conversation_messages", [])
conversation_id = state_value.get("conversation_id")
if not conversation_messages:
return gr.update(
value="<p style='color:red;'>No conversation to save.</p>"
), state_value, gr.update(), gr.update(value="", visible=False), gr.update(visible=False)
# Start a new conversation in the database if not existing
if not conversation_id:
conversation_id = start_new_conversation(
conversation_title_text if conversation_title_text else "Untitled Conversation"
)
else:
# Update the conversation title if it has changed
update_conversation_title(conversation_id, conversation_title_text)
# Save the messages
for role, content in conversation_messages:
save_message(conversation_id, role, content)
# Save keywords if provided
if keywords_text:
add_keywords_to_conversation(conversation_id, [kw.strip() for kw in keywords_text.split(',')])
# Save the rating if provided
try:
if rating_value:
set_conversation_rating(conversation_id, int(rating_value))
except ValueError as ve:
logging.error(f"Invalid rating value: {ve}")
return gr.update(
value=f"<p style='color:red;'>Invalid rating: {ve}</p>"
), state_value, gr.update(), gr.update(value="", visible=False), gr.update(visible=False)
# Update state
updated_state = update_state(state_value, conversation_id=conversation_id)
# Update the conversation list
conversation_choices = update_conversation_list()
# Reset rating display and input
rating_display_update = gr.update(value=f"**Current Rating:** {rating_value} star(s)", visible=True)
rating_input_update = gr.update(value=rating_value, visible=True)
return gr.update(
value="<p style='color:green;'>Conversation saved successfully.</p>"
), updated_state, gr.update(choices=conversation_choices), rating_display_update, rating_input_update
save_conversation_button.click(
save_conversation_function,
inputs=[conversation_title, keywords, rating_input, state],
outputs=[status_message, state, load_conversation, rating_display, rating_input]
)
def start_new_conversation_wrapper(title, state_value):
# Reset the state with no conversation_id and empty conversation messages
updated_state = update_state(state_value, conversation_id=None, page=1, conversation_messages=[])
# Clear the chat history and reset rating components
return [], updated_state, gr.update(value="", visible=False), gr.update(value=None, visible=False)
new_conversation.click(
start_new_conversation_wrapper,
inputs=[conversation_title, state],
outputs=[chatbot, state, rating_display, rating_input]
)
def update_file_list(page):
files, total_pages, current_page = get_paginated_files(page)
choices = [f"{title} (ID: {id})" for id, title in files]
return gr.update(choices=choices), gr.update(value=f"Page {current_page} of {total_pages}"), current_page
def next_page_fn(current_page):
return update_file_list(current_page + 1)
def prev_page_fn(current_page):
return update_file_list(max(1, current_page - 1))
def update_context_source(choice):
# Update visibility based on context source choice
return {
existing_file: gr.update(visible=choice == "Existing File"),
prev_page_btn: gr.update(visible=choice == "Search Database"),
next_page_btn: gr.update(visible=choice == "Search Database"),
page_info: gr.update(visible=choice == "Search Database"),
search_query: gr.update(visible=choice == "Search Database"),
search_button: gr.update(visible=choice == "Search Database"),
search_results: gr.update(visible=choice == "Search Database"),
file_upload: gr.update(visible=choice == "Upload File"),
convert_to_text: gr.update(visible=choice == "Upload File"),
keywords: gr.update(visible=choice == "Upload File")
}
context_source.change(update_context_source, context_source,
[existing_file, prev_page_btn, next_page_btn, page_info, search_query, search_button,
search_results, file_upload, convert_to_text, keywords])
next_page_btn.click(next_page_fn, inputs=[file_page], outputs=[existing_file, page_info, file_page])
prev_page_btn.click(prev_page_fn, inputs=[file_page], outputs=[existing_file, page_info, file_page])
# Initialize the file list when context source is changed to "Existing File"
context_source.change(lambda choice: update_file_list(1) if choice == "Existing File" else (gr.update(), gr.update(), 1),
inputs=[context_source], outputs=[existing_file, page_info, file_page])
def perform_search(query, selected_databases, keywords):
try:
results = []
# Iterate over selected database types and perform searches accordingly
for database_type in selected_databases:
if database_type == "Media DB":
# FIXME - check for existence of keywords before setting as search field
search_fields = ["title", "content", "keywords"]
results += search_media_db(query, search_fields, keywords, page=1, results_per_page=25)
elif database_type == "RAG Chat":
results += search_rag_chat(query)
elif database_type == "RAG Notes":
results += search_rag_notes(query)
elif database_type == "Character Chat":
results += search_character_chat(query)
elif database_type == "Character Cards":
results += search_character_cards(query)
# Remove duplicate results if necessary
results = list(set(results))
return gr.update(choices=results)
except Exception as e:
gr.Error(f"Error performing search: {str(e)}")
return gr.update(choices=[])
# Click Event for the DB Search Button
search_button.click(
perform_search,
inputs=[search_query, db_choice, keywords_input],
outputs=[search_results]
)
def rephrase_question(history, latest_question, api_choice):
logging.info("RAG QnA: Rephrasing question")
conversation_history = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in history[:-1]])
prompt = f"""You are a helpful assistant. Given the conversation history and the latest question, resolve any ambiguous references in the latest question.
Conversation History:
{conversation_history}
Latest Question:
{latest_question}
Rewritten Question:"""
# Use the selected API to generate the rephrased question
rephrased_question = generate_answer(api_choice, prompt, "")
logging.info(f"Rephrased question: {rephrased_question}")
return rephrased_question.strip()
# FIXME - RAG DB selection
def rag_qa_chat_wrapper(
message, history, context_source, existing_file, search_results, file_upload,
convert_to_text, keywords, api_choice, use_query_rewriting, state_value,
keywords_input, top_k_input, use_re_ranking, db_choices, auto_save_enabled
):
try:
logging.info(f"Starting rag_qa_chat_wrapper with message: {message}")
logging.info(f"Context source: {context_source}")
logging.info(f"API choice: {api_choice}")
logging.info(f"Query rewriting: {'enabled' if use_query_rewriting else 'disabled'}")
logging.info(f"Selected DB Choices: {db_choices}")
# Show loading indicator
yield history, "", gr.update(visible=True), state_value, gr.update(visible=False), gr.update(
visible=False)
conversation_id = state_value.get("conversation_id")
conversation_messages = state_value.get("conversation_messages", [])
# Save the user's message
if conversation_id:
save_message(conversation_id, "user", message)
else:
# Append to in-memory messages
conversation_messages.append(("user", message))
state_value["conversation_messages"] = conversation_messages
# Ensure api_choice is a string
api_choice_str = api_choice.value if isinstance(api_choice, gr.components.Dropdown) else api_choice
logging.info(f"Resolved API choice: {api_choice_str}")
# Only rephrase the question if it's not the first query and query rewriting is enabled
if len(history) > 0 and use_query_rewriting:
rephrased_question = rephrase_question(history, message, api_choice_str)
logging.info(f"Original question: {message}")
logging.info(f"Rephrased question: {rephrased_question}")
else:
rephrased_question = message
logging.info(f"Using original question: {message}")
if context_source == "All Files in the Database":
# Use the enhanced_rag_pipeline to search the selected databases
context = enhanced_rag_pipeline(
rephrased_question, api_choice_str, keywords_input, top_k_input, use_re_ranking,
database_types=db_choices # Pass the list of selected databases
)
logging.info(f"Using enhanced_rag_pipeline for database search")
elif context_source == "Search Database":
context = f"media_id:{search_results.split('(ID: ')[1][:-1]}"
logging.info(f"Using search result with context: {context}")
else:
# Upload File
logging.info("Processing uploaded file")
if file_upload is None:
raise ValueError("No file uploaded")
# Process the uploaded file
file_path = file_upload.name
file_name = os.path.basename(file_path)
logging.info(f"Uploaded file: {file_name}")
if convert_to_text:
logging.info("Converting file to plain text")
content = convert_file_to_text(file_path)
else:
logging.info("Reading file content")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
logging.info(f"File content length: {len(content)} characters")
# Process keywords
if not keywords:
keywords = "default,rag-file-upload"
logging.info(f"Keywords: {keywords}")
# Add the content to the database and get the media_id
logging.info("Adding content to database")
result = add_media_with_keywords(
url=file_name,
title=file_name,
media_type='document',
content=content,
keywords=keywords,
prompt='No prompt for uploaded files',
summary='No summary for uploaded files',
transcription_model='None',
author='Unknown',
ingestion_date=datetime.now().strftime('%Y-%m-%d')
)
logging.info(f"Result from add_media_with_keywords: {result}")
if isinstance(result, tuple):
media_id, _ = result
else:
media_id = result
context = f"media_id:{media_id}"
logging.info(f"Context for uploaded file: {context}")
logging.info("Calling rag_qa_chat function")
new_history, response = rag_qa_chat(rephrased_question, history, context, api_choice_str)
# Log first 100 chars of response
logging.info(f"Response received from rag_qa_chat: {response[:100]}...")
# Save assistant's response
if conversation_id:
save_message(conversation_id, "assistant", response)
else:
conversation_messages.append(("assistant", response))
state_value["conversation_messages"] = conversation_messages
# Update the state
updated_state = auto_save_conversation(message, response, state_value, auto_save_enabled)
updated_state["conversation_messages"] = conversation_messages
# Safely update history
if new_history:
new_history[-1] = (message, response)
else:
new_history = [(message, response)]
# Get the current rating and update display
conversation_id = updated_state.get("conversation_id")
if conversation_id:
rating = get_conversation_rating(conversation_id)
if rating is not None:
rating_display_update = gr.update(value=f"**Current Rating:** {rating} star(s)", visible=True)
rating_input_update = gr.update(value=str(rating), visible=True)
else:
rating_display_update = gr.update(value="**Current Rating:** Not Rated", visible=True)
rating_input_update = gr.update(value=None, visible=True)
else:
rating_display_update = gr.update(value="", visible=False)
rating_input_update = gr.update(value=None, visible=False)
gr.Info("Response generated successfully")
logging.info("rag_qa_chat_wrapper completed successfully")
yield new_history, "", gr.update(
visible=False), updated_state, rating_display_update, rating_input_update
except ValueError as e:
logging.error(f"Input error in rag_qa_chat_wrapper: {str(e)}")
gr.Error(f"Input error: {str(e)}")
yield history, "", gr.update(visible=False), state_value, gr.update(visible=False), gr.update(
visible=False)
except DatabaseError as e:
logging.error(f"Database error in rag_qa_chat_wrapper: {str(e)}")
gr.Error(f"Database error: {str(e)}")
yield history, "", gr.update(visible=False), state_value, gr.update(visible=False), gr.update(
visible=False)
except Exception as e:
logging.error(f"Unexpected error in rag_qa_chat_wrapper: {e}", exc_info=True)
gr.Error("An unexpected error occurred. Please try again later.")
yield history, "", gr.update(visible=False), state_value, gr.update(visible=False), gr.update(
visible=False)
def clear_chat_history():
return [], "", gr.update(value="", visible=False), gr.update(value=None, visible=False)
submit.click(
rag_qa_chat_wrapper,
inputs=[
msg,
chatbot,
context_source,
existing_file,
search_results,
file_upload,
convert_to_text,
keywords,
api_choice,
use_query_rewriting,
state,
keywords_input,
top_k_input,
use_re_ranking,
db_choice,
auto_save_checkbox
],
outputs=[chatbot, msg, loading_indicator, state, rating_display, rating_input],
)
clear_chat.click(
clear_chat_history,
outputs=[chatbot, msg, rating_display, rating_input]
)
return (
context_source,
existing_file,
search_query,
search_button,
search_results,
file_upload,
convert_to_text,
keywords,
api_choice,
use_query_rewriting,
chatbot,
msg,
submit,
clear_chat,
)
def create_rag_qa_notes_management_tab():
# New Management Tab
with gr.TabItem("Notes Management", visible=True):
gr.Markdown("# RAG QA Notes Management")
management_state = gr.State({
"selected_conversation_id": None,
"selected_note_id": None,
})
with gr.Row():
with gr.Column(scale=1):
# Search Notes
search_notes_title = gr.Textbox(label="Search Notes by Title")
search_notes_by_keyword = gr.Textbox(label="Search Notes by Keywords")
search_notes_button = gr.Button("Search Notes")
notes_list = gr.Dropdown(label="Notes", choices=[])
# Manage Notes
load_note_button = gr.Button("Load Note")
delete_note_button = gr.Button("Delete Note")
note_title_input = gr.Textbox(label="Note Title")
note_content_input = gr.TextArea(label="Note Content", lines=20)
note_keywords_input = gr.Textbox(label="Note Keywords (comma-separated)", value="default_note_keyword")
save_note_button = gr.Button("Save Note")
create_new_note_button = gr.Button("Create New Note")
status_message = gr.HTML()
# Function Definitions
def search_notes(search_notes_title, keywords):
if keywords:
keywords_list = [kw.strip() for kw in keywords.split(',')]
notes_data, total_pages, total_count = get_notes_by_keywords(keywords_list)
choices = [f"Note {note_id} - {title} ({timestamp})" for
note_id, title, content, timestamp, conversation_id in notes_data]
return gr.update(choices=choices, label=f"Found {total_count} notes")
elif search_notes_title:
notes_data, total_pages, total_count = search_notes_titles(search_notes_title)
choices = [f"Note {note_id} - {title} ({timestamp})" for
note_id, title, content, timestamp, conversation_id in notes_data]
return gr.update(choices=choices, label=f"Found {total_count} notes")
else:
# This will now return all notes, ordered by timestamp
notes_data, total_pages, total_count = search_notes_titles("")
choices = [f"Note {note_id} - {title} ({timestamp})" for
note_id, title, content, timestamp, conversation_id in notes_data]
return gr.update(choices=choices, label=f"All notes ({total_count} total)")
search_notes_button.click(
search_notes,
inputs=[search_notes_title, search_notes_by_keyword],
outputs=[notes_list]
)
def load_selected_note(selected_note, state_value):
if selected_note:
note_id = int(selected_note.split('(ID: ')[1][:-1])
note_data = get_note_by_id(note_id)
if note_data:
note_id, title, content = note_data[0]
state_value["selected_note_id"] = note_id
# Get keywords for the note
keywords = get_keywords_for_note(note_id)
keywords_str = ', '.join(keywords)
return (
gr.update(value=title),
gr.update(value=content),
gr.update(value=keywords_str),
state_value
)
return gr.update(value=''), gr.update(value=''), gr.update(value=''), state_value
load_note_button.click(
load_selected_note,
inputs=[notes_list, management_state],
outputs=[note_title_input, note_content_input, note_keywords_input, management_state]
)
def save_note_function(title, content, keywords_str, state_value):
note_id = state_value["selected_note_id"]
if note_id:
update_note(note_id, title, content)
if keywords_str:
# Clear existing keywords and add new ones
clear_keywords_from_note(note_id)
keywords_list = [kw.strip() for kw in keywords_str.split(',')]
add_keywords_to_note(note_id, keywords_list)
return gr.Info("Note updated successfully.")
else:
# Create new note
conversation_id = state_value.get("selected_conversation_id")
if conversation_id:
note_id = save_notes(conversation_id, title, content)
state_value["selected_note_id"] = note_id
if keywords_str:
keywords_list = [kw.strip() for kw in keywords_str.split(',')]
add_keywords_to_note(note_id, keywords_list)
return gr.Info("New note created successfully.")
else:
return gr.Error("No conversation selected. Cannot create a new note.")
save_note_button.click(
save_note_function,
inputs=[note_title_input, note_content_input, note_keywords_input, management_state],
outputs=[]
)
def delete_selected_note(state_value):
note_id = state_value["selected_note_id"]
if note_id:
delete_note(note_id)
# Reset state
state_value["selected_note_id"] = None
# Update notes list
updated_notes = search_notes("", "")
return updated_notes, gr.update(value="Note deleted successfully."), state_value
else:
return gr.update(), gr.update(value="No note selected."), state_value
delete_note_button.click(
delete_selected_note,
inputs=[management_state],
outputs=[notes_list, status_message, management_state]
)
def create_new_note_function(state_value):
state_value["selected_note_id"] = None
return gr.update(value=''), gr.update(value=''), gr.update(value=''), state_value
create_new_note_button.click(
create_new_note_function,
inputs=[management_state],
outputs=[note_title_input, note_content_input, note_keywords_input, management_state]
)
def create_rag_qa_chat_management_tab():
# New Management Tab
with gr.TabItem("Chat Management", visible=True):
gr.Markdown("# RAG QA Chat Conversation Management")
management_state = gr.State({
"selected_conversation_id": None,
"selected_note_id": None,
})
# State to store the mapping between titles and IDs
conversation_mapping = gr.State({})
with gr.Row():
with gr.Column(scale=1):
# Search Conversations
with gr.Group():
gr.Markdown("## Search Conversations")
title_search = gr.Textbox(
label="Search by Title",
placeholder="Enter title to search..."
)
content_search = gr.Textbox(
label="Search in Chat Content",
placeholder="Enter text to search in messages..."
)
keyword_search = gr.Textbox(
label="Filter by Keywords (comma-separated)",
placeholder="keyword1, keyword2, ..."
)
search_conversations_button = gr.Button("Search Conversations")
conversations_list = gr.Dropdown(label="Conversations", choices=[])
new_conversation_button = gr.Button("New Conversation")
# Manage Conversations
load_conversation_button = gr.Button("Load Conversation")
delete_conversation_button = gr.Button("Delete Conversation")
conversation_title_input = gr.Textbox(label="Conversation Title")
conversation_content_input = gr.TextArea(label="Conversation Content", lines=20)
save_conversation_button = gr.Button("Save Conversation")
status_message = gr.HTML()
# Function Definitions
def search_conversations(title_query, content_query, keywords):
try:
# Parse keywords if provided
keywords_list = None
if keywords and keywords.strip():
keywords_list = [kw.strip() for kw in keywords.split(',')]
# Search using existing search_conversations_by_keywords function with all criteria
results, total_pages, total_count = search_conversations_by_keywords(
keywords=keywords_list,
title_query=title_query if title_query.strip() else None,
content_query=content_query if content_query.strip() else None
)
# Build choices as list of titles (ensure uniqueness)
choices = []
mapping = {}
for conv in results:
conversation_id = conv['conversation_id']
title = conv['title']
display_title = f"{title} (ID: {conversation_id[:8]})"
choices.append(display_title)
mapping[display_title] = conversation_id
return gr.update(choices=choices), mapping
except Exception as e:
logging.error(f"Error in search_conversations: {str(e)}")
return gr.update(choices=[]), {}
# Update the search button click event
search_conversations_button.click(
search_conversations,
inputs=[title_search, content_search, keyword_search],
outputs=[conversations_list, conversation_mapping]
)
def load_selected_conversation(selected_title, state_value, mapping):
conversation_id = mapping.get(selected_title)
if conversation_id:
# Load conversation title
conversation_title = get_conversation_title(conversation_id)
# Load conversation messages
messages, total_pages, total_count = load_chat_history(conversation_id)
# Concatenate messages into a single string
conversation_content = ""
for role, content in messages:
conversation_content += f"{role}: {content}\n\n"
# Update state
new_state = state_value.copy()
new_state["selected_conversation_id"] = conversation_id
return (
gr.update(value=conversation_title),
gr.update(value=conversation_content.strip()),
new_state
)
return gr.update(value=''), gr.update(value=''), state_value
load_conversation_button.click(
load_selected_conversation,
inputs=[conversations_list, management_state, conversation_mapping],
outputs=[conversation_title_input, conversation_content_input, management_state]
)
def save_conversation(title, content, state_value):
conversation_id = state_value["selected_conversation_id"]
if conversation_id:
# Update conversation title
update_conversation_title(conversation_id, title)
# Clear existing messages
delete_messages_in_conversation(conversation_id)
# Parse the content back into messages
messages = []
for line in content.strip().split('\n\n'):
if ': ' in line:
role, message_content = line.split(': ', 1)
messages.append((role.strip(), message_content.strip()))
else:
# If the format is incorrect, skip or handle accordingly
continue
# Save new messages
for role, message_content in messages:
save_message(conversation_id, role, message_content)
return (
gr.HTML("<p style='color: green;'>Conversation updated successfully.</p>"),
gr.update(value=title),
gr.update(value=content),
state_value
)
else:
return (
gr.HTML("<p style='color: red;'>No conversation selected to save.</p>"),
gr.update(value=title),
gr.update(value=content),
state_value
)
save_conversation_button.click(
save_conversation,
inputs=[conversation_title_input, conversation_content_input, management_state],
outputs=[status_message, conversation_title_input, conversation_content_input, management_state]
)
def delete_selected_conversation(state_value, mapping):
conversation_id = state_value["selected_conversation_id"]
if conversation_id:
delete_conversation(conversation_id)
# Reset state
new_state = state_value.copy()
new_state["selected_conversation_id"] = None
# Update conversations list and mapping
conversations, _, _ = get_all_conversations()
choices = []
new_mapping = {}
for conv_id, title in conversations:
display_title = f"{title} (ID: {conv_id[:8]})"
choices.append(display_title)
new_mapping[display_title] = conv_id
return (
gr.update(choices=choices, value=None),
gr.HTML("<p style='color: green;'>Conversation deleted successfully.</p>"),
new_state,
gr.update(value=''),
gr.update(value=''),
new_mapping
)
else:
return (
gr.update(),
gr.HTML("<p style='color: red;'>No conversation selected.</p>"),
state_value,
gr.update(),
gr.update(),
mapping
)
delete_conversation_button.click(
delete_selected_conversation,
inputs=[management_state, conversation_mapping],
outputs=[
conversations_list,
status_message,
management_state,
conversation_title_input,
conversation_content_input,
conversation_mapping
]
)
def create_new_conversation(state_value, mapping):
conversation_id = start_new_conversation()
# Update state
new_state = state_value.copy()
new_state["selected_conversation_id"] = conversation_id
# Update conversations list and mapping
conversations, _, _ = get_all_conversations()
choices = []
new_mapping = {}
for conv_id, title in conversations:
display_title = f"{title} (ID: {conv_id[:8]})"
choices.append(display_title)
new_mapping[display_title] = conv_id
# Set the new conversation as selected
selected_title = f"Untitled Conversation (ID: {conversation_id[:8]})"
return (
gr.update(choices=choices, value=selected_title),
gr.update(value='Untitled Conversation'),
gr.update(value=''),
gr.HTML("<p style='color: green;'>New conversation created.</p>"),
new_state,
new_mapping
)
new_conversation_button.click(
create_new_conversation,
inputs=[management_state, conversation_mapping],
outputs=[
conversations_list,
conversation_title_input,
conversation_content_input,
status_message,
management_state,
conversation_mapping
]
)
def delete_messages_in_conversation_wrapper(conversation_id):
"""Wrapper function to delete all messages in a conversation."""
try:
delete_messages_in_conversation(conversation_id)
logging.info(f"Messages in conversation '{conversation_id}' deleted successfully.")
except Exception as e:
logging.error(f"Error deleting messages in conversation '{conversation_id}': {e}")
raise
def get_conversation_title_wrapper(conversation_id):
"""Helper function to get the conversation title."""
result = get_conversation_title(conversation_id)
if result:
return result[0][0]
else:
return "Untitled Conversation"
def create_export_data_tab():
with gr.TabItem("Export Data"):
gr.Markdown("# Export Data")
export_option = gr.Radio(
["Export All", "Export Selected"],
label="Export Option",
value="Export All"
)
conversations_checklist = gr.CheckboxGroup(
choices=[],
label="Select Conversations",
visible=False
)
notes_checklist = gr.CheckboxGroup(
choices=[],
label="Select Notes",
visible=False
)
export_button = gr.Button("Export")
download_link = gr.File(label="Download Exported Data", visible=False)
status_message = gr.HTML()
# Function to update visibility and populate checklists
def update_visibility(export_option_value):
if export_option_value == "Export Selected":
# Fetch conversations and notes to populate the checklists
conversations = fetch_all_conversations()
notes = fetch_all_notes()
conversation_choices = [f"{title} (ID: {conversation_id})" for conversation_id, title, _ in conversations]
note_choices = [f"{title} (ID: {note_id})" for note_id, title, _ in notes]
return (
gr.update(visible=True, choices=conversation_choices),
gr.update(visible=True, choices=note_choices)
)
else:
return (
gr.update(visible=False),
gr.update(visible=False)
)
export_option.change(
update_visibility,
inputs=[export_option],
outputs=[conversations_checklist, notes_checklist]
)
import zipfile
import io
def update_visibility(export_option_value):
if export_option_value == "Export Selected":
# Fetch conversations and notes to populate the checklists
conversations = fetch_all_conversations()
notes = fetch_all_notes()
conversation_choices = [f"{title} (ID: {conversation_id})" for conversation_id, title, _ in
conversations]
note_choices = [f"{title} (ID: {note_id})" for note_id, title, _ in notes]
return (
gr.update(visible=True, choices=conversation_choices),
gr.update(visible=True, choices=note_choices)
)
else:
return (
gr.update(visible=False),
gr.update(visible=False)
)
export_option.change(
update_visibility,
inputs=[export_option],
outputs=[conversations_checklist, notes_checklist]
)
def export_data_function(export_option, selected_conversations, selected_notes):
try:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
if export_option == "Export All":
# Fetch all conversations and notes
conversations = fetch_all_conversations()
notes = fetch_all_notes()
else:
# Fetch selected conversations and notes
conversation_ids = [int(item.split(' (ID: ')[1][:-1]) for item in selected_conversations]
note_ids = [int(item.split(' (ID: ')[1][:-1]) for item in selected_notes]
conversations = fetch_conversations_by_ids(conversation_ids)
notes = fetch_notes_by_ids(note_ids)
# Export conversations
for conversation in conversations:
conversation_id, title, _ = conversation
filename = f"conversation_{conversation_id}_{title.replace(' ', '_')}.md"
zip_file.writestr(filename, conversation)
# Export notes
for note in notes:
note_id, title, _ = note
filename = f"note_{note_id}_{title.replace(' ', '_')}.md"
zip_file.writestr(filename, note)
zip_buffer.seek(0)
return zip_buffer, gr.update(visible=True), gr.update(
value="<p style='color:green;'>Export successful!</p>")
except Exception as e:
logging.error(f"Error exporting data: {str(e)}")
return None, gr.update(visible=False), gr.update(value=f"<p style='color:red;'>Error: {str(e)}</p>")
export_button.click(
export_data_function,
inputs=[export_option, conversations_checklist, notes_checklist],
outputs=[download_link, download_link, status_message]
)
def convert_file_to_text(file_path):
"""Convert various file types to plain text."""
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.pdf':
return extract_text_and_format_from_pdf(file_path)
elif file_extension == '.epub':
return read_epub(file_path)
elif file_extension in ['.json', '.csv']:
return read_structured_file(file_path)
elif file_extension == '.docx':
return docx2txt.process(file_path)
elif file_extension in ['.txt', '.md', '.rtf']:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
else:
raise ValueError(f"Unsupported file type: {file_extension}")
def read_structured_file(file_path):
"""Read and convert JSON or CSV files to text."""
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.json':
with open(file_path, 'r') as file:
data = json.load(file)
return json.dumps(data, indent=2)
elif file_extension == '.csv':
with open(file_path, 'r', newline='') as file:
csv_reader = csv.reader(file)
return '\n'.join([','.join(row) for row in csv_reader])
else:
raise ValueError(f"Unsupported file type: {file_extension}")
#
# End of RAG_QA_Chat_tab.py
########################################################################################################################