from script.vector_db import IndexManager from script.document_uploader import Uploader from db.save_data import InsertDatabase from db.get_data import GetDatabase from db.delete_data import DeleteDatabase from db.update_data import UpdateDatabase from typing import Any, Optional, List from fastapi import UploadFile from fastapi import HTTPException from service.dto import ChatMessage from core.chat.engine import Engine from core.chat.chatstore import ChatStore from core.parser import clean_text, update_response, renumber_sources, seperate_to_list from llama_index.core.llms import MessageRole from service.dto import BotResponseStreaming from service.aws_loader import Loader import logging import re import json # Configure logging logging.basicConfig(level=logging.INFO) # async def data_ingestion( # db_conn, reference, file: UploadFile, content_table: UploadFile # ) -> Any: async def data_ingestion(db_conn, reference, file: UploadFile) -> Any: try: # insert_database = InsertDatabase(db_conn) file_name = f"{reference['title']}" aws_loader = Loader() file_obj = file aws_loader.upload_to_s3(file_obj, file_name) print("Uploaded Success") response = json.dumps({"status": "success", "message": "Vector Index loaded successfully."}) # Insert data into the database # await insert_database.insert_data(reference) # # uploader = Uploader(reference, file, content_table) # uploader = Uploader(reference, file) # print("uploader : ", uploader) # nodes_with_metadata = await uploader.process_documents() # # Build indexes using IndexManager # index = IndexManager() # response = index.build_indexes(nodes_with_metadata) return response except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in data ingestion: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred in data ingestion.", ) async def get_data(db_conn, title="", fetch_all_data=True): get_database = GetDatabase(db_conn) print(get_database) try: if fetch_all_data: results = await get_database.get_all_data() print(results) logging.info("Database fetched all data") return results else: results = await get_database.get_data(title) logging.info("Database fetched one data") return results except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in get data.: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred in get data." ) async def update_data(id: int, reference, db_conn): update_database = UpdateDatabase(db_conn) try: reference = reference.model_dump() print(reference) reference.update({"id": id}) print(reference) await update_database.update_record(reference) response = {"status": "Update Success"} return response except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in update data.: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred in update data." ) async def delete_data(id: int, db_conn): delete_database = DeleteDatabase(db_conn) try: params = {"id": id} await delete_database.delete_record(params) response = {"status": "Delete Success"} return response except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in get data.: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred in delete data." ) def generate_completion_non_streaming( session_id, user_request, chat_engine, title=None, category=None, type="general" ): try: engine = Engine() index_manager = IndexManager() chatstore = ChatStore() # Load existing indexes index = index_manager.load_existing_indexes() if type == "general": # Retrieve the chat engine with the loaded index chat_engine = engine.get_chat_engine(session_id, index) else: # Retrieve the chat engine with the loaded index chat_engine = engine.get_chat_engine( session_id, index, title=title, category=category ) # Generate completion response response = chat_engine.chat(user_request) sources = response.sources print(sources) number_reference = list(set(re.findall(r"\[(\d+)\]", str(response)))) number_reference_sorted = sorted(number_reference) contents = [] raw_contents = [] metadata_collection = [] scores = [] if number_reference_sorted: for number in number_reference_sorted: # Konversi number ke integer untuk digunakan sebagai indeks number = int(number) # Pastikan sources tidak kosong dan memiliki elemen yang diperlukan if sources and len(sources) > 0: node = dict(sources[0])["raw_output"].source_nodes # Pastikan number valid sebagai indeks if 0 <= number - 1 < len(node): raw_content = seperate_to_list(node[number - 1].node.get_text()) raw_contents.append(raw_content) content = clean_text(node[number - 1].node.get_text()) contents.append(content) metadata = dict(node[number - 1].node.metadata) metadata_collection.append(metadata) score = node[number - 1].score scores.append(score) else: print(f"Invalid reference number: {number}") else: print("No sources available") else: print("There are no references") response = update_response(str(response)) contents = renumber_sources(contents) # Check the lengths of content and metadata num_content = len(contents) num_metadata = len(metadata_collection) # Add content to metadata for i in range(min(num_content, num_metadata)): metadata_collection[i]["content"] = re.sub(r"source \d+\:", "", contents[i]) message = ChatMessage( role=MessageRole.ASSISTANT, content=response, metadata=metadata_collection ) chatstore.delete_last_message(session_id) chatstore.add_message(session_id, message) chatstore.clean_message(session_id) return str(response), raw_contents, contents, metadata_collection, scores except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in generate text: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred in generate text.", ) async def generate_streaming_completion(user_request, chat_engine): try: engine = Engine() index_manager = IndexManager() # Load existing indexes index = index_manager.load_existing_indexes() # Retrieve the chat engine with the loaded index chat_engine = engine.get_chat_engine(index) # Generate completion response response = chat_engine.stream_chat(user_request) completed_response = "" for gen in response.response_gen: completed_response += gen # Concatenate the new string yield BotResponseStreaming( content=gen, completed_content=completed_response ) nodes = response.source_nodes for node in nodes: reference = str(clean_text(node.node.get_text())) metadata = dict(node.node.metadata) score = float(node.score) yield BotResponseStreaming( completed_content=completed_response, reference=reference, metadata=metadata, score=score, ) except Exception as e: yield {"error": str(e)} except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in generate text: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred in generate text.", )