|
|
|
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
import subprocess as sp
|
|
import sys
|
|
from tempfile import NamedTemporaryFile
|
|
import time
|
|
import typing as tp
|
|
import warnings
|
|
|
|
import gradio as gr
|
|
|
|
from audiocraft.data.audio import audio_write
|
|
from audiocraft.models import MAGNeT
|
|
|
|
|
|
MODEL = None
|
|
SPACE_ID = os.environ.get('SPACE_ID', '')
|
|
MAX_BATCH_SIZE = 12
|
|
N_REPEATS = 2
|
|
INTERRUPTING = False
|
|
MBD = None
|
|
|
|
_old_call = sp.call
|
|
|
|
PROD_STRIDE_1 = "prod-stride1 (new!)"
|
|
|
|
|
|
def _call_nostderr(*args, **kwargs):
|
|
|
|
kwargs['stderr'] = sp.DEVNULL
|
|
kwargs['stdout'] = sp.DEVNULL
|
|
_old_call(*args, **kwargs)
|
|
|
|
|
|
sp.call = _call_nostderr
|
|
|
|
pool = ProcessPoolExecutor(4)
|
|
pool.__enter__()
|
|
|
|
|
|
def interrupt():
|
|
global INTERRUPTING
|
|
INTERRUPTING = True
|
|
|
|
|
|
class FileCleaner:
|
|
def __init__(self, file_lifetime: float = 3600):
|
|
self.file_lifetime = file_lifetime
|
|
self.files = []
|
|
|
|
def add(self, path: tp.Union[str, Path]):
|
|
self._cleanup()
|
|
self.files.append((time.time(), Path(path)))
|
|
|
|
def _cleanup(self):
|
|
now = time.time()
|
|
for time_added, path in list(self.files):
|
|
if now - time_added > self.file_lifetime:
|
|
if path.exists():
|
|
path.unlink()
|
|
self.files.pop(0)
|
|
else:
|
|
break
|
|
|
|
|
|
file_cleaner = FileCleaner()
|
|
|
|
|
|
def make_waveform(*args, **kwargs):
|
|
|
|
be = time.time()
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter('ignore')
|
|
out = gr.make_waveform(*args, **kwargs)
|
|
print("Make a video took", time.time() - be)
|
|
return out
|
|
|
|
|
|
def load_model(version='facebook/magnet-small-10secs'):
|
|
global MODEL
|
|
print("Loading model", version)
|
|
if MODEL is None or MODEL.name != version:
|
|
MODEL = None
|
|
MODEL = MAGNeT.get_pretrained(version)
|
|
|
|
|
|
def _do_predictions(texts, progress=False, gradio_progress=None, **gen_kwargs):
|
|
MODEL.set_generation_params(**gen_kwargs)
|
|
print("new batch", len(texts), texts)
|
|
be = time.time()
|
|
|
|
try:
|
|
outputs = MODEL.generate(texts, progress=progress, return_tokens=False)
|
|
except RuntimeError as e:
|
|
raise gr.Error("Error while generating " + e.args[0])
|
|
outputs = outputs.detach().cpu().float()
|
|
pending_videos = []
|
|
out_wavs = []
|
|
for i, output in enumerate(outputs):
|
|
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file:
|
|
audio_write(
|
|
file.name, output, MODEL.sample_rate, strategy="loudness",
|
|
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False)
|
|
if i == 0:
|
|
pending_videos.append(pool.submit(make_waveform, file.name))
|
|
out_wavs.append(file.name)
|
|
file_cleaner.add(file.name)
|
|
out_videos = [pending_video.result() for pending_video in pending_videos]
|
|
for video in out_videos:
|
|
file_cleaner.add(video)
|
|
print("batch finished", len(texts), time.time() - be)
|
|
print("Tempfiles currently stored: ", len(file_cleaner.files))
|
|
return out_videos, out_wavs
|
|
|
|
|
|
def predict_batched(texts, melodies):
|
|
max_text_length = 512
|
|
texts = [text[:max_text_length] for text in texts]
|
|
load_model('facebook/magnet-small-10secs')
|
|
res = _do_predictions(texts, melodies)
|
|
return res
|
|
|
|
|
|
def predict_full(model, model_path, text, temperature, topp,
|
|
max_cfg_coef, min_cfg_coef,
|
|
decoding_steps1, decoding_steps2, decoding_steps3, decoding_steps4,
|
|
span_score,
|
|
progress=gr.Progress()):
|
|
global INTERRUPTING
|
|
INTERRUPTING = False
|
|
progress(0, desc="Loading model...")
|
|
model_path = model_path.strip()
|
|
if model_path:
|
|
if not Path(model_path).exists():
|
|
raise gr.Error(f"Model path {model_path} doesn't exist.")
|
|
if not Path(model_path).is_dir():
|
|
raise gr.Error(f"Model path {model_path} must be a folder containing "
|
|
"state_dict.bin and compression_state_dict_.bin.")
|
|
model = model_path
|
|
if temperature < 0:
|
|
raise gr.Error("Temperature must be >= 0.")
|
|
|
|
load_model(model)
|
|
|
|
max_generated = 0
|
|
|
|
def _progress(generated, to_generate):
|
|
nonlocal max_generated
|
|
max_generated = max(generated, max_generated)
|
|
progress((min(max_generated, to_generate), to_generate))
|
|
if INTERRUPTING:
|
|
raise gr.Error("Interrupted.")
|
|
MODEL.set_custom_progress_callback(_progress)
|
|
|
|
videos, wavs = _do_predictions(
|
|
[text] * N_REPEATS, progress=True,
|
|
temperature=temperature, top_p=topp,
|
|
max_cfg_coef=max_cfg_coef, min_cfg_coef=min_cfg_coef,
|
|
decoding_steps=[decoding_steps1, decoding_steps2, decoding_steps3, decoding_steps4],
|
|
span_arrangement='stride1' if (span_score == PROD_STRIDE_1) else 'nonoverlap',
|
|
gradio_progress=progress)
|
|
|
|
outputs_ = [videos[0]] + [wav for wav in wavs]
|
|
return tuple(outputs_)
|
|
|
|
def ui_full(launch_kwargs):
|
|
with gr.Blocks() as interface:
|
|
gr.Markdown(
|
|
"""
|
|
# MAGNeT
|
|
This is your private demo for [MAGNeT](https://github.com/facebookresearch/audiocraft),
|
|
A fast text-to-music model, consists of a single, non-autoregressive transformer.
|
|
presented at: ["Masked Audio Generation using a Single Non-Autoregressive Transformer"] (https://huggingface.co/papers/2401.04577)
|
|
"""
|
|
)
|
|
with gr.Row():
|
|
with gr.Column():
|
|
with gr.Row():
|
|
text = gr.Text(label="Input Text", value="80s electronic track with melodic synthesizers, catchy beat and groovy bass", interactive=True)
|
|
with gr.Row():
|
|
submit = gr.Button("Submit")
|
|
|
|
_ = gr.Button("Interrupt").click(fn=interrupt, queue=False)
|
|
with gr.Row():
|
|
model = gr.Radio(['facebook/magnet-small-10secs', 'facebook/magnet-medium-10secs',
|
|
'facebook/magnet-small-30secs', 'facebook/magnet-medium-30secs',
|
|
'facebook/audio-magnet-small', 'facebook/audio-magnet-medium'],
|
|
label="Model", value='facebook/magnet-small-10secs', interactive=True)
|
|
model_path = gr.Text(label="Model Path (custom models)")
|
|
with gr.Row():
|
|
span_score = gr.Radio(["max-nonoverlap", PROD_STRIDE_1],
|
|
label="Span Scoring", value=PROD_STRIDE_1, interactive=True)
|
|
with gr.Row():
|
|
decoding_steps1 = gr.Number(label="Decoding Steps (stage 1)", value=20, interactive=True)
|
|
decoding_steps2 = gr.Number(label="Decoding Steps (stage 2)", value=10, interactive=True)
|
|
decoding_steps3 = gr.Number(label="Decoding Steps (stage 3)", value=10, interactive=True)
|
|
decoding_steps4 = gr.Number(label="Decoding Steps (stage 4)", value=10, interactive=True)
|
|
with gr.Row():
|
|
temperature = gr.Number(label="Temperature", value=3.0, step=0.25, minimum=0, interactive=True)
|
|
topp = gr.Number(label="Top-p", value=0.9, step=0.1, minimum=0, maximum=1, interactive=True)
|
|
max_cfg_coef = gr.Number(label="Max CFG coefficient", value=10.0, minimum=0, interactive=True)
|
|
min_cfg_coef = gr.Number(label="Min CFG coefficient", value=1.0, minimum=0, interactive=True)
|
|
with gr.Column():
|
|
output = gr.Video(label="Generated Audio - variation 1")
|
|
audio_outputs = [gr.Audio(label=f"Generated Audio - variation {i+1}", type='filepath') for i in range(N_REPEATS)]
|
|
submit.click(fn=predict_full,
|
|
inputs=[model, model_path, text,
|
|
temperature, topp,
|
|
max_cfg_coef, min_cfg_coef,
|
|
decoding_steps1, decoding_steps2, decoding_steps3, decoding_steps4,
|
|
span_score],
|
|
outputs=[output] + [o for o in audio_outputs])
|
|
gr.Examples(
|
|
fn=predict_full,
|
|
examples=[
|
|
[
|
|
"80s electronic track with melodic synthesizers, catchy beat and groovy bass",
|
|
'facebook/magnet-small-10secs',
|
|
20, 3.0, 0.9, 10.0,
|
|
],
|
|
[
|
|
"80s electronic track with melodic synthesizers, catchy beat and groovy bass. 170 bpm",
|
|
'facebook/magnet-small-10secs',
|
|
20, 3.0, 0.9, 10.0,
|
|
],
|
|
[
|
|
"Earthy tones, environmentally conscious, ukulele-infused, harmonic, breezy, easygoing, organic instrumentation, gentle grooves",
|
|
'facebook/magnet-medium-10secs',
|
|
20, 3.0, 0.9, 10.0,
|
|
],
|
|
[ "Funky groove with electric piano playing blue chords rhythmically",
|
|
'facebook/magnet-medium-10secs',
|
|
20, 3.0, 0.9, 10.0,
|
|
],
|
|
[
|
|
"Rock with saturated guitars, a heavy bass line and crazy drum break and fills.",
|
|
'facebook/magnet-small-30secs',
|
|
60, 3.0, 0.9, 10.0,
|
|
],
|
|
[ "A grand orchestral arrangement with thunderous percussion, epic brass fanfares, and soaring strings, creating a cinematic atmosphere fit for a heroic battle",
|
|
'facebook/magnet-medium-30secs',
|
|
60, 3.0, 0.9, 10.0,
|
|
],
|
|
[ "Seagulls squawking as ocean waves crash while wind blows heavily into a microphone.",
|
|
'facebook/audio-magnet-small',
|
|
20, 3.5, 0.8, 20.0,
|
|
],
|
|
[ "A toilet flushing as music is playing and a man is singing in the distance.",
|
|
'facebook/audio-magnet-medium',
|
|
20, 3.5, 0.8, 20.0,
|
|
],
|
|
],
|
|
|
|
inputs=[text, model, decoding_steps1, temperature, topp, max_cfg_coef],
|
|
outputs=[output]
|
|
)
|
|
|
|
gr.Markdown(
|
|
"""
|
|
### More details
|
|
|
|
#### Music Generation
|
|
"magnet" models will generate a short music extract based on the textual description you provided.
|
|
These models can generate either 10 seconds or 30 seconds of music.
|
|
These models were trained with descriptions from a stock music catalog. Descriptions that will work best
|
|
should include some level of details on the instruments present, along with some intended use case
|
|
(e.g. adding "perfect for a commercial" can somehow help).
|
|
|
|
We present 4 model variants:
|
|
1. facebook/magnet-small-10secs - a 300M non-autoregressive transformer capable of generating 10-second music conditioned
|
|
on text.
|
|
2. facebook/magnet-medium-10secs - 1.5B parameters, 10 seconds audio.
|
|
3. facebook/magnet-small-30secs - 300M parameters, 30 seconds audio.
|
|
4. facebook/magnet-medium-30secs - 1.5B parameters, 30 seconds audio.
|
|
|
|
#### Sound-Effect Generation
|
|
"audio-magnet" models will generate a 10-second sound effect based on the description you provide.
|
|
|
|
These models were trained on the following data sources: a subset of AudioSet (Gemmeke et al., 2017),
|
|
[BBC sound effects](https://sound-effects.bbcrewind.co.uk/), AudioCaps (Kim et al., 2019),
|
|
Clotho v2 (Drossos et al., 2020), VGG-Sound (Chen et al., 2020), FSD50K (Fonseca et al., 2021),
|
|
[Free To Use Sounds](https://www.freetousesounds.com/all-in-one-bundle/), [Sonniss Game Effects](https://sonniss.com/gameaudiogdc),
|
|
[WeSoundEffects](https://wesoundeffects.com/we-sound-effects-bundle-2020/),
|
|
[Paramount Motion - Odeon Cinematic Sound Effects](https://www.paramountmotion.com/odeon-sound-effects).
|
|
|
|
We present 2 model variants:
|
|
1. facebook/audio-magnet-small - 10 second sound effect generation, 300M parameters.
|
|
2. facebook/audio-magnet-medium - 10 second sound effect generation, 1.5B parameters.
|
|
|
|
See [github.com/facebookresearch/audiocraft](https://github.com/facebookresearch/audiocraft/blob/main/docs/MAGNET.md)
|
|
for more details.
|
|
"""
|
|
)
|
|
|
|
interface.queue().launch(**launch_kwargs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'--listen',
|
|
type=str,
|
|
default='0.0.0.0' if 'SPACE_ID' in os.environ else '127.0.0.1',
|
|
help='IP to listen on for connections to Gradio',
|
|
)
|
|
parser.add_argument(
|
|
'--username', type=str, default='', help='Username for authentication'
|
|
)
|
|
parser.add_argument(
|
|
'--password', type=str, default='', help='Password for authentication'
|
|
)
|
|
parser.add_argument(
|
|
'--server_port',
|
|
type=int,
|
|
default=0,
|
|
help='Port to run the server listener on',
|
|
)
|
|
parser.add_argument(
|
|
'--inbrowser', action='store_true', help='Open in browser'
|
|
)
|
|
parser.add_argument(
|
|
'--share', action='store_true', help='Share the gradio UI'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
launch_kwargs = {}
|
|
launch_kwargs['server_name'] = args.listen
|
|
|
|
if args.username and args.password:
|
|
launch_kwargs['auth'] = (args.username, args.password)
|
|
if args.server_port:
|
|
launch_kwargs['server_port'] = args.server_port
|
|
if args.inbrowser:
|
|
launch_kwargs['inbrowser'] = args.inbrowser
|
|
if args.share:
|
|
launch_kwargs['share'] = args.share
|
|
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
|
|
|
|
|
|
ui_full(launch_kwargs)
|
|
|