oceansweep's picture
Upload 169 files
c5b0bb7 verified
raw
history blame
24.2 kB
# Prompts_DB.py
# Description: Functions to manage the prompts database.
#
# Imports
import sqlite3
import logging
#
# External Imports
import re
from typing import Tuple
#
# Local Imports
from App_Function_Libraries.Utils.Utils import get_database_path
#
#######################################################################################################################
#
# Functions to manage prompts DB
def create_prompts_db():
logging.debug("create_prompts_db: Creating prompts database.")
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.executescript('''
CREATE TABLE IF NOT EXISTS Prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
author TEXT,
details TEXT,
system TEXT,
user TEXT
);
CREATE TABLE IF NOT EXISTS Keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL UNIQUE COLLATE NOCASE
);
CREATE TABLE IF NOT EXISTS PromptKeywords (
prompt_id INTEGER,
keyword_id INTEGER,
FOREIGN KEY (prompt_id) REFERENCES Prompts (id),
FOREIGN KEY (keyword_id) REFERENCES Keywords (id),
PRIMARY KEY (prompt_id, keyword_id)
);
CREATE INDEX IF NOT EXISTS idx_keywords_keyword ON Keywords(keyword);
CREATE INDEX IF NOT EXISTS idx_promptkeywords_prompt_id ON PromptKeywords(prompt_id);
CREATE INDEX IF NOT EXISTS idx_promptkeywords_keyword_id ON PromptKeywords(keyword_id);
''')
# FIXME - dirty hack that should be removed later...
# Migration function to add the 'author' column to the Prompts table
def add_author_column_to_prompts():
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
# Check if 'author' column already exists
cursor.execute("PRAGMA table_info(Prompts)")
columns = [col[1] for col in cursor.fetchall()]
if 'author' not in columns:
# Add the 'author' column
cursor.execute('ALTER TABLE Prompts ADD COLUMN author TEXT')
print("Author column added to Prompts table.")
else:
print("Author column already exists in Prompts table.")
add_author_column_to_prompts()
def normalize_keyword(keyword):
return re.sub(r'\s+', ' ', keyword.strip().lower())
# FIXME - update calls to this function to use the new args
def add_prompt(name, author, details, system=None, user=None, keywords=None):
logging.debug(f"add_prompt: Adding prompt with name: {name}, author: {author}, system: {system}, user: {user}, keywords: {keywords}")
if not name:
logging.error("add_prompt: A name is required.")
return "A name is required."
try:
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO Prompts (name, author, details, system, user)
VALUES (?, ?, ?, ?, ?)
''', (name, author, details, system, user))
prompt_id = cursor.lastrowid
if keywords:
normalized_keywords = [normalize_keyword(k) for k in keywords if k.strip()]
for keyword in set(normalized_keywords): # Use set to remove duplicates
cursor.execute('''
INSERT OR IGNORE INTO Keywords (keyword) VALUES (?)
''', (keyword,))
cursor.execute('SELECT id FROM Keywords WHERE keyword = ?', (keyword,))
keyword_id = cursor.fetchone()[0]
cursor.execute('''
INSERT OR IGNORE INTO PromptKeywords (prompt_id, keyword_id) VALUES (?, ?)
''', (prompt_id, keyword_id))
return "Prompt added successfully."
except sqlite3.IntegrityError:
return "Prompt with this name already exists."
except sqlite3.Error as e:
return f"Database error: {e}"
def fetch_prompt_details(name):
logging.debug(f"fetch_prompt_details: Fetching details for prompt: {name}")
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT p.name, p.author, p.details, p.system, p.user, GROUP_CONCAT(k.keyword, ', ') as keywords
FROM Prompts p
LEFT JOIN PromptKeywords pk ON p.id = pk.prompt_id
LEFT JOIN Keywords k ON pk.keyword_id = k.id
WHERE p.name = ?
GROUP BY p.id
''', (name,))
return cursor.fetchone()
def list_prompts(page=1, per_page=10):
logging.debug(f"list_prompts: Listing prompts for page {page} with {per_page} prompts per page.")
offset = (page - 1) * per_page
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute('SELECT name FROM Prompts LIMIT ? OFFSET ?', (per_page, offset))
prompts = [row[0] for row in cursor.fetchall()]
# Get total count of prompts
cursor.execute('SELECT COUNT(*) FROM Prompts')
total_count = cursor.fetchone()[0]
total_pages = (total_count + per_page - 1) // per_page
return prompts, total_pages, page
def insert_prompt_to_db(title, author, description, system_prompt, user_prompt, keywords=None):
return add_prompt(title, author, description, system_prompt, user_prompt, keywords)
def get_prompt_db_connection():
prompt_db_path = get_database_path('prompts.db')
return sqlite3.connect(prompt_db_path)
def search_prompts(query):
logging.debug(f"search_prompts: Searching prompts with query: {query}")
try:
with get_prompt_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT p.name, p.details, p.system, p.user, GROUP_CONCAT(k.keyword, ', ') as keywords
FROM Prompts p
LEFT JOIN PromptKeywords pk ON p.id = pk.prompt_id
LEFT JOIN Keywords k ON pk.keyword_id = k.id
WHERE p.name LIKE ? OR p.details LIKE ? OR p.system LIKE ? OR p.user LIKE ? OR k.keyword LIKE ?
GROUP BY p.id
ORDER BY p.name
""", (f'%{query}%', f'%{query}%', f'%{query}%', f'%{query}%', f'%{query}%'))
return cursor.fetchall()
except sqlite3.Error as e:
logging.error(f"Error searching prompts: {e}")
return []
def search_prompts_by_keyword(keyword, page=1, per_page=10):
logging.debug(f"search_prompts_by_keyword: Searching prompts by keyword: {keyword}")
normalized_keyword = normalize_keyword(keyword)
offset = (page - 1) * per_page
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT DISTINCT p.name
FROM Prompts p
JOIN PromptKeywords pk ON p.id = pk.prompt_id
JOIN Keywords k ON pk.keyword_id = k.id
WHERE k.keyword LIKE ?
LIMIT ? OFFSET ?
''', ('%' + normalized_keyword + '%', per_page, offset))
prompts = [row[0] for row in cursor.fetchall()]
# Get total count of matching prompts
cursor.execute('''
SELECT COUNT(DISTINCT p.id)
FROM Prompts p
JOIN PromptKeywords pk ON p.id = pk.prompt_id
JOIN Keywords k ON pk.keyword_id = k.id
WHERE k.keyword LIKE ?
''', ('%' + normalized_keyword + '%',))
total_count = cursor.fetchone()[0]
total_pages = (total_count + per_page - 1) // per_page
return prompts, total_pages, page
def update_prompt_keywords(prompt_name, new_keywords):
logging.debug(f"update_prompt_keywords: Updating keywords for prompt: {prompt_name}")
try:
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id FROM Prompts WHERE name = ?', (prompt_name,))
prompt_id = cursor.fetchone()
if not prompt_id:
return "Prompt not found."
prompt_id = prompt_id[0]
cursor.execute('DELETE FROM PromptKeywords WHERE prompt_id = ?', (prompt_id,))
normalized_keywords = [normalize_keyword(k) for k in new_keywords if k.strip()]
for keyword in set(normalized_keywords): # Use set to remove duplicates
cursor.execute('INSERT OR IGNORE INTO Keywords (keyword) VALUES (?)', (keyword,))
cursor.execute('SELECT id FROM Keywords WHERE keyword = ?', (keyword,))
keyword_id = cursor.fetchone()[0]
cursor.execute('INSERT INTO PromptKeywords (prompt_id, keyword_id) VALUES (?, ?)',
(prompt_id, keyword_id))
# Remove unused keywords
cursor.execute('''
DELETE FROM Keywords
WHERE id NOT IN (SELECT DISTINCT keyword_id FROM PromptKeywords)
''')
return "Keywords updated successfully."
except sqlite3.Error as e:
return f"Database error: {e}"
def add_or_update_prompt(title, author, description, system_prompt, user_prompt, keywords=None):
logging.debug(f"add_or_update_prompt: Adding or updating prompt: {title}")
if not title:
return "Error: Title is required."
existing_prompt = fetch_prompt_details(title)
if existing_prompt:
# Update existing prompt
result = update_prompt_in_db(title, author, description, system_prompt, user_prompt)
if "successfully" in result:
# Update keywords if the prompt update was successful
keyword_result = update_prompt_keywords(title, keywords or [])
result += f" {keyword_result}"
else:
# Insert new prompt
result = insert_prompt_to_db(title, author, description, system_prompt, user_prompt, keywords)
return result
def load_prompt_details(selected_prompt):
logging.debug(f"load_prompt_details: Loading prompt details for {selected_prompt}")
if selected_prompt:
details = fetch_prompt_details(selected_prompt)
if details:
return details[0], details[1], details[2], details[3], details[4], details[5]
return "", "", "", "", "", ""
def update_prompt_in_db(title, author, description, system_prompt, user_prompt):
logging.debug(f"update_prompt_in_db: Updating prompt: {title}")
try:
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE Prompts SET author = ?, details = ?, system = ?, user = ? WHERE name = ?",
(author, description, system_prompt, user_prompt, title)
)
if cursor.rowcount == 0:
return "No prompt found with the given title."
return "Prompt updated successfully!"
except sqlite3.Error as e:
return f"Error updating prompt: {e}"
def delete_prompt(prompt_id):
logging.debug(f"delete_prompt: Deleting prompt with ID: {prompt_id}")
try:
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
# Delete associated keywords
cursor.execute("DELETE FROM PromptKeywords WHERE prompt_id = ?", (prompt_id,))
# Delete the prompt
cursor.execute("DELETE FROM Prompts WHERE id = ?", (prompt_id,))
if cursor.rowcount == 0:
return f"No prompt found with ID {prompt_id}"
else:
conn.commit()
return f"Prompt with ID {prompt_id} has been successfully deleted"
except sqlite3.Error as e:
return f"An error occurred: {e}"
def delete_prompt_keyword(keyword: str) -> str:
"""
Delete a keyword and its associations from the prompts database.
Args:
keyword (str): The keyword to delete
Returns:
str: Success/failure message
"""
logging.debug(f"delete_prompt_keyword: Deleting keyword: {keyword}")
try:
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
# First normalize the keyword
normalized_keyword = normalize_keyword(keyword)
# Get the keyword ID
cursor.execute("SELECT id FROM Keywords WHERE keyword = ?", (normalized_keyword,))
result = cursor.fetchone()
if not result:
return f"Keyword '{keyword}' not found."
keyword_id = result[0]
# Delete keyword associations from PromptKeywords
cursor.execute("DELETE FROM PromptKeywords WHERE keyword_id = ?", (keyword_id,))
# Delete the keyword itself
cursor.execute("DELETE FROM Keywords WHERE id = ?", (keyword_id,))
# Get the number of affected prompts
affected_prompts = cursor.rowcount
conn.commit()
logging.info(f"Keyword '{keyword}' deleted successfully")
return f"Successfully deleted keyword '{keyword}' and removed it from {affected_prompts} prompts."
except sqlite3.Error as e:
error_msg = f"Database error deleting keyword: {str(e)}"
logging.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error deleting keyword: {str(e)}"
logging.error(error_msg)
return error_msg
def export_prompt_keywords_to_csv() -> Tuple[str, str]:
"""
Export all prompt keywords to a CSV file with associated metadata.
Returns:
Tuple[str, str]: (status_message, file_path)
"""
import csv
import tempfile
import os
from datetime import datetime
logging.debug("export_prompt_keywords_to_csv: Starting export")
try:
# Create a temporary file with a specific name in the system's temp directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, f'prompt_keywords_export_{timestamp}.csv')
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
# Get keywords with related prompt information
query = '''
SELECT
k.keyword,
GROUP_CONCAT(p.name, ' | ') as prompt_names,
COUNT(DISTINCT p.id) as num_prompts,
GROUP_CONCAT(DISTINCT p.author, ' | ') as authors
FROM Keywords k
LEFT JOIN PromptKeywords pk ON k.id = pk.keyword_id
LEFT JOIN Prompts p ON pk.prompt_id = p.id
GROUP BY k.id, k.keyword
ORDER BY k.keyword
'''
cursor.execute(query)
results = cursor.fetchall()
# Write to CSV
with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
'Keyword',
'Associated Prompts',
'Number of Prompts',
'Authors'
])
for row in results:
writer.writerow([
row[0], # keyword
row[1] if row[1] else '', # prompt_names (may be None)
row[2], # num_prompts
row[3] if row[3] else '' # authors (may be None)
])
status_msg = f"Successfully exported {len(results)} prompt keywords to CSV."
logging.info(status_msg)
return status_msg, file_path
except sqlite3.Error as e:
error_msg = f"Database error exporting keywords: {str(e)}"
logging.error(error_msg)
return error_msg, "None"
except Exception as e:
error_msg = f"Error exporting keywords: {str(e)}"
logging.error(error_msg)
return error_msg, "None"
def view_prompt_keywords() -> str:
"""
View all keywords currently in the prompts database.
Returns:
str: Markdown formatted string of all keywords
"""
logging.debug("view_prompt_keywords: Retrieving all keywords")
try:
with sqlite3.connect(get_database_path('prompts.db')) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT k.keyword, COUNT(DISTINCT pk.prompt_id) as prompt_count
FROM Keywords k
LEFT JOIN PromptKeywords pk ON k.id = pk.keyword_id
GROUP BY k.id, k.keyword
ORDER BY k.keyword
""")
keywords = cursor.fetchall()
if keywords:
keyword_list = [f"- {k[0]} ({k[1]} prompts)" for k in keywords]
return "### Current Prompt Keywords:\n" + "\n".join(keyword_list)
return "No keywords found."
except Exception as e:
error_msg = f"Error retrieving keywords: {str(e)}"
logging.error(error_msg)
return error_msg
def export_prompts(
export_format='csv',
filter_keywords=None,
include_system=True,
include_user=True,
include_details=True,
include_author=True,
include_keywords=True,
markdown_template=None
) -> Tuple[str, str]:
"""
Export prompts to CSV or Markdown with configurable options.
Args:
export_format (str): 'csv' or 'markdown'
filter_keywords (List[str], optional): Keywords to filter prompts by
include_system (bool): Include system prompts in export
include_user (bool): Include user prompts in export
include_details (bool): Include prompt details/descriptions
include_author (bool): Include author information
include_keywords (bool): Include associated keywords
markdown_template (str, optional): Template for markdown export
Returns:
Tuple[str, str]: (status_message, file_path)
"""
import csv
import tempfile
import os
import zipfile
from datetime import datetime
try:
# Get prompts data
with get_prompt_db_connection() as conn:
cursor = conn.cursor()
# Build query based on included fields
select_fields = ['p.name']
if include_author:
select_fields.append('p.author')
if include_details:
select_fields.append('p.details')
if include_system:
select_fields.append('p.system')
if include_user:
select_fields.append('p.user')
query = f"""
SELECT DISTINCT {', '.join(select_fields)}
FROM Prompts p
"""
# Add keyword filtering if specified
if filter_keywords:
placeholders = ','.join(['?' for _ in filter_keywords])
query += f"""
JOIN PromptKeywords pk ON p.id = pk.prompt_id
JOIN Keywords k ON pk.keyword_id = k.id
WHERE k.keyword IN ({placeholders})
"""
cursor.execute(query, filter_keywords if filter_keywords else ())
prompts = cursor.fetchall()
# Get keywords for each prompt if needed
if include_keywords:
prompt_keywords = {}
for prompt in prompts:
cursor.execute("""
SELECT k.keyword
FROM Keywords k
JOIN PromptKeywords pk ON k.id = pk.keyword_id
JOIN Prompts p ON pk.prompt_id = p.id
WHERE p.name = ?
""", (prompt[0],))
prompt_keywords[prompt[0]] = [row[0] for row in cursor.fetchall()]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if export_format == 'csv':
# Export as CSV
temp_file = os.path.join(tempfile.gettempdir(), f'prompts_export_{timestamp}.csv')
with open(temp_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# Write header
header = ['Name']
if include_author:
header.append('Author')
if include_details:
header.append('Details')
if include_system:
header.append('System Prompt')
if include_user:
header.append('User Prompt')
if include_keywords:
header.append('Keywords')
writer.writerow(header)
# Write data
for prompt in prompts:
row = list(prompt)
if include_keywords:
row.append(', '.join(prompt_keywords.get(prompt[0], [])))
writer.writerow(row)
return f"Successfully exported {len(prompts)} prompts to CSV.", temp_file
else:
# Export as Markdown files in ZIP
temp_dir = tempfile.mkdtemp()
zip_path = os.path.join(tempfile.gettempdir(), f'prompts_export_{timestamp}.zip')
# Define markdown templates
templates = {
"Basic Template": """# {title}
{author_section}
{details_section}
{system_section}
{user_section}
{keywords_section}
""",
"Detailed Template": """# {title}
## Author
{author_section}
## Description
{details_section}
## System Prompt
{system_section}
## User Prompt
{user_section}
## Keywords
{keywords_section}
"""
}
template = templates.get(markdown_template, markdown_template or templates["Basic Template"])
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for prompt in prompts:
# Create markdown content
md_content = template.format(
title=prompt[0],
author_section=f"Author: {prompt[1]}" if include_author else "",
details_section=prompt[2] if include_details else "",
system_section=prompt[3] if include_system else "",
user_section=prompt[4] if include_user else "",
keywords_section=', '.join(prompt_keywords.get(prompt[0], [])) if include_keywords else ""
)
# Create safe filename
safe_filename = re.sub(r'[^\w\-_\. ]', '_', prompt[0])
md_path = os.path.join(temp_dir, f"{safe_filename}.md")
# Write markdown file
with open(md_path, 'w', encoding='utf-8') as f:
f.write(md_content)
# Add to ZIP
zipf.write(md_path, os.path.basename(md_path))
return f"Successfully exported {len(prompts)} prompts to Markdown files.", zip_path
except Exception as e:
error_msg = f"Error exporting prompts: {str(e)}"
logging.error(error_msg)
return error_msg, "None"
create_prompts_db()
#
# End of Propmts_DB.py
#######################################################################################################################