Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
|
@@ -1,12 +1,14 @@
|
|
| 1 |
-
# app.py — Voice Clarity Booster with Presets, Dual-Stage
|
| 2 |
-
# A/B alternating, Delta (Original−Enhanced), and
|
|
|
|
|
|
|
|
|
|
| 3 |
|
| 4 |
import os
|
| 5 |
-
import io
|
| 6 |
import tempfile
|
| 7 |
from typing import Tuple, Optional, Dict, Any
|
| 8 |
|
| 9 |
-
#
|
| 10 |
import warnings
|
| 11 |
warnings.filterwarnings(
|
| 12 |
"ignore",
|
|
@@ -24,7 +26,7 @@ import soundfile as sf
|
|
| 24 |
import torch
|
| 25 |
import torchaudio
|
| 26 |
|
| 27 |
-
# Optional
|
| 28 |
try:
|
| 29 |
import pyloudnorm as pyln
|
| 30 |
_HAVE_PYLN = True
|
|
@@ -44,9 +46,15 @@ except Exception:
|
|
| 44 |
|
| 45 |
|
| 46 |
# -----------------------------
|
| 47 |
-
#
|
| 48 |
# -----------------------------
|
| 49 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 50 |
_ENHANCER_METRICGAN: Optional[SpectralMaskEnhancement] = None
|
| 51 |
_ENHANCER_SEPFORMER: Optional[SepformerSeparation] = None
|
| 52 |
|
|
@@ -77,10 +85,7 @@ def _get_sepformer() -> SepformerSeparation:
|
|
| 77 |
# Audio helpers
|
| 78 |
# -----------------------------
|
| 79 |
def _to_mono(wav: np.ndarray) -> np.ndarray:
|
| 80 |
-
"""
|
| 81 |
-
Ensure mono [T] float32 robustly.
|
| 82 |
-
Accepts [T], [T,C], [C,T]; picks the 'channels' axis if <=8.
|
| 83 |
-
"""
|
| 84 |
wav = np.asarray(wav, dtype=np.float32)
|
| 85 |
if wav.ndim == 1:
|
| 86 |
return wav
|
|
@@ -93,7 +98,6 @@ def _to_mono(wav: np.ndarray) -> np.ndarray:
|
|
| 93 |
if t <= 8: # [C, T]
|
| 94 |
return wav.mean(axis=0).astype(np.float32)
|
| 95 |
return wav.mean(axis=1).astype(np.float32)
|
| 96 |
-
# Higher dims: flatten
|
| 97 |
return wav.reshape(-1).astype(np.float32)
|
| 98 |
|
| 99 |
|
|
@@ -135,16 +139,13 @@ def _align_lengths(a: np.ndarray, b: np.ndarray) -> Tuple[np.ndarray, np.ndarray
|
|
| 135 |
|
| 136 |
|
| 137 |
def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[np.ndarray, str]:
|
| 138 |
-
"""
|
| 139 |
-
Match cand loudness to ref, returning adjusted signal and a short description.
|
| 140 |
-
Uses LUFS (pyloudnorm) if available, else RMS.
|
| 141 |
-
"""
|
| 142 |
if len(ref) < sr // 10 or len(cand) < sr // 10:
|
| 143 |
return cand, "skipped (clip too short)"
|
| 144 |
|
| 145 |
if _HAVE_PYLN:
|
| 146 |
try:
|
| 147 |
-
meter = pyln.Meter(sr)
|
| 148 |
l_ref = meter.integrated_loudness(ref.astype(np.float64))
|
| 149 |
l_cand = meter.integrated_loudness(cand.astype(np.float64))
|
| 150 |
gain_db = l_ref - l_cand
|
|
@@ -153,7 +154,7 @@ def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[
|
|
| 153 |
except Exception:
|
| 154 |
pass
|
| 155 |
|
| 156 |
-
#
|
| 157 |
eps = 1e-9
|
| 158 |
rms_ref = np.sqrt(np.mean(ref**2) + eps)
|
| 159 |
rms_cand = np.sqrt(np.mean(cand**2) + eps)
|
|
@@ -164,15 +165,13 @@ def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[
|
|
| 164 |
|
| 165 |
|
| 166 |
def _make_ab_alternating(orig: np.ndarray, enh: np.ndarray, sr: int, seg_sec: float = 2.0) -> np.ndarray:
|
| 167 |
-
"""
|
| 168 |
-
Build an A/B track that alternates: seg of Original, seg of Enhanced, repeated.
|
| 169 |
-
"""
|
| 170 |
seg_n = max(1, int(seg_sec * sr))
|
| 171 |
orig, enh = _align_lengths(orig, enh)
|
| 172 |
n = len(orig)
|
| 173 |
out = []
|
| 174 |
pos = 0
|
| 175 |
-
flag = True
|
| 176 |
while pos < n:
|
| 177 |
end = min(pos + seg_n, n)
|
| 178 |
out.append(orig[pos:end] if flag else enh[pos:end])
|
|
@@ -182,7 +181,7 @@ def _make_ab_alternating(orig: np.ndarray, enh: np.ndarray, sr: int, seg_sec: fl
|
|
| 182 |
|
| 183 |
|
| 184 |
# -----------------------------
|
| 185 |
-
# Model runners
|
| 186 |
# -----------------------------
|
| 187 |
def _run_metricgan(path_16k: str) -> torch.Tensor:
|
| 188 |
enh = _get_metricgan()
|
|
@@ -191,51 +190,64 @@ def _run_metricgan(path_16k: str) -> torch.Tensor:
|
|
| 191 |
return out
|
| 192 |
|
| 193 |
|
| 194 |
-
def _run_sepformer(path_16k: str) -> torch.Tensor:
|
| 195 |
-
|
| 196 |
-
|
| 197 |
-
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
| 204 |
-
|
| 205 |
-
|
| 206 |
-
|
| 207 |
-
|
| 208 |
-
|
| 209 |
-
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
|
| 213 |
-
|
| 214 |
-
|
| 215 |
-
|
| 216 |
-
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
|
| 223 |
-
|
| 224 |
-
|
| 225 |
-
|
| 226 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 227 |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_mid:
|
| 228 |
sf.write(tmp_mid.name, stage1.squeeze(0).numpy(), 16000, subtype="PCM_16")
|
| 229 |
tmp_mid.flush()
|
| 230 |
mid_path = tmp_mid.name
|
| 231 |
try:
|
| 232 |
-
stage2 = _run_metricgan(mid_path)
|
|
|
|
|
|
|
|
|
|
| 233 |
finally:
|
| 234 |
try:
|
| 235 |
os.remove(mid_path)
|
| 236 |
except Exception:
|
| 237 |
pass
|
| 238 |
-
return stage2
|
| 239 |
|
| 240 |
|
| 241 |
# -----------------------------
|
|
@@ -244,43 +256,52 @@ def _run_dual_stage(path_16k: str) -> torch.Tensor:
|
|
| 244 |
def _enhance_numpy_audio(
|
| 245 |
audio: Tuple[int, np.ndarray],
|
| 246 |
mode: str = "MetricGAN+ (denoise)",
|
| 247 |
-
dry_wet: float = 1.0, # 0..1
|
| 248 |
-
presence_db: float = 0.0,
|
| 249 |
-
lowcut_hz: float = 0.0,
|
| 250 |
out_sr: Optional[int] = None,
|
| 251 |
loudness_match: bool = True,
|
| 252 |
) -> Tuple[int, np.ndarray, np.ndarray, str]:
|
| 253 |
"""
|
| 254 |
-
Input: (sr, np.float32 [T] or [T,C])
|
| 255 |
Returns: (sr_out, enhanced, delta, metrics_text)
|
| 256 |
-
- enhanced: final output (after dry/wet, polish, loudness match)
|
| 257 |
-
- delta: original - enhanced (at output SR & length-matched)
|
| 258 |
"""
|
| 259 |
sr_in, wav_np = audio
|
| 260 |
wav_mono = _sanitize(_to_mono(wav_np))
|
| 261 |
|
| 262 |
-
# Guard: tiny input
|
| 263 |
if wav_mono.size < 32:
|
| 264 |
sr_out = sr_in if sr_in else 16000
|
| 265 |
silence = np.zeros(int(sr_out * 1.0), dtype=np.float32)
|
| 266 |
return sr_out, silence, silence, "Input too short; returned silence."
|
| 267 |
|
| 268 |
dry_t = torch.from_numpy(wav_mono).unsqueeze(0) # [1, T @ sr_in]
|
| 269 |
-
# Prepare 16k mono file for models
|
| 270 |
wav_16k = _resample_torch(dry_t, sr_in, 16000)
|
|
|
|
| 271 |
|
|
|
|
| 272 |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_in:
|
| 273 |
sf.write(tmp_in.name, wav_16k.squeeze(0).numpy(), 16000, subtype="PCM_16")
|
| 274 |
tmp_in.flush()
|
| 275 |
path_16k = tmp_in.name
|
| 276 |
|
|
|
|
| 277 |
try:
|
| 278 |
if mode.startswith("MetricGAN"):
|
| 279 |
-
proc = _run_metricgan(path_16k)
|
| 280 |
elif mode.startswith("SepFormer"):
|
| 281 |
-
proc = _run_sepformer(path_16k
|
|
|
|
|
|
|
|
|
|
| 282 |
elif mode.startswith("Dual-Stage"):
|
| 283 |
-
proc = _run_dual_stage(path_16k
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 284 |
else: # Bypass (EQ only)
|
| 285 |
proc = wav_16k
|
| 286 |
finally:
|
|
@@ -289,7 +310,7 @@ def _enhance_numpy_audio(
|
|
| 289 |
except Exception:
|
| 290 |
pass
|
| 291 |
|
| 292 |
-
#
|
| 293 |
proc = _highpass(proc, 16000, lowcut_hz)
|
| 294 |
proc = _presence_boost(proc, 16000, presence_db)
|
| 295 |
proc = _limit_peak(proc, target_dbfs=-1.0)
|
|
@@ -299,34 +320,32 @@ def _enhance_numpy_audio(
|
|
| 299 |
proc_out = _resample_torch(proc, 16000, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 300 |
dry_out = _resample_torch(dry_t, sr_in, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 301 |
|
| 302 |
-
#
|
| 303 |
proc_out, dry_out = _align_lengths(proc_out, dry_out)
|
| 304 |
dry_wet = float(np.clip(dry_wet, 0.0, 1.0))
|
| 305 |
enhanced = proc_out * dry_wet + dry_out * (1.0 - dry_wet)
|
| 306 |
|
| 307 |
-
# Loudness match
|
| 308 |
loud_text = "off"
|
| 309 |
if loudness_match:
|
| 310 |
enhanced, loud_text = _loudness_match_to_ref(dry_out, enhanced, sr_out)
|
| 311 |
|
| 312 |
enhanced = _sanitize(enhanced)
|
| 313 |
|
| 314 |
-
# Delta
|
| 315 |
-
delta = dry_out - enhanced
|
| 316 |
-
delta = _sanitize(delta)
|
| 317 |
|
| 318 |
-
#
|
| 319 |
eps = 1e-9
|
| 320 |
-
rms_dry = np.sqrt(np.mean(dry_out**2) + eps)
|
| 321 |
-
rms_enh = np.sqrt(np.mean(enhanced**2) + eps)
|
| 322 |
rms_delta = np.sqrt(np.mean(delta**2) + eps)
|
| 323 |
-
change_db = 20 * np.log10((rms_dry + eps) / (rms_delta + eps))
|
| 324 |
metrics = (
|
| 325 |
f"Mode: {mode} | Dry/Wet: {dry_wet*100:.0f}% | Presence: {presence_db:+.1f} dB | "
|
| 326 |
-
f"Low-cut: {lowcut_hz:.0f} Hz | Loudness match: {loud_text}
|
| 327 |
-
f"
|
| 328 |
-
f'Approx. "noise removed" ratio: {change_db:.2f} dB'
|
| 329 |
)
|
|
|
|
|
|
|
|
|
|
| 330 |
|
| 331 |
return sr_out, enhanced, delta, metrics
|
| 332 |
|
|
@@ -335,7 +354,6 @@ def _enhance_numpy_audio(
|
|
| 335 |
# Presets
|
| 336 |
# -----------------------------
|
| 337 |
PRESETS: Dict[str, Dict[str, Any]] = {
|
| 338 |
-
# Maximum cleanup: dereverb + denoise chain, high dry/wet, subtle presence, mild HPF
|
| 339 |
"Ultimate Clean Voice": {
|
| 340 |
"mode": "Dual-Stage (SepFormer → MetricGAN+)",
|
| 341 |
"dry_wet": 0.92,
|
|
@@ -343,7 +361,6 @@ PRESETS: Dict[str, Dict[str, Any]] = {
|
|
| 343 |
"lowcut_hz": 80.0,
|
| 344 |
"loudness_match": True,
|
| 345 |
},
|
| 346 |
-
# Natural cleanup for most cases
|
| 347 |
"Natural Speech": {
|
| 348 |
"mode": "MetricGAN+ (denoise)",
|
| 349 |
"dry_wet": 0.85,
|
|
@@ -351,31 +368,27 @@ PRESETS: Dict[str, Dict[str, Any]] = {
|
|
| 351 |
"lowcut_hz": 50.0,
|
| 352 |
"loudness_match": True,
|
| 353 |
},
|
| 354 |
-
# Studio-ish clarity
|
| 355 |
"Podcast Studio": {
|
| 356 |
"mode": "MetricGAN+ (denoise)",
|
| 357 |
-
"dry_wet": 0.
|
| 358 |
"presence_db": 2.0,
|
| 359 |
"lowcut_hz": 75.0,
|
| 360 |
"loudness_match": True,
|
| 361 |
},
|
| 362 |
-
# Strong dereverb, blend to avoid artifacts
|
| 363 |
"Room Dereverb": {
|
| 364 |
"mode": "SepFormer (dereverb+denoise)",
|
| 365 |
-
"dry_wet": 0.
|
| 366 |
"presence_db": 0.5,
|
| 367 |
"lowcut_hz": 60.0,
|
| 368 |
"loudness_match": True,
|
| 369 |
},
|
| 370 |
-
# When music bed is under voice—be gentle
|
| 371 |
"Music + Voice Safe": {
|
| 372 |
"mode": "MetricGAN+ (denoise)",
|
| 373 |
-
"dry_wet": 0.
|
| 374 |
"presence_db": 0.0,
|
| 375 |
"lowcut_hz": 40.0,
|
| 376 |
"loudness_match": True,
|
| 377 |
},
|
| 378 |
-
# Harsh phone/zoom recordings
|
| 379 |
"Phone Call Rescue": {
|
| 380 |
"mode": "MetricGAN+ (denoise)",
|
| 381 |
"dry_wet": 0.88,
|
|
@@ -383,7 +396,6 @@ PRESETS: Dict[str, Dict[str, Any]] = {
|
|
| 383 |
"lowcut_hz": 100.0,
|
| 384 |
"loudness_match": True,
|
| 385 |
},
|
| 386 |
-
# Light touch
|
| 387 |
"Gentle Denoise": {
|
| 388 |
"mode": "MetricGAN+ (denoise)",
|
| 389 |
"dry_wet": 0.65,
|
|
@@ -391,13 +403,12 @@ PRESETS: Dict[str, Dict[str, Any]] = {
|
|
| 391 |
"lowcut_hz": 0.0,
|
| 392 |
"loudness_match": True,
|
| 393 |
},
|
| 394 |
-
"Custom": {}
|
| 395 |
}
|
| 396 |
|
| 397 |
|
| 398 |
def _apply_preset(preset_name: str):
|
| 399 |
cfg = PRESETS.get(preset_name, {})
|
| 400 |
-
# Return gr.update() for each adjustable control
|
| 401 |
def upd(val=None):
|
| 402 |
return gr.update(value=val) if val is not None else gr.update()
|
| 403 |
if not cfg or preset_name == "Custom":
|
|
@@ -447,7 +458,12 @@ def gradio_enhance(
|
|
| 447 |
|
| 448 |
|
| 449 |
with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
| 450 |
-
gr.Markdown(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 451 |
|
| 452 |
with gr.Row():
|
| 453 |
with gr.Column(scale=1):
|
|
@@ -462,7 +478,6 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
| 462 |
label="Preset",
|
| 463 |
)
|
| 464 |
|
| 465 |
-
# Controls that presets will adjust
|
| 466 |
mode = gr.Radio(
|
| 467 |
choices=[
|
| 468 |
"MetricGAN+ (denoise)",
|
|
@@ -490,7 +505,6 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
| 490 |
label="Output Sample Rate",
|
| 491 |
)
|
| 492 |
|
| 493 |
-
# Apply preset on change
|
| 494 |
preset.change(
|
| 495 |
_apply_preset,
|
| 496 |
inputs=[preset],
|
|
@@ -511,5 +525,5 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
| 511 |
outputs=[out_audio, ab_audio, delta_audio, metrics],
|
| 512 |
)
|
| 513 |
|
| 514 |
-
#
|
| 515 |
demo.launch()
|
|
|
|
| 1 |
+
# app.py — Voice Clarity Booster with Presets, Dual-Stage (smart CPU/GPU guard),
|
| 2 |
+
# A/B alternating, Delta (Original−Enhanced), Loudness Match, and safe fallbacks.
|
| 3 |
+
#
|
| 4 |
+
# Key change: On CPU or for long clips, SepFormer/Dual-Stage auto-falls back to MetricGAN+
|
| 5 |
+
# instead of hanging. Metrics show the fallback reason.
|
| 6 |
|
| 7 |
import os
|
|
|
|
| 8 |
import tempfile
|
| 9 |
from typing import Tuple, Optional, Dict, Any
|
| 10 |
|
| 11 |
+
# ---- Quiet noisy deprecation warnings (optional) ----
|
| 12 |
import warnings
|
| 13 |
warnings.filterwarnings(
|
| 14 |
"ignore",
|
|
|
|
| 26 |
import torch
|
| 27 |
import torchaudio
|
| 28 |
|
| 29 |
+
# Optional LUFS matching (falls back to RMS if unavailable)
|
| 30 |
try:
|
| 31 |
import pyloudnorm as pyln
|
| 32 |
_HAVE_PYLN = True
|
|
|
|
| 46 |
|
| 47 |
|
| 48 |
# -----------------------------
|
| 49 |
+
# Environment / runtime limits
|
| 50 |
# -----------------------------
|
| 51 |
+
USE_GPU = torch.cuda.is_available()
|
| 52 |
+
# On CPU, SepFormer is extremely slow; avoid for long clips (or disable).
|
| 53 |
+
MAX_SEPFORMER_SEC_CPU = float(os.getenv("MAX_SEPFORMER_SEC_CPU", 12)) # hard limit for CPU
|
| 54 |
+
MAX_SEPFORMER_SEC_GPU = float(os.getenv("MAX_SEPFORMER_SEC_GPU", 180)) # just in case
|
| 55 |
+
ALLOW_SEPFORMER_CPU = os.getenv("ALLOW_SEPFORMER_CPU", "0") == "1" # override at your risk
|
| 56 |
+
|
| 57 |
+
_DEVICE = "cuda" if USE_GPU else "cpu"
|
| 58 |
_ENHANCER_METRICGAN: Optional[SpectralMaskEnhancement] = None
|
| 59 |
_ENHANCER_SEPFORMER: Optional[SepformerSeparation] = None
|
| 60 |
|
|
|
|
| 85 |
# Audio helpers
|
| 86 |
# -----------------------------
|
| 87 |
def _to_mono(wav: np.ndarray) -> np.ndarray:
|
| 88 |
+
"""Robust mono: accepts [T], [T,C], [C,T]; treats dim<=8 as channels."""
|
|
|
|
|
|
|
|
|
|
| 89 |
wav = np.asarray(wav, dtype=np.float32)
|
| 90 |
if wav.ndim == 1:
|
| 91 |
return wav
|
|
|
|
| 98 |
if t <= 8: # [C, T]
|
| 99 |
return wav.mean(axis=0).astype(np.float32)
|
| 100 |
return wav.mean(axis=1).astype(np.float32)
|
|
|
|
| 101 |
return wav.reshape(-1).astype(np.float32)
|
| 102 |
|
| 103 |
|
|
|
|
| 139 |
|
| 140 |
|
| 141 |
def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[np.ndarray, str]:
|
| 142 |
+
"""Match cand loudness to ref (LUFS if available, else RMS)."""
|
|
|
|
|
|
|
|
|
|
| 143 |
if len(ref) < sr // 10 or len(cand) < sr // 10:
|
| 144 |
return cand, "skipped (clip too short)"
|
| 145 |
|
| 146 |
if _HAVE_PYLN:
|
| 147 |
try:
|
| 148 |
+
meter = pyln.Meter(sr)
|
| 149 |
l_ref = meter.integrated_loudness(ref.astype(np.float64))
|
| 150 |
l_cand = meter.integrated_loudness(cand.astype(np.float64))
|
| 151 |
gain_db = l_ref - l_cand
|
|
|
|
| 154 |
except Exception:
|
| 155 |
pass
|
| 156 |
|
| 157 |
+
# RMS fallback
|
| 158 |
eps = 1e-9
|
| 159 |
rms_ref = np.sqrt(np.mean(ref**2) + eps)
|
| 160 |
rms_cand = np.sqrt(np.mean(cand**2) + eps)
|
|
|
|
| 165 |
|
| 166 |
|
| 167 |
def _make_ab_alternating(orig: np.ndarray, enh: np.ndarray, sr: int, seg_sec: float = 2.0) -> np.ndarray:
|
| 168 |
+
"""A/B track flips Original→Enhanced every seg_sec."""
|
|
|
|
|
|
|
| 169 |
seg_n = max(1, int(seg_sec * sr))
|
| 170 |
orig, enh = _align_lengths(orig, enh)
|
| 171 |
n = len(orig)
|
| 172 |
out = []
|
| 173 |
pos = 0
|
| 174 |
+
flag = True
|
| 175 |
while pos < n:
|
| 176 |
end = min(pos + seg_n, n)
|
| 177 |
out.append(orig[pos:end] if flag else enh[pos:end])
|
|
|
|
| 181 |
|
| 182 |
|
| 183 |
# -----------------------------
|
| 184 |
+
# Model runners (with guards)
|
| 185 |
# -----------------------------
|
| 186 |
def _run_metricgan(path_16k: str) -> torch.Tensor:
|
| 187 |
enh = _get_metricgan()
|
|
|
|
| 190 |
return out
|
| 191 |
|
| 192 |
|
| 193 |
+
def _run_sepformer(path_16k: str, dur_sec: float) -> Tuple[Optional[torch.Tensor], Optional[str]]:
|
| 194 |
+
"""Return (tensor, fallback_msg). If not safe to run, returns (None, reason)."""
|
| 195 |
+
if USE_GPU:
|
| 196 |
+
if dur_sec > MAX_SEPFORMER_SEC_GPU:
|
| 197 |
+
return None, f"SepFormer skipped (GPU clip {dur_sec:.1f}s > {MAX_SEPFORMER_SEC_GPU:.0f}s limit)"
|
| 198 |
+
else:
|
| 199 |
+
if not ALLOW_SEPFORMER_CPU:
|
| 200 |
+
return None, "SepFormer disabled on CPU (set ALLOW_SEPFORMER_CPU=1 to force)"
|
| 201 |
+
if dur_sec > MAX_SEPFORMER_SEC_CPU:
|
| 202 |
+
return None, f"SepFormer skipped (CPU clip {dur_sec:.1f}s > {MAX_SEPFORMER_SEC_CPU:.0f}s limit)"
|
| 203 |
+
|
| 204 |
+
try:
|
| 205 |
+
sep = _get_sepformer()
|
| 206 |
+
with torch.no_grad():
|
| 207 |
+
out = sep.separate_file(path=path_16k)
|
| 208 |
+
if isinstance(out, torch.Tensor):
|
| 209 |
+
if out.dim() == 1:
|
| 210 |
+
out = out.unsqueeze(0)
|
| 211 |
+
elif out.dim() == 2 and out.shape[0] > 1:
|
| 212 |
+
out = out[:1, :]
|
| 213 |
+
return out, None
|
| 214 |
+
if hasattr(out, "numpy"):
|
| 215 |
+
t = torch.from_numpy(out.numpy())
|
| 216 |
+
if t.dim() == 1:
|
| 217 |
+
t = t.unsqueeze(0)
|
| 218 |
+
elif t.dim() == 2 and t.shape[0] > 1:
|
| 219 |
+
t = t[:1, :]
|
| 220 |
+
return t, None
|
| 221 |
+
if isinstance(out, (list, tuple)):
|
| 222 |
+
t = torch.tensor(out[0] if isinstance(out[0], (np.ndarray, list)) else out, dtype=torch.float32)
|
| 223 |
+
if t.dim() == 1:
|
| 224 |
+
t = t.unsqueeze(0)
|
| 225 |
+
return t, None
|
| 226 |
+
return None, "SepFormer returned unexpected format; skipped"
|
| 227 |
+
except Exception as e:
|
| 228 |
+
return None, f"SepFormer error: {e.__class__.__name__}"
|
| 229 |
+
|
| 230 |
+
|
| 231 |
+
def _run_dual_stage(path_16k: str, dur_sec: float) -> Tuple[Optional[torch.Tensor], Optional[str]]:
|
| 232 |
+
"""SepFormer → MetricGAN+. Applies same guards; returns (tensor, msg)."""
|
| 233 |
+
stage1, msg = _run_sepformer(path_16k, dur_sec)
|
| 234 |
+
if stage1 is None:
|
| 235 |
+
return None, msg or "SepFormer unavailable"
|
| 236 |
+
# Save stage1 to temp for MetricGAN
|
| 237 |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_mid:
|
| 238 |
sf.write(tmp_mid.name, stage1.squeeze(0).numpy(), 16000, subtype="PCM_16")
|
| 239 |
tmp_mid.flush()
|
| 240 |
mid_path = tmp_mid.name
|
| 241 |
try:
|
| 242 |
+
stage2 = _run_metricgan(mid_path)
|
| 243 |
+
return stage2, None
|
| 244 |
+
except Exception as e:
|
| 245 |
+
return None, f"MetricGAN after SepFormer failed: {e.__class__.__name__}"
|
| 246 |
finally:
|
| 247 |
try:
|
| 248 |
os.remove(mid_path)
|
| 249 |
except Exception:
|
| 250 |
pass
|
|
|
|
| 251 |
|
| 252 |
|
| 253 |
# -----------------------------
|
|
|
|
| 256 |
def _enhance_numpy_audio(
|
| 257 |
audio: Tuple[int, np.ndarray],
|
| 258 |
mode: str = "MetricGAN+ (denoise)",
|
| 259 |
+
dry_wet: float = 1.0, # 0..1
|
| 260 |
+
presence_db: float = 0.0,
|
| 261 |
+
lowcut_hz: float = 0.0,
|
| 262 |
out_sr: Optional[int] = None,
|
| 263 |
loudness_match: bool = True,
|
| 264 |
) -> Tuple[int, np.ndarray, np.ndarray, str]:
|
| 265 |
"""
|
|
|
|
| 266 |
Returns: (sr_out, enhanced, delta, metrics_text)
|
|
|
|
|
|
|
| 267 |
"""
|
| 268 |
sr_in, wav_np = audio
|
| 269 |
wav_mono = _sanitize(_to_mono(wav_np))
|
| 270 |
|
|
|
|
| 271 |
if wav_mono.size < 32:
|
| 272 |
sr_out = sr_in if sr_in else 16000
|
| 273 |
silence = np.zeros(int(sr_out * 1.0), dtype=np.float32)
|
| 274 |
return sr_out, silence, silence, "Input too short; returned silence."
|
| 275 |
|
| 276 |
dry_t = torch.from_numpy(wav_mono).unsqueeze(0) # [1, T @ sr_in]
|
|
|
|
| 277 |
wav_16k = _resample_torch(dry_t, sr_in, 16000)
|
| 278 |
+
dur_sec = float(wav_16k.shape[-1]) / 16000.0
|
| 279 |
|
| 280 |
+
# Write temp input for model runners
|
| 281 |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_in:
|
| 282 |
sf.write(tmp_in.name, wav_16k.squeeze(0).numpy(), 16000, subtype="PCM_16")
|
| 283 |
tmp_in.flush()
|
| 284 |
path_16k = tmp_in.name
|
| 285 |
|
| 286 |
+
fallback_note = None
|
| 287 |
try:
|
| 288 |
if mode.startswith("MetricGAN"):
|
| 289 |
+
proc = _run_metricgan(path_16k)
|
| 290 |
elif mode.startswith("SepFormer"):
|
| 291 |
+
proc, msg = _run_sepformer(path_16k, dur_sec)
|
| 292 |
+
if proc is None:
|
| 293 |
+
proc = wav_16k # bypass
|
| 294 |
+
fallback_note = f"[Fallback→Bypass] {msg}"
|
| 295 |
elif mode.startswith("Dual-Stage"):
|
| 296 |
+
proc, msg = _run_dual_stage(path_16k, dur_sec)
|
| 297 |
+
if proc is None:
|
| 298 |
+
# fall back to MetricGAN if SepFormer not possible
|
| 299 |
+
try:
|
| 300 |
+
proc = _run_metricgan(path_16k)
|
| 301 |
+
fallback_note = f"[Fallback→MetricGAN+] {msg}"
|
| 302 |
+
except Exception as e:
|
| 303 |
+
proc = wav_16k # ultimate fallback: bypass
|
| 304 |
+
fallback_note = f"[Fallback→Bypass] {msg or ''} / MetricGAN error: {e.__class__.__name__}"
|
| 305 |
else: # Bypass (EQ only)
|
| 306 |
proc = wav_16k
|
| 307 |
finally:
|
|
|
|
| 310 |
except Exception:
|
| 311 |
pass
|
| 312 |
|
| 313 |
+
# Polish on processed only
|
| 314 |
proc = _highpass(proc, 16000, lowcut_hz)
|
| 315 |
proc = _presence_boost(proc, 16000, presence_db)
|
| 316 |
proc = _limit_peak(proc, target_dbfs=-1.0)
|
|
|
|
| 320 |
proc_out = _resample_torch(proc, 16000, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 321 |
dry_out = _resample_torch(dry_t, sr_in, sr_out).squeeze(0).numpy().astype(np.float32)
|
| 322 |
|
| 323 |
+
# Mix dry/wet
|
| 324 |
proc_out, dry_out = _align_lengths(proc_out, dry_out)
|
| 325 |
dry_wet = float(np.clip(dry_wet, 0.0, 1.0))
|
| 326 |
enhanced = proc_out * dry_wet + dry_out * (1.0 - dry_wet)
|
| 327 |
|
| 328 |
+
# Loudness match
|
| 329 |
loud_text = "off"
|
| 330 |
if loudness_match:
|
| 331 |
enhanced, loud_text = _loudness_match_to_ref(dry_out, enhanced, sr_out)
|
| 332 |
|
| 333 |
enhanced = _sanitize(enhanced)
|
| 334 |
|
| 335 |
+
# Delta
|
| 336 |
+
delta = _sanitize(dry_out - enhanced)
|
|
|
|
| 337 |
|
| 338 |
+
# Metrics
|
| 339 |
eps = 1e-9
|
|
|
|
|
|
|
| 340 |
rms_delta = np.sqrt(np.mean(delta**2) + eps)
|
|
|
|
| 341 |
metrics = (
|
| 342 |
f"Mode: {mode} | Dry/Wet: {dry_wet*100:.0f}% | Presence: {presence_db:+.1f} dB | "
|
| 343 |
+
f"Low-cut: {lowcut_hz:.0f} Hz | Loudness match: {loud_text} | Device: {'GPU' if USE_GPU else 'CPU'} | "
|
| 344 |
+
f"Clip @16k: {dur_sec:.2f}s"
|
|
|
|
| 345 |
)
|
| 346 |
+
if fallback_note:
|
| 347 |
+
metrics += f"\n{fallback_note}"
|
| 348 |
+
metrics += f"\nΔ RMS: {20*np.log10(rms_delta+eps):+.2f} dBFS"
|
| 349 |
|
| 350 |
return sr_out, enhanced, delta, metrics
|
| 351 |
|
|
|
|
| 354 |
# Presets
|
| 355 |
# -----------------------------
|
| 356 |
PRESETS: Dict[str, Dict[str, Any]] = {
|
|
|
|
| 357 |
"Ultimate Clean Voice": {
|
| 358 |
"mode": "Dual-Stage (SepFormer → MetricGAN+)",
|
| 359 |
"dry_wet": 0.92,
|
|
|
|
| 361 |
"lowcut_hz": 80.0,
|
| 362 |
"loudness_match": True,
|
| 363 |
},
|
|
|
|
| 364 |
"Natural Speech": {
|
| 365 |
"mode": "MetricGAN+ (denoise)",
|
| 366 |
"dry_wet": 0.85,
|
|
|
|
| 368 |
"lowcut_hz": 50.0,
|
| 369 |
"loudness_match": True,
|
| 370 |
},
|
|
|
|
| 371 |
"Podcast Studio": {
|
| 372 |
"mode": "MetricGAN+ (denoise)",
|
| 373 |
+
"dry_wet": 0.90,
|
| 374 |
"presence_db": 2.0,
|
| 375 |
"lowcut_hz": 75.0,
|
| 376 |
"loudness_match": True,
|
| 377 |
},
|
|
|
|
| 378 |
"Room Dereverb": {
|
| 379 |
"mode": "SepFormer (dereverb+denoise)",
|
| 380 |
+
"dry_wet": 0.70,
|
| 381 |
"presence_db": 0.5,
|
| 382 |
"lowcut_hz": 60.0,
|
| 383 |
"loudness_match": True,
|
| 384 |
},
|
|
|
|
| 385 |
"Music + Voice Safe": {
|
| 386 |
"mode": "MetricGAN+ (denoise)",
|
| 387 |
+
"dry_wet": 0.60,
|
| 388 |
"presence_db": 0.0,
|
| 389 |
"lowcut_hz": 40.0,
|
| 390 |
"loudness_match": True,
|
| 391 |
},
|
|
|
|
| 392 |
"Phone Call Rescue": {
|
| 393 |
"mode": "MetricGAN+ (denoise)",
|
| 394 |
"dry_wet": 0.88,
|
|
|
|
| 396 |
"lowcut_hz": 100.0,
|
| 397 |
"loudness_match": True,
|
| 398 |
},
|
|
|
|
| 399 |
"Gentle Denoise": {
|
| 400 |
"mode": "MetricGAN+ (denoise)",
|
| 401 |
"dry_wet": 0.65,
|
|
|
|
| 403 |
"lowcut_hz": 0.0,
|
| 404 |
"loudness_match": True,
|
| 405 |
},
|
| 406 |
+
"Custom": {}
|
| 407 |
}
|
| 408 |
|
| 409 |
|
| 410 |
def _apply_preset(preset_name: str):
|
| 411 |
cfg = PRESETS.get(preset_name, {})
|
|
|
|
| 412 |
def upd(val=None):
|
| 413 |
return gr.update(value=val) if val is not None else gr.update()
|
| 414 |
if not cfg or preset_name == "Custom":
|
|
|
|
| 458 |
|
| 459 |
|
| 460 |
with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
| 461 |
+
gr.Markdown(
|
| 462 |
+
f"## Voice Clarity Booster — Presets, A/B, Delta, Loudness Match \n"
|
| 463 |
+
f"**Device:** {'GPU' if USE_GPU else 'CPU'} · "
|
| 464 |
+
f"SepFormer limits — CPU≤{MAX_SEPFORMER_SEC_CPU:.0f}s, GPU≤{MAX_SEPFORMER_SEC_GPU:.0f}s"
|
| 465 |
+
+ ("" if USE_GPU or ALLOW_SEPFORMER_CPU else " · (SepFormer disabled on CPU)")
|
| 466 |
+
)
|
| 467 |
|
| 468 |
with gr.Row():
|
| 469 |
with gr.Column(scale=1):
|
|
|
|
| 478 |
label="Preset",
|
| 479 |
)
|
| 480 |
|
|
|
|
| 481 |
mode = gr.Radio(
|
| 482 |
choices=[
|
| 483 |
"MetricGAN+ (denoise)",
|
|
|
|
| 505 |
label="Output Sample Rate",
|
| 506 |
)
|
| 507 |
|
|
|
|
| 508 |
preset.change(
|
| 509 |
_apply_preset,
|
| 510 |
inputs=[preset],
|
|
|
|
| 525 |
outputs=[out_audio, ab_audio, delta_audio, metrics],
|
| 526 |
)
|
| 527 |
|
| 528 |
+
# Launch unguarded so Spaces initializes
|
| 529 |
demo.launch()
|