voice-denoising / alif_e7_voice_denoising_guide.md
grgsaliba's picture
Upload alif_e7_voice_denoising_guide.md with huggingface_hub
2d3c3ce verified

A newer version of the Gradio SDK is available: 6.0.1

Upgrade

Voice Denoising Model for Alif E7 NPU - Complete Development Guide

Overview

This guide provides step-by-step instructions for developing and deploying a voice denoising model on the Alif E7's Ethos-U55 NPU using the DTLN (Dual-signal Transformation LSTM Network) architecture.

Alif E7 NPU Specifications

  • NPU: Dual Arm Ethos-U55 (128 MAC and 256 MAC units)
  • CPU: Dual Cortex-M55 (High-Performance @ 400MHz, High-Efficiency @ 160MHz)
  • Memory: 256KB ITCM, 1MB DTCM (tightly coupled)
  • Supported: 8-bit and 16-bit quantized CNNs/RNNs
  • Performance: 250+ GOPS
  • Framework: TensorFlow Lite for Microcontrollers

Why DTLN Model?

  1. Real-time capable: < 1 million parameters
  2. TF-Lite support: Native quantization and optimization
  3. Proven edge performance: Runs on Raspberry Pi 3B+
  4. Low latency: < 8ms execution time required for real-time
  5. Efficient architecture: Stacked dual-signal transformation LSTM

Step 1: Environment Setup

Prerequisites

# Create conda environment
conda create -n alif_audio python=3.8
conda activate alif_audio

# Install core dependencies
pip install tensorflow==2.12.0
pip install tensorflow-model-optimization
pip install soundfile librosa numpy
pip install tflite-runtime

Step 2: Clone and Setup DTLN Repository

git clone https://github.com/breizhn/DTLN.git
cd DTLN

Step 3: Model Architecture Modifications for Ethos-U55

Create a new file dtln_ethos_u55.py:

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import LSTM, Dense, Input, Multiply
from tensorflow.keras.models import Model
import numpy as np

class DTLN_Ethos_U55:
    """
    DTLN model optimized for Ethos-U55 NPU deployment
    - Reduced LSTM units for memory constraints
    - 8-bit quantization aware training
    - Optimized for < 1MB model size
    """
    
    def __init__(self, 
                 frame_len=512,
                 frame_shift=128,
                 lstm_units=128,  # Reduced from 256 for NPU
                 ):
        self.frame_len = frame_len
        self.frame_shift = frame_shift
        self.lstm_units = lstm_units
        
    def build_model(self):
        """Build DTLN model with Ethos-U55 optimizations"""
        
        # First stage - Magnitude processing
        mag_input = Input(shape=(None, self.frame_len//2 + 1))
        
        # LSTM layer - optimized size for NPU
        lstm_1 = LSTM(self.lstm_units, 
                     return_sequences=True,
                     unroll=False)(mag_input)
        
        # Fully connected layer for mask estimation
        mask = Dense(self.frame_len//2 + 1, 
                    activation='sigmoid',
                    name='magnitude_mask')(lstm_1)
        
        # Apply mask to magnitude spectrum
        estimated_mag = Multiply()([mag_input, mask])
        
        # Second stage - Enhanced magnitude processing
        lstm_2 = LSTM(self.lstm_units,
                     return_sequences=True,
                     unroll=False)(estimated_mag)
        
        # Final mask estimation
        final_mask = Dense(self.frame_len//2 + 1,
                          activation='sigmoid',
                          name='final_mask')(lstm_2)
        
        # Final enhanced magnitude
        enhanced_mag = Multiply()([estimated_mag, final_mask])
        
        model = Model(inputs=mag_input, outputs=enhanced_mag)
        
        return model
    
    def create_stateful_inference_models(self, model):
        """
        Create stateful models for real-time inference
        This splits the model for frame-by-frame processing
        """
        # Model 1: First stage
        mag_input = Input(batch_shape=(1, 1, self.frame_len//2 + 1))
        lstm_1_state_in_h = Input(batch_shape=(1, self.lstm_units))
        lstm_1_state_in_c = Input(batch_shape=(1, self.lstm_units))
        
        lstm_1 = LSTM(self.lstm_units, 
                     return_sequences=True,
                     stateful=True,
                     return_state=True)
        
        lstm_out, state_h, state_c = lstm_1(
            mag_input, 
            initial_state=[lstm_1_state_in_h, lstm_1_state_in_c]
        )
        
        mask = Dense(self.frame_len//2 + 1, activation='sigmoid')(lstm_out)
        estimated_mag = Multiply()([mag_input, mask])
        
        model_1 = Model(
            inputs=[mag_input, lstm_1_state_in_h, lstm_1_state_in_c],
            outputs=[estimated_mag, state_h, state_c]
        )
        
        # Model 2: Second stage (similar structure)
        # ... (implement similar structure for stage 2)
        
        return model_1  # Return both models in practice

Step 4: Quantization-Aware Training

Create train_quantized.py:

import tensorflow as tf
import tensorflow_model_optimization as tfmot
from dtln_ethos_u55 import DTLN_Ethos_U55

def apply_quantization_aware_training(model):
    """
    Apply quantization-aware training for 8-bit deployment on Ethos-U55
    """
    quantize_model = tfmot.quantization.keras.quantize_model
    
    # Quantize the entire model
    q_aware_model = quantize_model(model)
    
    return q_aware_model

def train_model():
    """Training loop with QAT"""
    
    # Build model
    dtln = DTLN_Ethos_U55(lstm_units=128)
    model = dtln.build_model()
    
    # Apply quantization-aware training
    q_aware_model = apply_quantization_aware_training(model)
    
    # Compile with appropriate loss
    q_aware_model.compile(
        loss='mean_squared_error',
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        metrics=['mae']
    )
    
    # Load your training data
    # X_train, y_train = load_training_data()
    
    # Train
    # history = q_aware_model.fit(
    #     X_train, y_train,
    #     batch_size=32,
    #     epochs=50,
    #     validation_split=0.2
    # )
    
    return q_aware_model

if __name__ == "__main__":
    model = train_model()
    model.save('dtln_ethos_u55_qat.h5')

Step 5: Convert to TensorFlow Lite with 8-bit Quantization

Create convert_to_tflite.py:

import tensorflow as tf
import numpy as np

def representative_dataset_gen():
    """
    Generator for representative dataset
    Used for full integer quantization
    """
    # Load sample audio data
    # This should be representative of your inference data
    for _ in range(100):
        # Generate or load real audio frames
        data = np.random.randn(1, 10, 257).astype(np.float32)
        yield [data]

def convert_to_tflite_int8(model_path, output_path):
    """
    Convert model to TFLite with INT8 quantization for Ethos-U55
    """
    # Load the quantization-aware trained model
    model = tf.keras.models.load_model(model_path)
    
    # Create TFLite converter
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    
    # Enable full integer quantization
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    
    # Set representative dataset for calibration
    converter.representative_dataset = representative_dataset_gen
    
    # Ensure input/output are int8
    converter.target_spec.supported_ops = [
        tf.lite.OpsSet.TFLITE_BUILTINS_INT8
    ]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8
    
    # Convert
    tflite_model = converter.convert()
    
    # Save
    with open(output_path, 'wb') as f:
        f.write(tflite_model)
    
    # Print model size
    print(f"Model size: {len(tflite_model) / 1024:.2f} KB")
    
    return tflite_model

def analyze_model(tflite_path):
    """Analyze the converted model"""
    interpreter = tf.lite.Interpreter(model_path=tflite_path)
    interpreter.allocate_tensors()
    
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    print("\nInput Details:")
    for detail in input_details:
        print(f"  Name: {detail['name']}")
        print(f"  Shape: {detail['shape']}")
        print(f"  Type: {detail['dtype']}")
        print(f"  Quantization: {detail['quantization']}")
    
    print("\nOutput Details:")
    for detail in output_details:
        print(f"  Name: {detail['name']}")
        print(f"  Shape: {detail['shape']}")
        print(f"  Type: {detail['dtype']}")
        print(f"  Quantization: {detail['quantization']}")

if __name__ == "__main__":
    # Convert model
    tflite_model = convert_to_tflite_int8(
        'dtln_ethos_u55_qat.h5',
        'dtln_ethos_u55_int8.tflite'
    )
    
    # Analyze the converted model
    analyze_model('dtln_ethos_u55_int8.tflite')

Step 6: Convert TFLite to Ethos-U55 Format

Create convert_for_ethos_u55.sh:

#!/bin/bash

# Install Vela compiler for Ethos-U55
pip install ethos-u-vela

# Convert TFLite model to Ethos-U55 format
vela \
    --accelerator-config ethos-u55-256 \
    --system-config Ethos_U55_High_End_Embedded \
    --memory-mode Shared_Sram \
    --output-dir ./ethos_u55_output \
    dtln_ethos_u55_int8.tflite

echo "Conversion complete! Output in ./ethos_u55_output/"

Step 7: Deployment on Alif E7

Create alif_e7_inference.c:

#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/schema/schema_generated.h"

// Model data (generated from xxd or similar)
extern const unsigned char dtln_model_data[];
extern const int dtln_model_data_len;

// Audio buffer parameters
#define FRAME_LEN 512
#define FRAME_SHIFT 128
#define SAMPLE_RATE 16000

namespace {
    tflite::ErrorReporter* error_reporter = nullptr;
    const tflite::Model* model = nullptr;
    tflite::MicroInterpreter* interpreter = nullptr;
    TfLiteTensor* input = nullptr;
    TfLiteTensor* output = nullptr;
    
    // Arena size - adjust based on model requirements
    constexpr int kTensorArenaSize = 100 * 1024; // 100KB
    uint8_t tensor_arena[kTensorArenaSize];
}

void setup_model() {
    // Set up error reporter
    static tflite::MicroErrorReporter micro_error_reporter;
    error_reporter = &micro_error_reporter;
    
    // Load model
    model = tflite::GetModel(dtln_model_data);
    if (model->version() != TFLITE_SCHEMA_VERSION) {
        TF_LITE_REPORT_ERROR(error_reporter,
            "Model version mismatch!");
        return;
    }
    
    // Set up ops resolver
    static tflite::AllOpsResolver resolver;
    
    // Build interpreter
    static tflite::MicroInterpreter static_interpreter(
        model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
    interpreter = &static_interpreter;
    
    // Allocate tensors
    TfLiteStatus allocate_status = interpreter->AllocateTensors();
    if (allocate_status != kTfLiteOk) {
        TF_LITE_REPORT_ERROR(error_reporter, "AllocateTensors() failed");
        return;
    }
    
    // Get input and output tensors
    input = interpreter->input(0);
    output = interpreter->output(0);
}

void process_audio_frame(float* audio_frame, float* enhanced_frame) {
    // Preprocessing: compute magnitude spectrum
    // (Use CMSIS-DSP for FFT on Cortex-M55)
    
    // Copy to input tensor (with quantization if needed)
    for (int i = 0; i < input->dims->data[1]; i++) {
        input->data.int8[i] = /* quantize audio_frame[i] */;
    }
    
    // Run inference
    TfLiteStatus invoke_status = interpreter->Invoke();
    if (invoke_status != kTfLiteOk) {
        TF_LITE_REPORT_ERROR(error_reporter, "Invoke failed");
        return;
    }
    
    // Get output and dequantize
    for (int i = 0; i < output->dims->data[1]; i++) {
        enhanced_frame[i] = /* dequantize output->data.int8[i] */;
    }
    
    // Post-processing: inverse FFT to get time-domain signal
}

// Real-time processing loop
void real_time_denoising() {
    float audio_buffer[FRAME_LEN];
    float enhanced_buffer[FRAME_LEN];
    
    while (1) {
        // Read audio frame from microphone (I2S/PDM)
        // read_audio_frame(audio_buffer);
        
        // Process frame
        process_audio_frame(audio_buffer, enhanced_buffer);
        
        // Output enhanced audio
        // write_audio_frame(enhanced_buffer);
    }
}

int main() {
    // Initialize Alif E7 peripherals
    // init_audio_peripherals();
    
    // Setup model
    setup_model();
    
    // Start real-time processing
    real_time_denoising();
    
    return 0;
}

Step 8: Build Configuration for Alif E7

Create CMakeLists.txt:

cmake_minimum_required(VERSION 3.15)
project(alif_e7_voice_denoising)

# Set toolchain for Arm Cortex-M55
set(CMAKE_TOOLCHAIN_FILE arm-none-eabi.cmake)

# TensorFlow Lite Micro
add_subdirectory(tensorflow/lite/micro)

# CMSIS-DSP for FFT
find_package(CMSISDSP REQUIRED)

# Source files
set(SOURCES
    alif_e7_inference.c
    model_data.c
)

# Create executable
add_executable(voice_denoising ${SOURCES})

# Link libraries
target_link_libraries(voice_denoising
    tensorflow-lite-micro
    CMSISDSP
)

# Compiler flags for Cortex-M55
target_compile_options(voice_denoising PRIVATE
    -mcpu=cortex-m55
    -mfpu=fp-dp-d16
    -mfloat-abi=hard
    -mthumb
    -O3
)

Performance Optimization Tips

1. Memory Optimization

  • Use stateful LSTM for streaming inference
  • Minimize arena size by quantizing all layers
  • Use DTCM for frequently accessed data

2. Computation Optimization

  • Leverage Ethos-U55's 256 MACs for convolutions
  • Use CMSIS-DSP for FFT operations on Cortex-M55
  • Pipeline audio processing with NPU inference

3. Latency Reduction

  • Target < 8ms frame processing time
  • Use frame_shift = 128 samples (8ms @ 16kHz)
  • Optimize I/O with DMA

Testing and Validation

# test_model.py
import tensorflow as tf
import numpy as np
import soundfile as sf
import time

def test_latency(tflite_path, audio_file):
    """Test inference latency"""
    interpreter = tf.lite.Interpreter(model_path=tflite_path)
    interpreter.allocate_tensors()
    
    audio, sr = sf.read(audio_file)
    
    # Process frames
    frame_len = 512
    frame_shift = 128
    latencies = []
    
    for i in range(0, len(audio) - frame_len, frame_shift):
        frame = audio[i:i+frame_len]
        
        # Compute magnitude spectrum
        spec = np.abs(np.fft.rfft(frame))
        spec = spec[np.newaxis, np.newaxis, :]
        
        # Set input
        interpreter.set_tensor(
            interpreter.get_input_details()[0]['index'],
            spec.astype(np.float32)
        )
        
        # Time inference
        start = time.time()
        interpreter.invoke()
        latency = (time.time() - start) * 1000  # ms
        latencies.append(latency)
    
    print(f"Average latency: {np.mean(latencies):.2f} ms")
    print(f"Max latency: {np.max(latencies):.2f} ms")
    print(f"Real-time factor: {np.mean(latencies) / 8:.2f}x")

if __name__ == "__main__":
    test_latency('dtln_ethos_u55_int8.tflite', 'test_audio.wav')

Alternative Models to Consider

If DTLN doesn't meet requirements, consider:

  1. RNNoise - Very lightweight, excellent for embedded
  2. Conv-TasNet - Better quality but more compute
  3. U-Net based - Good quality/performance balance
  4. CleanUNet - State-of-art but may need more optimization

Resources

Troubleshooting

Common Issues

  1. Model too large: Reduce LSTM units, use more aggressive quantization
  2. Latency too high: Reduce frame length, optimize FFT
  3. Poor quality: Increase training data, adjust quantization calibration
  4. Memory overflow: Reduce tensor arena size, use external PSRAM

Next Steps

  1. Train on your specific noise conditions
  2. Benchmark on actual hardware
  3. Optimize based on profiling results
  4. Fine-tune hyperparameters for your use case