Spaces:
Sleeping
Sleeping
import random | |
import Phonemize | |
from Levenshtein import editops | |
from gradio.components import Audio, Dropdown, Textbox, Image | |
import gradio as gr | |
import transcriber | |
import json | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from scipy.io import wavfile | |
from scipy.signal import spectrogram | |
import numpy as np | |
from torch import nn | |
engine = transcriber.transcribe_SA(model_path='models/SA',verbose=0) | |
phonemizer = Phonemize.phonemization() | |
arpa2ipa = pd.read_csv('data/arpa2ipa.csv', sep='\\s+', header=None, names=['arpa','ipa']) | |
prompts = np.loadtxt('data/prompts.txt', dtype=str) | |
Attributes = engine.att_list | |
df_output = None | |
def select_prompt(): | |
return random.choice(prompts) | |
def phonemize_prompt(prompt, is_ipa=False): | |
phonemes = phonemizer.cmu_phonemize(prompt) | |
phonemes = [ph.lower() for ph in phonemes] | |
if is_ipa: | |
phonemes = [arpa2ipa[arpa2ipa.arpa==ph].ipa.values[0] for ph in phonemes] | |
return ' '.join(phonemes) | |
def diff_fn(): | |
return [('H','+'),('E','-'),('N',None),('\n', None),('F','-'),('Fgo','-'),('M','+')] | |
def recognizeAudio(audio_file, attributes, is_ipa=False): | |
global df_output | |
if is_ipa: | |
p2att_matrix = 'data/p2att_en_us-ipa.csv' | |
else: | |
p2att_matrix = 'data/p2att_en_us-arpa.csv' | |
output = engine.transcribe(audio_file, attributes= 'all', phonological_matrix_file=p2att_matrix, human_readable=False) | |
records = [] | |
d = json.loads(output) | |
phonemes = d['Phoneme']['symbols'] | |
records.append(['Phoneme']+phonemes) | |
for att in d['Attributes']: | |
records.append([att['Name']]+att['Pattern']) | |
df = pd.DataFrame.from_records(records) | |
df.fillna('', inplace=True) | |
df_output = df | |
return df[df[0].isin(['Phoneme']+list(attributes))].to_html(header=False, index=False) | |
#Get error by matching the expected sequence with the recognized one and return the output in a format that can be visualized by the gradio HighlightedText box | |
def get_error(exp_list, rec_list): | |
exp_list = list(exp_list) | |
rec_list = list(rec_list) | |
vocab = set(exp_list+rec_list) | |
w2c = dict(zip(vocab,range(len(vocab)))) | |
exp_out = [[a,None] for a in exp_list] | |
rec_out = [[a,None] for a in rec_list] | |
exp_enc = ''.join([chr(w2c[c]) for c in exp_list]) | |
rec_enc = ''.join([chr(w2c[c]) for c in rec_list]) | |
for op, exp_i, rec_i in editops(exp_enc, rec_enc): | |
if op == 'replace': | |
exp_out[exp_i][1] = 'S' | |
rec_out[rec_i][1] = 'S' | |
elif op == 'insert': | |
rec_out[rec_i][1] = 'I' | |
elif op == 'delete': | |
exp_out[exp_i][1] = 'D' | |
diff_list = [['Expected:\t', None]] + exp_out + [['\n',None]] + [['Recognized :\t', None]] + rec_out | |
return diff_list | |
def scale_vector(vector, new_min, new_max): | |
min_val = min(vector) | |
max_val = max(vector) | |
scaled_vector = [] | |
for val in vector: | |
scaled_val = ((val - min_val) * (new_max - new_min) / (max_val - min_val)) + new_min | |
scaled_vector.append(scaled_val) | |
return scaled_vector | |
def create_spectrogram_with_att(wav_file, att_contour, att ): | |
# Read the WAV file | |
sampling_rate, data = wavfile.read(wav_file) | |
# Calculate the spectrogram | |
f, t, Sxx = spectrogram(data, fs=sampling_rate) | |
fig, axs = plt.subplots(2, 1, figsize=(10, 10), sharex=True) | |
# Plot the spectrogram | |
axs[0].pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud') # Use grayscale colormap | |
#plt.colorbar(label='Intensity (dB)') | |
axs[0].set_ylabel('Frequency (Hz)') | |
axs[0].set_xlabel('Time (s)') | |
axs[0].set_title(f'Spectrogram with {att} Contour') | |
axs[0].set_ylim(0, 8000) # Adjust the frequency range if necessary | |
ax_att = axs[0].twinx() | |
# Plot the att contour | |
x_points = att_contour.shape[0] | |
time_att = np.arange(0, x_points * 0.02, 0.02)[:x_points] # Assuming pitch_contour is sampled every 20 ms | |
ax_att.plot(time_att, att_contour, color='blue', label=f'{att} Contour') | |
ax_att.set_ylim(0,1) | |
ax_att.legend() | |
# Plot the waveform | |
time = np.arange(0, len(data)) / sampling_rate | |
axs[1].plot(time, data, color='blue') | |
axs[1].set_ylabel('Amplitude') | |
axs[1].set_xlabel('Time (s)') | |
axs[1].set_title('Waveform') | |
#plt.show() | |
return fig | |
def plot_contour(audio_file, att): | |
indx_n = engine.processor.tokenizer.convert_tokens_to_ids([f'n_{att}'])[0] | |
indx_p = engine.processor.tokenizer.convert_tokens_to_ids([f'p_{att}'])[0] | |
index_all = [engine.processor.tokenizer.pad_token_id, indx_n, indx_p] | |
prob = nn.functional.softmax(engine.logits.squeeze()[:,index_all], dim=-1) | |
att_contour = prob[:,-1] | |
fig = create_spectrogram_with_att(audio_file, att_contour, att) | |
return fig | |
with gr.Blocks() as gui: | |
with gr.Tab("Main"): | |
prompt = gr.Textbox(label='Prompt', value=select_prompt) | |
get_prompt = gr.Button("Get Prompt") | |
get_prompt.click(fn=select_prompt, outputs=prompt) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
prompt_phonemes = gr.Textbox(label="Expected Phonemes", interactive=False) | |
with gr.Column(scale=1): | |
is_ipa = gr.Checkbox(label="IPA") | |
get_phoneme = gr.Button("Get Phonemes") | |
get_phoneme.click(fn=phonemize_prompt, inputs=[prompt, is_ipa], outputs=prompt_phonemes) | |
record_audio = gr.Audio(sources=["microphone","upload"], type="filepath") | |
att_list = gr.Dropdown(label="Select Attributes", choices=sorted(Attributes), value=['vowel', 'voiced', 'consonant'] ,multiselect=True) | |
process = gr.Button("Process Audio") | |
recognition = gr.HTML(label='Output') | |
process.click(fn=recognizeAudio, inputs=[record_audio,att_list, is_ipa], outputs=recognition) | |
with gr.Tab("Assessment"): | |
assess = gr.Button("Assessment") | |
diff = [] | |
for i in range(len(Attributes)+1): | |
diff.append(gr.HighlightedText( | |
combine_adjacent=False, | |
show_legend=True, | |
color_map={"S": "red", "I": "green", "D":"blue"}, visible=False)) | |
def get_assessment(prompt_phonemes):#, recognized_phonemes, recognized_attributes): | |
outputs = [gr.HighlightedText(visible=False)]*(df_output.shape[0]) | |
outputs[0] = gr.HighlightedText(label=f"Phoneme Assessment", | |
value=get_error(prompt_phonemes.split(), df_output.iloc[0].values[1:]), | |
visible=True) | |
i = 1 | |
for j,r in df_output.iloc[1:].iterrows(): | |
convert = lambda ph: '-' if f'n_{att}' in engine.p2att_map[ph] else '+' | |
att = r.iloc[0] | |
exp_att = [convert(ph) for ph in prompt_phonemes.split()] | |
rec_att = r.iloc[1:].values | |
if ''.join(exp_att) != ''.join(rec_att): | |
outputs[i] = gr.HighlightedText(label=f"{att} Assessment", | |
value=get_error(exp_att, rec_att), | |
visible=True) | |
i += 1 | |
return outputs | |
assess.click(fn=get_assessment, inputs= [prompt_phonemes], outputs=diff) | |
with gr.Tab("Analysis"): | |
selected_att = gr.Dropdown( sorted(Attributes), label="Select an Attribute to plot", value='voiced', interactive=True) | |
do_plot = gr.Button('Plot') | |
plot_block = gr.Plot(label='Spectrogram with Attribute Contour') | |
do_plot.click(plot_contour, inputs=[record_audio,selected_att], outputs=plot_block) | |
gui.launch() | |