Spaces:
Runtime error
Runtime error
import os | |
TOGETHER_API_KEY = os.environ.get("TOGETHER_API_KEY") | |
SEMANTIC_SCHOLAR_API_KEY = os.environ.get("SEMANTIC_SCHOLAR_API_KEY") | |
import re | |
import time | |
import json | |
import shutil | |
import requests | |
import spacy | |
#!python -m spacy download en_core_web_lg | |
from openai import OpenAI, APIError | |
from llama_index import ( | |
VectorStoreIndex, | |
SimpleDirectoryReader, | |
ServiceContext, | |
load_index_from_storage | |
) | |
from llama_index.embeddings import HuggingFaceEmbedding, TogetherEmbedding | |
from llama_index.storage.storage_context import StorageContext | |
# 处理原始.text数据抹去citation,或者直接从用户出获得没有citation的introduct | |
def remove_citation(text): | |
# Regular expression to match \cite{...} | |
pattern = r'\\cite\{[^}]*\}' | |
# Replace \cite{...} with an empty string | |
text = re.sub(pattern, '', text) | |
# Replace multiple spaces with a single space | |
text = re.sub(r' +', ' ', text) | |
# Replace spaces before punctuation marks with just the punctuation marks | |
text = re.sub(r"\s+([,.!?;:()\[\]{}])", r"\1", text) | |
return text | |
def get_chat_completion(client, prompt, llm_model, max_tokens): | |
messages = [ | |
{ | |
"role": "system", | |
"content": "You are an AI assistant", | |
}, | |
{ | |
"role": "user", | |
"content": prompt, | |
} | |
] | |
try: | |
chat_completion = client.chat.completions.create( | |
messages=messages, | |
model=llm_model, | |
max_tokens=max_tokens | |
) | |
return chat_completion.choices[0].message.content | |
except APIError as e: | |
# Handle specific API errors | |
print(f"API Error: {e}") | |
except Exception as e: | |
# Handle other exceptions | |
print(f"Error: {e}") | |
def get_relevant_papers(search_query, sort=True, count=10): | |
""" | |
search_query (str): the required query parameter and its value (in this case, the keyword we want to search for) | |
count (int): the number of relevant papers to return for each query | |
Semantic Scholar Rate limit: | |
1 request per second for the following endpoints: | |
/paper/batch | |
/paper/search | |
/recommendations | |
10 requests / second for all other calls | |
""" | |
# Define the paper search endpoint URL; All keywords in the search query are matched against the paper’s title and abstract. | |
url = 'https://api.semanticscholar.org/graph/v1/paper/search' | |
# Define headers with API key | |
headers = {'x-api-key': SEMANTIC_SCHOLAR_API_KEY} | |
query_params = { | |
'query': search_query, | |
'fields': 'url,title,year,abstract,authors.name,journal,citationStyles,tldr,referenceCount,citationCount', | |
'limit': 20, | |
} | |
# Send the API request | |
response = requests.get(url, params=query_params, headers=headers) | |
# Check response status | |
if response.status_code == 200: | |
json_response = response.json() | |
if json_response['total'] != 0: | |
papers = json_response['data'] | |
else: | |
papers = [] | |
# Sort the papers based on citationCount in descending order | |
if sort: | |
papers = sorted(papers, key=lambda x: x['citationCount'], reverse=True) | |
return papers[:count] | |
else: | |
print(f"Request failed with status code {response.status_code}: {response.text}") | |
def save_papers(unique_dir, papers): | |
os.makedirs(unique_dir, exist_ok=True) | |
# Save each dictionary to a separate JSON file | |
for i, dictionary in enumerate(papers): | |
filename = os.path.join(unique_dir, f"{dictionary['paperId']}.json") | |
with open(filename, 'w') as json_file: | |
json.dump(dictionary, json_file, indent=4) | |
print(f"{len(papers)} papers saved as JSON files successfully at {unique_dir}.") | |
def get_index(service_context, docs_dir, persist_dir): | |
documents = SimpleDirectoryReader(docs_dir, filename_as_id=True).load_data() | |
# check if storage already exists | |
PERSIST_DIR = persist_dir | |
if not os.path.exists(PERSIST_DIR): | |
print('create new index') | |
index = VectorStoreIndex.from_documents( | |
documents, service_context=service_context, show_progress=False | |
) | |
# store it for later | |
index.storage_context.persist(persist_dir=PERSIST_DIR) | |
else: | |
print('load the existing index') | |
# load the existing index | |
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR) | |
index = load_index_from_storage(storage_context, service_context=service_context) | |
# refresh the index | |
refreshed_docs = index.refresh_ref_docs(documents, update_kwargs={"delete_kwargs": {"delete_from_docstore": True}}) | |
print(f'refreshed_docs:\n{refreshed_docs}') | |
return index | |
def get_paper_data(text): | |
"""text = node.text """ | |
dictionary_from_json = json.loads(text) | |
bibtex = dictionary_from_json['citationStyles']['bibtex'] | |
bibtex = bibtex.replace('&', 'and') | |
citation_label = re.findall(r"@(\w+){([\w'-]+)", bibtex)[0][1] | |
citationCount = dictionary_from_json['citationCount'] | |
if dictionary_from_json['tldr'] is not None: | |
tldr = dictionary_from_json['tldr']['text'] | |
else: | |
tldr = 'No tldr available' | |
url = dictionary_from_json['url'] | |
return citation_label, (bibtex, citationCount, tldr, url) | |
def move_cite_inside_sentence(sent, ez_citation): | |
if sent[-1]!='\n': | |
character = sent[-1] | |
sent_new = sent[:-1] + ' <ez_citation>' + character | |
else: | |
count = sent.count('\n') | |
sent = sent.strip() | |
character = sent[-1] | |
sent_new = sent[:-1] + ' <ez_citation>' + character + '\n'*count | |
return sent_new.replace('<ez_citation>', ez_citation) | |
def write_bib_file(bib_file_content, data): | |
bibtex, citationCount, tldr, url = data | |
bib_file_content = bib_file_content + f'\n%citationCount: {citationCount}\n%tldr: {tldr}\n%url: {url}\n' + bibtex | |
return bib_file_content | |
def write_citation(sent, bib_file_content, retrieved_nodes, sim_threshold=0.75): | |
labels = [] | |
for node in retrieved_nodes: | |
citation_label, data = get_paper_data(node.text) | |
print('relevant paper id (node.id_):', node.id_, 'match score (node.score):', node.score) | |
print('relevant paper data:', *data) | |
print('-'*30) | |
if node.score > sim_threshold and citation_label != "None": | |
labels.append(citation_label) | |
if not (citation_label in bib_file_content): | |
bib_file_content = write_bib_file(bib_file_content, data) | |
else: | |
continue | |
labels = ', '.join(labels) | |
if labels: | |
ez_citation = f'\cite{{{labels}}}' | |
sent_new = move_cite_inside_sentence(sent, ez_citation) | |
else: | |
sent_new = sent | |
return sent_new, bib_file_content | |
get_prompt = lambda sentence: f""" | |
I want to use semantic scholar paper search api to find the relevant papers, can you read the following text then suggest me an suitable search query for this task? | |
Here is an example for using the api: | |
<example> | |
```python | |
import requests | |
# Define the paper search endpoint URL | |
url = 'https://api.semanticscholar.org/graph/v1/paper/search' | |
# Define the required query parameter and its value (in this case, the keyword we want to search for) | |
query_params = {{ | |
'query': 'semantic scholar platform', | |
'limit': 3 | |
}} | |
# Make the GET request with the URL and query parameters | |
searchResponse = requests.get(url, params=query_params) | |
``` | |
</example> | |
Here is the text: | |
<text> | |
{sentence} | |
</text> | |
""" | |
# main block | |
def main(sentences, count, client, llm_model, max_tokens, service_context): | |
"""count (int): the number of relevant papers to return for each query""" | |
sentences_new = [] | |
bib_file_content = '' | |
for sentence in sentences: | |
prompt = get_prompt(sentence) | |
response = get_chat_completion(client, prompt, llm_model, max_tokens) | |
# Define a regular expression pattern to find the value of 'query' | |
pattern = r"'query': '(.*?)'" | |
matches = re.findall(pattern, response) | |
if matches: | |
search_query = matches[0] | |
else: | |
search_query = sentence[:2] # use the first two words as the search query | |
relevant_papers = get_relevant_papers(search_query, sort=True, count=count) | |
if relevant_papers: | |
# save papers to json files and build index | |
unique_dir = os.path.join("papers", f"{int(time.time())}") | |
persist_dir = os.path.join("index", f"{int(time.time())}") | |
save_papers(unique_dir, relevant_papers) | |
index = get_index(service_context, unique_dir, persist_dir) | |
# get sentence's most similar papers | |
retriever = index.as_retriever(service_context=service_context, similarity_top_k=5) | |
retrieved_nodes = retriever.retrieve(sentence) | |
sent_new, bib_file_content = write_citation(sentence, bib_file_content, retrieved_nodes, sim_threshold=0.7) | |
sentences_new.append(sent_new) | |
else: | |
sentences_new.append(sentence) | |
print('sentence:', sentence.strip()) | |
print('search_query:', search_query) | |
print('='*30) | |
return sentences_new, bib_file_content | |
def ez_cite(introduction, debug=False): | |
nlp = spacy.load("en_core_web_lg") | |
doc = nlp(introduction) | |
sentences = [sentence.text for sentence in doc.sents] | |
sentences = [ remove_citation(sentence) for sentence in sentences] | |
client = OpenAI(api_key=TOGETHER_API_KEY, | |
base_url='https://api.together.xyz', | |
) | |
llm_model = "Qwen/Qwen1.5-72B-Chat" | |
max_tokens = 1000 | |
embed_model = TogetherEmbedding(model_name="togethercomputer/m2-bert-80M-8k-retrieval", api_key=TOGETHER_API_KEY) | |
service_context = ServiceContext.from_defaults( | |
llm=None, embed_model=embed_model, chunk_size=8192, # chunk_size must be bigger than the whole .json so that all info is preserved, in this case, one doc is one node | |
) | |
if debug: | |
sentences = sentences[:2] | |
sentences_new, bib_file_content = main(sentences, count=10, | |
client=client, | |
llm_model=llm_model, | |
max_tokens=max_tokens, | |
service_context=service_context) | |
with open('intro.bib', 'w') as bib_file: | |
bib_file.write(bib_file_content) | |
final_intro = ' '.join(sentences_new) | |
print(final_intro) | |
print('='*30) | |
dir_path = "index" | |
try: | |
# Delete the directory and its contents | |
shutil.rmtree(dir_path) | |
print(f"Directory '{dir_path}' deleted successfully.") | |
except Exception as e: | |
print(f"Error deleting directory '{dir_path}': {e}") | |
dir_path = "papers" | |
try: | |
# Delete the directory and its contents | |
shutil.rmtree(dir_path) | |
print(f"Directory '{dir_path}' deleted successfully.") | |
except Exception as e: | |
print(f"Error deleting directory '{dir_path}': {e}") | |
return final_intro, bib_file_content | |
example1 = r"""In the current Noisy Intermediate-Scale Quantum (NISQ) era, a few methods have been proposed to construct useful quantum algorithms that are compatible with mild hardware restrictions. Most of these methods involve the specification of a quantum circuit Ansatz, optimized in a classical fashion to solve specific computational tasks. | |
Next to variational quantum eigensolvers in chemistry and variants of the quantum approximate optimization algorithm, machine learning approaches based on such parametrized quantum circuits stand as some of the most promising practical applications to yield quantum advantages.""" | |
if __name__ == "__main__": | |
final_intro, bib_file_content = ez_cite(example1, debug=True) | |