|
|
import gradio as gr |
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import boto3 |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
import re |
|
|
import time |
|
|
import random |
|
|
import shutil |
|
|
from werkzeug.security import generate_password_hash, check_password_hash |
|
|
from botocore.exceptions import NoCredentialsError, ClientError |
|
|
import soundfile as sf |
|
|
import shutil |
|
|
import threading |
|
|
from pathlib import Path |
|
|
from supabase import create_client, Client |
|
|
from supabase.client import ClientOptions |
|
|
from supabase_auth.errors import ( |
|
|
AuthWeakPasswordError, |
|
|
AuthInvalidCredentialsError, |
|
|
AuthApiError, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY", "") |
|
|
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY", "") |
|
|
S3_BUCKET = os.environ.get("S3_BUCKET", "tts-dataset-creator-593793024422-prod") |
|
|
AWS_REGION = os.environ.get("AWS_REGION", "me-south-1") |
|
|
|
|
|
SAMPLE_RATE = 44100 |
|
|
APP_CODE_DIR = "./" |
|
|
USER_ID = "" |
|
|
url = os.environ.get("url", "") |
|
|
key = os.environ.get("key", "") |
|
|
supabase = create_client(url, key) |
|
|
|
|
|
|
|
|
COUNTRIES = [ |
|
|
"Algeria", "Bahrain", "Egypt", "Iraq", "Jordan", "Kuwait", "Lebanon", |
|
|
"Libya", "Mauritania", "Morocco", "Oman", "Palestine", "Qatar", |
|
|
"Saudi Arabia", "Somalia", "Sudan", "Syria", "Tunisia", |
|
|
"United Arab Emirates", "Yemen" |
|
|
] |
|
|
|
|
|
|
|
|
COUNTRY_CODES = { |
|
|
"Algeria": "dz", |
|
|
"Bahrain": "bh", |
|
|
"Egypt": "eg", |
|
|
"Iraq": "iq", |
|
|
"Jordan": "jo", |
|
|
"Kuwait": "kw", |
|
|
"Lebanon": "lb", |
|
|
"Libya": "ly", |
|
|
"Mauritania": "mr", |
|
|
"Morocco": "ma", |
|
|
"Oman": "om", |
|
|
"Palestine": "ps", |
|
|
"Qatar": "qa", |
|
|
"Saudi Arabia": "sa", |
|
|
"Somalia": "so", |
|
|
"Sudan": "sd", |
|
|
"Syria": "sy", |
|
|
"Tunisia": "tn", |
|
|
"United Arab Emirates": "ae", |
|
|
"Yemen": "ye" |
|
|
} |
|
|
|
|
|
|
|
|
COUNTRY_DIALECTS = { |
|
|
"Saudi Arabia": { |
|
|
"حجازية": "hj", |
|
|
"حجازية بدوية": "hj-bd", |
|
|
"جنوبية": "jn", |
|
|
"تهامية": "th", |
|
|
"نجدية": "nj", |
|
|
"نجدية بدوية": "nj-bd", |
|
|
"قصيمية": "qm", |
|
|
"الشمال": "sh", |
|
|
"حساوية": "hs", |
|
|
"قطيفية": "qt", |
|
|
"سيهاتية": "sy", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Egypt": { |
|
|
"قاهرية": "ca", |
|
|
"إسكندرانية": "al", |
|
|
"صعيدية": "sa", |
|
|
"سيناوية": "si", |
|
|
"دلتاوية": "de", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Morocco": { |
|
|
"فاسية": "fe", |
|
|
"دار البيضاء": "ca", |
|
|
"مراكشية": "ma", |
|
|
"شمالية": "no", |
|
|
"أطلسية": "at", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Iraq": { |
|
|
"بغدادية": "ba", |
|
|
"بصراوية": "bs", |
|
|
"موصلية": "mo", |
|
|
"كردية": "ku", |
|
|
"جنوبية": "so", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Yemen": { |
|
|
"صنعانية": "sa", |
|
|
"عدنية": "ad", |
|
|
"حضرمية": "ha", |
|
|
"تهامية": "ti", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Jordan": { |
|
|
"عمانية": "am", |
|
|
"شمالية": "no", |
|
|
"جنوبية": "so", |
|
|
"بدوية": "be", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Lebanon": { |
|
|
"بيروتية": "be", |
|
|
"جبلية": "mo", |
|
|
"جنوبية": "so", |
|
|
"شمالية": "no", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Syria": { |
|
|
"دمشقية": "da", |
|
|
"حلبية": "al", |
|
|
"حمصية": "ho", |
|
|
"ساحلية": "co", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Palestine": { |
|
|
"قدسية": "je", |
|
|
"غزاوية": "ga", |
|
|
"خليلية": "he", |
|
|
"شمالية": "no", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"United Arab Emirates": { |
|
|
"إماراتية": "em", |
|
|
"دبية": "du", |
|
|
"أبوظبية": "ad", |
|
|
"شارقية": "sh", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Kuwait": { |
|
|
"كويتية": "ku", |
|
|
"بدوية": "be", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Qatar": { |
|
|
"قطرية": "qa", |
|
|
"بدوية": "be", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Bahrain": { |
|
|
"بحرينية": "ba", |
|
|
"مدنية": "ur", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Oman": { |
|
|
"عمانية": "om", |
|
|
"ظفارية": "dh", |
|
|
"داخلية": "in", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Algeria": { |
|
|
"جزائرية": "al", |
|
|
"قسنطينية": "co", |
|
|
"وهرانية": "or", |
|
|
"قبائلية": "ka", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Tunisia": { |
|
|
"تونسية": "tu", |
|
|
"صفاقسية": "sf", |
|
|
"سوسية": "so", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Libya": { |
|
|
"طرابلسية": "tr", |
|
|
"بنغازية": "be", |
|
|
"فزانية": "fe", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Sudan": { |
|
|
"خرطومية": "kh", |
|
|
"شمالية": "no", |
|
|
"دارفورية": "da", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Somalia": { |
|
|
"صومالية": "so", |
|
|
"شمالية": "no", |
|
|
"جنوبية": "so", |
|
|
"أخرى": "oth" |
|
|
}, |
|
|
"Mauritania": { |
|
|
"موريتانية": "mr", |
|
|
"حسانية": "ha", |
|
|
"أخرى": "oth" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
def get_dialect_code(country, dialect): |
|
|
"""Generate full dialect code from country and dialect.""" |
|
|
country_code = COUNTRY_CODES.get(country, "unk") |
|
|
dialect_code = COUNTRY_DIALECTS.get(country, {}).get(dialect, "gen") |
|
|
return f"{country_code}-{dialect_code}" |
|
|
|
|
|
def get_dialects_for_country(country): |
|
|
"""Get available dialects for a country.""" |
|
|
return list(COUNTRY_DIALECTS.get(country, {}).keys()) |
|
|
|
|
|
GENDERS = ["Male", "Female"] |
|
|
|
|
|
RECORDING_INSTRUCTIONS = """ |
|
|
<section dir="rtl" lang="ar" style="text-align: right"> |
|
|
<ul> |
|
|
<li>سجّل في مكان هادي ومفيهوش دوشة.</li> |
|
|
<li>خلّي المايك أو الموبايل قريب من وشّك (حوالي ٢٠ – ٣٠ سم).</li> |
|
|
<li>اتكلم بصوت واضح وطبيعي زي ما بتتكلم عادي.</li> |
|
|
<li>حاول تخلي الصوت ثابت، متعليش مرّة وتوطي مرّة.</li> |
|
|
<li>لو غلطت في كلمة، اقفل التسجيل وابدأ من الأول.</li> |
|
|
<li>لو في جملة مش عايز تسجّلها، ممكن تختار «تخطّي».</li> |
|
|
<li> ولو قابلتك أي مشكلة أو عندك سؤال، ابعتلنا على الإيميل: <a href="mailto:[email protected]">[email protected]</a> </li> </ul> |
|
|
</section> |
|
|
|
|
|
""" |
|
|
|
|
|
RECORDING_INSTRUCTIONS_F = """ |
|
|
<section> |
|
|
<ul> |
|
|
<li>سجّلي في مكان هادي ومفيهوش دوشة.</li> |
|
|
<li>خلّي المايك أو الموبايل قريب من وشّك (حوالي ٢٠ – ٣٠ سم).</li> |
|
|
<li>اتكلمي بصوت واضح وطبيعي زي ما بتتكلمي عادي.</li> |
|
|
<li>حاولي تخلي صوتِك ثابت، متعليش مرّة وتوطي مرّة.</li> |
|
|
<li>لو غلطتي في كلمة، اقفلي التسجيل وابدئي من الأول.</li> |
|
|
<li>لو في جملة مش عايزة تسجّليها، ممكن تختاري «تخطّي».</li> |
|
|
<li> |
|
|
لو قابلتك أي مشكلة أو عندِك سؤال، ابعتي لنا على الإيميل: <a href="mailto:[email protected]">[email protected]</a> |
|
|
</section> |
|
|
|
|
|
""" |
|
|
|
|
|
RECORDING_INSTRUCTIONS_M = """ |
|
|
<section dir="rtl" lang="ar" style="text-align: right"> <ul> |
|
|
<li>سجّل في مكان هادي ومفيهوش دوشة.</li> |
|
|
<li>خلّي المايك أو الموبايل قريب من وشّك (حوالي ٢٠ – ٣٠ سم).</li> |
|
|
<li>اتكلم بصوت واضح وطبيعي زي ما بتتكلم عادي.</li> |
|
|
<li>حاول تخلي الصوت ثابت، متعليش مرّة وتوطي مرّة.</li> |
|
|
<li>لو غلطت في كلمة، اقفل التسجيل وابدأ من الأول.</li> |
|
|
<li>لو في جملة مش عايز تسجّلها، ممكن تختار «تخطّي».</li> |
|
|
<li> ولو قابلتك أي مشكلة أو عندك سؤال، ابعتلنا على الإيميل: <a href="mailto:[email protected]">[email protected]</a> </li> </ul> |
|
|
</section> |
|
|
|
|
|
""" |
|
|
|
|
|
CONSENT_DETAILS = """ |
|
|
<section dir="rtl" lang="ar" style="text-align: right"> |
|
|
<h1>الموافقة على جمع واستخدام البيانات</h1> |
|
|
<p> |
|
|
الاتفاقية دي ما بين <strong>المشارك (حضرتك)</strong> وبين فريق البحث من |
|
|
<strong>جامعة الملك فهد للبترول والمعادن</strong> و<strong>جامعة طيبة</strong> |
|
|
(اللي هنسميه هنا "الجامعتين"). |
|
|
الاتفاقية مخصوص علشان جمع واستخدام وتوزيع تسجيلات صوتية هتساعد في أبحاث كشف الأصوات المزوّرة (Deepfake) |
|
|
وغيرها من الأبحاث غير التجارية. |
|
|
</p> |
|
|
<ol> |
|
|
<li> |
|
|
<strong>هدف جمع البيانات:</strong><br> |
|
|
الفريق بيجمع تسجيلات صوت علشان يعمل مجموعة بيانات (Dataset) خاصة بالكشف عن الأصوات المصنوعة بالذكاء الاصطناعي |
|
|
باستخدام تقنيات تحويل النص لصوت (TTS) أو تقليد الأصوات (Voice Conversion) وطرق تانية. |
|
|
البيانات دي هتتستخدم في أبحاث علمية وأكاديمية لتطوير طرق أفضل لاكتشاف الأصوات المزوّرة وغيرها من الأبحاث غير التجارية. |
|
|
</li> |
|
|
<li> |
|
|
<strong>طبيعة البيانات اللي هتتجمع:</strong><br> |
|
|
المشارك بيوافق إنه يدي: |
|
|
<ul> |
|
|
<li>تسجيلات صوتية بصوته الطبيعي أو من خلال نصوص/جمل هنطلب منه يقراها.</li> |
|
|
<li>بيانات اختيارية زي: النوع (ذكر/أنثى)، الفئة العمرية، اللهجة، إلخ.</li> |
|
|
<li>موافقة إن صوته يتعدل أو يتغيّر بأساليب صناعية.</li> |
|
|
</ul> |
|
|
</li> |
|
|
<li> |
|
|
<strong>الحقوق الممنوحة:</strong><br> |
|
|
المشارك بيدي الفريق الحق الكامل (من غير فلوس أو قيود) إنه: |
|
|
<ul> |
|
|
<li>يسجّل ويعالج ويستخدم الصوت الطبيعي والنسخ المصنوعة منه.</li> |
|
|
<li>يوزّع مجموعة البيانات (الطبيعية والمصنوعة) للباحثين في المجتمع العلمي لأغراض بحثية غير تجارية فقط.</li> |
|
|
<li>ينشر عينات صوتية على منصات مهنية أو أكاديمية زي LinkedIn، Twitter/X، YouTube علشان يزود الوعي بأبحاث الديب فيك أو يعرّف الناس عن توفر البيانات.</li> |
|
|
</ul> |
|
|
</li> |
|
|
<li> |
|
|
<strong>إتاحة البيانات:</strong><br> |
|
|
المجموعة الصوتية (الطبيعية والمصنوعة) هتتنشر بترخيص مفتوح |
|
|
<em>(Creative Commons Attribution 4.0)</em> |
|
|
يسمح لأي باحث يستخدمها ويشاركها للأغراض الأكاديمية غير التجارية. |
|
|
</li> |
|
|
<li> |
|
|
<strong>الخصوصية والسرية:</strong><br> |
|
|
<ul> |
|
|
<li>مش هيتم نشر اسم المشارك أو أي بيانات شخصية مباشرة إلا بموافقته المكتوبة.</li> |
|
|
<li>المشارك هيكون ليه ID مجهول في مجموعة البيانات.</li> |
|
|
</ul> |
|
|
</li> |
|
|
<li> |
|
|
<strong>المشاركة والانضمام:</strong><br> |
|
|
<ul> |
|
|
<li>المشاركة اختيارية ١٠٠٪.</li> |
|
|
<li>له الحق ينسحب أو يطلب حذف تسجيلاته قبل ما مجموعة البيانات تتنشر للعامة.</li> |
|
|
<li>بعد النشر العام، سحب البيانات مش هيكون ممكن بسبب طريقة توزيعها.</li> |
|
|
</ul> |
|
|
</li> |
|
|
<li> |
|
|
<strong>التعويض:</strong><br> |
|
|
المشارك عارف إن مفيش أي مقابل مادي أو فلوس مقابل تسجيلاته، والمشاركة هنا لدعم وتطوير البحث العلمي فقط. |
|
|
</li> |
|
|
</ol> |
|
|
</section> |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SENTENCES_DB_FILE = os.path.join(APP_CODE_DIR, "sentences.json") |
|
|
|
|
|
|
|
|
if not os.path.exists(SENTENCES_DB_FILE): |
|
|
with open(SENTENCES_DB_FILE, "w") as f: |
|
|
json.dump({ |
|
|
"sentences": [ |
|
|
{ |
|
|
"unique_id": "001", |
|
|
"text": "The birch canoe slid on the smooth planks.", |
|
|
"dialect": ["sa-hj", "sa-nj"] |
|
|
}, |
|
|
{ |
|
|
"unique_id": "002", |
|
|
"text": "Glue the sheet to the dark blue background.", |
|
|
"dialect": ["sa-jn", "sa-th"] |
|
|
} |
|
|
] |
|
|
}, f) |
|
|
|
|
|
|
|
|
def get_s3_client(): |
|
|
"""Create and return an S3 client using IAM role or environment variables.""" |
|
|
try: |
|
|
if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: |
|
|
print("Using IAM role for S3 authentication") |
|
|
return boto3.client('s3', region_name=AWS_REGION) |
|
|
else: |
|
|
print("Using access keys for S3 authentication") |
|
|
return boto3.client( |
|
|
's3', |
|
|
aws_access_key_id=AWS_ACCESS_KEY, |
|
|
aws_secret_access_key=AWS_SECRET_KEY, |
|
|
region_name=AWS_REGION |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error creating S3 client: {e}") |
|
|
raise |
|
|
|
|
|
def upload_to_s3(local_file, s3_file): |
|
|
"""Upload a file to S3 bucket and return True if successful.""" |
|
|
try: |
|
|
s3_client = get_s3_client() |
|
|
s3_client.upload_file(local_file, S3_BUCKET, s3_file) |
|
|
print(f"Successfully uploaded {local_file} to s3://{S3_BUCKET}/{s3_file}") |
|
|
return True |
|
|
except NoCredentialsError: |
|
|
print(f"No AWS credentials available for uploading {local_file} to s3://{S3_BUCKET}/{s3_file}") |
|
|
return False |
|
|
except ClientError as e: |
|
|
error_code = e.response['Error']['Code'] |
|
|
error_message = e.response['Error']['Message'] |
|
|
print(f"ClientError uploading {local_file} to s3://{S3_BUCKET}/{s3_file}: {error_code} - {error_message}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"Unexpected error uploading {local_file} to s3://{S3_BUCKET}/{s3_file}: {type(e).__name__} - {str(e)}") |
|
|
return False |
|
|
|
|
|
def download_from_s3(s3_file, local_file): |
|
|
"""Download a file from S3 bucket.""" |
|
|
try: |
|
|
s3_client = get_s3_client() |
|
|
s3_client.download_file(S3_BUCKET, s3_file, local_file) |
|
|
print(f"Successfully downloaded s3://{S3_BUCKET}/{s3_file} to {local_file}") |
|
|
return True |
|
|
except NoCredentialsError: |
|
|
print(f"No AWS credentials available for downloading s3://{S3_BUCKET}/{s3_file}") |
|
|
return False |
|
|
except ClientError as e: |
|
|
error_code = e.response['Error']['Code'] |
|
|
error_message = e.response['Error']['Message'] |
|
|
print(f"ClientError downloading s3://{S3_BUCKET}/{s3_file}: {error_code} - {error_message}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"Unexpected error downloading s3://{S3_BUCKET}/{s3_file}: {type(e).__name__} - {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_audio_quality(audio_path): |
|
|
"""Check if audio meets minimum quality standards""" |
|
|
try: |
|
|
with sf.SoundFile(audio_path) as f: |
|
|
if f.samplerate < 16000: |
|
|
return False, f"Sample rate too low: {f.samplerate}Hz" |
|
|
if len(f) / f.samplerate < 1: |
|
|
return False, "Recording too short" |
|
|
return True, "Audio quality OK" |
|
|
except Exception as e: |
|
|
return False, f"Audio validation error: {e}" |
|
|
|
|
|
def cleanup_audio_resources(): |
|
|
"""Clean up temporary audio files""" |
|
|
temp_audio_dir = "/tmp" |
|
|
try: |
|
|
for file in os.listdir(temp_audio_dir): |
|
|
if file.startswith("gradio") and file.endswith(".wav"): |
|
|
file_path = os.path.join(temp_audio_dir, file) |
|
|
if os.path.getctime(file_path) < time.time() - 300: |
|
|
os.remove(file_path) |
|
|
except Exception as e: |
|
|
print(f"Audio cleanup error: {e}") |
|
|
|
|
|
def normalize_username(name, dialect_code, gender): |
|
|
"""Convert name to lowercase, replace spaces with underscores, add unique ID and dialect code.""" |
|
|
base_name = re.sub(r'\s+', '_', name.lower().strip()) |
|
|
unique_id = uuid.uuid4().hex[:7] |
|
|
gender_suffix = "_m" if gender == "Male" else "_f" if gender == "Female" else "" |
|
|
return f"{base_name}_{unique_id}_{dialect_code}{gender_suffix}" |
|
|
|
|
|
|
|
|
id = uuid.uuid4().hex[:7] |
|
|
|
|
|
|
|
|
|
|
|
AWS_DATA_DIR = "tmp/home/ubuntu/.tts_dataset_creator" |
|
|
CONFIG_AWS = os.path.join(AWS_DATA_DIR, "config") |
|
|
USER_DB_AWS = os.path.join(CONFIG_AWS, "users.json") |
|
|
SESSION_AWS = os.path.join(CONFIG_AWS, "sessions.json") |
|
|
|
|
|
|
|
|
APP_DATA_DIR = "/tmp/home/ubuntu/.tts_dataset_creator" + f"_{id}" |
|
|
CONFIG_DIR = os.path.join(APP_DATA_DIR, "config") |
|
|
USERS_DIR = os.path.join(".tts_dataset_creator", "users") |
|
|
USERS_DIR_TMP = os.path.join("tmp", USERS_DIR) |
|
|
|
|
|
USER_DB_FILE = os.path.join(CONFIG_DIR, "users.json") |
|
|
SESSION_FILE = os.path.join(CONFIG_DIR, "sessions.json") |
|
|
|
|
|
|
|
|
|
|
|
def create_app_directories(): |
|
|
"""Create the improved directory structure.""" |
|
|
directories = [CONFIG_DIR, USERS_DIR_TMP] |
|
|
for directory in directories: |
|
|
os.makedirs(directory, exist_ok=True) |
|
|
print(f"✅ Created directory structure at {APP_DATA_DIR}") |
|
|
|
|
|
create_app_directories() |
|
|
|
|
|
download_from_s3(USER_DB_AWS, USER_DB_FILE) |
|
|
download_from_s3(SESSION_AWS, SESSION_FILE) |
|
|
|
|
|
def load_users(id): |
|
|
"""Load users from the database file.""" |
|
|
try: |
|
|
response = ( |
|
|
supabase.table("users") |
|
|
.select("*") |
|
|
.eq("id", id) |
|
|
.execute() |
|
|
) |
|
|
return response.data[0] |
|
|
except Exception as e: |
|
|
print("Error fetching users data:", e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_users(user): |
|
|
"""Save users to the database file.""" |
|
|
try: |
|
|
response = ( |
|
|
supabase.table("users") |
|
|
.insert(user) |
|
|
.execute() |
|
|
) |
|
|
except Exception as e: |
|
|
print("Error saving session data:", e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_sessions(id): |
|
|
try: |
|
|
response = ( |
|
|
supabase.table("sessions") |
|
|
.select("id, current_index, completed_sentences, recorded_sentences, total_recording_duration") |
|
|
.eq("id", id) |
|
|
.execute() |
|
|
) |
|
|
return response.data[0] |
|
|
except Exception as e: |
|
|
print("WEEEEEE") |
|
|
print("Error fetching session data:", e) |
|
|
|
|
|
"""Load session data from file.""" |
|
|
|
|
|
|
|
|
|
|
|
def save_sessions(id, session): |
|
|
"""Save session data to file.""" |
|
|
session["id"] = id |
|
|
try: |
|
|
response = ( |
|
|
supabase.table("sessions") |
|
|
.insert(session) |
|
|
.execute() |
|
|
) |
|
|
except Exception as e: |
|
|
print("Error saving session data:", e) |
|
|
|
|
|
def register_user(name, email, password, country, dialect, city, gender, consent): |
|
|
"""Register a new user with country, dialect, city, and gender.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dialect_code = get_dialect_code(country, dialect) |
|
|
|
|
|
|
|
|
username = normalize_username(name, dialect_code, gender) |
|
|
hashed_password = generate_password_hash(password) |
|
|
global supabase, USER_ID |
|
|
try: |
|
|
response = supabase.auth.sign_up( |
|
|
{ |
|
|
"email": email, |
|
|
"password": password, |
|
|
} |
|
|
) |
|
|
print(response.user.id) |
|
|
|
|
|
USER_ID = response.user.id |
|
|
data = { |
|
|
"id": USER_ID, |
|
|
"name": name, |
|
|
"email": email, |
|
|
"country": country, |
|
|
"dialect": dialect, |
|
|
"dialect_code": dialect_code, |
|
|
"city": city, |
|
|
"gender": gender, |
|
|
"created_at": datetime.now().isoformat() |
|
|
} |
|
|
print("HEERER") |
|
|
try: |
|
|
session = response.session |
|
|
supabase.postgrest.auth(session.access_token) |
|
|
supabase.table("users").insert(data).execute() |
|
|
|
|
|
|
|
|
|
|
|
create_user_directories(username) |
|
|
except Exception as e: |
|
|
print("Error inserting user data:", e) |
|
|
|
|
|
|
|
|
try: |
|
|
session = { |
|
|
"id": USER_ID, |
|
|
"current_index": 0, |
|
|
"completed_sentences": [], |
|
|
"recorded_sentences": [], |
|
|
"total_recording_duration": 0.0 |
|
|
} |
|
|
save_sessions(USER_ID, session) |
|
|
except Exception as e: |
|
|
print("Error creating session data:", e) |
|
|
|
|
|
return True, username |
|
|
except AuthWeakPasswordError as e: |
|
|
return( |
|
|
False, "⚠️ كلمة السر غير مقبولة. استخدم علي الأقل 6 أرقام" |
|
|
) |
|
|
except Exception as e: |
|
|
if(str(e) == "User already registered"): |
|
|
return( |
|
|
False, "⚠️ هذا المستخدم موجود بالفعل, قم بتسجيل الدخول علي حسابك" |
|
|
) |
|
|
elif(str(e) == "Unable to validate email address: invalid format"): |
|
|
return( |
|
|
False, "⚠️ يوجد خطأ بالإيميل, يرجي إعادة الإدخال مرة أخري بصورة صحيحة" |
|
|
) |
|
|
else: |
|
|
print(e) |
|
|
return( |
|
|
False, "⚠️ حدث خطأ غير متوقع, يرجى المحاولة مرة أخرى" |
|
|
) |
|
|
|
|
|
def authenticate_user(email, password): |
|
|
"""Authenticate a user.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
global supabase, USER_ID |
|
|
try: |
|
|
response = supabase.auth.sign_in_with_password( |
|
|
{ |
|
|
"email": email, |
|
|
"password": password, |
|
|
} |
|
|
) |
|
|
USER_ID = response.user.id |
|
|
session = response.session |
|
|
supabase.postgrest.auth(session.access_token) |
|
|
print(USER_ID) |
|
|
load_sessions(USER_ID) |
|
|
print("XDCE") |
|
|
return( |
|
|
True, |
|
|
email |
|
|
) |
|
|
except Exception as e: |
|
|
print(e) |
|
|
return( |
|
|
False, |
|
|
"⚠️ هناك خطأ بالإيميل أو كلمة السر, في حالة عدم وجود حساب يرجي إنشاء حساب أولاً", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_user_session(id, username, session_data): |
|
|
"""Update session data for a user.""" |
|
|
sessions = load_sessions(id) |
|
|
sessions[username] = { |
|
|
"id": USER_ID, |
|
|
"current_index": session_data.get("current_index", 0), |
|
|
"completed_sentences": session_data.get("completed_sentences", []), |
|
|
"recorded_sentences": session_data.get("recorded_sentences", []), |
|
|
"total_recording_duration": session_data.get("total_recording_duration", 0.0) |
|
|
} |
|
|
save_sessions(USER_ID, session_data) |
|
|
update_done_sentences_file(username, session_data.get("completed_sentences", [])) |
|
|
|
|
|
def update_done_sentences_file(username, completed_sentences): |
|
|
"""Update the done sentences file in organized structure.""" |
|
|
user_dir = os.path.join(USERS_DIR_TMP, username) |
|
|
done_file = os.path.join(user_dir, f"{username}_DONE_SENTENCES.txt") |
|
|
|
|
|
with open(done_file, "w") as f: |
|
|
for sentence_id in completed_sentences: |
|
|
f.write(f"{sentence_id}\n") |
|
|
|
|
|
s3_path = f"{username}/{username}_DONE_SENTENCES.txt" |
|
|
if upload_to_s3(done_file, s3_path): |
|
|
try: |
|
|
os.remove(done_file) |
|
|
print(f"Deleted local file: {done_file}") |
|
|
except OSError as e: |
|
|
print(f"Error deleting local file {done_file}: {e}") |
|
|
|
|
|
|
|
|
def load_sentences(): |
|
|
"""Load sentences from the database file.""" |
|
|
with open(SENTENCES_DB_FILE, "r") as f: |
|
|
data = json.load(f) |
|
|
return [(s["unique_id"], s["text"], s.get("dialect", [])) for s in data["sentences"]] |
|
|
|
|
|
def get_available_sentences(completed_sentences, user_dialect_code): |
|
|
"""Get sentences that haven't been completed by the user and match their dialect.""" |
|
|
all_sentences = load_sentences() |
|
|
return [ |
|
|
(sid, text) |
|
|
for sid, text, dialects in all_sentences |
|
|
if sid not in completed_sentences and user_dialect_code in dialects |
|
|
] |
|
|
|
|
|
|
|
|
def create_user_directories(username): |
|
|
"""Create directory structure for a user in the organized users folder.""" |
|
|
user_dir = os.path.join(USERS_DIR_TMP, username) |
|
|
os.makedirs(os.path.join(user_dir, "wavs"), exist_ok=True) |
|
|
os.makedirs(os.path.join(user_dir, "txt"), exist_ok=True) |
|
|
|
|
|
metadata_file = os.path.join(user_dir, "metadata.csv") |
|
|
if not os.path.exists(metadata_file): |
|
|
with open(metadata_file, "w") as f: |
|
|
f.write("audio_file|text\n") |
|
|
|
|
|
done_file = os.path.join(user_dir, f"{username}_DONE_SENTENCES.txt") |
|
|
if not os.path.exists(done_file): |
|
|
with open(done_file, "w") as f: |
|
|
pass |
|
|
|
|
|
def save_recording(username, audio_path, sentence_id, sentence_text): |
|
|
"""Save recording in the organized user directory.""" |
|
|
user_dir = os.path.join(USERS_DIR, username) |
|
|
wav_filename = f"{username}_{sentence_id}.wav" |
|
|
local_wav_path = os.path.join(user_dir, "wavs", wav_filename) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
shutil.copy2(audio_path, local_wav_path) |
|
|
print(f"Copied {audio_path} to {local_wav_path}") |
|
|
|
|
|
try: |
|
|
os.remove(audio_path) |
|
|
except OSError as e: |
|
|
print(f"Warning: Could not remove temp file {audio_path}: {e}") |
|
|
|
|
|
except OSError as e: |
|
|
print(f"Error renaming {audio_path} to {local_wav_path}: {e}") |
|
|
return False, 0.0 |
|
|
|
|
|
|
|
|
metadata_path = os.path.join(user_dir, "metadata.csv") |
|
|
try: |
|
|
with open(metadata_path, "a") as f: |
|
|
f.write(f"{wav_filename}|{sentence_text}\n") |
|
|
print(f"Updated {metadata_path}") |
|
|
except OSError as e: |
|
|
print(f"Error updating {metadata_path}: {e}") |
|
|
return False, 0.0 |
|
|
|
|
|
|
|
|
try: |
|
|
with sf.SoundFile(local_wav_path) as f: |
|
|
duration = len(f) / f.samplerate |
|
|
print(f"Recording duration: {duration} seconds") |
|
|
except Exception as e: |
|
|
print(f"Error reading audio duration for {local_wav_path}: {e}") |
|
|
duration = 0.0 |
|
|
|
|
|
|
|
|
s3_wav_path = f"{username}/wavs/{wav_filename}" |
|
|
s3_metadata_path = f"{username}/metadata.csv" |
|
|
all_uploads_successful = True |
|
|
|
|
|
if not upload_to_s3(local_wav_path, s3_wav_path): |
|
|
all_uploads_successful = False |
|
|
if not upload_to_s3(metadata_path, s3_metadata_path): |
|
|
all_uploads_successful = False |
|
|
|
|
|
|
|
|
if all_uploads_successful: |
|
|
try: |
|
|
os.remove(local_wav_path) |
|
|
print(f"Deleted local file: {local_wav_path}") |
|
|
except OSError as e: |
|
|
print(f"Error deleting local files: {e}") |
|
|
all_uploads_successful = False |
|
|
|
|
|
print(f"save_recording completed, success={all_uploads_successful}") |
|
|
return all_uploads_successful, duration |
|
|
|
|
|
|
|
|
def login_interface(): |
|
|
"""Create the login interface components.""" |
|
|
login_title = gr.Markdown("# TTS Dataset Creator - Login") |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("Login"): |
|
|
login_email = gr.Textbox(label="Email") |
|
|
login_password = gr.Textbox(label="Password", type="password") |
|
|
login_button = gr.Button("Login") |
|
|
login_message = gr.Textbox(label="Message", interactive=False) |
|
|
with gr.Tab("Register"): |
|
|
register_name = gr.Textbox(label="Name (in English)") |
|
|
register_email = gr.Textbox(label="Email") |
|
|
register_password = gr.Textbox(label="Password", type="password") |
|
|
register_country = gr.Dropdown(choices=COUNTRIES, label="Country", value="Saudi Arabia") |
|
|
register_dialect = gr.Dropdown(choices=[], label="Dialect", value=None) |
|
|
register_city = gr.Textbox(label="City", placeholder="Enter your city name") |
|
|
register_gender = gr.Dropdown(choices=GENDERS, label="Gender") |
|
|
register_consent = gr.Checkbox( |
|
|
label="بمجرد إنشاء حساب فأنا موافق علي استخدام صوتي في الأبحاث ومنها أبحاث التزييف العميق, للمزيد أضغط علي اقرأ المزيد", |
|
|
value=False |
|
|
) |
|
|
with gr.Accordion("اقرأ المزيد", open=False): |
|
|
consent_details = gr.Markdown(CONSENT_DETAILS) |
|
|
register_button = gr.Button("Register") |
|
|
register_message = gr.Textbox(label="Message", interactive=False) |
|
|
|
|
|
return ( |
|
|
login_title, login_email, login_password, login_button, login_message, |
|
|
register_name, register_email, register_password, register_country, register_dialect, |
|
|
register_city, register_gender, register_consent, consent_details, |
|
|
register_button, register_message |
|
|
) |
|
|
|
|
|
def main_app_interface(): |
|
|
"""Create the main application interface components.""" |
|
|
main_title = gr.Markdown("# TTS Dataset Creator", visible=False) |
|
|
|
|
|
|
|
|
with gr.Row(visible=False) as user_row: |
|
|
username_display = gr.Textbox(label="Username", interactive=False) |
|
|
logout_button = gr.Button("Logout") |
|
|
|
|
|
|
|
|
with gr.Accordion("تعليمات التسجيل", open=True, visible=False) as instructions_accordion: |
|
|
instructions_display = gr.Markdown("", elem_id="instructions_display") |
|
|
|
|
|
|
|
|
with gr.Column(visible=False) as recording_row: |
|
|
current_sentence_display = gr.Textbox( |
|
|
label="Current Sentence", |
|
|
interactive=True, |
|
|
elem_classes="large-text", |
|
|
lines=2 |
|
|
) |
|
|
with gr.Row(): |
|
|
sentence_id_display = gr.Textbox(label="Sentence ID", interactive=False) |
|
|
progress_display = gr.Textbox(label="Progress", interactive=False) |
|
|
|
|
|
with gr.Row(visible=False) as audio_row: |
|
|
audio_recorder = gr.Audio( |
|
|
sources=["microphone"], |
|
|
format="wav", |
|
|
type="filepath", |
|
|
label="Record Audio", |
|
|
scale=4 |
|
|
) |
|
|
discard_button = gr.Button("Discard (D)", variant="stop", scale=1) |
|
|
|
|
|
|
|
|
with gr.Row(visible=False) as button_row: |
|
|
save_button = gr.Button("Save & Next (N)", variant="primary") |
|
|
skip_button = gr.Button("Skip (S)", variant="secondary") |
|
|
|
|
|
|
|
|
with gr.Row(visible=False) as status_row: |
|
|
session_status = gr.Textbox(label="Session Status", interactive=False) |
|
|
|
|
|
return ( |
|
|
main_title, user_row, username_display, logout_button, instructions_accordion, |
|
|
instructions_display, recording_row, current_sentence_display, sentence_id_display, |
|
|
progress_display, audio_row, audio_recorder, button_row, save_button, discard_button, |
|
|
skip_button, status_row, session_status |
|
|
) |
|
|
|
|
|
def create_app(): |
|
|
"""Create the main Gradio application.""" |
|
|
with gr.Blocks(css=""" |
|
|
#app-title { text-align: center; margin-bottom: 20px; } |
|
|
.recording-buttons { display: flex; justify-content: space-between; } |
|
|
.large-text { font-size: 1.5em; font-weight: bold; } |
|
|
#instructions_display { |
|
|
text-align: right; direction: rtl; |
|
|
} |
|
|
""") as app: |
|
|
|
|
|
state = gr.State({ |
|
|
"logged_in": False, |
|
|
"username": "", |
|
|
"dialect_code": "", |
|
|
"sentences": [], |
|
|
"current_index": 0, |
|
|
"completed_sentences": [], |
|
|
"recorded_sentences": [], |
|
|
"total_recording_duration": 0.0, |
|
|
"current_sentence_text": "", |
|
|
"current_sentence_id": "", |
|
|
"progress": "", |
|
|
"last_save_time": 0 |
|
|
}) |
|
|
|
|
|
|
|
|
( |
|
|
login_title, login_email, login_password, login_button, login_message, |
|
|
register_name, register_email, register_password, register_country, register_dialect, |
|
|
register_city, register_gender, register_consent, consent_details, |
|
|
register_button, register_message |
|
|
) = login_interface() |
|
|
|
|
|
( |
|
|
main_title, user_row, username_display, logout_button, instructions_accordion, |
|
|
instructions_display, recording_row, current_sentence_display, sentence_id_display, |
|
|
progress_display, audio_row, audio_recorder, button_row, save_button, discard_button, |
|
|
skip_button, status_row, session_status |
|
|
) = main_app_interface() |
|
|
|
|
|
def update_dialects(country): |
|
|
"""Update dialect choices based on selected country.""" |
|
|
dialects = get_dialects_for_country(country) |
|
|
return gr.update(choices=dialects, value=dialects[0] if dialects else None) |
|
|
|
|
|
|
|
|
def login(email, password, state_dict): |
|
|
email = email.lower() |
|
|
success, result = authenticate_user(email, password) |
|
|
if success: |
|
|
username = result |
|
|
session_data = load_sessions(USER_ID) |
|
|
user_data = load_users(USER_ID) |
|
|
user_dialect_code = user_data.get("dialect_code", "") |
|
|
user_gender = user_data.get("gender", "Male") |
|
|
|
|
|
available_sentences = get_available_sentences( |
|
|
session_data.get("completed_sentences", []), |
|
|
user_dialect_code |
|
|
) |
|
|
|
|
|
state_dict["logged_in"] = True |
|
|
state_dict["username"] = username |
|
|
state_dict["dialect_code"] = user_dialect_code |
|
|
state_dict["sentences"] = available_sentences |
|
|
state_dict["current_index"] = session_data.get("current_index", 0) |
|
|
state_dict["completed_sentences"] = session_data.get("completed_sentences", []) |
|
|
state_dict["recorded_sentences"] = session_data.get("recorded_sentences", []) |
|
|
state_dict["total_recording_duration"] = session_data.get("total_recording_duration", 0.0) |
|
|
state_dict["last_save_time"] = 0 |
|
|
state_dict["is_processing"] = False |
|
|
|
|
|
|
|
|
instructions = RECORDING_INSTRUCTIONS_M if user_gender == "Male" else RECORDING_INSTRUCTIONS_F |
|
|
|
|
|
if available_sentences: |
|
|
sentence_id, sentence_text = available_sentences[0] |
|
|
state_dict["current_sentence_text"] = sentence_text |
|
|
state_dict["current_sentence_id"] = sentence_id |
|
|
total = len(available_sentences) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
state_dict["progress"] = progress |
|
|
else: |
|
|
state_dict["current_sentence_text"] = "" |
|
|
state_dict["current_sentence_id"] = "" |
|
|
state_dict["progress"] = "0 sentences recorded, 0 minutes and 0 seconds total" |
|
|
|
|
|
return ( |
|
|
state_dict, |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
username, |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
instructions, |
|
|
gr.update(visible=True), |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
state_dict["progress"], |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
"Logged in successfully. Ready to record sentences." if available_sentences else "No sentences available." |
|
|
) |
|
|
else: |
|
|
return ( |
|
|
state_dict, |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
"Invalid email or password", |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
"", |
|
|
"", |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
"" |
|
|
) |
|
|
|
|
|
|
|
|
def register(name, email, password, country, dialect, city, gender, consent, state_dict): |
|
|
email = email.lower() |
|
|
if not all([name, email, password, country, dialect, city, gender]): |
|
|
return state_dict, "Please fill in all fields", gr.update(visible=True) |
|
|
if not consent: |
|
|
return state_dict, "You must agree to the research data usage terms to register", gr.update(visible=True) |
|
|
|
|
|
success, result = register_user(name, email, password, country, dialect, city, gender, consent) |
|
|
if success: |
|
|
return state_dict, f"Registration successful! Your username is: {result}. Please go to the Login tab.", gr.update(visible=True) |
|
|
else: |
|
|
return state_dict, result, gr.update(visible=True) |
|
|
|
|
|
|
|
|
def logout(state_dict): |
|
|
state_dict["logged_in"] = False |
|
|
state_dict["username"] = "" |
|
|
state_dict["dialect_code"] = "" |
|
|
state_dict["sentences"] = [] |
|
|
state_dict["current_index"] = 0 |
|
|
state_dict["completed_sentences"] = [] |
|
|
state_dict["recorded_sentences"] = [] |
|
|
state_dict["total_recording_duration"] = 0.0 |
|
|
state_dict["current_sentence_text"] = "" |
|
|
state_dict["current_sentence_id"] = "" |
|
|
state_dict["progress"] = "" |
|
|
state_dict["last_save_time"] = 0 |
|
|
|
|
|
return ( |
|
|
state_dict, |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=True), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
"", |
|
|
"", |
|
|
"", |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
gr.update(visible=False), |
|
|
"" |
|
|
) |
|
|
|
|
|
|
|
|
def save_recording_handler(audio_path, state_dict, edited_sentence): |
|
|
|
|
|
if state_dict.get("is_processing", False): |
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
"⏳ Processing previous recording...", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
current_time = time.time() |
|
|
if current_time - state_dict["last_save_time"] < 3: |
|
|
print("Ignoring rapid save request due to debounce") |
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
"⏳ Please wait before saving again...", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
state_dict["is_processing"] = True |
|
|
state_dict["last_save_time"] = current_time |
|
|
|
|
|
try: |
|
|
print(f"save_recording_handler called with audio_path={audio_path}") |
|
|
|
|
|
if not audio_path: |
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
"Please record audio before saving", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
is_valid, validation_message = validate_audio_quality(audio_path) |
|
|
if not is_valid: |
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
f"❌ Audio quality issue: {validation_message}. Please record again.", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
if not state_dict["sentences"]: |
|
|
total = len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
"No sentences available", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
current_index = state_dict["current_index"] |
|
|
sentence_id, _ = state_dict["sentences"][current_index] |
|
|
sentence_text = edited_sentence.strip() if edited_sentence.strip() else state_dict["current_sentence_text"] |
|
|
|
|
|
success, duration = save_recording(state_dict["username"], audio_path, sentence_id, sentence_text) |
|
|
|
|
|
if sentence_id not in state_dict["completed_sentences"]: |
|
|
state_dict["completed_sentences"].append(sentence_id) |
|
|
if sentence_id not in state_dict["recorded_sentences"]: |
|
|
state_dict["recorded_sentences"].append(sentence_id) |
|
|
state_dict["total_recording_duration"] += duration |
|
|
|
|
|
|
|
|
update_user_session(USER_ID, state_dict["username"], { |
|
|
"id": USER_ID, |
|
|
"current_index": state_dict["current_index"], |
|
|
"completed_sentences": state_dict["completed_sentences"], |
|
|
"recorded_sentences": state_dict["recorded_sentences"], |
|
|
"total_recording_duration": state_dict["total_recording_duration"] |
|
|
}) |
|
|
|
|
|
state_dict["sentences"] = get_available_sentences( |
|
|
state_dict["completed_sentences"], |
|
|
state_dict["dialect_code"] |
|
|
) |
|
|
|
|
|
|
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
|
|
|
if state_dict["sentences"]: |
|
|
next_id, next_text = random.choice(state_dict["sentences"]) |
|
|
state_dict["current_sentence_text"] = next_text |
|
|
state_dict["current_sentence_id"] = next_id |
|
|
state_dict["current_index"] = next((i for i, s in enumerate(state_dict["sentences"]) if s[0] == next_id), 0) |
|
|
status_message = "Recording saved successfully. Ready to record next sentence" if success else "Recording saved locally but failed to upload to S3. Moving to next sentence." |
|
|
else: |
|
|
state_dict["current_sentence_text"] = "" |
|
|
state_dict["current_sentence_id"] = "" |
|
|
state_dict["current_index"] = 0 |
|
|
next_id, next_text = "", "" |
|
|
status_message = "Recording saved, no more sentences available" if success else "Recording saved locally but failed to upload to S3. No more sentences available." |
|
|
|
|
|
return ( |
|
|
state_dict, |
|
|
next_text, |
|
|
next_id, |
|
|
progress, |
|
|
status_message, |
|
|
gr.update(value=None) |
|
|
) |
|
|
finally: |
|
|
state_dict["is_processing"] = False |
|
|
|
|
|
|
|
|
def discard_recording(state_dict): |
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
"Recording discarded. Please record again.", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
|
|
|
def skip_sentence(state_dict): |
|
|
if not state_dict["sentences"]: |
|
|
total = len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
return ( |
|
|
state_dict, |
|
|
state_dict["current_sentence_text"], |
|
|
state_dict["current_sentence_id"], |
|
|
progress, |
|
|
"No sentences available", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
current_index = state_dict["current_index"] |
|
|
current_id, _ = state_dict["sentences"][current_index] |
|
|
if current_id not in state_dict["completed_sentences"]: |
|
|
state_dict["completed_sentences"].append(current_id) |
|
|
|
|
|
state_dict["sentences"] = get_available_sentences( |
|
|
state_dict["completed_sentences"], |
|
|
state_dict["dialect_code"] |
|
|
) |
|
|
|
|
|
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences()) |
|
|
completed = len(state_dict["recorded_sentences"]) |
|
|
duration = state_dict["total_recording_duration"] |
|
|
minutes = int(duration // 60) |
|
|
seconds = int(duration % 60) |
|
|
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total" |
|
|
|
|
|
if state_dict["sentences"]: |
|
|
next_id, next_text = random.choice(state_dict["sentences"]) |
|
|
state_dict["current_sentence_text"] = next_text |
|
|
state_dict["current_sentence_id"] = next_id |
|
|
state_dict["current_index"] = next((i for i, s in enumerate(state_dict["sentences"]) if s[0] == next_id), 0) |
|
|
else: |
|
|
next_id, next_text = "", "" |
|
|
state_dict["current_sentence_text"] = "" |
|
|
state_dict["current_sentence_id"] = "" |
|
|
state_dict["current_index"] = 0 |
|
|
|
|
|
|
|
|
|
|
|
print(USER_ID) |
|
|
update_user_session(USER_ID, state_dict["username"], { |
|
|
"id": USER_ID, |
|
|
"current_index": state_dict["current_index"], |
|
|
"completed_sentences": state_dict["completed_sentences"], |
|
|
"recorded_sentences": state_dict["recorded_sentences"], |
|
|
"total_recording_duration": state_dict["total_recording_duration"] |
|
|
}) |
|
|
|
|
|
return ( |
|
|
state_dict, |
|
|
next_text, |
|
|
next_id, |
|
|
progress, |
|
|
"Sentence skipped. Ready to record next sentence.", |
|
|
gr.update(value=None) |
|
|
) |
|
|
|
|
|
|
|
|
register_country.change( |
|
|
update_dialects, |
|
|
inputs=[register_country], |
|
|
outputs=[register_dialect] |
|
|
) |
|
|
|
|
|
login_button.click( |
|
|
login, |
|
|
inputs=[login_email, login_password, state], |
|
|
outputs=[ |
|
|
state, login_title, login_email, login_password, login_button, login_message, |
|
|
register_name, register_email, register_password, register_country, register_dialect, |
|
|
register_city, register_gender, register_consent, consent_details, register_button, |
|
|
register_message, main_title, user_row, username_display, logout_button, |
|
|
instructions_accordion, instructions_display, recording_row, current_sentence_display, |
|
|
sentence_id_display, progress_display, audio_row, audio_recorder, button_row, save_button, |
|
|
discard_button, skip_button, status_row, session_status |
|
|
] |
|
|
) |
|
|
|
|
|
register_button.click( |
|
|
register, |
|
|
inputs=[register_name, register_email, register_password, register_country, register_dialect, |
|
|
register_city, register_gender, register_consent, state], |
|
|
outputs=[state, register_message, consent_details] |
|
|
) |
|
|
|
|
|
logout_button.click( |
|
|
logout, |
|
|
inputs=[state], |
|
|
outputs=[ |
|
|
state, login_title, login_email, login_password, login_button, login_message, |
|
|
register_name, register_email, register_password, register_country, register_dialect, |
|
|
register_city, register_gender, register_consent, consent_details, register_button, |
|
|
register_message, main_title, user_row, username_display, logout_button, |
|
|
instructions_accordion, instructions_display, recording_row, current_sentence_display, |
|
|
sentence_id_display, progress_display, audio_row, audio_recorder, button_row, save_button, |
|
|
discard_button, skip_button, status_row, session_status |
|
|
] |
|
|
) |
|
|
|
|
|
save_button.click( |
|
|
save_recording_handler, |
|
|
inputs=[audio_recorder, state, current_sentence_display], |
|
|
outputs=[ |
|
|
state, |
|
|
current_sentence_display, |
|
|
sentence_id_display, |
|
|
progress_display, |
|
|
session_status, |
|
|
audio_recorder |
|
|
] |
|
|
) |
|
|
|
|
|
discard_button.click( |
|
|
discard_recording, |
|
|
inputs=[state], |
|
|
outputs=[state, current_sentence_display, sentence_id_display, progress_display, session_status, audio_recorder] |
|
|
) |
|
|
|
|
|
skip_button.click( |
|
|
skip_sentence, |
|
|
inputs=[state], |
|
|
outputs=[ |
|
|
state, |
|
|
current_sentence_display, |
|
|
sentence_id_display, |
|
|
progress_display, |
|
|
session_status, |
|
|
audio_recorder |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
app.load( |
|
|
lambda: update_dialects("Saudi Arabia"), |
|
|
outputs=[register_dialect] |
|
|
) |
|
|
|
|
|
return app |
|
|
|
|
|
if __name__ == "__main__": |
|
|
port = int(os.environ.get("GRADIO_SERVER_PORT", 7860)) |
|
|
|
|
|
app = create_app() |
|
|
app.launch(server_name="0.0.0.0", server_port=port, share=False) |
|
|
|