from llama_index.core.ingestion import IngestionPipeline from llama_index.core.extractors import PydanticProgramExtractor from llama_index.embeddings.openai import OpenAIEmbedding from config import PINECONE_CONFIG from pinecone.grpc import PineconeGRPC as Pinecone from service.reader import Reader from script.get_metadata import Metadata from fastapi import UploadFile, HTTPException,status from llama_index.core.node_parser import ( SentenceSplitter, SemanticSplitterNodeParser, ) # from script.get_topic import extract_topic import logging import random class Uploader: # def __init__(self, reference, file: UploadFile, content_table: UploadFile): def __init__(self, reference, file: UploadFile): self.file = file # self.content_table = content_table self.reader = Reader() self.reference = reference self.metadata = Metadata(reference) async def ingest_documents(self, file: UploadFile): """Load documents from the storage path.""" documents = await self.reader.read_from_uploadfile(file) print("document successfully ingested") return documents def check_existing_metadata(self, pinecone_index, title, random_vector): try: result = pinecone_index.query( vector=random_vector, top_k=1, filter={ "title": {"$eq": title}, }, ) return result["matches"] except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error check existing metadata {str(e)}", ) async def process_documents(self): # Ingest documents print("test") documents = await self.ingest_documents(self.file) print("documents") # topic_extractor = extract_topic(self.reference, self.content_table) embed_model = OpenAIEmbedding() # Get metadata documents_with_metadata = self.metadata.apply_metadata(documents) print("documents_with_metadata") # document_filtered = self.filter_document(documents_with_metadata) # Set up the ingestion pipeline # pipeline = IngestionPipeline( # transformations=[ # SemanticSplitterNodeParser( # buffer_size=1, # breakpoint_percentile_threshold=95, # embed_model=embed_model, # ), # # topic_extractor, # ] # ) splitter = SemanticSplitterNodeParser( buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model ) # Run the pipeline try: # nodes_with_metadata = pipeline.run(documents=documents_with_metadata) nodes_with_metadata = splitter.get_nodes_from_documents(documents_with_metadata) print("Pipeline processing completed updated.") return nodes_with_metadata except Exception as e: # Log the error and raise HTTPException for FastAPI logging.error(f"An error occurred in making pipeline: {e}") raise HTTPException( status_code=500, detail="An internal server error occurred making pipeline.", ) def filter_document(self, documents): api_key = PINECONE_CONFIG.PINECONE_API_KEY client = Pinecone(api_key=api_key) pinecone_index = client.Index("test") random_vector = [random.uniform(0, 1) for _ in range(1536)] filtered_documents = [] for doc in documents: result = self.check_existing_metadata( pinecone_index, doc.metadata["title"], random_vector ) if len(result) == 0: filtered_documents.append(doc) return filtered_documents