#!/usr/bin/env python3 """ NeuCodec Test - Gradio App Equivalent to nemo and snac test spaces, but for NeuCodec used in NeuTTS-Air models. Allows testing encode/decode cycles with the neuphonic/neucodec model. """ import gradio as gr import torch import librosa import numpy as np import traceback import time # Attempt to import NeuCodec try: from neucodec import NeuCodec, DistillNeuCodec print("NeuCodec modules imported successfully.") except ImportError as e: print(f"Error importing NeuCodec: {e}") raise ImportError("Could not import NeuCodec. Make sure 'neucodec' is installed correctly.") from e # --- Configuration --- TARGET_SR = 16000 # NeuCodec operates at 16kHz for encoding OUTPUT_SR = 24000 # NeuCodec outputs at 24kHz DEVICE = "cuda" if torch.cuda.is_available() else "cpu" MODEL_NAME = "neuphonic/neucodec" # Options: neuphonic/neucodec, neuphonic/distill-neucodec print(f"Using device: {DEVICE}") # --- Load Model (Load once globally) --- neucodec = None try: print(f"Loading NeuCodec model: {MODEL_NAME}...") start_time = time.time() if MODEL_NAME == "neuphonic/distill-neucodec": neucodec = DistillNeuCodec.from_pretrained(MODEL_NAME) else: neucodec = NeuCodec.from_pretrained(MODEL_NAME) neucodec = neucodec.to(DEVICE) neucodec.eval() # Set model to evaluation mode end_time = time.time() print(f"NeuCodec loaded successfully to {DEVICE}. Time taken: {end_time - start_time:.2f} seconds.") except Exception as e: print(f"FATAL: Error loading NeuCodec: {e}") print(traceback.format_exc()) # --- Main Processing Function --- def process_audio(audio_filepath): """ Loads, resamples, encodes, decodes audio using NeuCodec, and returns results. """ if neucodec is None: return None, None, None, "Error: NeuCodec could not be loaded. Cannot process audio." if audio_filepath is None: return None, None, None, "Please upload an audio file." logs = ["--- Starting Audio Processing with NeuCodec ---"] try: # 1. Load Audio logs.append(f"Loading audio file: {audio_filepath}") load_start = time.time() # Load original audio (for playback reference) original_waveform, original_sr = librosa.load(audio_filepath, sr=None, mono=False) logs.append(f"Audio loaded. Original SR: {original_sr} Hz, Shape: {original_waveform.shape}") # Convert to mono if stereo if len(original_waveform.shape) > 1: logs.append(f"Warning: Input audio has {original_waveform.shape[0]} channels. Converting to mono.") original_waveform = librosa.to_mono(original_waveform) load_end = time.time() logs.append(f"Loading time: {load_end - load_start:.2f}s") # --- Prepare Original for Playback --- original_audio_playback = (original_sr, original_waveform) logs.append("Prepared original audio for playback.") # 2. Resample to 16kHz for encoding (NeuCodec expects 16kHz input) resample_start = time.time() logs.append(f"Resampling waveform to {TARGET_SR} Hz for encoding...") waveform_16k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=TARGET_SR) logs.append(f"Resampling complete. New Shape: {waveform_16k.shape}") resample_end = time.time() logs.append(f"Resampling time: {resample_end - resample_start:.2f}s") # --- Prepare 16kHz version for Playback --- resampled_audio_playback = (TARGET_SR, waveform_16k) logs.append("Prepared 16kHz audio for playback.") # 3. Prepare for NeuCodec Encoding # NeuCodec expects [batch, channels, samples] format waveform_tensor = torch.from_numpy(waveform_16k).float().unsqueeze(0).unsqueeze(0) # [1, 1, samples] waveform_tensor = waveform_tensor.to(DEVICE) logs.append(f"Waveform prepared for encoding. Shape: {waveform_tensor.shape}, Device: {DEVICE}") # 4. Encode Audio using NeuCodec logs.append("Encoding audio with NeuCodec...") encode_start = time.time() with torch.no_grad(): encoded_codes = neucodec.encode_code(audio_or_path=waveform_tensor.cpu()) encode_end = time.time() if encoded_codes is None: log_msg = "Encoding failed: encoded_codes is None" logs.append(log_msg) raise ValueError(log_msg) logs.append(f"Encoding complete. Time: {encode_end - encode_start:.2f}s") logs.append(f"Encoded codes shape: {encoded_codes.shape}") logs.append(f"Encoded codes device: {encoded_codes.device}") # Log some statistics about the codes logs.append(f"Code sequence length: {encoded_codes.shape[-1]}") logs.append(f"Code range: [{encoded_codes.min().item():.0f}, {encoded_codes.max().item():.0f}]") # Calculate compression ratio original_samples = waveform_16k.shape[0] code_elements = encoded_codes.numel() compression_ratio = original_samples / code_elements if code_elements > 0 else 0 logs.append(f"Compression ratio: ~{compression_ratio:.1f}:1 ({original_samples} samples -> {code_elements} codes)") # 5. Decode the Codes using NeuCodec logs.append("Decoding the generated codes with NeuCodec...") decode_start = time.time() with torch.no_grad(): reconstructed_waveform = neucodec.decode_code(encoded_codes) decode_end = time.time() logs.append(f"Decoding complete. Reconstructed waveform shape: {reconstructed_waveform.shape}, Device: {reconstructed_waveform.device}. Time: {decode_end - decode_start:.2f}s") # 6. Prepare Reconstructed Audio for Playback # Output is at 24kHz. Move to CPU, remove batch and channel dims, convert to NumPy. reconstructed_audio_np = reconstructed_waveform.cpu().squeeze().numpy() logs.append(f"Reconstructed audio prepared for playback at {OUTPUT_SR} Hz. Shape: {reconstructed_audio_np.shape}") reconstructed_audio_playback = (OUTPUT_SR, reconstructed_audio_np) # 7. Calculate quality metrics # For comparison, we need to resample original to 24kHz to match reconstructed output logs.append("Calculating quality metrics...") original_24k = librosa.resample(original_waveform, orig_sr=original_sr, target_sr=OUTPUT_SR) # Handle length differences (common with codecs) min_len = min(len(original_24k), len(reconstructed_audio_np)) original_trimmed = original_24k[:min_len] reconstructed_trimmed = reconstructed_audio_np[:min_len] # Simple MSE calculation mse = np.mean((original_trimmed - reconstructed_trimmed) ** 2) if len(original_24k) != len(reconstructed_audio_np): logs.append(f"Audio length difference: Original {len(original_24k)} samples, Reconstructed {len(reconstructed_audio_np)} samples") logs.append(f"MSE (first {min_len} samples at 24kHz): {mse:.6f}") # Calculate Signal-to-Noise Ratio (SNR) signal_power = np.mean(original_trimmed ** 2) noise_power = mse if noise_power > 0: snr_db = 10 * np.log10(signal_power / noise_power) logs.append(f"SNR: {snr_db:.2f} dB") logs.append("\n--- Audio Processing Completed Successfully ---") # Summary statistics total_time = (load_end - load_start) + (resample_end - resample_start) + (encode_end - encode_start) + (decode_end - decode_start) logs.append(f"Total processing time: {total_time:.2f}s") logs.append(f"Audio duration: {len(original_waveform) / original_sr:.2f}s") logs.append(f"Real-time factor: {(len(original_waveform) / original_sr) / total_time:.2f}x") return original_audio_playback, resampled_audio_playback, reconstructed_audio_playback, "\n".join(logs) except Exception as e: logs.append("\n--- An Error Occurred ---") logs.append(f"Error Type: {type(e).__name__}") logs.append(f"Error Details: {e}") logs.append("\n--- Traceback ---") logs.append(traceback.format_exc()) return None, None, None, "\n".join(logs) # --- Gradio Interface --- DESCRIPTION = """ This app demonstrates the **NeuCodec** model (`neuphonic/neucodec`) used in NeuTTS-Air. **How it works:** 1. Upload an audio file (wav, mp3, flac, etc.). 2. The audio will be automatically resampled to 16kHz for encoding. 3. The 16kHz audio is encoded into discrete codes by NeuCodec. 4. These codes are then decoded back into 24kHz audio by NeuCodec. 5. You can listen to the original, the 16kHz version, and the final reconstructed 24kHz audio. **Technical details:** - Input sample rate: 16kHz (for encoding) - Output sample rate: 24kHz (after decoding) - Architecture: 50Hz neural audio codec with single codebook - Hop length: 480 samples **Note:** If the input is stereo, it will be converted to mono. """ iface = gr.Interface( fn=process_audio, inputs=gr.Audio(type="filepath", label="Upload Audio File"), outputs=[ gr.Audio(label="Original Audio"), gr.Audio(label="16kHz Audio (Input to NeuCodec)"), gr.Audio(label="Reconstructed Audio (24kHz Output from NeuCodec)"), gr.Textbox(label="Log Output", lines=20) ], title="NeuCodec Demo (16kHz -> 24kHz)", description=DESCRIPTION, examples=[ # TODO # ["examples/example1.wav"], ], cache_examples=False ) if __name__ == "__main__": if neucodec is None: print("Cannot launch Gradio interface because NeuCodec failed to load.") else: print("Launching Gradio Interface...") print(f"Model: {MODEL_NAME}") print(f"Input sample rate: {TARGET_SR} Hz") print(f"Output sample rate: {OUTPUT_SR} Hz") print(f"Device: {DEVICE}") iface.launch(share=True)