# Prompts_DB.py # Description: Functions to manage the prompts database. # # Imports import sqlite3 import logging # # External Imports import re from typing import Tuple # # Local Imports from App_Function_Libraries.Utils.Utils import get_database_path # ####################################################################################################################### # # Functions to manage prompts DB def create_prompts_db(): logging.debug("create_prompts_db: Creating prompts database.") with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.executescript(''' CREATE TABLE IF NOT EXISTS Prompts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, author TEXT, details TEXT, system TEXT, user TEXT ); CREATE TABLE IF NOT EXISTS Keywords ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT NOT NULL UNIQUE COLLATE NOCASE ); CREATE TABLE IF NOT EXISTS PromptKeywords ( prompt_id INTEGER, keyword_id INTEGER, FOREIGN KEY (prompt_id) REFERENCES Prompts (id), FOREIGN KEY (keyword_id) REFERENCES Keywords (id), PRIMARY KEY (prompt_id, keyword_id) ); CREATE INDEX IF NOT EXISTS idx_keywords_keyword ON Keywords(keyword); CREATE INDEX IF NOT EXISTS idx_promptkeywords_prompt_id ON PromptKeywords(prompt_id); CREATE INDEX IF NOT EXISTS idx_promptkeywords_keyword_id ON PromptKeywords(keyword_id); ''') # FIXME - dirty hack that should be removed later... # Migration function to add the 'author' column to the Prompts table def add_author_column_to_prompts(): with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() # Check if 'author' column already exists cursor.execute("PRAGMA table_info(Prompts)") columns = [col[1] for col in cursor.fetchall()] if 'author' not in columns: # Add the 'author' column cursor.execute('ALTER TABLE Prompts ADD COLUMN author TEXT') print("Author column added to Prompts table.") else: print("Author column already exists in Prompts table.") add_author_column_to_prompts() def normalize_keyword(keyword): return re.sub(r'\s+', ' ', keyword.strip().lower()) # FIXME - update calls to this function to use the new args def add_prompt(name, author, details, system=None, user=None, keywords=None): logging.debug(f"add_prompt: Adding prompt with name: {name}, author: {author}, system: {system}, user: {user}, keywords: {keywords}") if not name: logging.error("add_prompt: A name is required.") return "A name is required." try: with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO Prompts (name, author, details, system, user) VALUES (?, ?, ?, ?, ?) ''', (name, author, details, system, user)) prompt_id = cursor.lastrowid if keywords: normalized_keywords = [normalize_keyword(k) for k in keywords if k.strip()] for keyword in set(normalized_keywords): # Use set to remove duplicates cursor.execute(''' INSERT OR IGNORE INTO Keywords (keyword) VALUES (?) ''', (keyword,)) cursor.execute('SELECT id FROM Keywords WHERE keyword = ?', (keyword,)) keyword_id = cursor.fetchone()[0] cursor.execute(''' INSERT OR IGNORE INTO PromptKeywords (prompt_id, keyword_id) VALUES (?, ?) ''', (prompt_id, keyword_id)) return "Prompt added successfully." except sqlite3.IntegrityError: return "Prompt with this name already exists." except sqlite3.Error as e: return f"Database error: {e}" def fetch_prompt_details(name): logging.debug(f"fetch_prompt_details: Fetching details for prompt: {name}") with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute(''' SELECT p.name, p.author, p.details, p.system, p.user, GROUP_CONCAT(k.keyword, ', ') as keywords FROM Prompts p LEFT JOIN PromptKeywords pk ON p.id = pk.prompt_id LEFT JOIN Keywords k ON pk.keyword_id = k.id WHERE p.name = ? GROUP BY p.id ''', (name,)) return cursor.fetchone() def list_prompts(page=1, per_page=10): logging.debug(f"list_prompts: Listing prompts for page {page} with {per_page} prompts per page.") offset = (page - 1) * per_page with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute('SELECT name FROM Prompts LIMIT ? OFFSET ?', (per_page, offset)) prompts = [row[0] for row in cursor.fetchall()] # Get total count of prompts cursor.execute('SELECT COUNT(*) FROM Prompts') total_count = cursor.fetchone()[0] total_pages = (total_count + per_page - 1) // per_page return prompts, total_pages, page def insert_prompt_to_db(title, author, description, system_prompt, user_prompt, keywords=None): return add_prompt(title, author, description, system_prompt, user_prompt, keywords) def get_prompt_db_connection(): prompt_db_path = get_database_path('prompts.db') return sqlite3.connect(prompt_db_path) def search_prompts(query): logging.debug(f"search_prompts: Searching prompts with query: {query}") try: with get_prompt_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT p.name, p.details, p.system, p.user, GROUP_CONCAT(k.keyword, ', ') as keywords FROM Prompts p LEFT JOIN PromptKeywords pk ON p.id = pk.prompt_id LEFT JOIN Keywords k ON pk.keyword_id = k.id WHERE p.name LIKE ? OR p.details LIKE ? OR p.system LIKE ? OR p.user LIKE ? OR k.keyword LIKE ? GROUP BY p.id ORDER BY p.name """, (f'%{query}%', f'%{query}%', f'%{query}%', f'%{query}%', f'%{query}%')) return cursor.fetchall() except sqlite3.Error as e: logging.error(f"Error searching prompts: {e}") return [] def search_prompts_by_keyword(keyword, page=1, per_page=10): logging.debug(f"search_prompts_by_keyword: Searching prompts by keyword: {keyword}") normalized_keyword = normalize_keyword(keyword) offset = (page - 1) * per_page with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute(''' SELECT DISTINCT p.name FROM Prompts p JOIN PromptKeywords pk ON p.id = pk.prompt_id JOIN Keywords k ON pk.keyword_id = k.id WHERE k.keyword LIKE ? LIMIT ? OFFSET ? ''', ('%' + normalized_keyword + '%', per_page, offset)) prompts = [row[0] for row in cursor.fetchall()] # Get total count of matching prompts cursor.execute(''' SELECT COUNT(DISTINCT p.id) FROM Prompts p JOIN PromptKeywords pk ON p.id = pk.prompt_id JOIN Keywords k ON pk.keyword_id = k.id WHERE k.keyword LIKE ? ''', ('%' + normalized_keyword + '%',)) total_count = cursor.fetchone()[0] total_pages = (total_count + per_page - 1) // per_page return prompts, total_pages, page def update_prompt_keywords(prompt_name, new_keywords): logging.debug(f"update_prompt_keywords: Updating keywords for prompt: {prompt_name}") try: with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute('SELECT id FROM Prompts WHERE name = ?', (prompt_name,)) prompt_id = cursor.fetchone() if not prompt_id: return "Prompt not found." prompt_id = prompt_id[0] cursor.execute('DELETE FROM PromptKeywords WHERE prompt_id = ?', (prompt_id,)) normalized_keywords = [normalize_keyword(k) for k in new_keywords if k.strip()] for keyword in set(normalized_keywords): # Use set to remove duplicates cursor.execute('INSERT OR IGNORE INTO Keywords (keyword) VALUES (?)', (keyword,)) cursor.execute('SELECT id FROM Keywords WHERE keyword = ?', (keyword,)) keyword_id = cursor.fetchone()[0] cursor.execute('INSERT INTO PromptKeywords (prompt_id, keyword_id) VALUES (?, ?)', (prompt_id, keyword_id)) # Remove unused keywords cursor.execute(''' DELETE FROM Keywords WHERE id NOT IN (SELECT DISTINCT keyword_id FROM PromptKeywords) ''') return "Keywords updated successfully." except sqlite3.Error as e: return f"Database error: {e}" def add_or_update_prompt(title, author, description, system_prompt, user_prompt, keywords=None): logging.debug(f"add_or_update_prompt: Adding or updating prompt: {title}") if not title: return "Error: Title is required." existing_prompt = fetch_prompt_details(title) if existing_prompt: # Update existing prompt result = update_prompt_in_db(title, author, description, system_prompt, user_prompt) if "successfully" in result: # Update keywords if the prompt update was successful keyword_result = update_prompt_keywords(title, keywords or []) result += f" {keyword_result}" else: # Insert new prompt result = insert_prompt_to_db(title, author, description, system_prompt, user_prompt, keywords) return result def load_prompt_details(selected_prompt): logging.debug(f"load_prompt_details: Loading prompt details for {selected_prompt}") if selected_prompt: details = fetch_prompt_details(selected_prompt) if details: return details[0], details[1], details[2], details[3], details[4], details[5] return "", "", "", "", "", "" def update_prompt_in_db(title, author, description, system_prompt, user_prompt): logging.debug(f"update_prompt_in_db: Updating prompt: {title}") try: with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute( "UPDATE Prompts SET author = ?, details = ?, system = ?, user = ? WHERE name = ?", (author, description, system_prompt, user_prompt, title) ) if cursor.rowcount == 0: return "No prompt found with the given title." return "Prompt updated successfully!" except sqlite3.Error as e: return f"Error updating prompt: {e}" def delete_prompt(prompt_id): logging.debug(f"delete_prompt: Deleting prompt with ID: {prompt_id}") try: with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() # Delete associated keywords cursor.execute("DELETE FROM PromptKeywords WHERE prompt_id = ?", (prompt_id,)) # Delete the prompt cursor.execute("DELETE FROM Prompts WHERE id = ?", (prompt_id,)) if cursor.rowcount == 0: return f"No prompt found with ID {prompt_id}" else: conn.commit() return f"Prompt with ID {prompt_id} has been successfully deleted" except sqlite3.Error as e: return f"An error occurred: {e}" def delete_prompt_keyword(keyword: str) -> str: """ Delete a keyword and its associations from the prompts database. Args: keyword (str): The keyword to delete Returns: str: Success/failure message """ logging.debug(f"delete_prompt_keyword: Deleting keyword: {keyword}") try: with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() # First normalize the keyword normalized_keyword = normalize_keyword(keyword) # Get the keyword ID cursor.execute("SELECT id FROM Keywords WHERE keyword = ?", (normalized_keyword,)) result = cursor.fetchone() if not result: return f"Keyword '{keyword}' not found." keyword_id = result[0] # Delete keyword associations from PromptKeywords cursor.execute("DELETE FROM PromptKeywords WHERE keyword_id = ?", (keyword_id,)) # Delete the keyword itself cursor.execute("DELETE FROM Keywords WHERE id = ?", (keyword_id,)) # Get the number of affected prompts affected_prompts = cursor.rowcount conn.commit() logging.info(f"Keyword '{keyword}' deleted successfully") return f"Successfully deleted keyword '{keyword}' and removed it from {affected_prompts} prompts." except sqlite3.Error as e: error_msg = f"Database error deleting keyword: {str(e)}" logging.error(error_msg) return error_msg except Exception as e: error_msg = f"Error deleting keyword: {str(e)}" logging.error(error_msg) return error_msg def export_prompt_keywords_to_csv() -> Tuple[str, str]: """ Export all prompt keywords to a CSV file with associated metadata. Returns: Tuple[str, str]: (status_message, file_path) """ import csv import tempfile import os from datetime import datetime logging.debug("export_prompt_keywords_to_csv: Starting export") try: # Create a temporary file with a specific name in the system's temp directory timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, f'prompt_keywords_export_{timestamp}.csv') with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() # Get keywords with related prompt information query = ''' SELECT k.keyword, GROUP_CONCAT(p.name, ' | ') as prompt_names, COUNT(DISTINCT p.id) as num_prompts, GROUP_CONCAT(DISTINCT p.author, ' | ') as authors FROM Keywords k LEFT JOIN PromptKeywords pk ON k.id = pk.keyword_id LEFT JOIN Prompts p ON pk.prompt_id = p.id GROUP BY k.id, k.keyword ORDER BY k.keyword ''' cursor.execute(query) results = cursor.fetchall() # Write to CSV with open(file_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) writer.writerow([ 'Keyword', 'Associated Prompts', 'Number of Prompts', 'Authors' ]) for row in results: writer.writerow([ row[0], # keyword row[1] if row[1] else '', # prompt_names (may be None) row[2], # num_prompts row[3] if row[3] else '' # authors (may be None) ]) status_msg = f"Successfully exported {len(results)} prompt keywords to CSV." logging.info(status_msg) return status_msg, file_path except sqlite3.Error as e: error_msg = f"Database error exporting keywords: {str(e)}" logging.error(error_msg) return error_msg, "None" except Exception as e: error_msg = f"Error exporting keywords: {str(e)}" logging.error(error_msg) return error_msg, "None" def view_prompt_keywords() -> str: """ View all keywords currently in the prompts database. Returns: str: Markdown formatted string of all keywords """ logging.debug("view_prompt_keywords: Retrieving all keywords") try: with sqlite3.connect(get_database_path('prompts.db')) as conn: cursor = conn.cursor() cursor.execute(""" SELECT k.keyword, COUNT(DISTINCT pk.prompt_id) as prompt_count FROM Keywords k LEFT JOIN PromptKeywords pk ON k.id = pk.keyword_id GROUP BY k.id, k.keyword ORDER BY k.keyword """) keywords = cursor.fetchall() if keywords: keyword_list = [f"- {k[0]} ({k[1]} prompts)" for k in keywords] return "### Current Prompt Keywords:\n" + "\n".join(keyword_list) return "No keywords found." except Exception as e: error_msg = f"Error retrieving keywords: {str(e)}" logging.error(error_msg) return error_msg def export_prompts( export_format='csv', filter_keywords=None, include_system=True, include_user=True, include_details=True, include_author=True, include_keywords=True, markdown_template=None ) -> Tuple[str, str]: """ Export prompts to CSV or Markdown with configurable options. Args: export_format (str): 'csv' or 'markdown' filter_keywords (List[str], optional): Keywords to filter prompts by include_system (bool): Include system prompts in export include_user (bool): Include user prompts in export include_details (bool): Include prompt details/descriptions include_author (bool): Include author information include_keywords (bool): Include associated keywords markdown_template (str, optional): Template for markdown export Returns: Tuple[str, str]: (status_message, file_path) """ import csv import tempfile import os import zipfile from datetime import datetime try: # Get prompts data with get_prompt_db_connection() as conn: cursor = conn.cursor() # Build query based on included fields select_fields = ['p.name'] if include_author: select_fields.append('p.author') if include_details: select_fields.append('p.details') if include_system: select_fields.append('p.system') if include_user: select_fields.append('p.user') query = f""" SELECT DISTINCT {', '.join(select_fields)} FROM Prompts p """ # Add keyword filtering if specified if filter_keywords: placeholders = ','.join(['?' for _ in filter_keywords]) query += f""" JOIN PromptKeywords pk ON p.id = pk.prompt_id JOIN Keywords k ON pk.keyword_id = k.id WHERE k.keyword IN ({placeholders}) """ cursor.execute(query, filter_keywords if filter_keywords else ()) prompts = cursor.fetchall() # Get keywords for each prompt if needed if include_keywords: prompt_keywords = {} for prompt in prompts: cursor.execute(""" SELECT k.keyword FROM Keywords k JOIN PromptKeywords pk ON k.id = pk.keyword_id JOIN Prompts p ON pk.prompt_id = p.id WHERE p.name = ? """, (prompt[0],)) prompt_keywords[prompt[0]] = [row[0] for row in cursor.fetchall()] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if export_format == 'csv': # Export as CSV temp_file = os.path.join(tempfile.gettempdir(), f'prompts_export_{timestamp}.csv') with open(temp_file, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) # Write header header = ['Name'] if include_author: header.append('Author') if include_details: header.append('Details') if include_system: header.append('System Prompt') if include_user: header.append('User Prompt') if include_keywords: header.append('Keywords') writer.writerow(header) # Write data for prompt in prompts: row = list(prompt) if include_keywords: row.append(', '.join(prompt_keywords.get(prompt[0], []))) writer.writerow(row) return f"Successfully exported {len(prompts)} prompts to CSV.", temp_file else: # Export as Markdown files in ZIP temp_dir = tempfile.mkdtemp() zip_path = os.path.join(tempfile.gettempdir(), f'prompts_export_{timestamp}.zip') # Define markdown templates templates = { "Basic Template": """# {title} {author_section} {details_section} {system_section} {user_section} {keywords_section} """, "Detailed Template": """# {title} ## Author {author_section} ## Description {details_section} ## System Prompt {system_section} ## User Prompt {user_section} ## Keywords {keywords_section} """ } template = templates.get(markdown_template, markdown_template or templates["Basic Template"]) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for prompt in prompts: # Create markdown content md_content = template.format( title=prompt[0], author_section=f"Author: {prompt[1]}" if include_author else "", details_section=prompt[2] if include_details else "", system_section=prompt[3] if include_system else "", user_section=prompt[4] if include_user else "", keywords_section=', '.join(prompt_keywords.get(prompt[0], [])) if include_keywords else "" ) # Create safe filename safe_filename = re.sub(r'[^\w\-_\. ]', '_', prompt[0]) md_path = os.path.join(temp_dir, f"{safe_filename}.md") # Write markdown file with open(md_path, 'w', encoding='utf-8') as f: f.write(md_content) # Add to ZIP zipf.write(md_path, os.path.basename(md_path)) return f"Successfully exported {len(prompts)} prompts to Markdown files.", zip_path except Exception as e: error_msg = f"Error exporting prompts: {str(e)}" logging.error(error_msg) return error_msg, "None" create_prompts_db() # # End of Propmts_DB.py #######################################################################################################################