awinml's picture
Upload 2 files
9c49e99
raw
history blame
14.2 kB
import re
import openai
import pandas as pd
import pinecone
import spacy
import streamlit_scrollable_textbox as stx
import torch
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
from transformers import (
AutoModelForMaskedLM,
AutoModelForSeq2SeqLM,
AutoTokenizer,
pipeline,
)
import streamlit as st
@st.experimental_singleton
def get_data():
data = pd.read_csv("earnings_calls_cleaned_metadata.csv")
return data
# Initialize Spacy Model
@st.experimental_singleton
def get_spacy_model():
return spacy.load("en_core_web_sm")
# Initialize models from HuggingFace
@st.experimental_singleton
def get_t5_model():
return pipeline("summarization", model="t5-small", tokenizer="t5-small")
@st.experimental_singleton
def get_flan_t5_model():
return pipeline(
"summarization",
model="google/flan-t5-xl",
tokenizer="google/flan-t5-xl",
max_length=512,
# length_penalty = 0
)
@st.experimental_singleton
def get_mpnet_embedding_model():
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer(
"sentence-transformers/all-mpnet-base-v2", device=device
)
model.max_seq_length = 512
return model
@st.experimental_singleton
def get_splade_sparse_embedding_model():
model_sparse = "naver/splade-cocondenser-ensembledistil"
# check device
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained(model_sparse)
model_sparse = AutoModelForMaskedLM.from_pretrained(model_sparse)
# move to gpu if available
model_sparse.to(device)
return model_sparse, tokenizer
@st.experimental_singleton
def get_sgpt_embedding_model():
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer(
"Muennighoff/SGPT-125M-weightedmean-nli-bitfit", device=device
)
model.max_seq_length = 512
return model
@st.experimental_memo
def save_key(api_key):
return api_key
def create_dense_embeddings(query, model):
dense_emb = model.encode([query]).tolist()
return dense_emb
def create_sparse_embeddings(query, model, tokenizer):
device = "cuda" if torch.cuda.is_available() else "cpu"
inputs = tokenizer(query, return_tensors="pt").to(device)
with torch.no_grad():
logits = model(**inputs).logits
inter = torch.log1p(torch.relu(logits[0]))
token_max = torch.max(inter, dim=0) # sum over input tokens
nz_tokens = torch.where(token_max.values > 0)[0]
nz_weights = token_max.values[nz_tokens]
order = torch.sort(nz_weights, descending=True)
nz_weights = nz_weights[order[1]]
nz_tokens = nz_tokens[order[1]]
return {
"indices": nz_tokens.cpu().numpy().tolist(),
"values": nz_weights.cpu().numpy().tolist(),
}
def hybrid_score_norm(dense, sparse, alpha: float):
"""Hybrid score using a convex combination
alpha * dense + (1 - alpha) * sparse
Args:
dense: Array of floats representing
sparse: a dict of `indices` and `values`
alpha: scale between 0 and 1
"""
if alpha < 0 or alpha > 1:
raise ValueError("Alpha must be between 0 and 1")
hs = {
"indices": sparse["indices"],
"values": [v * (1 - alpha) for v in sparse["values"]],
}
return [v * alpha for v in dense], hs
def query_pinecone_sparse(
dense_vec,
sparse_vec,
top_k,
index,
year,
quarter,
ticker,
participant_type,
threshold=0.25,
):
if participant_type == "Company Speaker":
participant = "Answer"
else:
participant = "Question"
if year == "All":
if quarter == "All":
xc = index.query(
vector=dense_vec,
sparse_vector=sparse_vec,
top_k=top_k,
filter={
"Year": {
"$in": [
int("2020"),
int("2019"),
int("2018"),
int("2017"),
int("2016"),
]
},
"Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]},
"Ticker": {"$eq": ticker},
"QA_Flag": {"$eq": participant},
},
include_metadata=True,
)
else:
xc = index.query(
vector=dense_vec,
sparse_vector=sparse_vec,
top_k=top_k,
filter={
"Year": {
"$in": [
int("2020"),
int("2019"),
int("2018"),
int("2017"),
int("2016"),
]
},
"Quarter": {"$eq": quarter},
"Ticker": {"$eq": ticker},
"QA_Flag": {"$eq": participant},
},
include_metadata=True,
)
else:
# search pinecone index for context passage with the answer
xc = index.query(
vector=dense_vec,
sparse_vector=sparse_vec,
top_k=top_k,
filter={
"Year": int(year),
"Quarter": {"$eq": quarter},
"Ticker": {"$eq": ticker},
"QA_Flag": {"$eq": participant},
},
include_metadata=True,
)
# filter the context passages based on the score threshold
filtered_matches = []
for match in xc["matches"]:
if match["score"] >= threshold:
filtered_matches.append(match)
xc["matches"] = filtered_matches
return xc
def query_pinecone(
dense_vec,
top_k,
index,
year,
quarter,
ticker,
participant_type,
threshold=0.25,
):
if participant_type == "Company Speaker":
participant = "Answer"
else:
participant = "Question"
if year == "All":
if quarter == "All":
xc = index.query(
vector=dense_vec,
top_k=top_k,
filter={
"Year": {
"$in": [
int("2020"),
int("2019"),
int("2018"),
int("2017"),
int("2016"),
]
},
"Quarter": {"$in": ["Q1", "Q2", "Q3", "Q4"]},
"Ticker": {"$eq": ticker},
"QA_Flag": {"$eq": participant},
},
include_metadata=True,
)
else:
xc = index.query(
vector=dense_vec,
top_k=top_k,
filter={
"Year": {
"$in": [
int("2020"),
int("2019"),
int("2018"),
int("2017"),
int("2016"),
]
},
"Quarter": {"$eq": quarter},
"Ticker": {"$eq": ticker},
"QA_Flag": {"$eq": participant},
},
include_metadata=True,
)
else:
# search pinecone index for context passage with the answer
xc = index.query(
vector=dense_vec,
top_k=top_k,
filter={
"Year": int(year),
"Quarter": {"$eq": quarter},
"Ticker": {"$eq": ticker},
"QA_Flag": {"$eq": participant},
},
include_metadata=True,
)
# filter the context passages based on the score threshold
filtered_matches = []
for match in xc["matches"]:
if match["score"] >= threshold:
filtered_matches.append(match)
xc["matches"] = filtered_matches
return xc
def format_query(query_results):
# extract passage_text from Pinecone search result
context = [
result["metadata"]["Text"] for result in query_results["matches"]
]
return context
def sentence_id_combine(data, query_results, lag=1):
# Extract sentence IDs from query results
ids = [
result["metadata"]["Sentence_id"]
for result in query_results["matches"]
]
# Generate new IDs by adding a lag value to the original IDs
new_ids = [id + i for id in ids for i in range(-lag, lag + 1)]
# Remove duplicates and sort the new IDs
new_ids = sorted(set(new_ids))
# Create a list of lookup IDs by grouping the new IDs in groups of lag*2+1
lookup_ids = [
new_ids[i : i + (lag * 2 + 1)]
for i in range(0, len(new_ids), lag * 2 + 1)
]
# Create a list of context sentences by joining the sentences corresponding to the lookup IDs
context_list = [
" ".join(
data.loc[data["Sentence_id"].isin(lookup_id), "Text"].to_list()
)
for lookup_id in lookup_ids
]
return context_list
def text_lookup(data, sentence_ids):
context = ". ".join(data.iloc[sentence_ids].to_list())
return context
def generate_gpt_prompt(query_text, context_list):
context = " ".join(context_list)
prompt = f"""Answer the question in 6 long detailed points as accurately as possible using the provided context. Include as many key details as possible.
Context: {context}
Question: {query_text}
Answer:"""
return prompt
def generate_gpt_prompt_2(query_text, context_list):
context = " ".join(context_list)
prompt = f"""
Context information is below:
---------------------
{context}
---------------------
Given the context information and prior knowledge, answer this question:
{query_text}
Try to include as many key details as possible and format the answer in points."""
return prompt
def generate_flant5_prompt(query_text, context_list):
context = " \n".join(context_list)
prompt = f"""Given the context information and prior knowledge, answer this question:
{query_text}
Context information is below:
---------------------
{context}
---------------------"""
return prompt
def get_context_list_prompt(prompt):
prompt_list = prompt.split("---------------------")
context = prompt_list[-2].strip()
context_list = context.split(" \n")
return context_list
def gpt_model(prompt):
response = openai.Completion.create(
model="text-davinci-003",
prompt=prompt,
temperature=0.1,
max_tokens=1024,
top_p=1.0,
frequency_penalty=0.5,
presence_penalty=1,
)
return response.choices[0].text
# Entity Extraction
def extract_quarter_year(string):
# Extract year from string
year_match = re.search(r"\d{4}", string)
if year_match:
year = year_match.group()
else:
return None, None
# Extract quarter from string
quarter_match = re.search(r"Q\d", string)
if quarter_match:
quarter = "Q" + quarter_match.group()[1]
else:
return None, None
return quarter, year
def extract_entities(query, model):
doc = model(query)
entities = {ent.label_: ent.text for ent in doc.ents}
if "ORG" in entities.keys():
company = entities["ORG"].lower()
if "DATE" in entities.keys():
quarter, year = extract_quarter_year(entities["DATE"])
return company, quarter, year
else:
return company, None, None
else:
if "DATE" in entities.keys():
quarter, year = extract_quarter_year(entities["DATE"])
return None, quarter, year
else:
return None, None, None
def clean_entities(company, quarter, year):
company_ticker_map = {
"apple": "AAPL",
"amd": "AMD",
"amazon": "AMZN",
"cisco": "CSCO",
"google": "GOOGL",
"microsoft": "MSFT",
"nvidia": "NVDA",
"asml": "ASML",
"intel": "INTC",
"micron": "MU",
}
ticker_choice = [
"AAPL",
"CSCO",
"MSFT",
"ASML",
"NVDA",
"GOOGL",
"MU",
"INTC",
"AMZN",
"AMD",
]
year_choice = ["2020", "2019", "2018", "2017", "2016", "All"]
quarter_choice = ["Q1", "Q2", "Q3", "Q4", "All"]
if company is not None:
if company in company_ticker_map.keys():
ticker = company_ticker_map[company]
ticker_index = ticker_choice.index(ticker)
else:
ticker_index = 0
else:
ticker_index = 0
if quarter is not None:
if quarter in quarter_choice:
quarter_index = quarter_choice.index(quarter)
else:
quarter_index = len(quarter_choice) - 1
else:
quarter_index = len(quarter_choice) - 1
if year is not None:
if year in year_choice:
year_index = year_choice.index(year)
else:
year_index = len(year_choice) - 1
else:
year_index = len(year_choice) - 1
return ticker_index, quarter_index, year_index
# Transcript Retrieval
def retrieve_transcript(data, year, quarter, ticker):
if year == "All" or quarter == "All":
row = (
data.loc[
(data.Ticker == ticker),
["File_Name"],
]
.drop_duplicates()
.iloc[0, 0]
)
else:
row = (
data.loc[
(data.Year == int(year))
& (data.Quarter == quarter)
& (data.Ticker == ticker),
["File_Name"],
]
.drop_duplicates()
.iloc[0, 0]
)
# convert row to a string and join values with "-"
# row_str = "-".join(row.astype(str)) + ".txt"
open_file = open(
f"Transcripts/{ticker}/{row}",
"r",
)
file_text = open_file.read()
return file_text