Spaces:
Edmond98
/
Running on A100

File size: 4,849 Bytes
42c459e
09ab406
 
 
 
 
06e4c74
3cf82c2
 
 
 
0ec2266
1388ad6
3cf82c2
 
09ab406
 
 
0eaed7a
00f260b
4d56027
 
 
09ab406
 
 
3cf82c2
 
0ec2266
3cf82c2
 
09ab406
 
 
 
 
0ec2266
 
 
 
 
 
 
 
 
1388ad6
 
 
 
 
0ec2266
 
 
09ab406
06e4c74
4d56027
0ec2266
 
 
3cf82c2
 
 
0ec2266
aa3c419
 
0ec2266
aa3c419
 
 
0ec2266
4d56027
 
 
 
 
09ab406
 
 
4d56027
0eaed7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4d56027
 
0eaed7a
4d56027
0eaed7a
4d56027
0ec2266
 
4d56027
 
 
 
 
7bcf8d7
09ab406
06e4c74
4d56027
0ec2266
 
4d56027
 
 
 
 
09ab406
 
 
4d56027
 
 
 
 
09ab406
 
 
4d56027
 
 
 
0eaed7a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel
import numpy as np
import io
import soundfile as sf
import base64
import logging
import torch
import librosa
from pathlib import Path
import magic  # For MIME type detection
from pydub import AudioSegment

# Import functions from other modules
from asr import transcribe, ASR_LANGUAGES
from tts import synthesize, TTS_LANGUAGES
from lid import identify
from asr import ASR_SAMPLING_RATE

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="MMS: Scaling Speech Technology to 1000+ languages")

# Define request models
class AudioRequest(BaseModel):
    audio: str  # Base64 encoded audio or video data
    language: str

class TTSRequest(BaseModel):
    text: str
    language: str
    speed: float

def detect_mime_type(input_bytes):
    mime = magic.Magic(mime=True)
    return mime.from_buffer(input_bytes)

def extract_audio(input_bytes):
    mime_type = detect_mime_type(input_bytes)
    
    if mime_type.startswith('audio/'):
        return sf.read(io.BytesIO(input_bytes))
    elif mime_type.startswith('video/webm'):
        audio = AudioSegment.from_file(io.BytesIO(input_bytes), format="webm")
        audio_array = np.array(audio.get_array_of_samples())
        sample_rate = audio.frame_rate
        return audio_array, sample_rate
    else:
        raise ValueError(f"Unsupported MIME type: {mime_type}")

@app.post("/transcribe")
async def transcribe_audio(request: AudioRequest):
    try:
        input_bytes = base64.b64decode(request.audio)
        audio_array, sample_rate = extract_audio(input_bytes)

        # Convert to mono if stereo
        if len(audio_array.shape) > 1:
            audio_array = audio_array.mean(axis=1)

        # Ensure audio_array is float32
        audio_array = audio_array.astype(np.float32)

        # Resample if necessary
        if sample_rate != ASR_SAMPLING_RATE:
            audio_array = librosa.resample(audio_array, orig_sr=sample_rate, target_sr=ASR_SAMPLING_RATE)

        result = transcribe(audio_array, request.language)
        return JSONResponse(content={"transcription": result})
    except Exception as e:
        logger.error(f"Error in transcribe_audio: {str(e)}")
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

@app.post("/synthesize")
async def synthesize_speech(request: TTSRequest):
    try:
        logger.info(f"Synthesizing speech for text: {request.text}, language: {request.language}, speed: {request.speed}")
        result, filtered_text = synthesize(request.text, request.language, request.speed)
        logger.info(f"Synthesis complete. Filtered text: {filtered_text}")
        
        sample_rate, audio = result
        logger.info(f"Sample rate: {sample_rate}, Audio shape: {audio.shape}, Audio dtype: {audio.dtype}")

        # Ensure audio is a numpy array with the correct dtype
        audio = np.array(audio, dtype=np.float32)
        
        # Normalize audio to [-1, 1] range
        audio = audio / np.max(np.abs(audio))
        
        # Convert to int16 for WAV file
        audio = (audio * 32767).astype(np.int16)
        
        # Convert numpy array to bytes
        buffer = io.BytesIO()
        sf.write(buffer, audio, sample_rate, format='wav')
        buffer.seek(0)
        
        return FileResponse(
            buffer,
            media_type="audio/wav",
            headers={"Content-Disposition": "attachment; filename=synthesized_audio.wav"}
        )
    except Exception as e:
        logger.error(f"Error in synthesize_speech: {str(e)}")
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

@app.post("/identify")
async def identify_language(request: AudioRequest):
    try:
        input_bytes = base64.b64decode(request.audio)
        audio_array, sample_rate = extract_audio(input_bytes)
        result = identify(audio_array)
        return JSONResponse(content={"language_identification": result})
    except Exception as e:
        logger.error(f"Error in identify_language: {str(e)}")
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

@app.get("/asr_languages")
async def get_asr_languages():
    try:
        return JSONResponse(content=ASR_LANGUAGES)
    except Exception as e:
        logger.error(f"Error in get_asr_languages: {str(e)}")
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

@app.get("/tts_languages")
async def get_tts_languages():
    try:
        return JSONResponse(content=TTS_LANGUAGES)
    except Exception as e:
        logger.error(f"Error in get_tts_languages: {str(e)}")
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")