vsrinivas's picture
Update app.py
55954ef verified
raw
history blame
6.87 kB
from pytubefix import YouTube
from moviepy.editor import VideoFileClip, AudioFileClip
from pydub import AudioSegment
import whisper
import pandas as pd
import nltk
from nltk.tokenize import sent_tokenize
nltk.download('punkt')
import gradio as gr
import ast
from IPython.display import Audio, display
import requests
nltk.download('punkt_tab')
import io
import ffmpeg
import subprocess
import tempfile
import os
from pytubefix.cli import on_progress
model = whisper.load_model("base")
def extract_yt_audio(it, video_url, video_file):
"""
Takes youtube url (youtobe_url) and path where audio clip will be stored (audio_path)
in string format as input arguments.
Returns the extracted video clip (video) and the path to audio clip (audio_path).
"""
if it == 'URL' and ("youtube.com" in video_url or "youtu.be" in video_url):
yt = YouTube(video_url, use_oauth=True, allow_oauth_cache=True, on_progress_callback = on_progress)
a = yt.streams.filter(only_audio=True).first()
audio_file = a.download()
sample = AudioSegment.from_file(audio_file, format="mp4")
elif it == 'URL' and ("https://www" in video_url or "https://" in video_url or "www." in video_url):
response = requests.get(video_url)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
temp_file.write(response.content)
temp_file_path = temp_file.name
wav_file_path = temp_file_path.replace(".mp4", ".wav")
subprocess.run(["ffmpeg", "-i", temp_file_path, "-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", wav_file_path], check=True)
sample = AudioSegment.from_wav(wav_file_path)
os.remove(temp_file_path)
os.remove(wav_file_path)
elif it == 'URL':
sample = AudioSegment.from_file(video_url)
else:
sample = AudioSegment.from_file(video_file)
audio_path = 'audio.wav'
# display(Audio(audio_path))
sample.export(audio_path, format="wav")
print("Transcription started \nTranscript:\n")
result = model.transcribe(audio_path)
print(result['text'], '\n')
return gr.update(visible=True, value=result['text']), gr.update(visible=True), result['segments'], gr.update(visible=True, value=audio_path)
def semantic_chunks(segs, max_chunk_length=15.0):
print("Trying to get symantically chunked segments:")
"""
Takes segments of transcribed audio and 15secs as maximum check duration and returns chunks of the audio as a list.
"""
segs = ast.literal_eval(segs)
chunks = []
current_chunk = []
chunk_start_time = None
chunk_end_time = None
chunk_duration = 0
# iterate over segments and create chunks out of each segment
for segment in segs:
start = segment['start']
end = segment['end']
text = segment['text']
# sentence tokenize each segment to capture more semantic context
sentences = sent_tokenize(text)
# iterate over the sentences and group them into chunks subject to the max_chunk_length is 15 secs
for sentence in sentences:
sentence_duration = (end - start) / len(sentences)
# Check if adding the sentence exceeds the max_chunk_length of 15 secs
if chunk_duration + sentence_duration <= max_chunk_length:
if not current_chunk:
chunk_start_time = start
current_chunk.append(sentence)
chunk_duration += sentence_duration
chunk_end_time = end
else:
# If the chunk would be too long, finalize the current chunk with required parameters
chunks.append({
'chunk_id': len(chunks) + 1,
'chunk_length (secs)': chunk_duration,
'semantic_chunk': ' '.join(current_chunk),
'start_time (secs)': chunk_start_time,
'end_time (secs)': chunk_end_time
})
# Start a new chunk with the current sentence
current_chunk = [sentence]
chunk_start_time = start
chunk_end_time = end
chunk_duration = sentence_duration
# Finalize the last chunk if it exists
if current_chunk:
chunks.append({
'chunk_id': len(chunks) + 1,
'chunk_length (secs)': chunk_duration,
'semantic_chunk': ' '.join(current_chunk),
'start_time (secs)': chunk_start_time,
'end_time (secs)': chunk_end_time
})
print(pd.DataFrame(chunks))
return gr.update(visible=True, value=pd.DataFrame(chunks))
def toggle_input_fields(input_type):
if input_type == "URL":
return gr.update(visible=True, value='sample.mp4'), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True)
else:
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
def clear_all():
return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False))
with gr.Blocks() as demo:
gr.Markdown(
"""
# Extract audio from video, get the transcript and then get the semantic chunk information.
""")
# Radio button to choose between URL or upload
input_type = gr.Radio(choices=["URL", "Upload"], label="Select Video Input Type")
# input_url = gr.Textbox(label="Type-in the URL or File Location of the Video", value='https://www.youtube.com/watch?v=ug5e4JfC3oo')
input_url = gr.Textbox(label="Enter Video URL", visible=False)
video_file = gr.File(label="Upload Video", visible=False)
# input_url = gr.Textbox(label="Type-in the URL or File Location of the Video", value='sample.mp4')
segments = gr.Textbox(visible=False)
submit_btn_1 = gr.Button("Get the Transcript", visible=True)
audio = gr.Audio(visible=False, type="filepath", label='Play Audio')
transcript = gr.Textbox(visible=False, label='Transcript')
submit_btn_2 = gr.Button("Get the semantically Chuncked Segments", visible=False)
chunks = gr.Dataframe(visible=False, label = 'Semantic Chunks')
clear_btn = gr.Button("Clear")
input_type.change(fn=toggle_input_fields, inputs=input_type, outputs=[input_url, video_file, audio, transcript])
submit_btn_1.click(fn=extract_yt_audio, inputs=[input_type, input_url, video_file], outputs=[transcript, submit_btn_2, segments, audio])
# submit_btn_1.click(fn=extract_yt_audio, inputs=[input_url], outputs=[transcript, submit_btn_2, segments, audio])
submit_btn_2.click(fn=semantic_chunks, inputs=[segments], outputs=[chunks])
clear_btn.click(fn=clear_all, outputs=[input_url, video_file, transcript, submit_btn_2, chunks, audio])
demo.launch(debug=True)