# character_chat_db.py # Database functions for managing character cards and chat histories. # # # Imports import configparser import sqlite3 import json import os import sys from typing import List, Dict, Optional, Tuple, Any, Union from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path from Tests.Chat_APIs.Chat_APIs_Integration_test import logging # ####################################################################################################################### # # def ensure_database_directory(): os.makedirs(get_database_dir(), exist_ok=True) ensure_database_directory() # Construct the path to the config file config_path = get_project_relative_path('Config_Files/config.txt') # Read the config file config = configparser.ConfigParser() config.read(config_path) # Get the chat db path from the config, or use the default if not specified chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db')) print(f"Chat Database path: {chat_DB_PATH}") ######################################################################################################## # # Functions # FIXME - Setup properly and test/add documentation for its existence... def initialize_database(): """Initialize the SQLite database with required tables and FTS5 virtual tables.""" conn = None try: conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON;") # Create CharacterCards table with V2 fields cursor.execute(""" CREATE TABLE IF NOT EXISTS CharacterCards ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, description TEXT, personality TEXT, scenario TEXT, image BLOB, post_history_instructions TEXT, first_mes TEXT, mes_example TEXT, creator_notes TEXT, system_prompt TEXT, alternate_greetings TEXT, tags TEXT, creator TEXT, character_version TEXT, extensions TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); """) # Create FTS5 virtual table for CharacterCards cursor.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS CharacterCards_fts USING fts5( name, description, personality, scenario, system_prompt, content='CharacterCards', content_rowid='id' ); """) # Create triggers to keep FTS5 table in sync with CharacterCards cursor.executescript(""" CREATE TRIGGER IF NOT EXISTS CharacterCards_ai AFTER INSERT ON CharacterCards BEGIN INSERT INTO CharacterCards_fts( rowid, name, description, personality, scenario, system_prompt ) VALUES ( new.id, new.name, new.description, new.personality, new.scenario, new.system_prompt ); END; CREATE TRIGGER IF NOT EXISTS CharacterCards_ad AFTER DELETE ON CharacterCards BEGIN DELETE FROM CharacterCards_fts WHERE rowid = old.id; END; CREATE TRIGGER IF NOT EXISTS CharacterCards_au AFTER UPDATE ON CharacterCards BEGIN UPDATE CharacterCards_fts SET name = new.name, description = new.description, personality = new.personality, scenario = new.scenario, system_prompt = new.system_prompt WHERE rowid = new.id; END; """) # Create CharacterChats table cursor.execute(""" CREATE TABLE IF NOT EXISTS CharacterChats ( id INTEGER PRIMARY KEY AUTOINCREMENT, character_id INTEGER NOT NULL, conversation_name TEXT, chat_history TEXT, is_snapshot BOOLEAN DEFAULT FALSE, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE ); """) # Create FTS5 virtual table for CharacterChats cursor.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5( conversation_name, chat_history, content='CharacterChats', content_rowid='id' ); """) # Create triggers to keep FTS5 table in sync with CharacterChats cursor.executescript(""" CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history) VALUES (new.id, new.conversation_name, new.chat_history); END; CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN DELETE FROM CharacterChats_fts WHERE rowid = old.id; END; CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history WHERE rowid = new.id; END; """) # Create ChatKeywords table cursor.execute(""" CREATE TABLE IF NOT EXISTS ChatKeywords ( chat_id INTEGER NOT NULL, keyword TEXT NOT NULL, FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE ); """) # Create indexes for faster searches cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword); """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id); """) conn.commit() logging.info("Database initialized successfully.") except sqlite3.Error as e: logging.error(f"SQLite error occurred during database initialization: {e}") if conn: conn.rollback() raise except Exception as e: logging.error(f"Unexpected error occurred during database initialization: {e}") if conn: conn.rollback() raise finally: if conn: conn.close() # Call initialize_database() at the start of your application def setup_chat_database(): try: initialize_database() except Exception as e: logging.critical(f"Failed to initialize database: {e}") sys.exit(1) setup_chat_database() ######################################################################################################## # # Character Card handling def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]: """Parse and validate a character card according to V2 specification.""" v2_data = { 'name': card_data.get('name', ''), 'description': card_data.get('description', ''), 'personality': card_data.get('personality', ''), 'scenario': card_data.get('scenario', ''), 'first_mes': card_data.get('first_mes', ''), 'mes_example': card_data.get('mes_example', ''), 'creator_notes': card_data.get('creator_notes', ''), 'system_prompt': card_data.get('system_prompt', ''), 'post_history_instructions': card_data.get('post_history_instructions', ''), 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])), 'tags': json.dumps(card_data.get('tags', [])), 'creator': card_data.get('creator', ''), 'character_version': card_data.get('character_version', ''), 'extensions': json.dumps(card_data.get('extensions', {})) } # Handle 'image' separately as it might be binary data if 'image' in card_data: v2_data['image'] = card_data['image'] return v2_data def add_character_card(card_data: Dict[str, Any]) -> Optional[int]: """Add or update a character card in the database.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: parsed_card = parse_character_card(card_data) # Check if character already exists cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],)) row = cursor.fetchone() if row: # Update existing character character_id = row[0] update_query = """ UPDATE CharacterCards SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_mes = ?, mes_example = ?, creator_notes = ?, system_prompt = ?, alternate_greetings = ?, tags = ?, creator = ?, character_version = ?, extensions = ? WHERE id = ? """ cursor.execute(update_query, ( parsed_card['description'], parsed_card['personality'], parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions'], character_id )) else: # Insert new character insert_query = """ INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_mes, mes_example, creator_notes, system_prompt, alternate_greetings, tags, creator, character_version, extensions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ cursor.execute(insert_query, ( parsed_card['name'], parsed_card['description'], parsed_card['personality'], parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions'] )) character_id = cursor.lastrowid conn.commit() return character_id except sqlite3.IntegrityError as e: logging.error(f"Error adding character card: {e}") return None except Exception as e: logging.error(f"Unexpected error adding character card: {e}") return None finally: conn.close() # def add_character_card(card_data: Dict) -> Optional[int]: # """Add or update a character card in the database. # # Returns the ID of the inserted character or None if failed. # """ # conn = sqlite3.connect(chat_DB_PATH) # cursor = conn.cursor() # try: # # Ensure all required fields are present # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message'] # for field in required_fields: # if field not in card_data: # card_data[field] = '' # Assign empty string if field is missing # # # Check if character already exists # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],)) # row = cursor.fetchone() # # if row: # # Update existing character # character_id = row[0] # cursor.execute(""" # UPDATE CharacterCards # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? # WHERE id = ? # """, ( # card_data['description'], # card_data['personality'], # card_data['scenario'], # card_data['image'], # card_data['post_history_instructions'], # card_data['first_message'], # character_id # )) # else: # # Insert new character # cursor.execute(""" # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message) # VALUES (?, ?, ?, ?, ?, ?, ?) # """, ( # card_data['name'], # card_data['description'], # card_data['personality'], # card_data['scenario'], # card_data['image'], # card_data['post_history_instructions'], # card_data['first_message'] # )) # character_id = cursor.lastrowid # # conn.commit() # return cursor.lastrowid # except sqlite3.IntegrityError as e: # logging.error(f"Error adding character card: {e}") # return None # except Exception as e: # logging.error(f"Unexpected error adding character card: {e}") # return None # finally: # conn.close() def get_character_cards() -> List[Dict]: """Retrieve all character cards from the database.""" logging.debug(f"Fetching characters from DB: {chat_DB_PATH}") conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() cursor.execute("SELECT * FROM CharacterCards") rows = cursor.fetchall() columns = [description[0] for description in cursor.description] conn.close() characters = [dict(zip(columns, row)) for row in rows] #logging.debug(f"Characters fetched from DB: {characters}") return characters def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ Retrieve a single character card by its ID. Args: character_id: Can be either an integer ID or a dictionary containing character data. Returns: A dictionary containing the character card data, or None if not found. """ conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: if isinstance(character_id, dict): # If a dictionary is passed, assume it's already a character card return character_id elif isinstance(character_id, int): # If an integer is passed, fetch the character from the database cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,)) row = cursor.fetchone() if row: columns = [description[0] for description in cursor.description] return dict(zip(columns, row)) else: logging.warning(f"Invalid type for character_id: {type(character_id)}") return None except Exception as e: logging.error(f"Error in get_character_card_by_id: {e}") return None finally: conn.close() def update_character_card(character_id: int, card_data: Dict) -> bool: """Update an existing character card.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: cursor.execute(""" UPDATE CharacterCards SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? WHERE id = ? """, ( card_data.get('name'), card_data.get('description'), card_data.get('personality'), card_data.get('scenario'), card_data.get('image'), card_data.get('post_history_instructions', ''), card_data.get('first_message', "Hello! I'm ready to chat."), character_id )) conn.commit() return cursor.rowcount > 0 except sqlite3.IntegrityError as e: logging.error(f"Error updating character card: {e}") return False finally: conn.close() def delete_character_card(character_id: int) -> bool: """Delete a character card and its associated chats.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: # Delete associated chats first due to foreign key constraint cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,)) cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,)) conn.commit() return cursor.rowcount > 0 except sqlite3.Error as e: logging.error(f"Error deleting character card: {e}") return False finally: conn.close() def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]: """ Add a new chat history for a character, optionally associating keywords. Args: character_id (int): The ID of the character. conversation_name (str): Name of the conversation. chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples. keywords (Optional[List[str]]): List of keywords to associate with this chat. is_snapshot (bool, optional): Whether this chat is a snapshot. Returns: Optional[int]: The ID of the inserted chat or None if failed. """ conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: chat_history_json = json.dumps(chat_history) cursor.execute(""" INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot) VALUES (?, ?, ?, ?) """, ( character_id, conversation_name, chat_history_json, is_snapshot )) chat_id = cursor.lastrowid if keywords: # Insert keywords into ChatKeywords table keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords] cursor.executemany(""" INSERT INTO ChatKeywords (chat_id, keyword) VALUES (?, ?) """, keyword_records) conn.commit() return chat_id except sqlite3.Error as e: logging.error(f"Error adding character chat: {e}") return None finally: conn.close() def get_character_chats(character_id: Optional[int] = None) -> List[Dict]: """Retrieve all chats, or chats for a specific character if character_id is provided.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() if character_id is not None: cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,)) else: cursor.execute("SELECT * FROM CharacterChats") rows = cursor.fetchall() columns = [description[0] for description in cursor.description] conn.close() return [dict(zip(columns, row)) for row in rows] def get_character_chat_by_id(chat_id: int) -> Optional[Dict]: """Retrieve a single chat by its ID.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,)) row = cursor.fetchone() conn.close() if row: columns = [description[0] for description in cursor.description] chat = dict(zip(columns, row)) chat['chat_history'] = json.loads(chat['chat_history']) return chat return None def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]: """ Search for character chats using FTS5, optionally filtered by character_id. Args: query (str): The search query. character_id (Optional[int]): The ID of the character to filter chats by. Returns: Tuple[List[Dict], str]: A list of matching chats and a status message. """ if not query.strip(): return [], "Please enter a search query." conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: if character_id is not None: # Search with character_id filter cursor.execute(""" SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history FROM CharacterChats_fts JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ? ORDER BY rank """, (query, character_id)) else: # Search without character_id filter cursor.execute(""" SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history FROM CharacterChats_fts JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id WHERE CharacterChats_fts MATCH ? ORDER BY rank """, (query,)) rows = cursor.fetchall() columns = [description[0] for description in cursor.description] results = [dict(zip(columns, row)) for row in rows] if character_id is not None: status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character." else: status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters." return results, status_message except Exception as e: logging.error(f"Error searching chats with FTS5: {e}") return [], f"Error occurred during search: {e}" finally: conn.close() def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool: """Update an existing chat history.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: chat_history_json = json.dumps(chat_history) cursor.execute(""" UPDATE CharacterChats SET chat_history = ? WHERE id = ? """, ( chat_history_json, chat_id )) conn.commit() return cursor.rowcount > 0 except sqlite3.Error as e: logging.error(f"Error updating character chat: {e}") return False finally: conn.close() def delete_character_chat(chat_id: int) -> bool: """Delete a specific chat.""" conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,)) conn.commit() return cursor.rowcount > 0 except sqlite3.Error as e: logging.error(f"Error deleting character chat: {e}") return False finally: conn.close() def fetch_keywords_for_chats(keywords: List[str]) -> List[int]: """ Fetch chat IDs associated with any of the specified keywords. Args: keywords (List[str]): List of keywords to search for. Returns: List[int]: List of chat IDs associated with the keywords. """ if not keywords: return [] conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: # Construct the WHERE clause to search for each keyword keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords)) sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}" cursor.execute(sql_query, keywords) rows = cursor.fetchall() chat_ids = [row[0] for row in rows] return chat_ids except Exception as e: logging.error(f"Error in fetch_keywords_for_chats: {e}") return [] finally: conn.close() def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]: """Save chat history to the CharacterChats table. Returns the ID of the inserted chat or None if failed. """ return add_character_chat(character_id, conversation_name, chat_history) def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]: """ Perform a full-text search on specified fields with optional filtering and pagination. Args: query (str): The search query. fields (List[str]): List of fields to search in. where_clause (str, optional): Additional SQL WHERE clause to filter results. page (int, optional): Page number for pagination. results_per_page (int, optional): Number of results per page. Returns: List[Dict[str, Any]]: List of matching chat records with content and metadata. """ if not query.strip(): return [] conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: # Construct the MATCH query for FTS5 match_query = " AND ".join(fields) + f" MATCH ?" # Adjust the query with the fields fts_query = f""" SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history FROM CharacterChats_fts JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id WHERE {match_query} """ if where_clause: fts_query += f" AND ({where_clause})" fts_query += " ORDER BY rank LIMIT ? OFFSET ?" offset = (page - 1) * results_per_page cursor.execute(fts_query, (query, results_per_page, offset)) rows = cursor.fetchall() columns = [description[0] for description in cursor.description] results = [dict(zip(columns, row)) for row in rows] return results except Exception as e: logging.error(f"Error in search_db: {e}") return [] finally: conn.close() def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \ List[Dict[str, Any]]: """ Perform a full-text search within the specified chat IDs using FTS5. Args: query (str): The user's query. relevant_chat_ids (List[int]): List of chat IDs to search within. page (int): Pagination page number. results_per_page (int): Number of results per page. Returns: List[Dict[str, Any]]: List of search results with content and metadata. """ try: # Construct a WHERE clause to limit the search to relevant chat IDs where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids]) if not where_clause: where_clause = "1" # No restriction if no chat IDs # Perform full-text search using FTS5 fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page) filtered_fts_results = [ { "content": result['content'], "metadata": {"media_id": result['id']} } for result in fts_results if result['id'] in relevant_chat_ids ] return filtered_fts_results except Exception as e: logging.error(f"Error in perform_full_text_search_chat: {str(e)}") return [] def fetch_all_chats() -> List[Dict[str, Any]]: """ Fetch all chat messages from the database. Returns: List[Dict[str, Any]]: List of chat messages with relevant metadata. """ try: chats = get_character_chats() # Modify this function to retrieve all chats return chats except Exception as e: logging.error(f"Error fetching all chats: {str(e)}") return [] def search_character_chat(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]: """ Perform a full-text search on the Character Chat database. Args: query: Search query string. fts_top_k: Maximum number of results to return. relevant_media_ids: Optional list of character IDs to filter results. Returns: List of search results with content and metadata. """ if not query.strip(): return [] try: # Construct a WHERE clause to limit the search to relevant character IDs where_clause = "" if relevant_media_ids: placeholders = ','.join(['?'] * len(relevant_media_ids)) where_clause = f"CharacterChats.character_id IN ({placeholders})" # Perform full-text search using existing search_db function results = search_db(query, ["conversation_name", "chat_history"], where_clause, results_per_page=fts_top_k) # Format results formatted_results = [] for r in results: formatted_results.append({ "content": r['chat_history'], "metadata": { "chat_id": r['id'], "conversation_name": r['conversation_name'], "character_id": r['character_id'] } }) return formatted_results except Exception as e: logging.error(f"Error in search_character_chat: {e}") return [] def search_character_cards(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]: """ Perform a full-text search on the Character Cards database. Args: query: Search query string. fts_top_k: Maximum number of results to return. relevant_media_ids: Optional list of character IDs to filter results. Returns: List of search results with content and metadata. """ if not query.strip(): return [] try: conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() # Construct the query sql_query = """ SELECT CharacterCards.id, CharacterCards.name, CharacterCards.description, CharacterCards.personality, CharacterCards.scenario FROM CharacterCards_fts JOIN CharacterCards ON CharacterCards_fts.rowid = CharacterCards.id WHERE CharacterCards_fts MATCH ? """ params = [query] # Add filtering by character IDs if provided if relevant_media_ids: placeholders = ','.join(['?'] * len(relevant_media_ids)) sql_query += f" AND CharacterCards.id IN ({placeholders})" params.extend(relevant_media_ids) sql_query += " LIMIT ?" params.append(fts_top_k) cursor.execute(sql_query, params) rows = cursor.fetchall() columns = [description[0] for description in cursor.description] results = [dict(zip(columns, row)) for row in rows] # Format results formatted_results = [] for r in results: content = f"Name: {r['name']}\nDescription: {r['description']}\nPersonality: {r['personality']}\nScenario: {r['scenario']}" formatted_results.append({ "content": content, "metadata": { "character_id": r['id'], "name": r['name'] } }) return formatted_results except Exception as e: logging.error(f"Error in search_character_cards: {e}") return [] finally: conn.close() def fetch_character_ids_by_keywords(keywords: List[str]) -> List[int]: """ Fetch character IDs associated with any of the specified keywords. Args: keywords (List[str]): List of keywords to search for. Returns: List[int]: List of character IDs associated with the keywords. """ if not keywords: return [] conn = sqlite3.connect(chat_DB_PATH) cursor = conn.cursor() try: # Assuming 'tags' column in CharacterCards table stores tags as JSON array placeholders = ','.join(['?'] * len(keywords)) sql_query = f""" SELECT DISTINCT id FROM CharacterCards WHERE EXISTS ( SELECT 1 FROM json_each(tags) WHERE json_each.value IN ({placeholders}) ) """ cursor.execute(sql_query, keywords) rows = cursor.fetchall() character_ids = [row[0] for row in rows] return character_ids except Exception as e: logging.error(f"Error in fetch_character_ids_by_keywords: {e}") return [] finally: conn.close() ################################################################### # # Character Keywords def view_char_keywords(): try: with sqlite3.connect(chat_DB_PATH) as conn: cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT keyword FROM CharacterCards CROSS JOIN json_each(tags) WHERE json_valid(tags) ORDER BY keyword """) keywords = cursor.fetchall() if keywords: keyword_list = [k[0] for k in keywords] return "### Current Character Keywords:\n" + "\n".join( [f"- {k}" for k in keyword_list]) return "No keywords found." except Exception as e: return f"Error retrieving keywords: {str(e)}" def add_char_keywords(name: str, keywords: str): try: keywords_list = [k.strip() for k in keywords.split(",") if k.strip()] with sqlite3.connect('character_chat.db') as conn: cursor = conn.cursor() cursor.execute( "SELECT tags FROM CharacterCards WHERE name = ?", (name,) ) result = cursor.fetchone() if not result: return "Character not found." current_tags = result[0] if result[0] else "[]" current_keywords = set(current_tags[1:-1].split(',')) if current_tags != "[]" else set() updated_keywords = current_keywords.union(set(keywords_list)) cursor.execute( "UPDATE CharacterCards SET tags = ? WHERE name = ?", (str(list(updated_keywords)), name) ) conn.commit() return f"Successfully added keywords to character {name}" except Exception as e: return f"Error adding keywords: {str(e)}" def delete_char_keyword(char_name: str, keyword: str) -> str: """ Delete a keyword from a character's tags. Args: char_name (str): The name of the character keyword (str): The keyword to delete Returns: str: Success/failure message """ try: with sqlite3.connect(chat_DB_PATH) as conn: cursor = conn.cursor() # First, check if the character exists cursor.execute("SELECT tags FROM CharacterCards WHERE name = ?", (char_name,)) result = cursor.fetchone() if not result: return f"Character '{char_name}' not found." # Parse existing tags current_tags = json.loads(result[0]) if result[0] else [] if keyword not in current_tags: return f"Keyword '{keyword}' not found in character '{char_name}' tags." # Remove the keyword updated_tags = [tag for tag in current_tags if tag != keyword] # Update the character's tags cursor.execute( "UPDATE CharacterCards SET tags = ? WHERE name = ?", (json.dumps(updated_tags), char_name) ) conn.commit() logging.info(f"Keyword '{keyword}' deleted from character '{char_name}'") return f"Successfully deleted keyword '{keyword}' from character '{char_name}'." except Exception as e: error_msg = f"Error deleting keyword: {str(e)}" logging.error(error_msg) return error_msg def export_char_keywords_to_csv() -> Tuple[str, str]: """ Export all character keywords to a CSV file with associated metadata. Returns: Tuple[str, str]: (status_message, file_path) """ import csv from tempfile import NamedTemporaryFile from datetime import datetime try: # Create a temporary CSV file temp_file = NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', newline='') with sqlite3.connect(chat_DB_PATH) as conn: cursor = conn.cursor() # Get all characters and their tags cursor.execute(""" SELECT name, tags, (SELECT COUNT(*) FROM CharacterChats WHERE CharacterChats.character_id = CharacterCards.id) as chat_count FROM CharacterCards WHERE json_valid(tags) ORDER BY name """) results = cursor.fetchall() # Process the results to create rows for the CSV csv_rows = [] for name, tags_json, chat_count in results: tags = json.loads(tags_json) if tags_json else [] for tag in tags: csv_rows.append([ tag, # keyword name, # character name chat_count # number of chats ]) # Write to CSV writer = csv.writer(temp_file) writer.writerow(['Keyword', 'Character Name', 'Number of Chats']) writer.writerows(csv_rows) temp_file.close() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") status_msg = f"Successfully exported {len(csv_rows)} character keyword entries to CSV." logging.info(status_msg) return status_msg, temp_file.name except Exception as e: error_msg = f"Error exporting keywords: {str(e)}" logging.error(error_msg) return error_msg, "" # # End of Character chat keyword functions ###################################################### # # End of Character_Chat_DB.py #######################################################################################################################