|
import os |
|
import sqlite3 |
|
|
|
|
|
class MemoryDB: |
|
def __init__(self, db=None): |
|
self.db_file = db |
|
if db is None: |
|
self.db_file = f"{os.getcwd()}/mem.sqlite3" |
|
|
|
try: |
|
self.cnx = sqlite3.connect(self.db_file) |
|
except Exception as e: |
|
print("Exception connecting to memory database file:", e) |
|
self.cnx = None |
|
finally: |
|
if self.cnx is None: |
|
|
|
self.db_file = ":memory:" |
|
self.cnx = sqlite3.connect(self.db_file) |
|
self.cnx.execute( |
|
"CREATE VIRTUAL TABLE \ |
|
IF NOT EXISTS text USING FTS5 \ |
|
(session, \ |
|
key, \ |
|
block);" |
|
) |
|
self.session_id = int(self.get_max_session_id()) + 1 |
|
self.cnx.commit() |
|
|
|
def get_cnx(self): |
|
if self.cnx is None: |
|
self.cnx = sqlite3.connect(self.db_file) |
|
return self.cnx |
|
|
|
|
|
def get_max_session_id(self): |
|
id = None |
|
cmd_str = f"SELECT MAX(session) FROM text;" |
|
cnx = self.get_cnx() |
|
max_id = cnx.execute(cmd_str).fetchone()[0] |
|
if max_id is None: |
|
id = 0 |
|
else: |
|
id = max_id |
|
return id |
|
|
|
|
|
def get_next_key(self): |
|
next_key = None |
|
cmd_str = f"SELECT MAX(key) FROM text \ |
|
where session = {self.session_id};" |
|
cnx = self.get_cnx() |
|
next_key = cnx.execute(cmd_str).fetchone()[0] |
|
if next_key is None: |
|
next_key = 0 |
|
else: |
|
next_key = int(next_key) + 1 |
|
return next_key |
|
|
|
|
|
def insert(self, text=None): |
|
if text is not None: |
|
key = self.get_next_key() |
|
session_id = self.session_id |
|
cmd_str = f"REPLACE INTO text(session, key, block) \ |
|
VALUES (?, ?, ?);" |
|
cnx = self.get_cnx() |
|
cnx.execute(cmd_str, (session_id, key, text)) |
|
cnx.commit() |
|
|
|
|
|
def overwrite(self, key, text): |
|
self.delete_memory(key) |
|
session_id = self.session_id |
|
cmd_str = f"REPLACE INTO text(session, key, block) \ |
|
VALUES (?, ?, ?);" |
|
cnx = self.get_cnx() |
|
cnx.execute(cmd_str, (session_id, key, text)) |
|
cnx.commit() |
|
|
|
def delete_memory(self, key, session_id=None): |
|
session = session_id |
|
if session is None: |
|
session = self.session_id |
|
cmd_str = f"DELETE FROM text WHERE session = {session} AND key = {key};" |
|
cnx = self.get_cnx() |
|
cnx.execute(cmd_str) |
|
cnx.commit() |
|
|
|
def search(self, text): |
|
cmd_str = f"SELECT * FROM text('{text}')" |
|
cnx = self.get_cnx() |
|
rows = cnx.execute(cmd_str).fetchall() |
|
lines = [] |
|
for r in rows: |
|
lines.append(r[2]) |
|
return lines |
|
|
|
|
|
def get_session(self, id=None): |
|
if id is None: |
|
id = self.session_id |
|
cmd_str = f"SELECT * FROM text where session = {id}" |
|
cnx = self.get_cnx() |
|
rows = cnx.execute(cmd_str).fetchall() |
|
lines = [] |
|
for r in rows: |
|
lines.append(r[2]) |
|
return lines |
|
|
|
|
|
def quit(self): |
|
self.cnx.commit() |
|
self.cnx.close() |
|
|
|
|
|
permanent_memory = MemoryDB() |
|
|
|
|
|
|
|
|
|
|
|
|