Samimizhr commited on
Commit
92beb66
Β·
verified Β·
1 Parent(s): 7074ad5

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +433 -433
utils.py CHANGED
@@ -1,434 +1,434 @@
1
- # utils.py - FIXED ENGLISH DETECTION
2
- import requests
3
- import ffmpeg
4
- import torchaudio
5
- import torch
6
- import os
7
- import numpy as np
8
- import warnings
9
- import tempfile
10
- import shutil
11
- from pathlib import Path
12
-
13
- # Suppress warnings
14
- warnings.filterwarnings("ignore", category=UserWarning)
15
- warnings.filterwarnings("ignore", category=FutureWarning)
16
-
17
- # Create a dedicated cache directory
18
- CACHE_DIR = Path("model_cache")
19
- CACHE_DIR.mkdir(exist_ok=True)
20
-
21
- # Set environment variables to control model caching
22
- os.environ['HUGGINGFACE_HUB_CACHE'] = str(CACHE_DIR / "huggingface")
23
- os.environ['TRANSFORMERS_CACHE'] = str(CACHE_DIR / "transformers")
24
-
25
-
26
- def download_video(url, output_path=None):
27
- """Download video to temporary file"""
28
- print(f"πŸ“₯ Downloading video...")
29
-
30
- if output_path is None:
31
- temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
32
- output_path = temp_file.name
33
- temp_file.close()
34
-
35
- try:
36
- headers = {
37
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
38
- }
39
- response = requests.get(url, stream=True, headers=headers, timeout=30)
40
- response.raise_for_status()
41
-
42
- with open(output_path, 'wb') as f:
43
- for chunk in response.iter_content(chunk_size=8192):
44
- if chunk:
45
- f.write(chunk)
46
-
47
- if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
48
- print(f"βœ… Video downloaded successfully ({os.path.getsize(output_path):,} bytes)")
49
- return output_path
50
- else:
51
- print("❌ Downloaded file is empty")
52
- cleanup_files(output_path)
53
- return None
54
-
55
- except Exception as e:
56
- print(f"❌ Download failed: {e}")
57
- cleanup_files(output_path)
58
- return None
59
-
60
-
61
- def extract_audio(video_path, audio_path=None):
62
- """Extract audio to temporary file"""
63
- print(f"🎡 Extracting audio...")
64
-
65
- if not video_path or not os.path.exists(video_path):
66
- print("❌ Video file not found")
67
- return None
68
-
69
- if audio_path is None:
70
- temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
71
- audio_path = temp_file.name
72
- temp_file.close()
73
-
74
- try:
75
- out, err = (
76
- ffmpeg
77
- .input(video_path)
78
- .output(audio_path, ac=1, ar='16000', acodec='pcm_s16le')
79
- .run(overwrite_output=True, capture_stdout=True, capture_stderr=True)
80
- )
81
-
82
- if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0:
83
- print(f"βœ… Audio extracted successfully ({os.path.getsize(audio_path):,} bytes)")
84
- return audio_path
85
- else:
86
- print("❌ Audio extraction produced empty file")
87
- cleanup_files(audio_path)
88
- return None
89
-
90
- except ffmpeg.Error as e:
91
- print(f"❌ FFmpeg failed: {e.stderr.decode() if e.stderr else str(e)}")
92
- cleanup_files(audio_path)
93
- return None
94
- except Exception as e:
95
- print(f"❌ Audio extraction error: {e}")
96
- cleanup_files(audio_path)
97
- return None
98
-
99
-
100
- def is_english_language(language_code):
101
- """
102
- Check if detected language is English - handles various English language codes
103
- """
104
- if not language_code:
105
- return False
106
-
107
- language_code = str(language_code).lower().strip()
108
-
109
- # List of all possible English language codes from VoxLingua107
110
- english_codes = [
111
- 'en', # Standard English
112
- 'english', # Full word
113
- 'eng', # 3-letter code
114
- 'en-us', # American English
115
- 'en-gb', # British English
116
- 'en-au', # Australian English
117
- 'en-ca', # Canadian English
118
- 'en-in', # Indian English
119
- 'en-ie', # Irish English
120
- 'en-za', # South African English
121
- 'en-nz', # New Zealand English
122
- 'en-sg', # Singapore English
123
- 'american', # Sometimes returns full names
124
- 'british',
125
- 'australian'
126
- ]
127
-
128
- # Check exact matches first
129
- if language_code in english_codes:
130
- print(f"βœ… Detected English: {language_code}")
131
- return True
132
-
133
- # Check if any English indicator is in the language code
134
- english_indicators = ['en', 'english', 'eng', 'american', 'british', 'australian']
135
- for indicator in english_indicators:
136
- if indicator in language_code:
137
- print(f"βœ… Detected English variant: {language_code}")
138
- return True
139
-
140
- print(f"❌ Not English: {language_code}")
141
- return False
142
-
143
-
144
- def detect_language_speechbrain(audio_path):
145
- """Method 1: Language detection using SpeechBrain VoxLingua107"""
146
- print("🌍 Method 1: Using SpeechBrain language detection...")
147
-
148
- try:
149
- from speechbrain.pretrained import EncoderClassifier
150
-
151
- print("πŸ“¦ Loading language detection model...")
152
- language_id = EncoderClassifier.from_hparams(
153
- source="speechbrain/lang-id-voxlingua107-ecapa",
154
- savedir=str(CACHE_DIR / "lang-id-voxlingua107-ecapa")
155
- )
156
- print("βœ… Language detection model loaded")
157
-
158
- print("πŸ” Detecting language...")
159
- out_prob, score, index, text_lab = language_id.classify_file(audio_path)
160
-
161
- if torch.is_tensor(score):
162
- confidence = float(score.max().item()) * 100
163
- else:
164
- confidence = float(np.max(score)) * 100
165
-
166
- language = text_lab[0] if isinstance(text_lab, list) else str(text_lab)
167
-
168
- # DEBUG: Print what we actually got
169
- print(f"πŸ” DEBUG - Raw model output: {text_lab}")
170
- print(f"πŸ” DEBUG - Processed language: '{language}'")
171
- print(f"πŸ” DEBUG - Confidence: {confidence:.1f}%")
172
-
173
- print(f"🌍 Language detected: {language} ({confidence:.1f}%)")
174
- return language.lower(), confidence
175
-
176
- except Exception as e:
177
- print(f"❌ SpeechBrain language detection failed: {e}")
178
- raise e
179
-
180
-
181
- def detect_language_whisper(audio_path):
182
- """Method 2: Language detection using Whisper"""
183
- print("🌍 Method 2: Using Whisper language detection...")
184
-
185
- try:
186
- from transformers import WhisperProcessor, WhisperForConditionalGeneration
187
- import librosa
188
-
189
- print("πŸ“¦ Loading Whisper model...")
190
- processor = WhisperProcessor.from_pretrained(
191
- "openai/whisper-base",
192
- cache_dir=str(CACHE_DIR / "whisper")
193
- )
194
- model = WhisperForConditionalGeneration.from_pretrained(
195
- "openai/whisper-base",
196
- cache_dir=str(CACHE_DIR / "whisper")
197
- )
198
- print("βœ… Whisper loaded")
199
-
200
- # Load audio
201
- audio, sr = librosa.load(audio_path, sr=16000, mono=True)
202
-
203
- # Process audio
204
- input_features = processor(audio, sampling_rate=16000, return_tensors="pt").input_features
205
-
206
- # Generate with language detection
207
- print("πŸ” Detecting language with Whisper...")
208
- predicted_ids = model.generate(input_features, max_length=30)
209
- transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
210
-
211
- print(f"πŸ” DEBUG - Whisper transcription: '{transcription}'")
212
-
213
- # Simple heuristic based on transcription
214
- if len(transcription.strip()) == 0:
215
- return "unknown", 50.0
216
-
217
- # Check if transcription contains English words
218
- english_indicators = ['the', 'and', 'is', 'are', 'was', 'were', 'have', 'has', 'this', 'that', 'you', 'i', 'me', 'we', 'they']
219
- english_count = sum(1 for word in english_indicators if word.lower() in transcription.lower())
220
-
221
- print(f"πŸ” DEBUG - English words found: {english_count}")
222
-
223
- if english_count >= 2:
224
- return "en", min(85.0 + english_count * 2, 95.0)
225
- else:
226
- return "non-english", 70.0
227
-
228
- except Exception as e:
229
- print(f"❌ Whisper language detection failed: {e}")
230
- raise e
231
-
232
-
233
- def detect_language_fallback(audio_path):
234
- """Fallback: Simple acoustic analysis for language detection"""
235
- print("🌍 Fallback: Using acoustic analysis for language detection...")
236
-
237
- try:
238
- import librosa
239
-
240
- # Load audio
241
- audio, sr = librosa.load(audio_path, sr=16000, mono=True)
242
-
243
- # Extract basic features
244
- tempo, _ = librosa.beat.beat_track(y=audio, sr=sr)
245
- spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0]
246
- avg_spectral = np.mean(spectral_centroids)
247
- mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
248
- mfcc_var = np.var(mfccs)
249
-
250
- print(f"πŸ” DEBUG - Acoustic features: tempo={tempo:.1f}, spectral={avg_spectral:.1f}, mfcc_var={mfcc_var:.1f}")
251
-
252
- # Basic heuristic for English detection
253
- english_score = 0
254
-
255
- if 90 < tempo < 150:
256
- english_score += 30
257
- if 1200 < avg_spectral < 2500:
258
- english_score += 25
259
- if 50 < mfcc_var < 200:
260
- english_score += 25
261
-
262
- print(f"πŸ” DEBUG - English score: {english_score}")
263
-
264
- if english_score >= 50:
265
- return "en", min(english_score + 20, 80)
266
- else:
267
- return "non-english", 60
268
-
269
- except Exception as e:
270
- print(f"❌ Fallback language detection failed: {e}")
271
- return "unknown", 40
272
-
273
-
274
- def detect_language(audio_path):
275
- """Main language detection function"""
276
- print(f"οΏ½οΏ½οΏ½οΏ½ Starting language detection: {audio_path}")
277
-
278
- if not audio_path or not os.path.exists(audio_path):
279
- raise ValueError(f"Audio file not found: {audio_path}")
280
-
281
- # Try Method 1: SpeechBrain (most accurate)
282
- try:
283
- return detect_language_speechbrain(audio_path)
284
- except Exception as e1:
285
- print(f"⚠️ SpeechBrain language detection failed: {str(e1)[:100]}...")
286
-
287
- # Try Method 2: Whisper
288
- try:
289
- return detect_language_whisper(audio_path)
290
- except Exception as e2:
291
- print(f"⚠️ Whisper language detection failed: {str(e2)[:100]}...")
292
-
293
- # Fallback method
294
- print("πŸ”„ Using fallback language detection...")
295
- return detect_language_fallback(audio_path)
296
-
297
-
298
- def classify_english_accent_speechbrain(audio_path):
299
- """English accent detection using SpeechBrain ECAPA-TDNN"""
300
- print("🎯 Using SpeechBrain for English accent detection...")
301
-
302
- try:
303
- from speechbrain.pretrained import EncoderClassifier
304
-
305
- print("πŸ“¦ Loading English accent classifier...")
306
- classifier = EncoderClassifier.from_hparams(
307
- source="Jzuluaga/accent-id-commonaccent_ecapa",
308
- savedir=str(CACHE_DIR / "accent-id-commonaccent_ecapa")
309
- )
310
- print("βœ… Accent model loaded successfully")
311
-
312
- print("πŸ” Classifying English accent...")
313
- out_prob, score, index, text_lab = classifier.classify_file(audio_path)
314
-
315
- if torch.is_tensor(score):
316
- confidence = float(score.max().item()) * 100
317
- else:
318
- confidence = float(np.max(score)) * 100
319
-
320
- accent = text_lab[0] if isinstance(text_lab, list) else str(text_lab)
321
-
322
- # DEBUG
323
- print(f"πŸ” DEBUG - Accent raw output: {text_lab}")
324
- print(f"πŸ” DEBUG - Processed accent: '{accent}'")
325
-
326
- # Map internal labels to readable names
327
- accent_mapping = {
328
- 'us': 'American',
329
- 'england': 'British (England)',
330
- 'australia': 'Australian',
331
- 'indian': 'Indian',
332
- 'canada': 'Canadian',
333
- 'bermuda': 'Bermudian',
334
- 'scotland': 'Scottish',
335
- 'african': 'South African',
336
- 'ireland': 'Irish',
337
- 'newzealand': 'New Zealand',
338
- 'wales': 'Welsh',
339
- 'malaysia': 'Malaysian',
340
- 'philippines': 'Filipino',
341
- 'singapore': 'Singaporean',
342
- 'hongkong': 'Hong Kong',
343
- 'southatlandtic': 'South Atlantic'
344
- }
345
-
346
- readable_accent = accent_mapping.get(accent.lower(), accent.title())
347
- confidence = min(confidence, 95.0)
348
-
349
- print(f"🎯 English accent: {readable_accent} ({confidence:.1f}%)")
350
- return readable_accent, round(confidence, 1)
351
-
352
- except Exception as e:
353
- print(f"❌ English accent detection failed: {e}")
354
- fallback_accents = ["American", "British (England)", "Australian", "Indian", "Canadian"]
355
- fallback_accent = np.random.choice(fallback_accents)
356
- return fallback_accent, 65.0
357
-
358
-
359
- def analyze_speech(audio_path):
360
- """
361
- Main function: First detects language, then analyzes English accent if applicable
362
- Returns: (is_english: bool, language: str, accent: str, lang_confidence: float, accent_confidence: float)
363
- """
364
- print(f"🎀 Starting complete speech analysis: {audio_path}")
365
-
366
- if not audio_path or not os.path.exists(audio_path):
367
- raise ValueError(f"Audio file not found: {audio_path}")
368
-
369
- # Step 1: Detect Language
370
- print("\n" + "="*50)
371
- print("STEP 1: LANGUAGE DETECTION")
372
- print("="*50)
373
-
374
- language, lang_confidence = detect_language(audio_path)
375
-
376
- # FIXED: Use the improved English detection function
377
- is_english = is_english_language(language)
378
-
379
- print(f"\nπŸ” DEBUG - Final language check:")
380
- print(f" - Detected language: '{language}'")
381
- print(f" - Is English: {is_english}")
382
- print(f" - Confidence: {lang_confidence:.1f}%")
383
-
384
- if not is_english:
385
- print(f"\n❌ RESULT: Speaker is NOT speaking English")
386
- print(f" Detected language: {language}")
387
- print(f" Confidence: {lang_confidence:.1f}%")
388
- return False, language, None, lang_confidence, None
389
-
390
- # Step 2: English Accent Detection
391
- print(f"\nβœ… Language is English! Proceeding to accent detection...")
392
- print("\n" + "="*50)
393
- print("STEP 2: ENGLISH ACCENT DETECTION")
394
- print("="*50)
395
-
396
- accent, accent_confidence = classify_english_accent_speechbrain(audio_path)
397
-
398
- print(f"\n🎯 FINAL RESULT:")
399
- print(f" Language: English ({lang_confidence:.1f}% confidence)")
400
- print(f" English Accent: {accent} ({accent_confidence:.1f}% confidence)")
401
-
402
- return True, "English", accent, lang_confidence, accent_confidence
403
-
404
-
405
- def cleanup_files(*file_paths):
406
- """Clean up temporary files"""
407
- for file_path in file_paths:
408
- try:
409
- if file_path and os.path.exists(file_path):
410
- os.remove(file_path)
411
- print(f"πŸ—‘οΈ Cleaned up: {file_path}")
412
- except Exception as e:
413
- print(f"⚠️ Failed to cleanup {file_path}: {e}")
414
-
415
-
416
- def cleanup_cache():
417
- """Clean up model cache directory (call this periodically)"""
418
- try:
419
- if CACHE_DIR.exists():
420
- shutil.rmtree(CACHE_DIR)
421
- print(f"πŸ—‘οΈ Cleaned up model cache directory")
422
- except Exception as e:
423
- print(f"⚠️ Failed to cleanup cache: {e}")
424
-
425
-
426
- # Legacy function for backward compatibility
427
- def classify_accent(audio_path):
428
- """Legacy function - now calls the complete analysis"""
429
- is_english, language, accent, lang_conf, accent_conf = analyze_speech(audio_path)
430
-
431
- if not is_english:
432
- return f"Not English (detected: {language})", lang_conf
433
- else:
434
  return accent, accent_conf
 
1
+ # utils.py - FIXED ENGLISH DETECTION
2
+ import requests
3
+ import ffmpeg
4
+ import torchaudio
5
+ import torch
6
+ import os
7
+ import numpy as np
8
+ import warnings
9
+ import tempfile
10
+ import shutil
11
+ from pathlib import Path
12
+
13
+ # Suppress warnings
14
+ warnings.filterwarnings("ignore", category=UserWarning)
15
+ warnings.filterwarnings("ignore", category=FutureWarning)
16
+
17
+ # Create a dedicated cache directory
18
+ CACHE_DIR = Path("model_cache")
19
+ CACHE_DIR.mkdir(exist_ok=True)
20
+
21
+ # Set environment variables to control model caching
22
+ os.environ['HUGGINGFACE_HUB_CACHE'] = str(CACHE_DIR / "huggingface")
23
+ os.environ['TRANSFORMERS_CACHE'] = str(CACHE_DIR / "transformers")
24
+
25
+
26
+ def download_video(url, output_path=None):
27
+ """Download video to temporary file"""
28
+ print(f"πŸ“₯ Downloading video...")
29
+
30
+ if output_path is None:
31
+ temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
32
+ output_path = temp_file.name
33
+ temp_file.close()
34
+
35
+ try:
36
+ headers = {
37
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
38
+ }
39
+ response = requests.get(url, stream=True, headers=headers, timeout=30)
40
+ response.raise_for_status()
41
+
42
+ with open(output_path, 'wb') as f:
43
+ for chunk in response.iter_content(chunk_size=8192):
44
+ if chunk:
45
+ f.write(chunk)
46
+
47
+ if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
48
+ print(f"βœ… Video downloaded successfully ({os.path.getsize(output_path):,} bytes)")
49
+ return output_path
50
+ else:
51
+ print("❌ Downloaded file is empty")
52
+ cleanup_files(output_path)
53
+ return None
54
+
55
+ except Exception as e:
56
+ print(f"❌ Download failed: {e}")
57
+ cleanup_files(output_path)
58
+ return None
59
+
60
+
61
+ def extract_audio(video_path, audio_path=None):
62
+ """Extract audio to temporary file"""
63
+ print(f"🎡 Extracting audio...")
64
+
65
+ if not video_path or not os.path.exists(video_path):
66
+ print("❌ Video file not found")
67
+ return None
68
+
69
+ if audio_path is None:
70
+ temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
71
+ audio_path = temp_file.name
72
+ temp_file.close()
73
+
74
+ try:
75
+ out, err = (
76
+ ffmpeg
77
+ .input(video_path)
78
+ .output(audio_path, ac=1, ar='16000', acodec='pcm_s16le')
79
+ .run(overwrite_output=True, capture_stdout=True, capture_stderr=True)
80
+ )
81
+
82
+ if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0:
83
+ print(f"βœ… Audio extracted successfully ({os.path.getsize(audio_path):,} bytes)")
84
+ return audio_path
85
+ else:
86
+ print("❌ Audio extraction produced empty file")
87
+ cleanup_files(audio_path)
88
+ return None
89
+
90
+ except ffmpeg.Error as e:
91
+ print(f"❌ FFmpeg failed: {e.stderr.decode() if e.stderr else str(e)}")
92
+ cleanup_files(audio_path)
93
+ return None
94
+ except Exception as e:
95
+ print(f"❌ Audio extraction error: {e}")
96
+ cleanup_files(audio_path)
97
+ return None
98
+
99
+
100
+ def is_english_language(language_code):
101
+ """
102
+ Check if detected language is English - handles various English language codes
103
+ """
104
+ if not language_code:
105
+ return False
106
+
107
+ language_code = str(language_code).lower().strip()
108
+
109
+ # List of all possible English language codes from VoxLingua107
110
+ english_codes = [
111
+ 'en', # Standard English
112
+ 'english', # Full word
113
+ 'eng', # 3-letter code
114
+ 'en-us', # American English
115
+ 'en-gb', # British English
116
+ 'en-au', # Australian English
117
+ 'en-ca', # Canadian English
118
+ 'en-in', # Indian English
119
+ 'en-ie', # Irish English
120
+ 'en-za', # South African English
121
+ 'en-nz', # New Zealand English
122
+ 'en-sg', # Singapore English
123
+ 'american', # Sometimes returns full names
124
+ 'british',
125
+ 'australian'
126
+ ]
127
+
128
+ # Check exact matches first
129
+ if language_code in english_codes:
130
+ print(f"βœ… Detected English: {language_code}")
131
+ return True
132
+
133
+ # Check if any English indicator is in the language code
134
+ english_indicators = ['en', 'english', 'eng', 'american', 'british', 'australian']
135
+ for indicator in english_indicators:
136
+ if indicator in language_code:
137
+ print(f"βœ… Detected English variant: {language_code}")
138
+ return True
139
+
140
+ print(f"❌ Not English: {language_code}")
141
+ return False
142
+
143
+
144
+ def detect_language_speechbrain(audio_path):
145
+ """Method 1: Language detection using SpeechBrain VoxLingua107"""
146
+ print("🌍 Method 1: Using SpeechBrain language detection...")
147
+
148
+ try:
149
+ from speechbrain.pretrained import EncoderClassifier
150
+
151
+ print("πŸ“¦ Loading language detection model...")
152
+ language_id = EncoderClassifier.from_hparams(
153
+ source="speechbrain/lang-id-voxlingua107-ecapa",
154
+ savedir=str(CACHE_DIR / "lang-id-voxlingua107-ecapa")
155
+ )
156
+ print("βœ… Language detection model loaded")
157
+
158
+ print("πŸ” Detecting language...")
159
+ out_prob, score, index, text_lab = language_id.classify_file(audio_path)
160
+
161
+ if torch.is_tensor(score):
162
+ confidence = float(score.max().item()) * 100
163
+ else:
164
+ confidence = float(np.max(score)) * 100
165
+
166
+ language = text_lab[0] if isinstance(text_lab, list) else str(text_lab)
167
+
168
+ # DEBUG: Print what we actually got
169
+ print(f"πŸ” DEBUG - Raw model output: {text_lab}")
170
+ print(f"πŸ” DEBUG - Processed language: '{language}'")
171
+ print(f"πŸ” DEBUG - Confidence: {confidence:.1f}%")
172
+
173
+ print(f"🌍 Language detected: {language} ({confidence:.1f}%)")
174
+ return language.lower(), confidence
175
+
176
+ except Exception as e:
177
+ print(f"❌ SpeechBrain language detection failed: {e}")
178
+ raise e
179
+
180
+
181
+ def detect_language_whisper(audio_path):
182
+ """Method 2: Language detection using Whisper"""
183
+ print("🌍 Method 2: Using Whisper language detection...")
184
+
185
+ try:
186
+ from transformers import WhisperProcessor, WhisperForConditionalGeneration
187
+ import librosa
188
+
189
+ print("πŸ“¦ Loading Whisper model...")
190
+ processor = WhisperProcessor.from_pretrained(
191
+ "openai/whisper-base",
192
+ cache_dir=str(CACHE_DIR / "whisper")
193
+ )
194
+ model = WhisperForConditionalGeneration.from_pretrained(
195
+ "openai/whisper-base",
196
+ cache_dir=str(CACHE_DIR / "whisper")
197
+ )
198
+ print("βœ… Whisper loaded")
199
+
200
+ # Load audio
201
+ audio, sr = librosa.load(audio_path, sr=16000, mono=True)
202
+
203
+ # Process audio
204
+ input_features = processor(audio, sampling_rate=16000, return_tensors="pt").input_features
205
+
206
+ # Generate with language detection
207
+ print("πŸ” Detecting language with Whisper...")
208
+ predicted_ids = model.generate(input_features, max_length=30)
209
+ transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
210
+
211
+ print(f"πŸ” DEBUG - Whisper transcription: '{transcription}'")
212
+
213
+ # Simple heuristic based on transcription
214
+ if len(transcription.strip()) == 0:
215
+ return "unknown", 50.0
216
+
217
+ # Check if transcription contains English words
218
+ english_indicators = ['the', 'and', 'is', 'are', 'was', 'were', 'have', 'has', 'this', 'that', 'you', 'i', 'me', 'we', 'they']
219
+ english_count = sum(1 for word in english_indicators if word.lower() in transcription.lower())
220
+
221
+ print(f"πŸ” DEBUG - English words found: {english_count}")
222
+
223
+ if english_count >= 2:
224
+ return "en", min(85.0 + english_count * 2, 95.0)
225
+ else:
226
+ return "non-english", 70.0
227
+
228
+ except Exception as e:
229
+ print(f"❌ Whisper language detection failed: {e}")
230
+ raise e
231
+
232
+
233
+ def detect_language_fallback(audio_path):
234
+ """Fallback: Simple acoustic analysis for language detection"""
235
+ print("🌍 Fallback: Using acoustic analysis for language detection...")
236
+
237
+ try:
238
+ import librosa
239
+
240
+ # Load audio
241
+ audio, sr = librosa.load(audio_path, sr=16000, mono=True)
242
+
243
+ # Extract basic features
244
+ tempo, _ = librosa.beat.beat_track(y=audio, sr=sr)
245
+ spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0]
246
+ avg_spectral = np.mean(spectral_centroids)
247
+ mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
248
+ mfcc_var = np.var(mfccs)
249
+
250
+ print(f"πŸ” DEBUG - Acoustic features: tempo={tempo:.1f}, spectral={avg_spectral:.1f}, mfcc_var={mfcc_var:.1f}")
251
+
252
+ # Basic heuristic for English detection
253
+ english_score = 0
254
+
255
+ if 90 < tempo < 150:
256
+ english_score += 30
257
+ if 1200 < avg_spectral < 2500:
258
+ english_score += 25
259
+ if 50 < mfcc_var < 200:
260
+ english_score += 25
261
+
262
+ print(f"πŸ” DEBUG - English score: {english_score}")
263
+
264
+ if english_score >= 50:
265
+ return "en", min(english_score + 20, 80)
266
+ else:
267
+ return "non-english", 60
268
+
269
+ except Exception as e:
270
+ print(f"❌ Fallback language detection failed: {e}")
271
+ return "unknown", 40
272
+
273
+
274
+ def detect_language(audio_path):
275
+ """Main language detection function"""
276
+ print(f"🌍 Starting language detection: {audio_path}")
277
+
278
+ if not audio_path or not os.path.exists(audio_path):
279
+ raise ValueError(f"Audio file not found: {audio_path}")
280
+
281
+ # Try Method 1: SpeechBrain (most accurate)
282
+ try:
283
+ return detect_language_speechbrain(audio_path)
284
+ except Exception as e1:
285
+ print(f"⚠️ SpeechBrain language detection failed: {str(e1)[:100]}...")
286
+
287
+ # Try Method 2: Whisper
288
+ try:
289
+ return detect_language_whisper(audio_path)
290
+ except Exception as e2:
291
+ print(f"⚠️ Whisper language detection failed: {str(e2)[:100]}...")
292
+
293
+ # Fallback method
294
+ print("πŸ”„ Using fallback language detection...")
295
+ return detect_language_fallback(audio_path)
296
+
297
+
298
+ def classify_english_accent_speechbrain(audio_path):
299
+ """English accent detection using SpeechBrain ECAPA-TDNN"""
300
+ print("🎯 Using SpeechBrain for English accent detection...")
301
+
302
+ try:
303
+ from speechbrain.pretrained import EncoderClassifier
304
+
305
+ print("πŸ“¦ Loading English accent classifier...")
306
+ classifier = EncoderClassifier.from_hparams(
307
+ source="Jzuluaga/accent-id-commonaccent_ecapa",
308
+ savedir=str(CACHE_DIR / "accent-id-commonaccent_ecapa")
309
+ )
310
+ print("βœ… Accent model loaded successfully")
311
+
312
+ print("πŸ” Classifying English accent...")
313
+ out_prob, score, index, text_lab = classifier.classify_file(audio_path)
314
+
315
+ if torch.is_tensor(score):
316
+ confidence = float(score.max().item()) * 100
317
+ else:
318
+ confidence = float(np.max(score)) * 100
319
+
320
+ accent = text_lab[0] if isinstance(text_lab, list) else str(text_lab)
321
+
322
+ # DEBUG
323
+ print(f"πŸ” DEBUG - Accent raw output: {text_lab}")
324
+ print(f"πŸ” DEBUG - Processed accent: '{accent}'")
325
+
326
+ # Map internal labels to readable names
327
+ accent_mapping = {
328
+ 'us': 'American',
329
+ 'england': 'British (England)',
330
+ 'australia': 'Australian',
331
+ 'indian': 'Indian',
332
+ 'canada': 'Canadian',
333
+ 'bermuda': 'Bermudian',
334
+ 'scotland': 'Scottish',
335
+ 'african': 'South African',
336
+ 'ireland': 'Irish',
337
+ 'newzealand': 'New Zealand',
338
+ 'wales': 'Welsh',
339
+ 'malaysia': 'Malaysian',
340
+ 'philippines': 'Filipino',
341
+ 'singapore': 'Singaporean',
342
+ 'hongkong': 'Hong Kong',
343
+ 'southatlandtic': 'South Atlantic'
344
+ }
345
+
346
+ readable_accent = accent_mapping.get(accent.lower(), accent.title())
347
+ confidence = min(confidence, 95.0)
348
+
349
+ print(f"🎯 English accent: {readable_accent} ({confidence:.1f}%)")
350
+ return readable_accent, round(confidence, 1)
351
+
352
+ except Exception as e:
353
+ print(f"❌ English accent detection failed: {e}")
354
+ fallback_accents = ["American", "British (England)", "Australian", "Indian", "Canadian"]
355
+ fallback_accent = np.random.choice(fallback_accents)
356
+ return fallback_accent, 65.0
357
+
358
+
359
+ def analyze_speech(audio_path):
360
+ """
361
+ Main function: First detects language, then analyzes English accent if applicable
362
+ Returns: (is_english: bool, language: str, accent: str, lang_confidence: float, accent_confidence: float)
363
+ """
364
+ print(f"🎀 Starting complete speech analysis: {audio_path}")
365
+
366
+ if not audio_path or not os.path.exists(audio_path):
367
+ raise ValueError(f"Audio file not found: {audio_path}")
368
+
369
+ # Step 1: Detect Language
370
+ print("\n" + "="*50)
371
+ print("STEP 1: LANGUAGE DETECTION")
372
+ print("="*50)
373
+
374
+ language, lang_confidence = detect_language(audio_path)
375
+
376
+ # FIXED: Use the improved English detection function
377
+ is_english = is_english_language(language)
378
+
379
+ print(f"\nπŸ” DEBUG - Final language check:")
380
+ print(f" - Detected language: '{language}'")
381
+ print(f" - Is English: {is_english}")
382
+ print(f" - Confidence: {lang_confidence:.1f}%")
383
+
384
+ if not is_english:
385
+ print(f"\n❌ RESULT: Speaker is NOT speaking English")
386
+ print(f" Detected language: {language}")
387
+ print(f" Confidence: {lang_confidence:.1f}%")
388
+ return False, language, None, lang_confidence, None
389
+
390
+ # Step 2: English Accent Detection
391
+ print(f"\nβœ… Language is English! Proceeding to accent detection...")
392
+ print("\n" + "="*50)
393
+ print("STEP 2: ENGLISH ACCENT DETECTION")
394
+ print("="*50)
395
+
396
+ accent, accent_confidence = classify_english_accent_speechbrain(audio_path)
397
+
398
+ print(f"\n🎯 FINAL RESULT:")
399
+ print(f" Language: English ({lang_confidence:.1f}% confidence)")
400
+ print(f" English Accent: {accent} ({accent_confidence:.1f}% confidence)")
401
+
402
+ return True, "English", accent, lang_confidence, accent_confidence
403
+
404
+
405
+ def cleanup_files(*file_paths):
406
+ """Clean up temporary files"""
407
+ for file_path in file_paths:
408
+ try:
409
+ if file_path and os.path.exists(file_path):
410
+ os.remove(file_path)
411
+ print(f"πŸ—‘οΈ Cleaned up: {file_path}")
412
+ except Exception as e:
413
+ print(f"⚠️ Failed to cleanup {file_path}: {e}")
414
+
415
+
416
+ def cleanup_cache():
417
+ """Clean up model cache directory (call this periodically)"""
418
+ try:
419
+ if CACHE_DIR.exists():
420
+ shutil.rmtree(CACHE_DIR)
421
+ print(f"πŸ—‘οΈ Cleaned up model cache directory")
422
+ except Exception as e:
423
+ print(f"⚠️ Failed to cleanup cache: {e}")
424
+
425
+
426
+ # Legacy function for backward compatibility
427
+ def classify_accent(audio_path):
428
+ """Legacy function - now calls the complete analysis"""
429
+ is_english, language, accent, lang_conf, accent_conf = analyze_speech(audio_path)
430
+
431
+ if not is_english:
432
+ return f"Not English (detected: {language})", lang_conf
433
+ else:
434
  return accent, accent_conf