Spaces:
Running
Running
from openai import AzureOpenAI | |
from langchain_openai import AzureChatOpenAI | |
import os | |
import ffmpeg | |
from typing import List | |
from moviepy.editor import VideoFileClip | |
import nltk | |
from huggingface_hub import InferenceClient | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from langchain import HuggingFaceHub, PromptTemplate, LLMChain | |
import gradio as gr | |
from pytube import YouTube | |
import requests | |
import logging | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
class VideoAnalytics: | |
""" | |
Class for performing analytics on videos including transcription, summarization, topic generation, | |
and extraction of important sentences. | |
""" | |
def __init__(self): | |
""" | |
Initialize the VideoAnalytics object. | |
Args: | |
hf_token (str): Hugging Face API token. | |
""" | |
# Initialize AzureOpenAI client | |
self.client = AzureOpenAI() | |
self.mistral_client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
# Initialize transcribed text variable | |
self.transcribed_text = "" | |
# API URL for accessing the Hugging Face model | |
self.API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" | |
# Placeholder for Hugging Face API token | |
self.hf_token = "HF_TOKEN" # Replace this with the actual Hugging Face API token | |
# Set headers for API requests with Hugging Face token | |
self.headers = {"Authorization": f"Bearer {self.hf_token}"} | |
# Initialize english text variable | |
self.english_text = "" | |
self.openai_llm = AzureChatOpenAI( | |
deployment_name="ChatGPT", | |
) | |
# Configure logging settings | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def transcribe_video(self, vid: str) -> str: | |
""" | |
Transcribe the audio of the video. | |
Args: | |
vid (str): Path to the video file. | |
Returns: | |
str: Transcribed text. | |
""" | |
try: | |
# Load the video file and extract audio | |
video = VideoFileClip(vid) | |
audio = video.audio | |
# Write audio to a temporary file | |
audio.write_audiofile("output_audio.mp3") | |
audio_file = open("output_audio.mp3", "rb") | |
# Define a helper function to query the Hugging Face model | |
def query(data): | |
response = requests.post(self.API_URL, headers=self.headers, data=data) | |
return response.json() | |
# Send audio data to the Hugging Face model for transcription | |
output = query(audio_file) | |
# Update the transcribed_text attribute with the transcription result | |
self.transcribed_text = output["text"] | |
# Update the translation text into english_text | |
self.english_text = self.translation() | |
# Return the transcribed text | |
return output["text"] | |
except Exception as e: | |
logging.error(f"Error transcribing video: {e}") | |
return "" | |
def generate_video_summary(self) -> str: | |
""" | |
Generate a summary of the transcribed video. | |
Returns: | |
str: Generated summary. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Summarizer"}, | |
{"role": "user", "content": f"""summarize the following text delimited by triple backticks.Output must in english. | |
In two format of Outputs given below: | |
Abstractive Summary: | |
Extractive Summary: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="ChatGPT", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated summary message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating video summary: {e}") | |
return "" | |
def generate_topics(self) -> str: | |
""" | |
Generate topics from the transcribed video. | |
Returns: | |
str: Generated topics. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Topic Generator"}, | |
{"role": "user", "content": f"""generate single Topics from the following text don't make sentence for topic generation,delimited by triple backticks.Output must in english. | |
list out the topics: | |
Topics: | |
```{self.english_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="ChatGPT", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating topics: {e}") | |
return "" | |
def translation(self) -> str: | |
""" | |
translation from the transcribed video. | |
Returns: | |
str: translation. | |
""" | |
try: | |
# Define a conversation between system and user | |
conversation = [ | |
{"role": "system", "content": "You are a Multilingual Translator"}, | |
{"role": "user", "content": f""" Translate the following text in English ,delimited by triple backticks. | |
```{self.transcribed_text}``` | |
"""} | |
] | |
# Generate completion using ChatGPT model | |
response = self.client.chat.completions.create( | |
model="ChatGPT", | |
messages=conversation, | |
temperature=0, | |
max_tokens=1000 | |
) | |
# Get the generated topics message | |
message = response.choices[0].message.content | |
return message | |
except Exception as e: | |
logging.error(f"Error generating topics: {e}") | |
return "" | |
def format_prompt(self, question: str, data: str) -> str: | |
""" | |
Formats the prompt for the language model. | |
Args: | |
question (str): The user's question. | |
data (str): The data to be analyzed. | |
Returns: | |
str: Formatted prompt. | |
""" | |
prompt = "<s>" | |
prompt = f"""[INST] you are the german language and universal language expert .your task is analyze the given data and user ask any question about given data answer to the user question.your returning answer must in user's language.otherwise reply i don't know. | |
data:{data} | |
question:{question}[/INST]""" | |
prompt1 = f"[INST] {question} [/INST]" | |
return prompt+prompt1 | |
def generate(self, prompt: str, transcribed_text: str, temperature=0.9, max_new_tokens=5000, top_p=0.95, | |
repetition_penalty=1.0) -> str: | |
""" | |
Generates text based on the prompt and transcribed text. | |
Args: | |
prompt (str): The prompt for generating text. | |
transcribed_text (str): The transcribed text for analysis. | |
temperature (float): Controls the randomness of the sampling. Default is 0.9. | |
max_new_tokens (int): Maximum number of tokens to generate. Default is 5000. | |
top_p (float): Nucleus sampling parameter. Default is 0.95. | |
repetition_penalty (float): Penalty for repeating the same token. Default is 1.0. | |
Returns: | |
str: Generated text. | |
""" | |
try: | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=42, | |
) | |
# Format the prompt | |
formatted_prompt = self.format_prompt(prompt,transcribed_text) | |
# Generate text using the mistral client | |
stream = self.mistral_client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
# Concatenate generated text | |
for response in stream: | |
output += response.token.text | |
return output.replace("</s>","") | |
except Exception as e: | |
logging.error(f"Error in text generation: {e}") | |
return "An error occurred during text generation." | |
def video_qa(self, question: str, model: str) -> str: | |
""" | |
Performs video question answering. | |
Args: | |
question (str): The question asked by the user. | |
model (str): The language model to be used ("OpenAI" or "Mixtral"). | |
Returns: | |
str: Answer to the user's question. | |
""" | |
try: | |
if model == "OpenAI": | |
template = """you are the universal language expert .your task is analyze the given text and user ask any question about given text answer to the user question.otherwise reply i don't know. | |
extracted_text:{text} | |
user_question:{question}""" | |
prompt = PromptTemplate(template=template, input_variables=["text","question"]) | |
llm_chain = LLMChain(prompt=prompt, verbose=True, llm=self.openai_llm) | |
# Run the language model chain | |
result = llm_chain.run({"text":self.english_text,"question":question}) | |
return result | |
elif model == "Mixtral": | |
# Generate answer using Mixtral model | |
result = self.generate(question,self.english_text) | |
return result | |
except Exception as e: | |
logging.error(f"Error in video question answering: {e}") | |
return "An error occurred during video question answering." | |
def extract_video_important_sentence(self) -> str: | |
""" | |
Extract important sentences from the transcribed video. | |
Returns: | |
str: Extracted important sentences. | |
""" | |
try: | |
# Tokenize the sentences | |
sentences = nltk.sent_tokenize(self.english_text) | |
# Initialize TF-IDF vectorizer | |
tfidf_vectorizer = TfidfVectorizer() | |
# Fit the vectorizer on the summary sentences | |
tfidf_matrix = tfidf_vectorizer.fit_transform(sentences) | |
# Calculate sentence scores based on TF-IDF values | |
sentence_scores = tfidf_matrix.sum(axis=1) | |
# Create a list of (score, sentence) tuples | |
sentence_rankings = [(score, sentence) for score, sentence in zip(sentence_scores, sentences)] | |
# Sort sentences by score in descending order | |
sentence_rankings.sort(reverse=True) | |
# Set a threshold for selecting sentences | |
threshold = 2.5 # Adjust as needed | |
# Select sentences with scores above the threshold | |
selected_sentences = [sentence for score, sentence in sentence_rankings if score >= threshold] | |
# Join selected sentences to form the summary | |
summary = '\n\n'.join(selected_sentences) | |
return summary | |
except Exception as e: | |
logging.error(f"Error extracting important sentences: {e}") | |
return "" | |
def write_text_files(self, text: str, filename: str) -> None: | |
""" | |
Write text to a file. | |
Args: | |
text (str): Text to be written to the file. | |
filename (str): Name of the file. | |
""" | |
try: | |
file_path = f"{filename}.txt" | |
with open(file_path, 'w') as file: | |
# Write content to the file | |
file.write(text) | |
except Exception as e: | |
logging.error(f"Error writing text to file: {e}") | |
def Download(self, link: str) -> str: | |
""" | |
Download a video from YouTube. | |
Args: | |
link (str): YouTube video link. | |
Returns: | |
str: Path to the downloaded video file. | |
""" | |
try: | |
# Initialize YouTube object with the provided link | |
youtubeObject = YouTube(link) | |
# Get the highest resolution stream | |
youtubeObject = youtubeObject.streams.get_highest_resolution() | |
try: | |
# Attempt to download the video | |
file_name = youtubeObject.download() | |
return file_name | |
except: | |
# Log any errors that occur during video download | |
logging.info("An error has occurred") | |
logging.info("Download is completed successfully") | |
except Exception as e: | |
# Log any errors that occur during initialization of YouTube object | |
logging.error(f"Error downloading video: {e}") | |
return "" | |
def main(self, video: str = None, input_path: str = None) -> tuple: | |
""" | |
Perform video analytics. | |
Args: | |
video (str): Path to the video file. | |
input_path (str): Input path for the video. | |
Returns: | |
tuple: Summary, important sentences, and topics. | |
""" | |
try: | |
video = VideoFileClip(input_path) | |
duration = video.duration | |
video.close() | |
if round(duration) < 600: | |
# Download the video if input_path is provided, otherwise use the provided video path | |
if input_path: | |
input_path = self.Download(input_path) | |
text = self.transcribe_video(input_path) | |
elif video: | |
text = self.transcribe_video(video) | |
input_path = video | |
# Generate summary, important sentences, and topics | |
summary = self.generate_video_summary() | |
self.write_text_files(summary,"Summary") | |
important_sentences = self.extract_video_important_sentence() | |
self.write_text_files(important_sentences,"Important_Sentence") | |
topics = self.generate_topics() | |
self.write_text_files(topics,"Topics") | |
# Return the generated summary, important sentences, and topics | |
return summary,important_sentences,topics | |
else: | |
return "Video Duration Above 10 Minutes,Try Below 10 Minutes Video","","" | |
except Exception as e: | |
# Log any errors that occur during video analytics | |
logging.error(f"Error in main function: {e}") | |
return "", "", "" | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css",theme=gr.themes.Soft()) as demo: | |
gr.HTML("""<center><h1>Video Analytics</h1></center>""") | |
with gr.Row(): | |
yt_link = gr.Textbox(label= "Youtube Link",placeholder="https://www.youtube.com/watch?v=") | |
with gr.Row(): | |
video = gr.Video(sources="upload",height=200,width=300) | |
with gr.Row(): | |
submit_btn = gr.Button(value="Submit") | |
with gr.Tab("Summary"): | |
with gr.Row(): | |
summary = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
summary_download = gr.DownloadButton(label="Download",value="Summary.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Tab("Important Sentences"): | |
with gr.Row(): | |
Important_Sentences = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
sentence_download = gr.DownloadButton(label="Download",value="Important_Sentence.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Tab("Topics"): | |
with gr.Row(): | |
Topics = gr.Textbox(show_label=False,lines=10) | |
with gr.Row(): | |
topics_download = gr.DownloadButton(label="Download",value="Topics.txt",visible=True,size='lg',elem_classes="download_button") | |
with gr.Tab("Video QA"): | |
with gr.Row(): | |
with gr.Column(scale=0.70): | |
question = gr.Textbox(show_label=False,placeholder="Ask Your Questions...") | |
with gr.Column(scale=0.30): | |
model = gr.Dropdown(["OpenAI", "Mixtral"], label="Models") | |
with gr.Row(): | |
result = gr.Textbox(label='Answer',lines=10) | |
submit_btn.click(self.main,[video,yt_link],[summary,Important_Sentences,Topics]) | |
question.submit(self.video_qa,[question,model],result) | |
demo.launch() | |
if __name__ == "__main__": | |
video_analytics = VideoAnalytics() | |
video_analytics.gradio_interface() |