Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
|
@@ -1,5 +1,8 @@
|
|
| 1 |
-
# app.py — Voice Clarity Booster with
|
| 2 |
-
# Modes: MetricGAN+ (denoise)
|
|
|
|
|
|
|
|
|
|
| 3 |
|
| 4 |
import os
|
| 5 |
import io
|
|
@@ -24,6 +27,13 @@ import soundfile as sf
|
|
| 24 |
import torch
|
| 25 |
import torchaudio
|
| 26 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 27 |
# Prefer new SpeechBrain API; fall back for older versions
|
| 28 |
try:
|
| 29 |
from speechbrain.inference import SpectralMaskEnhancement
|
|
@@ -31,7 +41,6 @@ except Exception: # < 1.0
|
|
| 31 |
from speechbrain.pretrained import SpectralMaskEnhancement # type: ignore
|
| 32 |
|
| 33 |
try:
|
| 34 |
-
# SepFormer enhancement model (WHAMR) via separation interface
|
| 35 |
from speechbrain.inference import SepformerSeparation
|
| 36 |
except Exception:
|
| 37 |
from speechbrain.pretrained import SepformerSeparation # type: ignore
|
|
@@ -87,12 +96,11 @@ def _to_mono(wav: np.ndarray) -> np.ndarray:
|
|
| 87 |
if t <= 8: # [C, T]
|
| 88 |
return wav.mean(axis=0).astype(np.float32)
|
| 89 |
return wav.mean(axis=1).astype(np.float32)
|
| 90 |
-
# higher dims: fall back
|
| 91 |
return wav.reshape(-1).astype(np.float32)
|
| 92 |
|
| 93 |
|
| 94 |
-
def _sanitize(
|
| 95 |
-
return np.nan_to_num(
|
| 96 |
|
| 97 |
|
| 98 |
def _resample_torch(wav: torch.Tensor, sr_in: int, sr_out: int) -> torch.Tensor:
|
|
@@ -124,36 +132,79 @@ def _limit_peak(wav: torch.Tensor, target_dbfs: float = -1.0) -> torch.Tensor:
|
|
| 124 |
|
| 125 |
|
| 126 |
def _align_lengths(a: np.ndarray, b: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 127 |
-
"""Pad/crop to same length so we can mix dry/wet safely."""
|
| 128 |
n = min(len(a), len(b))
|
| 129 |
return a[:n], b[:n]
|
| 130 |
|
| 131 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 132 |
# -----------------------------
|
| 133 |
# Core pipeline
|
| 134 |
# -----------------------------
|
| 135 |
-
def _run_metricgan(
|
| 136 |
enh = _get_metricgan()
|
| 137 |
with torch.no_grad():
|
| 138 |
-
out = enh.enhance_file(
|
| 139 |
return out
|
| 140 |
|
| 141 |
|
| 142 |
-
def _run_sepformer(
|
| 143 |
sep = _get_sepformer()
|
| 144 |
with torch.no_grad():
|
| 145 |
-
|
| 146 |
-
out = sep.separate_file(path=clean_16k_path)
|
| 147 |
-
# Normalize shape to [1, T]
|
| 148 |
if isinstance(out, torch.Tensor):
|
| 149 |
if out.dim() == 1:
|
| 150 |
out = out.unsqueeze(0)
|
| 151 |
elif out.dim() == 2 and out.shape[0] > 1:
|
| 152 |
-
out = out[:1, :]
|
| 153 |
return out
|
| 154 |
-
# If older API returns numpy or list, convert:
|
| 155 |
if hasattr(out, "numpy"):
|
| 156 |
-
t = torch.from_numpy(out)
|
| 157 |
if t.dim() == 1:
|
| 158 |
t = t.unsqueeze(0)
|
| 159 |
elif t.dim() == 2 and t.shape[0] > 1:
|
|
@@ -174,17 +225,22 @@ def _enhance_numpy_audio(
|
|
| 174 |
presence_db: float = 0.0, # default 0 for safer tone
|
| 175 |
lowcut_hz: float = 0.0, # default 0 (off)
|
| 176 |
out_sr: Optional[int] = None,
|
| 177 |
-
|
|
|
|
| 178 |
"""
|
| 179 |
Input: (sr, np.float32 [T] or [T,C])
|
| 180 |
-
Returns: (sr_out,
|
|
|
|
|
|
|
| 181 |
"""
|
| 182 |
sr_in, wav_np = audio
|
| 183 |
wav_mono = _sanitize(_to_mono(wav_np))
|
| 184 |
|
| 185 |
# Guard: tiny input
|
| 186 |
if wav_mono.size < 32:
|
| 187 |
-
|
|
|
|
|
|
|
| 188 |
|
| 189 |
dry_t = torch.from_numpy(wav_mono).unsqueeze(0) # [1, T @ sr_in]
|
| 190 |
# Prepare 16k mono file for models
|
|
@@ -216,19 +272,38 @@ def _enhance_numpy_audio(
|
|
| 216 |
# Resample both to output rate for mixing & export
|
| 217 |
sr_out = sr_in if (out_sr is None or out_sr <= 0) else int(out_sr)
|
| 218 |
proc_out = _resample_torch(proc, 16000, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 219 |
-
dry_out
|
| 220 |
|
| 221 |
# Align and mix
|
| 222 |
proc_out, dry_out = _align_lengths(proc_out, dry_out)
|
| 223 |
dry_wet = float(np.clip(dry_wet, 0.0, 1.0))
|
| 224 |
-
|
| 225 |
-
|
| 226 |
-
|
| 227 |
-
|
| 228 |
-
if
|
| 229 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 230 |
|
| 231 |
-
return sr_out,
|
| 232 |
|
| 233 |
|
| 234 |
# -----------------------------
|
|
@@ -241,25 +316,33 @@ def gradio_enhance(
|
|
| 241 |
presence_db: float,
|
| 242 |
lowcut_hz: float,
|
| 243 |
output_sr: str,
|
|
|
|
| 244 |
):
|
| 245 |
if audio is None:
|
| 246 |
-
return None
|
| 247 |
out_sr = None
|
| 248 |
if output_sr in {"44100", "48000"}:
|
| 249 |
out_sr = int(output_sr)
|
| 250 |
-
sr_out, enhanced = _enhance_numpy_audio(
|
| 251 |
audio,
|
| 252 |
mode=mode,
|
| 253 |
dry_wet=dry_wet_pct / 100.0,
|
| 254 |
presence_db=float(presence_db),
|
| 255 |
lowcut_hz=float(lowcut_hz),
|
| 256 |
out_sr=out_sr,
|
|
|
|
| 257 |
)
|
| 258 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 259 |
|
| 260 |
|
| 261 |
with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
| 262 |
-
gr.Markdown("## Voice Clarity Booster")
|
| 263 |
with gr.Row():
|
| 264 |
with gr.Column():
|
| 265 |
in_audio = gr.Audio(
|
|
@@ -282,6 +365,7 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
| 282 |
lowcut = gr.Slider(
|
| 283 |
minimum=0, maximum=200, value=0, step=5, label="Low-Cut (Hz)"
|
| 284 |
)
|
|
|
|
| 285 |
out_sr = gr.Radio(
|
| 286 |
choices=["Original", "44100", "48000"],
|
| 287 |
value="Original",
|
|
@@ -289,12 +373,15 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
| 289 |
)
|
| 290 |
btn = gr.Button("Enhance")
|
| 291 |
with gr.Column():
|
| 292 |
-
out_audio = gr.Audio(type="numpy", label="Enhanced", autoplay=True)
|
|
|
|
|
|
|
|
|
|
| 293 |
|
| 294 |
btn.click(
|
| 295 |
gradio_enhance,
|
| 296 |
-
inputs=[in_audio, mode, dry_wet, presence, lowcut, out_sr],
|
| 297 |
-
outputs=[out_audio],
|
| 298 |
)
|
| 299 |
|
| 300 |
# Start server (Hugging Face Spaces expects this unguarded)
|
|
|
|
| 1 |
+
# app.py — Voice Clarity Booster with clear A/B comparison & loudness match
|
| 2 |
+
# - Modes: MetricGAN+ (denoise), SepFormer (dereverb+denoise), Bypass
|
| 3 |
+
# - Dry/Wet, Presence, Low-cut
|
| 4 |
+
# - Loudness Match (optional)
|
| 5 |
+
# - Outputs: Enhanced, A/B alternating (2s O/E flip), Delta (Original−Enhanced), Metrics
|
| 6 |
|
| 7 |
import os
|
| 8 |
import io
|
|
|
|
| 27 |
import torch
|
| 28 |
import torchaudio
|
| 29 |
|
| 30 |
+
# Optional: pyloudnorm for true LUFS matching; fallback to RMS if not available
|
| 31 |
+
try:
|
| 32 |
+
import pyloudnorm as pyln
|
| 33 |
+
_HAVE_PYLN = True
|
| 34 |
+
except Exception:
|
| 35 |
+
_HAVE_PYLN = False
|
| 36 |
+
|
| 37 |
# Prefer new SpeechBrain API; fall back for older versions
|
| 38 |
try:
|
| 39 |
from speechbrain.inference import SpectralMaskEnhancement
|
|
|
|
| 41 |
from speechbrain.pretrained import SpectralMaskEnhancement # type: ignore
|
| 42 |
|
| 43 |
try:
|
|
|
|
| 44 |
from speechbrain.inference import SepformerSeparation
|
| 45 |
except Exception:
|
| 46 |
from speechbrain.pretrained import SepformerSeparation # type: ignore
|
|
|
|
| 96 |
if t <= 8: # [C, T]
|
| 97 |
return wav.mean(axis=0).astype(np.float32)
|
| 98 |
return wav.mean(axis=1).astype(np.float32)
|
|
|
|
| 99 |
return wav.reshape(-1).astype(np.float32)
|
| 100 |
|
| 101 |
|
| 102 |
+
def _sanitize(x: np.ndarray) -> np.ndarray:
|
| 103 |
+
return np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32)
|
| 104 |
|
| 105 |
|
| 106 |
def _resample_torch(wav: torch.Tensor, sr_in: int, sr_out: int) -> torch.Tensor:
|
|
|
|
| 132 |
|
| 133 |
|
| 134 |
def _align_lengths(a: np.ndarray, b: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
|
|
| 135 |
n = min(len(a), len(b))
|
| 136 |
return a[:n], b[:n]
|
| 137 |
|
| 138 |
|
| 139 |
+
def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[np.ndarray, str]:
|
| 140 |
+
"""
|
| 141 |
+
Match cand loudness to ref, returning adjusted signal and a short description.
|
| 142 |
+
Uses LUFS (pyloudnorm) if available, else RMS.
|
| 143 |
+
"""
|
| 144 |
+
if len(ref) < sr // 10 or len(cand) < sr // 10:
|
| 145 |
+
return cand, "skipped (clip too short)"
|
| 146 |
+
|
| 147 |
+
if _HAVE_PYLN:
|
| 148 |
+
try:
|
| 149 |
+
meter = pyln.Meter(sr) # EBUR128 meter
|
| 150 |
+
l_ref = meter.integrated_loudness(ref.astype(np.float64))
|
| 151 |
+
l_cand = meter.integrated_loudness(cand.astype(np.float64))
|
| 152 |
+
gain_db = l_ref - l_cand
|
| 153 |
+
cand_adj = cand * (10.0 ** (gain_db / 20.0))
|
| 154 |
+
return cand_adj.astype(np.float32), f"LUFS matched (Δ {gain_db:+.2f} dB)"
|
| 155 |
+
except Exception:
|
| 156 |
+
pass
|
| 157 |
+
|
| 158 |
+
# Fallback: RMS match
|
| 159 |
+
eps = 1e-9
|
| 160 |
+
rms_ref = np.sqrt(np.mean(ref**2) + eps)
|
| 161 |
+
rms_cand = np.sqrt(np.mean(cand**2) + eps)
|
| 162 |
+
gain = rms_ref / (rms_cand + eps)
|
| 163 |
+
cand_adj = cand * gain
|
| 164 |
+
gain_db = 20.0 * np.log10(gain + eps)
|
| 165 |
+
return cand_adj.astype(np.float32), f"RMS matched (Δ {gain_db:+.2f} dB)"
|
| 166 |
+
|
| 167 |
+
|
| 168 |
+
def _make_ab_alternating(orig: np.ndarray, enh: np.ndarray, sr: int, seg_sec: float = 2.0) -> np.ndarray:
|
| 169 |
+
"""
|
| 170 |
+
Build an A/B track that alternates: seg of Original, seg of Enhanced, repeated.
|
| 171 |
+
"""
|
| 172 |
+
seg_n = max(1, int(seg_sec * sr))
|
| 173 |
+
orig, enh = _align_lengths(orig, enh)
|
| 174 |
+
n = len(orig)
|
| 175 |
+
out = []
|
| 176 |
+
pos = 0
|
| 177 |
+
flag = True # True=orig, False=enh
|
| 178 |
+
while pos < n:
|
| 179 |
+
end = min(pos + seg_n, n)
|
| 180 |
+
out.append(orig[pos:end] if flag else enh[pos:end])
|
| 181 |
+
pos = end
|
| 182 |
+
flag = not flag
|
| 183 |
+
return np.concatenate(out, axis=0).astype(np.float32)
|
| 184 |
+
|
| 185 |
+
|
| 186 |
# -----------------------------
|
| 187 |
# Core pipeline
|
| 188 |
# -----------------------------
|
| 189 |
+
def _run_metricgan(path_16k: str) -> torch.Tensor:
|
| 190 |
enh = _get_metricgan()
|
| 191 |
with torch.no_grad():
|
| 192 |
+
out = enh.enhance_file(path_16k) # [1, T]
|
| 193 |
return out
|
| 194 |
|
| 195 |
|
| 196 |
+
def _run_sepformer(path_16k: str) -> torch.Tensor:
|
| 197 |
sep = _get_sepformer()
|
| 198 |
with torch.no_grad():
|
| 199 |
+
out = sep.separate_file(path=path_16k)
|
|
|
|
|
|
|
| 200 |
if isinstance(out, torch.Tensor):
|
| 201 |
if out.dim() == 1:
|
| 202 |
out = out.unsqueeze(0)
|
| 203 |
elif out.dim() == 2 and out.shape[0] > 1:
|
| 204 |
+
out = out[:1, :]
|
| 205 |
return out
|
|
|
|
| 206 |
if hasattr(out, "numpy"):
|
| 207 |
+
t = torch.from_numpy(out.numpy())
|
| 208 |
if t.dim() == 1:
|
| 209 |
t = t.unsqueeze(0)
|
| 210 |
elif t.dim() == 2 and t.shape[0] > 1:
|
|
|
|
| 225 |
presence_db: float = 0.0, # default 0 for safer tone
|
| 226 |
lowcut_hz: float = 0.0, # default 0 (off)
|
| 227 |
out_sr: Optional[int] = None,
|
| 228 |
+
loudness_match: bool = True,
|
| 229 |
+
) -> Tuple[int, np.ndarray, np.ndarray, str]:
|
| 230 |
"""
|
| 231 |
Input: (sr, np.float32 [T] or [T,C])
|
| 232 |
+
Returns: (sr_out, enhanced, delta, metrics_text)
|
| 233 |
+
- enhanced: final output (after dry/wet, polish, loudness match)
|
| 234 |
+
- delta: original - enhanced (at output SR & length-matched)
|
| 235 |
"""
|
| 236 |
sr_in, wav_np = audio
|
| 237 |
wav_mono = _sanitize(_to_mono(wav_np))
|
| 238 |
|
| 239 |
# Guard: tiny input
|
| 240 |
if wav_mono.size < 32:
|
| 241 |
+
sr_out = sr_in if sr_in else 16000
|
| 242 |
+
silence = np.zeros(int(sr_out * 1.0), dtype=np.float32)
|
| 243 |
+
return sr_out, silence, silence, "Input too short; returned silence."
|
| 244 |
|
| 245 |
dry_t = torch.from_numpy(wav_mono).unsqueeze(0) # [1, T @ sr_in]
|
| 246 |
# Prepare 16k mono file for models
|
|
|
|
| 272 |
# Resample both to output rate for mixing & export
|
| 273 |
sr_out = sr_in if (out_sr is None or out_sr <= 0) else int(out_sr)
|
| 274 |
proc_out = _resample_torch(proc, 16000, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 275 |
+
dry_out = _resample_torch(dry_t, sr_in, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 276 |
|
| 277 |
# Align and mix
|
| 278 |
proc_out, dry_out = _align_lengths(proc_out, dry_out)
|
| 279 |
dry_wet = float(np.clip(dry_wet, 0.0, 1.0))
|
| 280 |
+
enhanced = proc_out * dry_wet + dry_out * (1.0 - dry_wet)
|
| 281 |
+
|
| 282 |
+
# Loudness match enhanced back to original (optional)
|
| 283 |
+
loud_text = "off"
|
| 284 |
+
if loudness_match:
|
| 285 |
+
enhanced, loud_text = _loudness_match_to_ref(dry_out, enhanced, sr_out)
|
| 286 |
+
|
| 287 |
+
enhanced = _sanitize(enhanced)
|
| 288 |
+
|
| 289 |
+
# Delta (what changed)
|
| 290 |
+
delta = dry_out - enhanced
|
| 291 |
+
delta = _sanitize(delta)
|
| 292 |
+
|
| 293 |
+
# Basic metrics
|
| 294 |
+
eps = 1e-9
|
| 295 |
+
rms_dry = np.sqrt(np.mean(dry_out**2) + eps)
|
| 296 |
+
rms_enh = np.sqrt(np.mean(enhanced**2) + eps)
|
| 297 |
+
rms_delta = np.sqrt(np.mean(delta**2) + eps)
|
| 298 |
+
change_db = 20 * np.log10((rms_dry + eps) / (rms_delta + eps))
|
| 299 |
+
metrics = (
|
| 300 |
+
f"Mode: {mode} | Dry/Wet: {dry_wet*100:.0f}% | Presence: {presence_db:+.1f} dB | "
|
| 301 |
+
f"Low-cut: {lowcut_hz:.0f} Hz | Loudness match: {loud_text}\n"
|
| 302 |
+
f"Dur: {len(enhanced)/sr_out:.2f}s | Δ (original−enhanced) RMS: {20*np.log10(rms_delta+eps):+.2f} dBFS | "
|
| 303 |
+
f'Approx. "noise removed" ratio: {change_db:.2f} dB'
|
| 304 |
+
)
|
| 305 |
|
| 306 |
+
return sr_out, enhanced, delta, metrics
|
| 307 |
|
| 308 |
|
| 309 |
# -----------------------------
|
|
|
|
| 316 |
presence_db: float,
|
| 317 |
lowcut_hz: float,
|
| 318 |
output_sr: str,
|
| 319 |
+
loudness_match: bool,
|
| 320 |
):
|
| 321 |
if audio is None:
|
| 322 |
+
return None, None, None, "No audio provided."
|
| 323 |
out_sr = None
|
| 324 |
if output_sr in {"44100", "48000"}:
|
| 325 |
out_sr = int(output_sr)
|
| 326 |
+
sr_out, enhanced, delta, metrics = _enhance_numpy_audio(
|
| 327 |
audio,
|
| 328 |
mode=mode,
|
| 329 |
dry_wet=dry_wet_pct / 100.0,
|
| 330 |
presence_db=float(presence_db),
|
| 331 |
lowcut_hz=float(lowcut_hz),
|
| 332 |
out_sr=out_sr,
|
| 333 |
+
loudness_match=bool(loudness_match),
|
| 334 |
)
|
| 335 |
+
# Build A/B alternating track
|
| 336 |
+
sr_in, wav_np = audio
|
| 337 |
+
orig_mono = _sanitize(_to_mono(wav_np))
|
| 338 |
+
orig_at_out = _resample_torch(torch.from_numpy(orig_mono).unsqueeze(0), sr_in, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 339 |
+
orig_at_out, enhanced = _align_lengths(orig_at_out, enhanced)
|
| 340 |
+
ab_alt = _make_ab_alternating(orig_at_out, enhanced, sr_out, seg_sec=2.0)
|
| 341 |
+
return (sr_out, enhanced), (sr_out, ab_alt), (sr_out, delta), metrics
|
| 342 |
|
| 343 |
|
| 344 |
with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
| 345 |
+
gr.Markdown("## Voice Clarity Booster — with A/B and Delta listening")
|
| 346 |
with gr.Row():
|
| 347 |
with gr.Column():
|
| 348 |
in_audio = gr.Audio(
|
|
|
|
| 365 |
lowcut = gr.Slider(
|
| 366 |
minimum=0, maximum=200, value=0, step=5, label="Low-Cut (Hz)"
|
| 367 |
)
|
| 368 |
+
loudmatch = gr.Checkbox(value=True, label="Loudness-match enhanced to original")
|
| 369 |
out_sr = gr.Radio(
|
| 370 |
choices=["Original", "44100", "48000"],
|
| 371 |
value="Original",
|
|
|
|
| 373 |
)
|
| 374 |
btn = gr.Button("Enhance")
|
| 375 |
with gr.Column():
|
| 376 |
+
out_audio = gr.Audio(type="numpy", label="Enhanced (autoplay)", autoplay=True)
|
| 377 |
+
ab_audio = gr.Audio(type="numpy", label="A/B Alternating (2s O → 2s E)", autoplay=False)
|
| 378 |
+
delta_audio = gr.Audio(type="numpy", label="Delta: Original − Enhanced", autoplay=False)
|
| 379 |
+
metrics = gr.Markdown("")
|
| 380 |
|
| 381 |
btn.click(
|
| 382 |
gradio_enhance,
|
| 383 |
+
inputs=[in_audio, mode, dry_wet, presence, lowcut, out_sr, loudmatch],
|
| 384 |
+
outputs=[out_audio, ab_audio, delta_audio, metrics],
|
| 385 |
)
|
| 386 |
|
| 387 |
# Start server (Hugging Face Spaces expects this unguarded)
|