from transformers import Qwen2Config, Qwen2ForCausalLM import torch import requests from bs4 import BeautifulSoup from duckduckgo_search import DDGS import logging import re # ログの設定 logging.basicConfig(level=logging.INFO) class CustomQwen2Config(Qwen2Config): model_type = "custom_qwen2config" def __init__(self, **kwargs): super().__init__(**kwargs) @classmethod def from_pretrained(cls, pretrained_model_name_or_path, **kwargs): config_dict, kwargs = cls.get_config_dict(pretrained_model_name_or_path, **kwargs) config = super().from_dict(config_dict, **kwargs) return config def to_dict(self): output = super().to_dict() output["model_type"] = self.model_type return output class CustomQwen2Model(Qwen2ForCausalLM): config_class = CustomQwen2Config def __init__(self, config): super().__init__(config) self.tokenizer = None self.embedding_model = None self.max_iterations = 5 # Maximum number of times to recreate keywords self.use_search = True self.top_k = 3 # of documents to retrieve for each search self.max_search_attempts = 3 # of search attempts for each keyword def set_tokenizer(self, tokenizer=None): self.tokenizer = tokenizer # パラメータ設定メソッド def set_max_iterations(self, max_iterations): self.max_iterations = max_iterations def set_use_search(self, use_search): self.use_search = use_search def set_top_k(self, top_k): self.top_k = top_k def generate_step(self, input_ids, max_new_tokens=150): """ Generates output from input_ids and returns tokenized output. """ input_ids = input_ids.to(self.device) output_ids = super().generate(input_ids, max_new_tokens=max_new_tokens) return output_ids # Return tokenized results def extract_response(self, output_ids, keyword): """ Extracts the tokens following a specific keyword from the generated response. Returns extracted text. """ # Decode generated output to text raw_response = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) # Extract text after keywords pattern = rf"{re.escape(keyword)}\s*(.*)" match = re.search(pattern, raw_response, re.DOTALL) if match: # Return matched parts extracted_text = match.group(1).strip() return extracted_text else: # Return empty string if keyword not found return "[ALL]" + raw_response def generate(self, input_ids, max_new_tokens=150, **kwargs): logging.info(f"Maximum keyword regeneration attempts: {self.max_iterations}") logging.info(f"External URL reference: {'Enabled' if self.use_search else 'Disabled'}") logging.info(f"k_top value: {self.top_k}") org_instruction = self.tokenizer.decode(input_ids[0], skip_special_tokens=True) # of attempts to re-create keywords keyword_attempt = 0 sufficient_info = False summarized_info = "" while keyword_attempt < self.max_iterations and not sufficient_info: logging.info(f"Keyword regeneration attempt: {keyword_attempt + 1}/{self.max_iterations}") # When using external references if self.use_search: logging.info("Retrieving relevant information using external URL references...") for search_attempt in range(1, self.max_search_attempts + 1): logging.info(f"Search attempt: {search_attempt}/{self.max_search_attempts}") relevant_docs = self.retrieve_relevant_information(org_instruction, top_k=self.top_k) summarized_info = self.summarize_documents(relevant_docs, org_instruction) # Determine whether to accept or reject the answer. sufficient_info = self.is_answer_sufficient(summarized_info, org_instruction) if sufficient_info: logging.info("Sufficient information found.") break else: logging.info("Insufficient information. Attempting next search.") if not sufficient_info: # Regenerate keywords new_keywords = self.generate_new_keywords(org_instruction) if new_keywords: org_instruction = self.update_instruction_with_new_keywords(org_instruction, new_keywords) logging.info(f"Retrying search with new keywords: {new_keywords}") else: logging.warning("Failed to generate new keywords.") break else: summarized_info = "" sufficient_info = False keyword_attempt += 1 if not sufficient_info: logging.info("Relevant data sources not found. Performing self-reasoning.") final_response = self.self_reasoning(org_instruction, max_new_tokens) else: # Perform normal answer generation process final_response = self.generate_answer(org_instruction, summarized_info, max_new_tokens) # Return final answer final_response_ids = self.tokenizer.encode(final_response, return_tensors="pt").to(self.device) return final_response_ids def retrieve_relevant_information(self, user_input, top_k=3): search_query = self.generate_search_query(user_input) logging.info(f"Generated search query: {search_query}") if not search_query: logging.warning("Search query is empty.") return ["No relevant information found."] with DDGS() as ddgs: search_results = ddgs.text( keywords=search_query, region='wt-wt', safesearch='off', timelimit=None, max_results=20 ) search_results = list(search_results) if not search_results: return ["No relevant information found."] # Filtering search results documents = [] for result in search_results: if 'body' in result and result['body']: documents.append(result['body']) elif 'snippet' in result and result['snippet']: documents.append(result['snippet']) # Select top k documents documents = documents[:top_k] return documents def generate_search_query(self, user_input): """ Generates a search query using the model's inference. """ # Create prompt prompt = f""" User's question: {user_input} Organize what you need to know to answer this problem and list three keywords to research. Keywords: -""" # Encode prompt input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device) # Generate output from model output_ids = self.generate_step(input_ids, max_new_tokens=50) # Extract keywords from output generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) # Extract keyword section pattern = r"Keywords:\s*(.*)" # Changed from "Keywords:\s*(.*)" match = re.search(pattern, generated_text, re.DOTALL) if match: keywords_text = match.group(1).strip() # Listify keywords keywords = re.findall(r"-\s*(.*)", keywords_text) search_query = ' '.join(keywords) logging.info(f"Generated search query: {search_query}") return search_query else: logging.warning("Failed to generate keywords.") return "" def generate_new_keywords(self, user_input): """ Attempts to regenerate keywords. """ prompt = f""" User's question: {user_input} Insufficient information was obtained. Please generate new keywords. List three new keywords. Keywords: -""" input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device) output_ids = self.generate_step(input_ids, max_new_tokens=50) generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) pattern = r"Keywords:\s*(.*)" # Changed from "Keywords:\s*(.*)" match = re.search(pattern, generated_text, re.DOTALL) if match: keywords_text = match.group(1).strip() keywords = re.findall(r"-\s*(.*)", keywords_text) search_query = ' '.join(keywords) logging.info(f"Regenerated search query: {search_query}") return search_query else: logging.warning("Failed to extract regenerated keywords.") return "" def update_instruction_with_new_keywords(self, instruction, new_keywords): """ Incorporates new keywords into the original instruction. """ # Simply appends new keywords to the original instruction. updated_instruction = f"{instruction} Keywords: {new_keywords}" return updated_instruction def is_answer_sufficient(self, summarized_info, user_input): """ Determines if the summarized information is sufficient to answer the question. """ prompt = f""" User's question: {user_input} Retrieved information: {summarized_info} Based on this information, determine if you can answer the user's question. If yes, respond with "Yes". If no, respond with "No" only. """ input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device) output_ids = self.generate_step(input_ids, max_new_tokens=10) generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True).strip() if "Yes" in generated_text: return True else: return False def generate_answer(self, user_input, summarized_info, max_new_tokens=150): """ Generates an answer based on the retrieved information. """ # Step 1: Understanding the question and extracting key points step1_prompt = f""" #User's question: {user_input} #Step 1: Understanding the question and extracting key points Accurately understand the user's question or instructions. Output the rules for answering and the tasks to be performed in a bullet list. #Rules for answering and tasks to be performed: """ step1_input_ids = self.tokenizer.encode(step1_prompt, return_tensors="pt").to(self.device) outputs_step1 = self.generate_step(step1_input_ids, max_new_tokens=max_new_tokens) step1_response = self.extract_response(outputs_step1, "#Rules for answering and tasks to be performed:") logging.info("Understanding the question...\n======================\n" + step1_response) # Step 2: Considerations for problem-solving step2_prompt = f""" #Step 2: Considerations for problem-solving Based on the content of Step 1, consider approaches and necessary information for solving the problem. #Step 2 response: """ step2_input_ids = self.tokenizer.encode(step1_response + step2_prompt, return_tensors="pt").to(self.device) outputs_step2 = self.generate_step(step2_input_ids, max_new_tokens=max_new_tokens) step2_response = self.extract_response(outputs_step2, "#Step 2 response:") logging.info("Considering approaches for problem-solving...\n======================\n" + step2_response) # Step 3: Creating the initial answer step3_prompt = f""" #Step 3: Creating the initial answer Based on the content so far, create an initial answer to the user's question. Your information may not be up-to-date. Fully consider information from the internet. #Latest internet information: {summarized_info} #Initial answer: """ step3_input_ids = self.tokenizer.encode(step2_response + step3_prompt, return_tensors="pt").to(self.device) outputs_step3 = self.generate_step(step3_input_ids, max_new_tokens=max_new_tokens) step3_response = self.extract_response(outputs_step3, "#Initial answer:") logging.info("Creating the initial answer...\n======================\n" + step3_response) # Step 4: Reflection (Self-verification) reflection_prompt = f""" #Step 4: Reflection (Self-verification) Verify whether the initial answer accurately responds to the user's question or instructions, and point out any errors or areas for improvement. Be cautious of overinterpreting the instructions and critically assess whether you have accurately understood them. Your information may not be up-to-date. Fully consider information from the internet. Reconfirm the user's question and provide an accurate answer to the question itself. (Ensure that you provide an answer to the question itself) #User's question: {user_input} #Latest internet information: {summarized_info} #Initial answer: {step3_response} #Reflection result: """ reflection_input_ids = self.tokenizer.encode(reflection_prompt, return_tensors="pt").to(self.device) outputs_reflection = self.generate_step(reflection_input_ids, max_new_tokens=max_new_tokens) reflection_response = self.extract_response(outputs_reflection, "#Reflection result:") logging.info("Performing reflection...\n======================\n" + reflection_response) # Step 5: Creating the final answer final_prompt = f""" #Step 5: Creating the final answer Based on the reflection results, modify the initial answer as needed. Your knowledge may not be up-to-date. Fully consider information from the internet. Reconfirm the user's question, and check for overinterpretation, misunderstandings, omissions, and careless mistakes. Create the final answer incorporating these. #Initial answer: {step3_response} #Reflection result: {reflection_response} #Latest internet information: {summarized_info} #User's question: {user_input} Please provide the final answer to the user's question. #Final answer: """ final_input_ids = self.tokenizer.encode(final_prompt, return_tensors="pt").to(self.device) outputs_final = self.generate_step(final_input_ids, max_new_tokens=max_new_tokens) final_response = self.extract_response(outputs_final, "#Final answer:").strip() return final_response def self_reasoning(self, user_input, max_new_tokens=150): """ Generates an answer based on self-reasoning. """ prompt = f""" User's question: {user_input} No relevant information was found on the internet. Please use your own knowledge and reasoning to answer. #Answer based on self-reasoning: """ input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device) output_ids = self.generate_step(input_ids, max_new_tokens=max_new_tokens) generated_text = self.extract_response(output_ids, "#Answer based on self-reasoning:").strip() logging.info("Answer based on self-reasoning:\n======================\n" + generated_text) return generated_text def process_document(self, doc, user_input): """ Determines if each document is relevant to the user's question and generates an answer if applicable. """ # Create prompt prompt = f""" User's question: {user_input} Content of the document: {doc[:2000]} # Truncate if too long Do not think of the question superficially. Use paradoxes and rephrasing to organize. Create an answer to the question based on the content of this document. Understand the points of disagreement between your own thoughts and the answer you would create based on this document, and prioritize the answer based on the document. Answer: """ input_ids = self.tokenizer.encode(prompt, return_tensors='pt').to(self.device) output_ids = self.generate_step(input_ids, max_new_tokens=500) generated_text = self.extract_response(output_ids, "Answer:") logging.info("Document processing result: " + generated_text) # Return empty string if deemed low relevance if "low relevance" in generated_text: return "" else: return generated_text.strip() def summarize_documents(self, documents, user_input): """ Processes each document and summarizes relevant information. """ summaries = [] for doc in documents: processed_text = self.process_document(doc, user_input) if processed_text: summaries.append(processed_text) return "\n\n".join(summaries)