Spaces:
Running
on
T4
Running
on
T4
import gradio as gr | |
from huggingface_hub import snapshot_download | |
from threading import Thread | |
import os | |
import time | |
import gradio as gr | |
import base64 | |
import numpy as np | |
import requests | |
import traceback | |
from server import serve | |
repo_id = "gpt-omni/mini-omni" | |
snapshot_download(repo_id, local_dir="./checkpoint", revision="main") | |
IP='0.0.0.0' | |
PORT=60808 | |
thread = Thread(target=serve, daemon=True) | |
thread.start() | |
API_URL = "http://0.0.0.0:60808/chat" | |
# recording parameters | |
IN_CHANNELS = 1 | |
IN_RATE = 24000 | |
IN_CHUNK = 1024 | |
IN_SAMPLE_WIDTH = 2 | |
VAD_STRIDE = 0.5 | |
# playing parameters | |
OUT_CHANNELS = 1 | |
OUT_RATE = 24000 | |
OUT_SAMPLE_WIDTH = 2 | |
OUT_CHUNK = 5760 | |
OUT_CHUNK = 4096 | |
OUT_RATE = 24000 | |
OUT_CHANNELS = 1 | |
def run_vad(ori_audio, sr): | |
_st = time.time() | |
try: | |
audio = np.frombuffer(ori_audio, dtype=np.int16) | |
audio = audio.astype(np.float32) / 32768.0 | |
sampling_rate = 16000 | |
if sr != sampling_rate: | |
audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) | |
vad_parameters = {} | |
vad_parameters = VadOptions(**vad_parameters) | |
speech_chunks = get_speech_timestamps(audio, vad_parameters) | |
audio = collect_chunks(audio, speech_chunks) | |
duration_after_vad = audio.shape[0] / sampling_rate | |
if sr != sampling_rate: | |
# resample to original sampling rate | |
vad_audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=sr) | |
else: | |
vad_audio = audio | |
vad_audio = np.round(vad_audio * 32768.0).astype(np.int16) | |
vad_audio_bytes = vad_audio.tobytes() | |
return duration_after_vad, vad_audio_bytes, round(time.time() - _st, 4) | |
except Exception as e: | |
msg = f"[asr vad error] audio_len: {len(ori_audio)/(sr*2):.3f} s, trace: {traceback.format_exc()}" | |
print(msg) | |
return -1, ori_audio, round(time.time() - _st, 4) | |
def warm_up(): | |
frames = b"\x00\x00" * 1024 * 2 # 1024 frames of 2 bytes each | |
dur, frames, tcost = run_vad(frames, 16000) | |
print(f"warm up done, time_cost: {tcost:.3f} s") | |
warm_up() | |
def determine_pause(stream: bytes, start_talking: bool) -> tuple[bytes, bool]: | |
"""Take in the stream, determine if a pause happened""" | |
temp_audio = stream | |
if len(temp_audio) > IN_SAMPLE_WIDTH * IN_RATE * IN_CHANNELS * VAD_STRIDE: | |
dur_vad, vad_audio_bytes, time_vad = run_vad(temp_audio, IN_RATE) | |
print(f"duration_after_vad: {dur_vad:.3f} s, time_vad: {time_vad:.3f} s") | |
if dur_vad > 0.2 and not start_talking: | |
if last_temp_audio is not None: | |
st.session_state.frames.append(last_temp_audio) | |
start_talking = True | |
if start_talking: | |
st.session_state.frames.append(temp_audio) | |
if dur_vad < 0.1 and start_talking: | |
st.session_state.recording = False | |
print(f"speech end detected. excit") | |
last_temp_audio = temp_audio | |
temp_audio = b"" | |
def process_audio(audio): | |
filepath = audio | |
print(f"filepath: {filepath}") | |
if filepath is None: | |
return | |
cnt = 0 | |
with open(filepath, "rb") as f: | |
data = f.read() | |
base64_encoded = str(base64.b64encode(data), encoding="utf-8") | |
files = {"audio": base64_encoded} | |
tik = time.time() | |
with requests.post(API_URL, json=files, stream=True) as response: | |
try: | |
for chunk in response.iter_content(chunk_size=OUT_CHUNK): | |
if chunk: | |
# Convert chunk to numpy array | |
if cnt == 0: | |
print(f"first chunk time cost: {time.time() - tik:.3f}") | |
cnt += 1 | |
audio_data = np.frombuffer(chunk, dtype=np.int16) | |
audio_data = audio_data.reshape(-1, OUT_CHANNELS) | |
yield OUT_RATE, audio_data.astype(np.int16) | |
except Exception as e: | |
print(f"error: {e}") | |
def greet(name): | |
return "Hello " + name + "!!" | |
demo = gr.Interface(fn=greet, inputs="text", outputs="text") | |
demo.launch() | |