|
import re |
|
import torch |
|
import torchaudio |
|
import numpy as np |
|
import tempfile |
|
from einops import rearrange |
|
from vocos import Vocos |
|
from pydub import AudioSegment, silence |
|
from model import CFM, UNetT, DiT, MMDiT |
|
from cached_path import cached_path |
|
from model.utils import ( |
|
load_checkpoint, |
|
get_tokenizer, |
|
convert_char_to_pinyin, |
|
save_spectrogram, |
|
) |
|
from transformers import pipeline |
|
import soundfile as sf |
|
import tomli |
|
import argparse |
|
import tqdm |
|
from pathlib import Path |
|
|
|
parser = argparse.ArgumentParser( |
|
prog="python3 inference-cli.py", |
|
description="Commandline interface for E2/F5 TTS with Advanced Batch Processing.", |
|
epilog="Specify options above to override one or more settings from config.", |
|
) |
|
parser.add_argument( |
|
"-c", |
|
"--config", |
|
help="Configuration file. Default=cli-config.toml", |
|
default="inference-cli.toml", |
|
) |
|
parser.add_argument( |
|
"-m", |
|
"--model", |
|
help="F5-TTS | E2-TTS", |
|
) |
|
parser.add_argument( |
|
"-r", |
|
"--ref_audio", |
|
type=str, |
|
help="Reference audio file < 15 seconds." |
|
) |
|
parser.add_argument( |
|
"-s", |
|
"--ref_text", |
|
type=str, |
|
default="666", |
|
help="Subtitle for the reference audio." |
|
) |
|
parser.add_argument( |
|
"-t", |
|
"--gen_text", |
|
type=str, |
|
help="Text to generate.", |
|
) |
|
parser.add_argument( |
|
"-o", |
|
"--output_dir", |
|
type=str, |
|
help="Path to output folder..", |
|
) |
|
parser.add_argument( |
|
"--remove_silence", |
|
help="Remove silence.", |
|
) |
|
args = parser.parse_args() |
|
|
|
config = tomli.load(open(args.config, "rb")) |
|
|
|
ref_audio = args.ref_audio if args.ref_audio else config["ref_audio"] |
|
ref_text = args.ref_text if args.ref_text != "666" else config["ref_text"] |
|
gen_text = args.gen_text if args.gen_text else config["gen_text"] |
|
output_dir = args.output_dir if args.output_dir else config["output_dir"] |
|
model = args.model if args.model else config["model"] |
|
remove_silence = args.remove_silence if args.remove_silence else config["remove_silence"] |
|
wave_path = Path(output_dir)/"out.wav" |
|
spectrogram_path = Path(output_dir)/"out.png" |
|
|
|
SPLIT_WORDS = [ |
|
"but", "however", "nevertheless", "yet", "still", |
|
"therefore", "thus", "hence", "consequently", |
|
"moreover", "furthermore", "additionally", |
|
"meanwhile", "alternatively", "otherwise", |
|
"namely", "specifically", "for example", "such as", |
|
"in fact", "indeed", "notably", |
|
"in contrast", "on the other hand", "conversely", |
|
"in conclusion", "to summarize", "finally" |
|
] |
|
|
|
device = ( |
|
"cuda" |
|
if torch.cuda.is_available() |
|
else "mps" if torch.backends.mps.is_available() else "cpu" |
|
) |
|
vocos = Vocos.from_pretrained("charactr/vocos-mel-24khz") |
|
|
|
print(f"Using {device} device") |
|
|
|
|
|
|
|
target_sample_rate = 24000 |
|
n_mel_channels = 100 |
|
hop_length = 256 |
|
target_rms = 0.1 |
|
nfe_step = 32 |
|
cfg_strength = 2.0 |
|
ode_method = "euler" |
|
sway_sampling_coef = -1.0 |
|
speed = 1.0 |
|
|
|
fix_duration = None |
|
|
|
def load_model(repo_name, exp_name, model_cls, model_cfg, ckpt_step): |
|
ckpt_path = str(cached_path(f"hf://SWivid/{repo_name}/{exp_name}/model_{ckpt_step}.safetensors")) |
|
|
|
vocab_char_map, vocab_size = get_tokenizer("Emilia_ZH_EN", "pinyin") |
|
model = CFM( |
|
transformer=model_cls( |
|
**model_cfg, text_num_embeds=vocab_size, mel_dim=n_mel_channels |
|
), |
|
mel_spec_kwargs=dict( |
|
target_sample_rate=target_sample_rate, |
|
n_mel_channels=n_mel_channels, |
|
hop_length=hop_length, |
|
), |
|
odeint_kwargs=dict( |
|
method=ode_method, |
|
), |
|
vocab_char_map=vocab_char_map, |
|
).to(device) |
|
|
|
model = load_checkpoint(model, ckpt_path, device, use_ema = True) |
|
|
|
return model |
|
|
|
|
|
|
|
F5TTS_model_cfg = dict( |
|
dim=1024, depth=22, heads=16, ff_mult=2, text_dim=512, conv_layers=4 |
|
) |
|
E2TTS_model_cfg = dict(dim=1024, depth=24, heads=16, ff_mult=4) |
|
|
|
def split_text_into_batches(text, max_chars=200, split_words=SPLIT_WORDS): |
|
if len(text.encode('utf-8')) <= max_chars: |
|
return [text] |
|
if text[-1] not in ['。', '.', '!', '!', '?', '?']: |
|
text += '.' |
|
|
|
sentences = re.split('([。.!?!?])', text) |
|
sentences = [''.join(i) for i in zip(sentences[0::2], sentences[1::2])] |
|
|
|
batches = [] |
|
current_batch = "" |
|
|
|
def split_by_words(text): |
|
words = text.split() |
|
current_word_part = "" |
|
word_batches = [] |
|
for word in words: |
|
if len(current_word_part.encode('utf-8')) + len(word.encode('utf-8')) + 1 <= max_chars: |
|
current_word_part += word + ' ' |
|
else: |
|
if current_word_part: |
|
|
|
for split_word in split_words: |
|
split_index = current_word_part.rfind(' ' + split_word + ' ') |
|
if split_index != -1: |
|
word_batches.append(current_word_part[:split_index].strip()) |
|
current_word_part = current_word_part[split_index:].strip() + ' ' |
|
break |
|
else: |
|
|
|
word_batches.append(current_word_part.strip()) |
|
current_word_part = "" |
|
current_word_part += word + ' ' |
|
if current_word_part: |
|
word_batches.append(current_word_part.strip()) |
|
return word_batches |
|
|
|
for sentence in sentences: |
|
if len(current_batch.encode('utf-8')) + len(sentence.encode('utf-8')) <= max_chars: |
|
current_batch += sentence |
|
else: |
|
|
|
if current_batch: |
|
batches.append(current_batch) |
|
current_batch = "" |
|
|
|
|
|
if len(sentence.encode('utf-8')) > max_chars: |
|
|
|
colon_parts = sentence.split(':') |
|
if len(colon_parts) > 1: |
|
for part in colon_parts: |
|
if len(part.encode('utf-8')) <= max_chars: |
|
batches.append(part) |
|
else: |
|
|
|
comma_parts = re.split('[,,]', part) |
|
if len(comma_parts) > 1: |
|
current_comma_part = "" |
|
for comma_part in comma_parts: |
|
if len(current_comma_part.encode('utf-8')) + len(comma_part.encode('utf-8')) <= max_chars: |
|
current_comma_part += comma_part + ',' |
|
else: |
|
if current_comma_part: |
|
batches.append(current_comma_part.rstrip(',')) |
|
current_comma_part = comma_part + ',' |
|
if current_comma_part: |
|
batches.append(current_comma_part.rstrip(',')) |
|
else: |
|
|
|
batches.extend(split_by_words(part)) |
|
else: |
|
|
|
comma_parts = re.split('[,,]', sentence) |
|
if len(comma_parts) > 1: |
|
current_comma_part = "" |
|
for comma_part in comma_parts: |
|
if len(current_comma_part.encode('utf-8')) + len(comma_part.encode('utf-8')) <= max_chars: |
|
current_comma_part += comma_part + ',' |
|
else: |
|
if current_comma_part: |
|
batches.append(current_comma_part.rstrip(',')) |
|
current_comma_part = comma_part + ',' |
|
if current_comma_part: |
|
batches.append(current_comma_part.rstrip(',')) |
|
else: |
|
|
|
batches.extend(split_by_words(sentence)) |
|
else: |
|
current_batch = sentence |
|
|
|
if current_batch: |
|
batches.append(current_batch) |
|
|
|
return batches |
|
|
|
def infer_batch(ref_audio, ref_text, gen_text_batches, model, remove_silence): |
|
if model == "F5-TTS": |
|
ema_model = load_model(model, "F5TTS_Base", DiT, F5TTS_model_cfg, 1200000) |
|
elif model == "E2-TTS": |
|
ema_model = load_model(model, "E2TTS_Base", UNetT, E2TTS_model_cfg, 1200000) |
|
|
|
audio, sr = ref_audio |
|
if audio.shape[0] > 1: |
|
audio = torch.mean(audio, dim=0, keepdim=True) |
|
|
|
rms = torch.sqrt(torch.mean(torch.square(audio))) |
|
if rms < target_rms: |
|
audio = audio * target_rms / rms |
|
if sr != target_sample_rate: |
|
resampler = torchaudio.transforms.Resample(sr, target_sample_rate) |
|
audio = resampler(audio) |
|
audio = audio.to(device) |
|
|
|
generated_waves = [] |
|
spectrograms = [] |
|
|
|
for i, gen_text in enumerate(tqdm.tqdm(gen_text_batches)): |
|
|
|
if len(ref_text[-1].encode('utf-8')) == 1: |
|
ref_text = ref_text + " " |
|
text_list = [ref_text + gen_text] |
|
final_text_list = convert_char_to_pinyin(text_list) |
|
|
|
|
|
ref_audio_len = audio.shape[-1] // hop_length |
|
zh_pause_punc = r"。,、;:?!" |
|
ref_text_len = len(ref_text.encode('utf-8')) + 3 * len(re.findall(zh_pause_punc, ref_text)) |
|
gen_text_len = len(gen_text.encode('utf-8')) + 3 * len(re.findall(zh_pause_punc, gen_text)) |
|
duration = ref_audio_len + int(ref_audio_len / ref_text_len * gen_text_len / speed) |
|
|
|
|
|
with torch.inference_mode(): |
|
generated, _ = ema_model.sample( |
|
cond=audio, |
|
text=final_text_list, |
|
duration=duration, |
|
steps=nfe_step, |
|
cfg_strength=cfg_strength, |
|
sway_sampling_coef=sway_sampling_coef, |
|
) |
|
|
|
generated = generated[:, ref_audio_len:, :] |
|
generated_mel_spec = rearrange(generated, "1 n d -> 1 d n") |
|
generated_wave = vocos.decode(generated_mel_spec.cpu()) |
|
if rms < target_rms: |
|
generated_wave = generated_wave * rms / target_rms |
|
|
|
|
|
generated_wave = generated_wave.squeeze().cpu().numpy() |
|
|
|
generated_waves.append(generated_wave) |
|
spectrograms.append(generated_mel_spec[0].cpu().numpy()) |
|
|
|
|
|
final_wave = np.concatenate(generated_waves) |
|
|
|
with open(wave_path, "wb") as f: |
|
sf.write(f.name, final_wave, target_sample_rate) |
|
|
|
if remove_silence: |
|
aseg = AudioSegment.from_file(f.name) |
|
non_silent_segs = silence.split_on_silence(aseg, min_silence_len=1000, silence_thresh=-50, keep_silence=500) |
|
non_silent_wave = AudioSegment.silent(duration=0) |
|
for non_silent_seg in non_silent_segs: |
|
non_silent_wave += non_silent_seg |
|
aseg = non_silent_wave |
|
aseg.export(f.name, format="wav") |
|
print(f.name) |
|
|
|
|
|
combined_spectrogram = np.concatenate(spectrograms, axis=1) |
|
save_spectrogram(combined_spectrogram, spectrogram_path) |
|
print(spectrogram_path) |
|
|
|
|
|
def infer(ref_audio_orig, ref_text, gen_text, model, remove_silence, custom_split_words): |
|
if not custom_split_words.strip(): |
|
custom_words = [word.strip() for word in custom_split_words.split(',')] |
|
global SPLIT_WORDS |
|
SPLIT_WORDS = custom_words |
|
|
|
print(gen_text) |
|
|
|
print("Converting audio...") |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f: |
|
aseg = AudioSegment.from_file(ref_audio_orig) |
|
|
|
non_silent_segs = silence.split_on_silence(aseg, min_silence_len=1000, silence_thresh=-50, keep_silence=500) |
|
non_silent_wave = AudioSegment.silent(duration=0) |
|
for non_silent_seg in non_silent_segs: |
|
non_silent_wave += non_silent_seg |
|
aseg = non_silent_wave |
|
|
|
audio_duration = len(aseg) |
|
if audio_duration > 15000: |
|
print("Audio is over 15s, clipping to only first 15s.") |
|
aseg = aseg[:15000] |
|
aseg.export(f.name, format="wav") |
|
ref_audio = f.name |
|
|
|
if not ref_text.strip(): |
|
print("No reference text provided, transcribing reference audio...") |
|
pipe = pipeline( |
|
"automatic-speech-recognition", |
|
model="openai/whisper-large-v3-turbo", |
|
torch_dtype=torch.float16, |
|
device=device, |
|
) |
|
ref_text = pipe( |
|
ref_audio, |
|
chunk_length_s=30, |
|
batch_size=128, |
|
generate_kwargs={"task": "transcribe"}, |
|
return_timestamps=False, |
|
)["text"].strip() |
|
print("Finished transcription") |
|
else: |
|
print("Using custom reference text...") |
|
|
|
|
|
audio, sr = torchaudio.load(ref_audio) |
|
max_chars = int(len(ref_text.encode('utf-8')) / (audio.shape[-1] / sr) * (30 - audio.shape[-1] / sr)) |
|
gen_text_batches = split_text_into_batches(gen_text, max_chars=max_chars) |
|
print('ref_text', ref_text) |
|
for i, gen_text in enumerate(gen_text_batches): |
|
print(f'gen_text {i}', gen_text) |
|
|
|
print(f"Generating audio using {model} in {len(gen_text_batches)} batches, loading models...") |
|
return infer_batch((audio, sr), ref_text, gen_text_batches, model, remove_silence) |
|
|
|
|
|
infer(ref_audio, ref_text, gen_text, model, remove_silence, ",".join(SPLIT_WORDS)) |