Spaces:
Runtime error
Runtime error
File size: 5,668 Bytes
39e845d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain import OpenAI, Cohere
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
from embeddings import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.docstore.document import Document
from langchain.vectorstores import FAISS, VectorStore
import docx2txt
from typing import List, Dict, Any
import re
import numpy as np
from io import StringIO
from io import BytesIO
import streamlit as st
from prompts import STUFF_PROMPT
from pypdf import PdfReader
from openai.error import AuthenticationError
import pptx
@st.experimental_memo()
def parse_docx(file: BytesIO) -> str:
text = docx2txt.process(file)
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
return text
@st.experimental_memo()
def parse_pdf(file: BytesIO) -> List[str]:
pdf = PdfReader(file)
output = []
for page in pdf.pages:
text = page.extract_text()
# Merge hyphenated words
text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text)
# Fix newlines in the middle of sentences
text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip())
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
output.append(text)
return output
@st.experimental_memo()
def parse_txt(file: BytesIO) -> str:
text = file.read().decode("utf-8")
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
return text
@st.experimental_memo()
def parse_pptx(file: BytesIO) -> str:
ppt_file = pptx.Presentation(file)
string_data = ""
for slide in ppt_file.slides:
for shape in slide.shapes:
if shape.has_text_frame:
string_data += shape.text_frame.text + '\n'
return string_data
@st.experimental_memo()
def parse_csv(uploaded_file):
# To read file as bytes:
#bytes_data = uploaded_file.getvalue()
#st.write(bytes_data)
# To convert to a string based IO:
stringio = StringIO(uploaded_file.getvalue().decode("utf-8"))
#st.write(stringio)
# To read file as string:
string_data = stringio.read()
#st.write(string_data)
# Can be used wherever a "file-like" object is accepted:
# dataframe = pd.read_csv(uploaded_file)
return string_data
@st.cache(allow_output_mutation=True)
def text_to_docs(text: str) -> List[Document]:
"""Converts a string or list of strings to a list of Documents
with metadata."""
if isinstance(text, str):
# Take a single string as one page
text = [text]
page_docs = [Document(page_content=page) for page in text]
# Add page numbers as metadata
for i, doc in enumerate(page_docs):
doc.metadata["page"] = i + 1
# Split pages into chunks
doc_chunks = []
for doc in page_docs:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
chunk_overlap=0,
)
chunks = text_splitter.split_text(doc.page_content)
for i, chunk in enumerate(chunks):
doc = Document(
page_content=chunk, metadata={"page": doc.metadata["page"], "chunk": i}
)
# Add sources a metadata
doc.metadata["source"] = f"{doc.metadata['page']}-{doc.metadata['chunk']}"
doc_chunks.append(doc)
return doc_chunks
@st.cache(allow_output_mutation=True, show_spinner=False)
def embed_docs(docs: List[Document]) -> VectorStore:
"""Embeds a list of Documents and returns a FAISS index"""
if not st.session_state.get("OPENAI_API_KEY"):
raise AuthenticationError(
"Enter your OpenAI API key in the sidebar. You can get a key at https://platform.openai.com/account/api-keys."
)
else:
# Embed the chunks
embeddings = OpenAIEmbeddings(openai_api_key=st.session_state.get("OPENAI_API_KEY")) # type: ignore
index = FAISS.from_documents(docs, embeddings)
return index
@st.cache(allow_output_mutation=True)
def search_docs(index: VectorStore, query: str) -> List[Document]:
"""Searches a FAISS index for similar chunks to the query
and returns a list of Documents."""
# Search for similar chunks
docs = index.similarity_search(query, k=5)
return docs
@st.cache(allow_output_mutation=True)
def get_answer(docs: List[Document], query: str) -> Dict[str, Any]:
"""Gets an answer to a question from a list of Documents."""
# Get the answer
chain = load_qa_with_sources_chain(OpenAI(temperature=0, openai_api_key=st.session_state.get("OPENAI_API_KEY")), chain_type="stuff", prompt=STUFF_PROMPT) # type: ignore
answer = chain(
{"input_documents": docs, "question": query}, return_only_outputs=True
)
return answer
@st.cache(allow_output_mutation=True)
def get_sources(answer: Dict[str, Any], docs: List[Document]) -> List[Document]:
"""Gets the source documents for an answer."""
# Get sources for the answer
source_keys = [s for s in answer["output_text"].split("SOURCES: ")[-1].split(", ")]
source_docs = []
for doc in docs:
if doc.metadata["source"] in source_keys:
source_docs.append(doc)
return source_docs
def wrap_text_in_html(text: str) -> str:
"""Wraps each text block separated by newlines in <p> tags"""
if isinstance(text, list):
# Add horizontal rules between pages
text = "\n<hr/>\n".join(text)
return "".join([f"<p>{line}</p>" for line in text.split("\n")]) |