|
import os |
|
import csv |
|
import json |
|
import docx |
|
import pptx |
|
import re |
|
import nltk |
|
import time |
|
import PyPDF2 |
|
import tempfile |
|
import openpyxl |
|
import requests |
|
import gradio as gr |
|
from bs4 import BeautifulSoup |
|
import xml.etree.ElementTree as ET |
|
from nltk.tokenize import word_tokenize |
|
from langchain_community.vectorstores import FAISS |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from langchain.schema import SystemMessage, HumanMessage, AIMessage |
|
from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace |
|
from langchain_community.embeddings import SentenceTransformerEmbeddings |
|
from youtube_transcript_api._errors import NoTranscriptFound, TranscriptsDisabled, VideoUnavailable |
|
nltk.download('punkt') |
|
nltk.download('omw-1.4') |
|
nltk.download('wordnet') |
|
|
|
def read_csv(file_path): |
|
with open(file_path, 'r', encoding='utf-8', errors='ignore', newline='') as csvfile: |
|
csv_reader = csv.reader(csvfile) |
|
csv_data = [row for row in csv_reader] |
|
return ' '.join([' '.join(row) for row in csv_data]) |
|
|
|
def read_text(file_path): |
|
with open(file_path, 'r', encoding='utf-8', errors='ignore', newline='') as f: |
|
return f.read() |
|
|
|
def read_pdf(file_path): |
|
text_data = [] |
|
with open(file_path, 'rb') as pdf_file: |
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
for page in pdf_reader.pages: |
|
text_data.append(page.extract_text()) |
|
return '\n'.join(text_data) |
|
|
|
def read_docx(file_path): |
|
doc = docx.Document(file_path) |
|
return '\n'.join([paragraph.text for paragraph in doc.paragraphs]) |
|
|
|
def read_pptx(file_path): |
|
ppt = pptx.Presentation(file_path) |
|
text_data = '' |
|
for slide in ppt.slides: |
|
for shape in slide.shapes: |
|
if hasattr(shape, "text"): |
|
text_data += shape.text + '\n' |
|
return text_data |
|
|
|
def read_xlsx(file_path): |
|
workbook = openpyxl.load_workbook(file_path) |
|
sheet = workbook.active |
|
text_data = '' |
|
for row in sheet.iter_rows(values_only=True): |
|
text_data += ' '.join([str(cell) for cell in row if cell is not None]) + '\n' |
|
return text_data |
|
|
|
def read_json(file_path): |
|
with open(file_path, 'r') as f: |
|
json_data = json.load(f) |
|
return json.dumps(json_data) |
|
|
|
def read_html(file_path): |
|
with open(file_path, 'r') as f: |
|
html_content = f.read() |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
return soup |
|
|
|
def read_xml(file_path): |
|
tree = ET.parse(file_path) |
|
root = tree.getroot() |
|
return ET.tostring(root, encoding='unicode') |
|
|
|
def process_youtube_video(url, languages=['en', 'ar']): |
|
if 'youtube.com/watch' in url or 'youtu.be/' in url: |
|
try: |
|
if "v=" in url: |
|
video_id = url.split("v=")[1].split("&")[0] |
|
elif "youtu.be/" in url: |
|
video_id = url.split("youtu.be/")[1].split("?")[0] |
|
else: |
|
return "Invalid YouTube video URL. Please provide a valid YouTube video link." |
|
|
|
response = requests.get(f"http://img.youtube.com/vi/{video_id}/mqdefault.jpg") |
|
if response.status_code != 200: |
|
return "Video doesn't exist." |
|
|
|
transcript_data = [] |
|
for lang in languages: |
|
try: |
|
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang]) |
|
transcript_data.append(' '.join([entry['text'] for entry in transcript])) |
|
except (NoTranscriptFound, TranscriptsDisabled, VideoUnavailable): |
|
continue |
|
|
|
return ' '.join(transcript_data) if transcript_data else "Please choose a YouTube video with available English or Arabic transcripts." |
|
|
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
else: |
|
return "Invalid YouTube URL. Please provide a valid YouTube link." |
|
|
|
def read_web_page(url): |
|
result = requests.get(url) |
|
if result.status_code == 200: |
|
src = result.content |
|
soup = BeautifulSoup(src, 'html.parser') |
|
text_data = '' |
|
for p in soup.find_all('p'): |
|
text_data += p.get_text() + '\n' |
|
return text_data |
|
else: |
|
return "Please provide a valid webpage link" |
|
|
|
def read_data(file_path_or_url, languages=['en', 'ar']): |
|
if file_path_or_url.endswith('.csv'): |
|
return read_csv(file_path_or_url) |
|
elif file_path_or_url.endswith('.txt'): |
|
return read_text(file_path_or_url) |
|
elif file_path_or_url.endswith('.pdf'): |
|
return read_pdf(file_path_or_url) |
|
elif file_path_or_url.endswith('.docx'): |
|
return read_docx(file_path_or_url) |
|
elif file_path_or_url.endswith('.pptx'): |
|
return read_pptx(file_path_or_url) |
|
elif file_path_or_url.endswith('.xlsx'): |
|
return read_xlsx(file_path_or_url) |
|
elif file_path_or_url.endswith('.json'): |
|
return read_json(file_path_or_url) |
|
elif file_path_or_url.endswith('.html'): |
|
return read_html(file_path_or_url) |
|
elif file_path_or_url.endswith('.xml'): |
|
return read_xml(file_path_or_url) |
|
elif 'youtube.com/watch' in file_path_or_url or 'youtu.be/' in file_path_or_url: |
|
return process_youtube_video(file_path_or_url, languages) |
|
elif file_path_or_url.startswith('http'): |
|
return read_web_page(file_path_or_url) |
|
else: |
|
return "Unsupported type or format." |
|
|
|
def normalize_text(text): |
|
text = re.sub("\*?", "", text) |
|
text = text.lower() |
|
text = text.strip() |
|
punctuation = '''!()[]{};:'"\<>/?$%^&*_`~=''' |
|
for punc in punctuation: |
|
text = text.replace(punc, "") |
|
text = re.sub(r'[A-Za-z0-9]*@[A-Za-z]*\.?[A-Za-z0-9]*', "", text) |
|
words = word_tokenize(text) |
|
return ' '.join(words) |
|
|
|
llm = HuggingFaceEndpoint( |
|
repo_id="HuggingFaceH4/starchat2-15b-v0.1", |
|
task="text-generation", |
|
max_new_tokens=4096, |
|
temperature=0.6, |
|
top_p=0.9, |
|
top_k=40, |
|
repetition_penalty=1.2, |
|
do_sample=True, |
|
) |
|
chat_model = ChatHuggingFace(llm=llm) |
|
|
|
model_name = "sentence-transformers/all-mpnet-base-v2" |
|
embedding_llm = SentenceTransformerEmbeddings(model_name=model_name) |
|
db = FAISS.load_local("faiss_index", embedding_llm, allow_dangerous_deserialization=True) |
|
|
|
def print_like_dislike(x: gr.LikeData): |
|
print(x.index, x.value, x.liked) |
|
|
|
def user(user_message, history): |
|
if not len(user_message): |
|
raise gr.Error("Chat messages cannot be empty") |
|
return "", history + [[user_message, None]] |
|
|
|
def user2(user_message, history, link): |
|
if not len(user_message) or not len(link): |
|
raise gr.Error("Chat messages or links cannot be empty") |
|
combined_message = f"{link}\n{user_message}" |
|
return "", history + [[combined_message, None]], link |
|
|
|
def user3(user_message, history, file_path): |
|
if not len(user_message) or not file_path: |
|
raise gr.Error("Chat messages or flies cannot be empty") |
|
combined_message = f"{file_path}\n{user_message}" |
|
return "", history + [[combined_message, None]], file_path |
|
|
|
def Chat_Message(history): |
|
messages = [ |
|
SystemMessage(content="You are a helpful assistant."), |
|
HumanMessage(content="Hi AI, how are you today?"), |
|
AIMessage(content="I'm great thank you. How can I help you?")] |
|
|
|
message=HumanMessage(content=history[-1][0]) |
|
messages.append(message) |
|
response = chat_model.invoke(messages) |
|
messages.append(AIMessage(content=response.content)) |
|
|
|
if len(messages) >= 8: |
|
messages = messages[-8:] |
|
|
|
history[-1][1] = "" |
|
for character in response.content: |
|
history[-1][1] += character |
|
time.sleep(0.0025) |
|
yield history |
|
|
|
def Web_Search(history): |
|
messages = [ |
|
SystemMessage(content="You are a helpful assistant."), |
|
HumanMessage(content="Hi AI, how are you today?"), |
|
AIMessage(content="I'm great thank you. How can I help you?")] |
|
|
|
message=history[-1][0] |
|
|
|
similar_docs = db.similarity_search(message, k=3) |
|
|
|
if similar_docs: |
|
source_knowledge = "\n".join([x.page_content for x in similar_docs]) |
|
else: |
|
source_knowledge = "" |
|
|
|
augmented_prompt = f""" |
|
If you can't answer the next Query using the Search Content, say 'No Answer Is Available' and then just give guidance for the query. |
|
Query: {message} |
|
Search Content: |
|
{source_knowledge} |
|
""" |
|
|
|
msg=HumanMessage(content=augmented_prompt) |
|
messages.append(msg) |
|
response = chat_model.invoke(messages) |
|
messages.append(AIMessage(content=response.content)) |
|
|
|
if len(messages) >= 8: |
|
messages = messages[-8:] |
|
|
|
history[-1][1] = "" |
|
for character in response.content: |
|
history[-1][1] += character |
|
time.sleep(0.0025) |
|
yield history |
|
|
|
def Chart_Generator(history): |
|
messages = [ |
|
SystemMessage(content="You are a helpful assistant."), |
|
HumanMessage(content="Hi AI, how are you today?"), |
|
AIMessage(content="I'm great thank you. How can I help you?") |
|
] |
|
|
|
message = history[-1][0] |
|
chart_url = f"https://quickchart.io/natural/{message}" |
|
response = requests.get(chart_url) |
|
|
|
if response.status_code == 200: |
|
image_html = f'<img src="{chart_url}" alt="Generated Chart" style="display: block; margin: auto; max-width: 100%; max-height: 100%;">' |
|
message_with_description = f"Describe and analyse the content of this chart: {chart_url}" |
|
|
|
prompt = HumanMessage(content=message_with_description) |
|
messages.append(prompt) |
|
|
|
response = chat_model.invoke(messages) |
|
messages.append(AIMessage(content=response.content)) |
|
|
|
if len(messages) >= 8: |
|
messages = messages[-8:] |
|
|
|
combined_content = f'{image_html}<br>{response.content}' |
|
else: |
|
response_text = "Can't generate this image. Please provide valid chart details." |
|
combined_content = response_text |
|
|
|
history[-1][1] = "" |
|
for character in combined_content: |
|
history[-1][1] += character |
|
time.sleep(0.0025) |
|
yield history |
|
|
|
def Link_Scratch(history): |
|
messages = [ |
|
SystemMessage(content="You are a helpful assistant."), |
|
HumanMessage(content="Hi AI, how are you today?"), |
|
AIMessage(content="I'm great thank you. How can I help you?") |
|
] |
|
|
|
combined_message = history[-1][0] |
|
|
|
link = "" |
|
user_message = "" |
|
if "\n" in combined_message: |
|
link, user_message = combined_message.split("\n", 1) |
|
link = link.strip() |
|
user_message = user_message.strip() |
|
|
|
result = read_data(link) |
|
|
|
if result in ["Unsupported type or format.", "Please provide a valid webpage link", |
|
"Invalid YouTube URL. Please provide a valid YouTube link.", |
|
"Please choose a YouTube video with available English or Arabic transcripts.", |
|
"Invalid YouTube video URL. Please provide a valid YouTube video link."]: |
|
response_message = result |
|
else: |
|
content_data = normalize_text(result) |
|
if not content_data: |
|
response_message = "The provided link is empty or does not contain any meaningful words." |
|
else: |
|
augmented_prompt = f""" |
|
If you can't answer the next Query using the Link Content, say 'No Answer Is Available' and then just give guidance for the query. |
|
Query: {user_message} |
|
Link Content: |
|
{content_data} |
|
""" |
|
message = HumanMessage(content=augmented_prompt) |
|
messages.append(message) |
|
response = chat_model.invoke(messages) |
|
messages.append(AIMessage(content=response.content)) |
|
|
|
if len(messages) >= 1: |
|
messages = messages[-1:] |
|
|
|
response_message = response.content |
|
|
|
history[-1][1] = "" |
|
for character in response_message: |
|
history[-1][1] += character |
|
time.sleep(0.0025) |
|
yield history |
|
|
|
def insert_line_breaks(text, every=8): |
|
return '\n'.join(text[i:i+every] for i in range(0, len(text), every)) |
|
|
|
def display_file_name(file): |
|
supported_extensions = ['.csv', '.txt', '.pdf', '.docx', '.pptx', '.xlsx', '.json', '.html', '.xml'] |
|
file_extension = os.path.splitext(file.name)[1] |
|
if file_extension.lower() in supported_extensions: |
|
file_name = os.path.basename(file.name) |
|
file_name_with_breaks = insert_line_breaks(file_name) |
|
icon_url = "https://img.icons8.com/ios-filled/50/0000FF/file.png" |
|
return f"<div style='display: flex; align-items: center;'><img src='{icon_url}' alt='file-icon' style='width: 20px; height: 20px; margin-right: 5px;'><b style='color:blue;'>{file_name_with_breaks}</b></div>" |
|
else: |
|
raise gr.Error("( Supported File Types Only : PDF , CSV , TXT , DOCX , PPTX , XLSX , JSON , HTML , XML )") |
|
|
|
def File_Interact(history,filepath): |
|
messages = [ |
|
SystemMessage(content="You are a helpful assistant."), |
|
HumanMessage(content="Hi AI, how are you today?"), |
|
AIMessage(content="I'm great thank you. How can I help you?")] |
|
|
|
combined_message = history[-1][0] |
|
|
|
link = "" |
|
user_message = "" |
|
if "\n" in combined_message: |
|
link, user_message = combined_message.split("\n", 1) |
|
user_message = user_message.strip() |
|
|
|
result = read_data(filepath) |
|
|
|
if result == "Unsupported type or format.": |
|
response_message = result |
|
else: |
|
content_data = normalize_text(result) |
|
if not content_data: |
|
response_message = "The file is empty or does not contain any meaningful words." |
|
else: |
|
augmented_prompt = f""" |
|
If you can't answer the next Query using the File Content, say 'No Answer Is Available' and then just give guidance for the query. |
|
Query: {user_message} |
|
File Content: |
|
{content_data} |
|
""" |
|
message = HumanMessage(content=augmented_prompt) |
|
messages.append(message) |
|
response = chat_model.invoke(messages) |
|
messages.append(AIMessage(content=response.content)) |
|
|
|
if len(messages) >= 1: |
|
messages = messages[-1:] |
|
|
|
response_message = response.content |
|
|
|
history[-1][1] = "" |
|
for character in response_message: |
|
history[-1][1] += character |
|
time.sleep(0.0025) |
|
yield history |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
with gr.Row(): |
|
gr.Markdown("""<span style='font-weight: bold; color: blue; font-size: large;'>Choose Your Mode</span>""") |
|
gr.Markdown("""<div style='margin-left: -120px;'><span style='font-weight: bold; color: blue; font-size: xx-large;'>IT ASSISTANT</span></div>""") |
|
|
|
with gr.Tab("Chat-Message"): |
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
bubble_full_width=False, |
|
height=500, |
|
placeholder="<span style='font-weight: bold; color: blue; font-size: x-large;'>Feel Free To Ask Me Anything Or Start A Conversation On Any Topic...</span>" |
|
) |
|
with gr.Row(): |
|
msg = gr.Textbox(show_label=False, placeholder="Type a message...", scale=10, container=False) |
|
submit = gr.Button("➡️Send", scale=1) |
|
|
|
clear = gr.ClearButton([msg, chatbot]) |
|
|
|
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=True).then(Chat_Message, chatbot, chatbot) |
|
submit.click(user, [msg, chatbot], [msg, chatbot], queue=True).then(Chat_Message, chatbot, chatbot) |
|
chatbot.like(print_like_dislike, None, None) |
|
|
|
with gr.Tab("Web-Search"): |
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
bubble_full_width=False, |
|
height=500, |
|
placeholder="<span style='font-weight: bold; color: blue; font-size: x-large;'>Demand What You Seek, And I'll Search The Web For The Most Relevant Information...</span>" |
|
) |
|
with gr.Row(): |
|
msg = gr.Textbox(show_label=False, placeholder="Type a message...", scale=10, container=False) |
|
submit = gr.Button("➡️Send", scale=1) |
|
|
|
clear = gr.ClearButton([msg, chatbot]) |
|
|
|
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=True).then(Web_Search, chatbot, chatbot) |
|
submit.click(user, [msg, chatbot], [msg, chatbot], queue=True).then(Web_Search, chatbot, chatbot) |
|
chatbot.like(print_like_dislike, None, None) |
|
|
|
with gr.Tab("Chart-Generator"): |
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
bubble_full_width=False, |
|
height=500, |
|
placeholder="<span style='font-weight: bold; color: blue; font-size: x-large;'>Request Any Chart Or Graph By Giving The Data Or A Description, And I'll Create It...</span>" |
|
) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox(show_label=False, placeholder="Type a message...", scale=10, container=False) |
|
submit = gr.Button("➡️Send", scale=1) |
|
|
|
clear = gr.ClearButton([msg, chatbot]) |
|
|
|
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=True).then(Chart_Generator, chatbot, chatbot) |
|
submit.click(user, [msg, chatbot], [msg, chatbot], queue=True).then(Chart_Generator, chatbot, chatbot) |
|
chatbot.like(print_like_dislike, None, None) |
|
|
|
with gr.Tab("Link-Scratch"): |
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
bubble_full_width=False, |
|
height=500, |
|
placeholder="<span style='font-weight: bold; color: blue; font-size: x-large;'>Provide A Link Of Web page Or YouTube Video And Inquire About Its Details...</span>" |
|
) |
|
|
|
with gr.Row(): |
|
msg1 = gr.Textbox(show_label=False, placeholder="Paste your link...", scale=4, container=False) |
|
msg2 = gr.Textbox(show_label=False, placeholder="Type a message...", scale=7, container=False) |
|
submit = gr.Button("➡️Send", scale=1) |
|
|
|
clear = gr.ClearButton([msg2, chatbot, msg1]) |
|
|
|
msg1.submit(user2, [msg2, chatbot, msg1], [msg2, chatbot, msg1], queue=True).then(Link_Scratch, chatbot, chatbot) |
|
msg2.submit(user2, [msg2, chatbot, msg1], [msg2, chatbot, msg1], queue=True).then(Link_Scratch, chatbot, chatbot) |
|
submit.click(user2, [msg2, chatbot, msg1], [msg2, chatbot, msg1], queue=True).then(Link_Scratch, chatbot, chatbot) |
|
chatbot.like(print_like_dislike, None, None) |
|
|
|
with gr.Tab("File-Interact"): |
|
chatbot = gr.Chatbot( |
|
[], |
|
elem_id="chatbot", |
|
bubble_full_width=False, |
|
height=500, |
|
placeholder="<span style='font-weight: bold; color: blue; font-size: x-large;'>Upload A File And Explore Questions Related To Its Content...</span><br>( Supported File Types Only : PDF , CSV , TXT , DOCX , PPTX , XLSX , JSON , HTML , XML )" |
|
) |
|
|
|
with gr.Column(): |
|
with gr.Row(): |
|
filepath = gr.UploadButton("Upload a file", file_count="single", scale=1) |
|
msg = gr.Textbox(show_label=False, placeholder="Type a message...", scale=7, container=False) |
|
submit = gr.Button("➡️Send", scale=1) |
|
with gr.Row(): |
|
file_output = gr.HTML("<div style='height: 20px; width: 30px;'></div>") |
|
clear = gr.ClearButton([msg, filepath, chatbot,file_output],scale=6) |
|
|
|
filepath.upload(display_file_name, inputs=filepath, outputs=file_output) |
|
|
|
msg.submit(user3, [msg, chatbot, file_output], [msg, chatbot, file_output], queue=True).then(File_Interact, [chatbot, filepath],chatbot) |
|
submit.click(user3, [msg, chatbot, file_output], [msg, chatbot, file_output], queue=True).then(File_Interact, [chatbot, filepath],chatbot) |
|
chatbot.like(print_like_dislike, None, None) |
|
|
|
demo.queue(max_size=5) |
|
demo.launch(max_file_size="5mb",show_api=False) |