Spaces:
Running
Running
# character_chat_db.py | |
# Database functions for managing character cards and chat histories. | |
# # | |
# Imports | |
import configparser | |
import sqlite3 | |
import json | |
import os | |
import sys | |
from typing import List, Dict, Optional, Tuple, Any, Union | |
from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path | |
from Tests.Chat_APIs.Chat_APIs_Integration_test import logging | |
# | |
####################################################################################################################### | |
# | |
# | |
def ensure_database_directory(): | |
os.makedirs(get_database_dir(), exist_ok=True) | |
ensure_database_directory() | |
# Construct the path to the config file | |
config_path = get_project_relative_path('Config_Files/config.txt') | |
# Read the config file | |
config = configparser.ConfigParser() | |
config.read(config_path) | |
# Get the chat db path from the config, or use the default if not specified | |
chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db')) | |
print(f"Chat Database path: {chat_DB_PATH}") | |
######################################################################################################## | |
# | |
# Functions | |
# FIXME - Setup properly and test/add documentation for its existence... | |
def initialize_database(): | |
"""Initialize the SQLite database with required tables and FTS5 virtual tables.""" | |
conn = None | |
try: | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
# Enable foreign key constraints | |
cursor.execute("PRAGMA foreign_keys = ON;") | |
# Create CharacterCards table with V2 fields | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS CharacterCards ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT UNIQUE NOT NULL, | |
description TEXT, | |
personality TEXT, | |
scenario TEXT, | |
image BLOB, | |
post_history_instructions TEXT, | |
first_mes TEXT, | |
mes_example TEXT, | |
creator_notes TEXT, | |
system_prompt TEXT, | |
alternate_greetings TEXT, | |
tags TEXT, | |
creator TEXT, | |
character_version TEXT, | |
extensions TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
); | |
""") | |
# Create FTS5 virtual table for CharacterCards | |
cursor.execute(""" | |
CREATE VIRTUAL TABLE IF NOT EXISTS CharacterCards_fts USING fts5( | |
name, | |
description, | |
personality, | |
scenario, | |
system_prompt, | |
content='CharacterCards', | |
content_rowid='id' | |
); | |
""") | |
# Create triggers to keep FTS5 table in sync with CharacterCards | |
cursor.executescript(""" | |
CREATE TRIGGER IF NOT EXISTS CharacterCards_ai AFTER INSERT ON CharacterCards BEGIN | |
INSERT INTO CharacterCards_fts( | |
rowid, | |
name, | |
description, | |
personality, | |
scenario, | |
system_prompt | |
) VALUES ( | |
new.id, | |
new.name, | |
new.description, | |
new.personality, | |
new.scenario, | |
new.system_prompt | |
); | |
END; | |
CREATE TRIGGER IF NOT EXISTS CharacterCards_ad AFTER DELETE ON CharacterCards BEGIN | |
DELETE FROM CharacterCards_fts WHERE rowid = old.id; | |
END; | |
CREATE TRIGGER IF NOT EXISTS CharacterCards_au AFTER UPDATE ON CharacterCards BEGIN | |
UPDATE CharacterCards_fts SET | |
name = new.name, | |
description = new.description, | |
personality = new.personality, | |
scenario = new.scenario, | |
system_prompt = new.system_prompt | |
WHERE rowid = new.id; | |
END; | |
""") | |
# Create CharacterChats table | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS CharacterChats ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
character_id INTEGER NOT NULL, | |
conversation_name TEXT, | |
chat_history TEXT, | |
is_snapshot BOOLEAN DEFAULT FALSE, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE | |
); | |
""") | |
# Create FTS5 virtual table for CharacterChats | |
cursor.execute(""" | |
CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5( | |
conversation_name, | |
chat_history, | |
content='CharacterChats', | |
content_rowid='id' | |
); | |
""") | |
# Create triggers to keep FTS5 table in sync with CharacterChats | |
cursor.executescript(""" | |
CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN | |
INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history) | |
VALUES (new.id, new.conversation_name, new.chat_history); | |
END; | |
CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN | |
DELETE FROM CharacterChats_fts WHERE rowid = old.id; | |
END; | |
CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN | |
UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history | |
WHERE rowid = new.id; | |
END; | |
""") | |
# Create ChatKeywords table | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS ChatKeywords ( | |
chat_id INTEGER NOT NULL, | |
keyword TEXT NOT NULL, | |
FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE | |
); | |
""") | |
# Create indexes for faster searches | |
cursor.execute(""" | |
CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword); | |
""") | |
cursor.execute(""" | |
CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id); | |
""") | |
conn.commit() | |
logging.info("Database initialized successfully.") | |
except sqlite3.Error as e: | |
logging.error(f"SQLite error occurred during database initialization: {e}") | |
if conn: | |
conn.rollback() | |
raise | |
except Exception as e: | |
logging.error(f"Unexpected error occurred during database initialization: {e}") | |
if conn: | |
conn.rollback() | |
raise | |
finally: | |
if conn: | |
conn.close() | |
# Call initialize_database() at the start of your application | |
def setup_chat_database(): | |
try: | |
initialize_database() | |
except Exception as e: | |
logging.critical(f"Failed to initialize database: {e}") | |
sys.exit(1) | |
setup_chat_database() | |
######################################################################################################## | |
# | |
# Character Card handling | |
def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]: | |
"""Parse and validate a character card according to V2 specification.""" | |
v2_data = { | |
'name': card_data.get('name', ''), | |
'description': card_data.get('description', ''), | |
'personality': card_data.get('personality', ''), | |
'scenario': card_data.get('scenario', ''), | |
'first_mes': card_data.get('first_mes', ''), | |
'mes_example': card_data.get('mes_example', ''), | |
'creator_notes': card_data.get('creator_notes', ''), | |
'system_prompt': card_data.get('system_prompt', ''), | |
'post_history_instructions': card_data.get('post_history_instructions', ''), | |
'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])), | |
'tags': json.dumps(card_data.get('tags', [])), | |
'creator': card_data.get('creator', ''), | |
'character_version': card_data.get('character_version', ''), | |
'extensions': json.dumps(card_data.get('extensions', {})) | |
} | |
# Handle 'image' separately as it might be binary data | |
if 'image' in card_data: | |
v2_data['image'] = card_data['image'] | |
return v2_data | |
def add_character_card(card_data: Dict[str, Any]) -> Optional[int]: | |
"""Add or update a character card in the database.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
parsed_card = parse_character_card(card_data) | |
# Check if character already exists | |
cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],)) | |
row = cursor.fetchone() | |
if row: | |
# Update existing character | |
character_id = row[0] | |
update_query = """ | |
UPDATE CharacterCards | |
SET description = ?, personality = ?, scenario = ?, image = ?, | |
post_history_instructions = ?, first_mes = ?, mes_example = ?, | |
creator_notes = ?, system_prompt = ?, alternate_greetings = ?, | |
tags = ?, creator = ?, character_version = ?, extensions = ? | |
WHERE id = ? | |
""" | |
cursor.execute(update_query, ( | |
parsed_card['description'], parsed_card['personality'], parsed_card['scenario'], | |
parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'], | |
parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'], | |
parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'], | |
parsed_card['character_version'], parsed_card['extensions'], character_id | |
)) | |
else: | |
# Insert new character | |
insert_query = """ | |
INSERT INTO CharacterCards (name, description, personality, scenario, image, | |
post_history_instructions, first_mes, mes_example, creator_notes, system_prompt, | |
alternate_greetings, tags, creator, character_version, extensions) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
""" | |
cursor.execute(insert_query, ( | |
parsed_card['name'], parsed_card['description'], parsed_card['personality'], | |
parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'], | |
parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'], | |
parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'], | |
parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions'] | |
)) | |
character_id = cursor.lastrowid | |
conn.commit() | |
return character_id | |
except sqlite3.IntegrityError as e: | |
logging.error(f"Error adding character card: {e}") | |
return None | |
except Exception as e: | |
logging.error(f"Unexpected error adding character card: {e}") | |
return None | |
finally: | |
conn.close() | |
# def add_character_card(card_data: Dict) -> Optional[int]: | |
# """Add or update a character card in the database. | |
# | |
# Returns the ID of the inserted character or None if failed. | |
# """ | |
# conn = sqlite3.connect(chat_DB_PATH) | |
# cursor = conn.cursor() | |
# try: | |
# # Ensure all required fields are present | |
# required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message'] | |
# for field in required_fields: | |
# if field not in card_data: | |
# card_data[field] = '' # Assign empty string if field is missing | |
# | |
# # Check if character already exists | |
# cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],)) | |
# row = cursor.fetchone() | |
# | |
# if row: | |
# # Update existing character | |
# character_id = row[0] | |
# cursor.execute(""" | |
# UPDATE CharacterCards | |
# SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? | |
# WHERE id = ? | |
# """, ( | |
# card_data['description'], | |
# card_data['personality'], | |
# card_data['scenario'], | |
# card_data['image'], | |
# card_data['post_history_instructions'], | |
# card_data['first_message'], | |
# character_id | |
# )) | |
# else: | |
# # Insert new character | |
# cursor.execute(""" | |
# INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message) | |
# VALUES (?, ?, ?, ?, ?, ?, ?) | |
# """, ( | |
# card_data['name'], | |
# card_data['description'], | |
# card_data['personality'], | |
# card_data['scenario'], | |
# card_data['image'], | |
# card_data['post_history_instructions'], | |
# card_data['first_message'] | |
# )) | |
# character_id = cursor.lastrowid | |
# | |
# conn.commit() | |
# return cursor.lastrowid | |
# except sqlite3.IntegrityError as e: | |
# logging.error(f"Error adding character card: {e}") | |
# return None | |
# except Exception as e: | |
# logging.error(f"Unexpected error adding character card: {e}") | |
# return None | |
# finally: | |
# conn.close() | |
def get_character_cards() -> List[Dict]: | |
"""Retrieve all character cards from the database.""" | |
logging.debug(f"Fetching characters from DB: {chat_DB_PATH}") | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT * FROM CharacterCards") | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
conn.close() | |
characters = [dict(zip(columns, row)) for row in rows] | |
#logging.debug(f"Characters fetched from DB: {characters}") | |
return characters | |
def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]: | |
""" | |
Retrieve a single character card by its ID. | |
Args: | |
character_id: Can be either an integer ID or a dictionary containing character data. | |
Returns: | |
A dictionary containing the character card data, or None if not found. | |
""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
if isinstance(character_id, dict): | |
# If a dictionary is passed, assume it's already a character card | |
return character_id | |
elif isinstance(character_id, int): | |
# If an integer is passed, fetch the character from the database | |
cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,)) | |
row = cursor.fetchone() | |
if row: | |
columns = [description[0] for description in cursor.description] | |
return dict(zip(columns, row)) | |
else: | |
logging.warning(f"Invalid type for character_id: {type(character_id)}") | |
return None | |
except Exception as e: | |
logging.error(f"Error in get_character_card_by_id: {e}") | |
return None | |
finally: | |
conn.close() | |
def update_character_card(character_id: int, card_data: Dict) -> bool: | |
"""Update an existing character card.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
cursor.execute(""" | |
UPDATE CharacterCards | |
SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? | |
WHERE id = ? | |
""", ( | |
card_data.get('name'), | |
card_data.get('description'), | |
card_data.get('personality'), | |
card_data.get('scenario'), | |
card_data.get('image'), | |
card_data.get('post_history_instructions', ''), | |
card_data.get('first_message', "Hello! I'm ready to chat."), | |
character_id | |
)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.IntegrityError as e: | |
logging.error(f"Error updating character card: {e}") | |
return False | |
finally: | |
conn.close() | |
def delete_character_card(character_id: int) -> bool: | |
"""Delete a character card and its associated chats.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Delete associated chats first due to foreign key constraint | |
cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,)) | |
cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.Error as e: | |
logging.error(f"Error deleting character card: {e}") | |
return False | |
finally: | |
conn.close() | |
def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]: | |
""" | |
Add a new chat history for a character, optionally associating keywords. | |
Args: | |
character_id (int): The ID of the character. | |
conversation_name (str): Name of the conversation. | |
chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples. | |
keywords (Optional[List[str]]): List of keywords to associate with this chat. | |
is_snapshot (bool, optional): Whether this chat is a snapshot. | |
Returns: | |
Optional[int]: The ID of the inserted chat or None if failed. | |
""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
chat_history_json = json.dumps(chat_history) | |
cursor.execute(""" | |
INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot) | |
VALUES (?, ?, ?, ?) | |
""", ( | |
character_id, | |
conversation_name, | |
chat_history_json, | |
is_snapshot | |
)) | |
chat_id = cursor.lastrowid | |
if keywords: | |
# Insert keywords into ChatKeywords table | |
keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords] | |
cursor.executemany(""" | |
INSERT INTO ChatKeywords (chat_id, keyword) | |
VALUES (?, ?) | |
""", keyword_records) | |
conn.commit() | |
return chat_id | |
except sqlite3.Error as e: | |
logging.error(f"Error adding character chat: {e}") | |
return None | |
finally: | |
conn.close() | |
def get_character_chats(character_id: Optional[int] = None) -> List[Dict]: | |
"""Retrieve all chats, or chats for a specific character if character_id is provided.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
if character_id is not None: | |
cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,)) | |
else: | |
cursor.execute("SELECT * FROM CharacterChats") | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
conn.close() | |
return [dict(zip(columns, row)) for row in rows] | |
def get_character_chat_by_id(chat_id: int) -> Optional[Dict]: | |
"""Retrieve a single chat by its ID.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,)) | |
row = cursor.fetchone() | |
conn.close() | |
if row: | |
columns = [description[0] for description in cursor.description] | |
chat = dict(zip(columns, row)) | |
chat['chat_history'] = json.loads(chat['chat_history']) | |
return chat | |
return None | |
def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]: | |
""" | |
Search for character chats using FTS5, optionally filtered by character_id. | |
Args: | |
query (str): The search query. | |
character_id (Optional[int]): The ID of the character to filter chats by. | |
Returns: | |
Tuple[List[Dict], str]: A list of matching chats and a status message. | |
""" | |
if not query.strip(): | |
return [], "Please enter a search query." | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
if character_id is not None: | |
# Search with character_id filter | |
cursor.execute(""" | |
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
FROM CharacterChats_fts | |
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ? | |
ORDER BY rank | |
""", (query, character_id)) | |
else: | |
# Search without character_id filter | |
cursor.execute(""" | |
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
FROM CharacterChats_fts | |
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
WHERE CharacterChats_fts MATCH ? | |
ORDER BY rank | |
""", (query,)) | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
results = [dict(zip(columns, row)) for row in rows] | |
if character_id is not None: | |
status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character." | |
else: | |
status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters." | |
return results, status_message | |
except Exception as e: | |
logging.error(f"Error searching chats with FTS5: {e}") | |
return [], f"Error occurred during search: {e}" | |
finally: | |
conn.close() | |
def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool: | |
"""Update an existing chat history.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
chat_history_json = json.dumps(chat_history) | |
cursor.execute(""" | |
UPDATE CharacterChats | |
SET chat_history = ? | |
WHERE id = ? | |
""", ( | |
chat_history_json, | |
chat_id | |
)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.Error as e: | |
logging.error(f"Error updating character chat: {e}") | |
return False | |
finally: | |
conn.close() | |
def delete_character_chat(chat_id: int) -> bool: | |
"""Delete a specific chat.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.Error as e: | |
logging.error(f"Error deleting character chat: {e}") | |
return False | |
finally: | |
conn.close() | |
def fetch_keywords_for_chats(keywords: List[str]) -> List[int]: | |
""" | |
Fetch chat IDs associated with any of the specified keywords. | |
Args: | |
keywords (List[str]): List of keywords to search for. | |
Returns: | |
List[int]: List of chat IDs associated with the keywords. | |
""" | |
if not keywords: | |
return [] | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Construct the WHERE clause to search for each keyword | |
keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords)) | |
sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}" | |
cursor.execute(sql_query, keywords) | |
rows = cursor.fetchall() | |
chat_ids = [row[0] for row in rows] | |
return chat_ids | |
except Exception as e: | |
logging.error(f"Error in fetch_keywords_for_chats: {e}") | |
return [] | |
finally: | |
conn.close() | |
def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]: | |
"""Save chat history to the CharacterChats table. | |
Returns the ID of the inserted chat or None if failed. | |
""" | |
return add_character_chat(character_id, conversation_name, chat_history) | |
def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]: | |
""" | |
Perform a full-text search on specified fields with optional filtering and pagination. | |
Args: | |
query (str): The search query. | |
fields (List[str]): List of fields to search in. | |
where_clause (str, optional): Additional SQL WHERE clause to filter results. | |
page (int, optional): Page number for pagination. | |
results_per_page (int, optional): Number of results per page. | |
Returns: | |
List[Dict[str, Any]]: List of matching chat records with content and metadata. | |
""" | |
if not query.strip(): | |
return [] | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Construct the MATCH query for FTS5 | |
match_query = " AND ".join(fields) + f" MATCH ?" | |
# Adjust the query with the fields | |
fts_query = f""" | |
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
FROM CharacterChats_fts | |
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
WHERE {match_query} | |
""" | |
if where_clause: | |
fts_query += f" AND ({where_clause})" | |
fts_query += " ORDER BY rank LIMIT ? OFFSET ?" | |
offset = (page - 1) * results_per_page | |
cursor.execute(fts_query, (query, results_per_page, offset)) | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
results = [dict(zip(columns, row)) for row in rows] | |
return results | |
except Exception as e: | |
logging.error(f"Error in search_db: {e}") | |
return [] | |
finally: | |
conn.close() | |
def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \ | |
List[Dict[str, Any]]: | |
""" | |
Perform a full-text search within the specified chat IDs using FTS5. | |
Args: | |
query (str): The user's query. | |
relevant_chat_ids (List[int]): List of chat IDs to search within. | |
page (int): Pagination page number. | |
results_per_page (int): Number of results per page. | |
Returns: | |
List[Dict[str, Any]]: List of search results with content and metadata. | |
""" | |
try: | |
# Construct a WHERE clause to limit the search to relevant chat IDs | |
where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids]) | |
if not where_clause: | |
where_clause = "1" # No restriction if no chat IDs | |
# Perform full-text search using FTS5 | |
fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page) | |
filtered_fts_results = [ | |
{ | |
"content": result['content'], | |
"metadata": {"media_id": result['id']} | |
} | |
for result in fts_results | |
if result['id'] in relevant_chat_ids | |
] | |
return filtered_fts_results | |
except Exception as e: | |
logging.error(f"Error in perform_full_text_search_chat: {str(e)}") | |
return [] | |
def fetch_all_chats() -> List[Dict[str, Any]]: | |
""" | |
Fetch all chat messages from the database. | |
Returns: | |
List[Dict[str, Any]]: List of chat messages with relevant metadata. | |
""" | |
try: | |
chats = get_character_chats() # Modify this function to retrieve all chats | |
return chats | |
except Exception as e: | |
logging.error(f"Error fetching all chats: {str(e)}") | |
return [] | |
def search_character_chat(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]: | |
""" | |
Perform a full-text search on the Character Chat database. | |
Args: | |
query: Search query string. | |
fts_top_k: Maximum number of results to return. | |
relevant_media_ids: Optional list of character IDs to filter results. | |
Returns: | |
List of search results with content and metadata. | |
""" | |
if not query.strip(): | |
return [] | |
try: | |
# Construct a WHERE clause to limit the search to relevant character IDs | |
where_clause = "" | |
if relevant_media_ids: | |
placeholders = ','.join(['?'] * len(relevant_media_ids)) | |
where_clause = f"CharacterChats.character_id IN ({placeholders})" | |
# Perform full-text search using existing search_db function | |
results = search_db(query, ["conversation_name", "chat_history"], where_clause, results_per_page=fts_top_k) | |
# Format results | |
formatted_results = [] | |
for r in results: | |
formatted_results.append({ | |
"content": r['chat_history'], | |
"metadata": { | |
"chat_id": r['id'], | |
"conversation_name": r['conversation_name'], | |
"character_id": r['character_id'] | |
} | |
}) | |
return formatted_results | |
except Exception as e: | |
logging.error(f"Error in search_character_chat: {e}") | |
return [] | |
def search_character_cards(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]: | |
""" | |
Perform a full-text search on the Character Cards database. | |
Args: | |
query: Search query string. | |
fts_top_k: Maximum number of results to return. | |
relevant_media_ids: Optional list of character IDs to filter results. | |
Returns: | |
List of search results with content and metadata. | |
""" | |
if not query.strip(): | |
return [] | |
try: | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
# Construct the query | |
sql_query = """ | |
SELECT CharacterCards.id, CharacterCards.name, CharacterCards.description, CharacterCards.personality, CharacterCards.scenario | |
FROM CharacterCards_fts | |
JOIN CharacterCards ON CharacterCards_fts.rowid = CharacterCards.id | |
WHERE CharacterCards_fts MATCH ? | |
""" | |
params = [query] | |
# Add filtering by character IDs if provided | |
if relevant_media_ids: | |
placeholders = ','.join(['?'] * len(relevant_media_ids)) | |
sql_query += f" AND CharacterCards.id IN ({placeholders})" | |
params.extend(relevant_media_ids) | |
sql_query += " LIMIT ?" | |
params.append(fts_top_k) | |
cursor.execute(sql_query, params) | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
results = [dict(zip(columns, row)) for row in rows] | |
# Format results | |
formatted_results = [] | |
for r in results: | |
content = f"Name: {r['name']}\nDescription: {r['description']}\nPersonality: {r['personality']}\nScenario: {r['scenario']}" | |
formatted_results.append({ | |
"content": content, | |
"metadata": { | |
"character_id": r['id'], | |
"name": r['name'] | |
} | |
}) | |
return formatted_results | |
except Exception as e: | |
logging.error(f"Error in search_character_cards: {e}") | |
return [] | |
finally: | |
conn.close() | |
def fetch_character_ids_by_keywords(keywords: List[str]) -> List[int]: | |
""" | |
Fetch character IDs associated with any of the specified keywords. | |
Args: | |
keywords (List[str]): List of keywords to search for. | |
Returns: | |
List[int]: List of character IDs associated with the keywords. | |
""" | |
if not keywords: | |
return [] | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Assuming 'tags' column in CharacterCards table stores tags as JSON array | |
placeholders = ','.join(['?'] * len(keywords)) | |
sql_query = f""" | |
SELECT DISTINCT id FROM CharacterCards | |
WHERE EXISTS ( | |
SELECT 1 FROM json_each(tags) | |
WHERE json_each.value IN ({placeholders}) | |
) | |
""" | |
cursor.execute(sql_query, keywords) | |
rows = cursor.fetchall() | |
character_ids = [row[0] for row in rows] | |
return character_ids | |
except Exception as e: | |
logging.error(f"Error in fetch_character_ids_by_keywords: {e}") | |
return [] | |
finally: | |
conn.close() | |
################################################################### | |
# | |
# Character Keywords | |
def view_char_keywords(): | |
try: | |
with sqlite3.connect(chat_DB_PATH) as conn: | |
cursor = conn.cursor() | |
cursor.execute(""" | |
SELECT DISTINCT keyword | |
FROM CharacterCards | |
CROSS JOIN json_each(tags) | |
WHERE json_valid(tags) | |
ORDER BY keyword | |
""") | |
keywords = cursor.fetchall() | |
if keywords: | |
keyword_list = [k[0] for k in keywords] | |
return "### Current Character Keywords:\n" + "\n".join( | |
[f"- {k}" for k in keyword_list]) | |
return "No keywords found." | |
except Exception as e: | |
return f"Error retrieving keywords: {str(e)}" | |
def add_char_keywords(name: str, keywords: str): | |
try: | |
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()] | |
with sqlite3.connect('character_chat.db') as conn: | |
cursor = conn.cursor() | |
cursor.execute( | |
"SELECT tags FROM CharacterCards WHERE name = ?", | |
(name,) | |
) | |
result = cursor.fetchone() | |
if not result: | |
return "Character not found." | |
current_tags = result[0] if result[0] else "[]" | |
current_keywords = set(current_tags[1:-1].split(',')) if current_tags != "[]" else set() | |
updated_keywords = current_keywords.union(set(keywords_list)) | |
cursor.execute( | |
"UPDATE CharacterCards SET tags = ? WHERE name = ?", | |
(str(list(updated_keywords)), name) | |
) | |
conn.commit() | |
return f"Successfully added keywords to character {name}" | |
except Exception as e: | |
return f"Error adding keywords: {str(e)}" | |
def delete_char_keyword(char_name: str, keyword: str) -> str: | |
""" | |
Delete a keyword from a character's tags. | |
Args: | |
char_name (str): The name of the character | |
keyword (str): The keyword to delete | |
Returns: | |
str: Success/failure message | |
""" | |
try: | |
with sqlite3.connect(chat_DB_PATH) as conn: | |
cursor = conn.cursor() | |
# First, check if the character exists | |
cursor.execute("SELECT tags FROM CharacterCards WHERE name = ?", (char_name,)) | |
result = cursor.fetchone() | |
if not result: | |
return f"Character '{char_name}' not found." | |
# Parse existing tags | |
current_tags = json.loads(result[0]) if result[0] else [] | |
if keyword not in current_tags: | |
return f"Keyword '{keyword}' not found in character '{char_name}' tags." | |
# Remove the keyword | |
updated_tags = [tag for tag in current_tags if tag != keyword] | |
# Update the character's tags | |
cursor.execute( | |
"UPDATE CharacterCards SET tags = ? WHERE name = ?", | |
(json.dumps(updated_tags), char_name) | |
) | |
conn.commit() | |
logging.info(f"Keyword '{keyword}' deleted from character '{char_name}'") | |
return f"Successfully deleted keyword '{keyword}' from character '{char_name}'." | |
except Exception as e: | |
error_msg = f"Error deleting keyword: {str(e)}" | |
logging.error(error_msg) | |
return error_msg | |
def export_char_keywords_to_csv() -> Tuple[str, str]: | |
""" | |
Export all character keywords to a CSV file with associated metadata. | |
Returns: | |
Tuple[str, str]: (status_message, file_path) | |
""" | |
import csv | |
from tempfile import NamedTemporaryFile | |
from datetime import datetime | |
try: | |
# Create a temporary CSV file | |
temp_file = NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', newline='') | |
with sqlite3.connect(chat_DB_PATH) as conn: | |
cursor = conn.cursor() | |
# Get all characters and their tags | |
cursor.execute(""" | |
SELECT | |
name, | |
tags, | |
(SELECT COUNT(*) FROM CharacterChats WHERE CharacterChats.character_id = CharacterCards.id) as chat_count | |
FROM CharacterCards | |
WHERE json_valid(tags) | |
ORDER BY name | |
""") | |
results = cursor.fetchall() | |
# Process the results to create rows for the CSV | |
csv_rows = [] | |
for name, tags_json, chat_count in results: | |
tags = json.loads(tags_json) if tags_json else [] | |
for tag in tags: | |
csv_rows.append([ | |
tag, # keyword | |
name, # character name | |
chat_count # number of chats | |
]) | |
# Write to CSV | |
writer = csv.writer(temp_file) | |
writer.writerow(['Keyword', 'Character Name', 'Number of Chats']) | |
writer.writerows(csv_rows) | |
temp_file.close() | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
status_msg = f"Successfully exported {len(csv_rows)} character keyword entries to CSV." | |
logging.info(status_msg) | |
return status_msg, temp_file.name | |
except Exception as e: | |
error_msg = f"Error exporting keywords: {str(e)}" | |
logging.error(error_msg) | |
return error_msg, "" | |
# | |
# End of Character chat keyword functions | |
###################################################### | |
# | |
# End of Character_Chat_DB.py | |
####################################################################################################################### | |