File size: 2,709 Bytes
819bacd
 
 
 
 
 
 
 
fe7c659
 
819bacd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import json
from typing import List, Tuple

import numpy as np
from fastapi.responses import JSONResponse
from sentence_transformers import SentenceTransformer
from transformers import pipeline

from app.db_local_storage.files_db import VECTOR_FILES_DIRECTORY
from app.db_local_storage.in_memory_db import query_response_storage


class QuerySearchFeature:

    def __init__(self, model, qa_pipeline):
        self.model = model
        self.qa_pipeline = qa_pipeline

    async def query_search(self, query: str) -> dict:

        user_query = {
            "text": query,
            "isSender": True,
        }

        query_response_storage.append(user_query)

        dataBase = await QuerySearchFeature.load_data()
        text_data, embeddings = await QuerySearchFeature.split_dataBase(dataBase)

        lexical_results = await QuerySearchFeature.lexical_search(query, text_data)
        semantic_results = await QuerySearchFeature.semantic_search(
            query, text_data, embeddings, self.model
        )

        combined_results = list(set(lexical_results + semantic_results))
        context = await QuerySearchFeature.get_context(combined_results)

        response = self.qa_pipeline(question=query, context=context)

        response_query = {
            "text": response["answer"],
            "isSender": False,
        }

        query_response_storage.append(response_query)


        return {
            "message": response["answer"],
            "context_used": context,
            "chunks": context,
        }

    @staticmethod
    async def semantic_search(
        query: str, chunks: List[str], embeddings: np.ndarray, model
    ) -> List[str]:
        query_embedding = model.encode([query])
        similarities = np.dot(embeddings, query_embedding.T).flatten()
        top_indices = np.argsort(-similarities)[:3]
        return [chunks[i] for i in top_indices]

    @staticmethod
    async def lexical_search(query: str, chunks: List[str]) -> List[str]:
        return [chunk for chunk in chunks if query.lower() in chunk.lower()]

    @staticmethod
    async def load_data():
        with open(VECTOR_FILES_DIRECTORY, "r") as file:
            dataBase = json.load(file)
        return dataBase

    @staticmethod
    async def split_dataBase(db) -> Tuple[List[str], np.ndarray]:
        text_data = []
        embeddings = []

        for document in db.values():
            for page in document["data"]:
                text_data.append(page["metadata"]["original_text"])
                embeddings.append(page["embedding"])
        return text_data, embeddings

    @staticmethod
    async def get_context(chunks: List[str]) -> str:
        return " ".join(chunks)