import os import PyPDF2 import pandas as pd import warnings import re from transformers import DPRContextEncoder, DPRContextEncoderTokenizer from transformers import DPRQuestionEncoder, DPRQuestionEncoderTokenizer import torch import gradio as gr from typing import Union from datasets import Dataset warnings.filterwarnings("ignore") torch.set_grad_enabled(False) ctx_encoder = DPRContextEncoder.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base") ctx_tokenizer = DPRContextEncoderTokenizer.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base") q_encoder = DPRQuestionEncoder.from_pretrained("facebook/dpr-question_encoder-single-nq-base") q_tokenizer = DPRQuestionEncoderTokenizer.from_pretrained("facebook/dpr-question_encoder-single-nq-base") def process_pdfs(parent_dir: Union[str,list]): """ processess the PDF files and returns a dataframe with the text of each page in a different line""" # XD # creating a pdf file object df = pd.DataFrame(columns = ["title","text"]) if type(parent_dir) == str : parent_dir = [parent_dir] for file_path in parent_dir: # creating a pdf file object pdfFileObj = open(file_path, 'rb') # creating a pdf reader object pdfReader = PyPDF2.PdfReader(pdfFileObj) # printing number of pages in pdf file num_pages = len(pdfReader.pages) for i in range(num_pages) : pageObj = pdfReader.pages[i] # extracting text from page txt = pageObj.extract_text() txt = txt.replace("\n","") # strip return to line txt = txt.replace("\t","") # strip tabs txt = re.sub(r" +"," ",txt) # strip extra space # 512 is related to the positional encoding "facebook/dpr-ctx_encoder-single-nq-base" model if len(txt) < 512 : file_name = file_path.split("/")[-1] new_data = {"title":f"{file_name}-page-{i}","text":txt} df = df.append(new_data,ignore_index=True) else : while len(txt) > 512 : new_data = {"title":f"{file_name}-pg{i}","text":txt[:512]} df = df.append(new_data,ignore_index=True) txt = txt[512:] # closing the pdf file object pdfFileObj.close() return df def process(example): """process the bathces of the dataset and returns the embeddings""" tokens = ctx_tokenizer(example["text"], return_tensors="pt") embed = ctx_encoder(**tokens)[0][0].numpy() return {'embeddings': embed} def process_dataset(df): """processess the dataframe and returns a dataset variable""" ds = Dataset.from_pandas(df) ds = ds.map(process) ds.add_faiss_index(column='embeddings') # add faiss index return ds def search(query, ds, k=3): """searches the query in the dataset and returns the k most similar""" tokens = q_tokenizer(query, return_tensors="pt") query_embed = q_encoder(**tokens)[0][0].numpy() scores, retrieved_examples = ds.get_nearest_examples("embeddings", query_embed, k=k) out = f"""title : {retrieved_examples["title"][0]},\ncontent: {retrieved_examples["text"][0]} similar resources: {retrieved_examples["title"]} """ return out def predict(query,file_paths, k=3): """predicts the most similar files to the query""" df = process_pdfs(file_paths) ds = process_dataset(df) return search(query,ds,k=k) with gr.Blocks() as demo : with gr.Column(): files = gr.Files(label="Upload PDFs",type="filepath",file_count="multiple") query = gr.Text(label="query") with gr.Accordion(): k = gr.Number(label="number of results",value=3) button = gr.Button("search") with gr.Column(): output = gr.Textbox(label="output") button.click(predict, [query,files,k],outputs=output) demo.launch()