AhmedAshrafMarzouk's picture
Update app.py
b8e9ab6 verified
raw
history blame
61.8 kB
import gradio as gr
import os
import json
import uuid
import boto3
import hashlib
from datetime import datetime
import re
import time
import random
import shutil
from werkzeug.security import generate_password_hash, check_password_hash
from botocore.exceptions import NoCredentialsError, ClientError
import soundfile as sf
import shutil
import threading
from pathlib import Path
from supabase import create_client, Client
from supabase.client import ClientOptions
from supabase_auth.errors import (
AuthWeakPasswordError,
AuthInvalidCredentialsError,
AuthApiError,
)
# AWS Configuration
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY", "")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY", "")
S3_BUCKET = os.environ.get("S3_BUCKET", "tts-dataset-creator-593793024422-prod")
AWS_REGION = os.environ.get("AWS_REGION", "me-south-1")
SAMPLE_RATE = 44100
APP_CODE_DIR = "./" # App code (GitHub repo)
USER_ID = ""
url = os.environ.get("url", "")
key = os.environ.get("key", "")
supabase = create_client(url, key)
# Countries and their dialects
COUNTRIES = [
"Algeria", "Bahrain", "Egypt", "Iraq", "Jordan", "Kuwait", "Lebanon",
"Libya", "Mauritania", "Morocco", "Oman", "Palestine", "Qatar",
"Saudi Arabia", "Somalia", "Sudan", "Syria", "Tunisia",
"United Arab Emirates", "Yemen"
]
# Country codes mapping
COUNTRY_CODES = {
"Algeria": "dz",
"Bahrain": "bh",
"Egypt": "eg",
"Iraq": "iq",
"Jordan": "jo",
"Kuwait": "kw",
"Lebanon": "lb",
"Libya": "ly",
"Mauritania": "mr",
"Morocco": "ma",
"Oman": "om",
"Palestine": "ps",
"Qatar": "qa",
"Saudi Arabia": "sa",
"Somalia": "so",
"Sudan": "sd",
"Syria": "sy",
"Tunisia": "tn",
"United Arab Emirates": "ae",
"Yemen": "ye"
}
# Dialect configurations by country
COUNTRY_DIALECTS = {
"Saudi Arabia": {
"حجازية": "hj",
"حجازية بدوية": "hj-bd",
"جنوبية": "jn",
"تهامية": "th",
"نجدية": "nj",
"نجدية بدوية": "nj-bd",
"قصيمية": "qm",
"الشمال": "sh",
"حساوية": "hs",
"قطيفية": "qt",
"سيهاتية": "sy",
"أخرى": "oth"
},
"Egypt": {
"قاهرية": "ca",
"إسكندرانية": "al",
"صعيدية": "sa",
"سيناوية": "si",
"دلتاوية": "de",
"أخرى": "oth"
},
"Morocco": {
"فاسية": "fe",
"دار البيضاء": "ca",
"مراكشية": "ma",
"شمالية": "no",
"أطلسية": "at",
"أخرى": "oth"
},
"Iraq": {
"بغدادية": "ba",
"بصراوية": "bs",
"موصلية": "mo",
"كردية": "ku",
"جنوبية": "so",
"أخرى": "oth"
},
"Yemen": {
"صنعانية": "sa",
"عدنية": "ad",
"حضرمية": "ha",
"تهامية": "ti",
"أخرى": "oth"
},
"Jordan": {
"عمانية": "am",
"شمالية": "no",
"جنوبية": "so",
"بدوية": "be",
"أخرى": "oth"
},
"Lebanon": {
"بيروتية": "be",
"جبلية": "mo",
"جنوبية": "so",
"شمالية": "no",
"أخرى": "oth"
},
"Syria": {
"دمشقية": "da",
"حلبية": "al",
"حمصية": "ho",
"ساحلية": "co",
"أخرى": "oth"
},
"Palestine": {
"قدسية": "je",
"غزاوية": "ga",
"خليلية": "he",
"شمالية": "no",
"أخرى": "oth"
},
"United Arab Emirates": {
"إماراتية": "em",
"دبية": "du",
"أبوظبية": "ad",
"شارقية": "sh",
"أخرى": "oth"
},
"Kuwait": {
"كويتية": "ku",
"بدوية": "be",
"أخرى": "oth"
},
"Qatar": {
"قطرية": "qa",
"بدوية": "be",
"أخرى": "oth"
},
"Bahrain": {
"بحرينية": "ba",
"مدنية": "ur",
"أخرى": "oth"
},
"Oman": {
"عمانية": "om",
"ظفارية": "dh",
"داخلية": "in",
"أخرى": "oth"
},
"Algeria": {
"جزائرية": "al",
"قسنطينية": "co",
"وهرانية": "or",
"قبائلية": "ka",
"أخرى": "oth"
},
"Tunisia": {
"تونسية": "tu",
"صفاقسية": "sf",
"سوسية": "so",
"أخرى": "oth"
},
"Libya": {
"طرابلسية": "tr",
"بنغازية": "be",
"فزانية": "fe",
"أخرى": "oth"
},
"Sudan": {
"خرطومية": "kh",
"شمالية": "no",
"دارفورية": "da",
"أخرى": "oth"
},
"Somalia": {
"صومالية": "so",
"شمالية": "no",
"جنوبية": "so",
"أخرى": "oth"
},
"Mauritania": {
"موريتانية": "mr",
"حسانية": "ha",
"أخرى": "oth"
}
}
def get_dialect_code(country, dialect):
"""Generate full dialect code from country and dialect."""
country_code = COUNTRY_CODES.get(country, "unk")
dialect_code = COUNTRY_DIALECTS.get(country, {}).get(dialect, "gen")
return f"{country_code}-{dialect_code}"
def get_dialects_for_country(country):
"""Get available dialects for a country."""
return list(COUNTRY_DIALECTS.get(country, {}).keys())
GENDERS = ["Male", "Female"]
RECORDING_INSTRUCTIONS = """
<section dir="rtl" lang="ar" style="text-align: right">
<ul>
<li>سجّل في مكان هادي ومفيهوش دوشة.</li>
<li>خلّي المايك أو الموبايل قريب من وشّك (حوالي ٢٠ – ٣٠ سم).</li>
<li>اتكلم بصوت واضح وطبيعي زي ما بتتكلم عادي.</li>
<li>حاول تخلي الصوت ثابت، متعليش مرّة وتوطي مرّة.</li>
<li>لو غلطت في كلمة، اقفل التسجيل وابدأ من الأول.</li>
<li>لو في جملة مش عايز تسجّلها، ممكن تختار «تخطّي».</li>
<li> ولو قابلتك أي مشكلة أو عندك سؤال، ابعتلنا على الإيميل: <a href="mailto:[email protected]">[email protected]</a> </li> </ul>
</section>
"""
RECORDING_INSTRUCTIONS_F = """
<section>
<ul>
<li>سجّلي في مكان هادي ومفيهوش دوشة.</li>
<li>خلّي المايك أو الموبايل قريب من وشّك (حوالي ٢٠ – ٣٠ سم).</li>
<li>اتكلمي بصوت واضح وطبيعي زي ما بتتكلمي عادي.</li>
<li>حاولي تخلي صوتِك ثابت، متعليش مرّة وتوطي مرّة.</li>
<li>لو غلطتي في كلمة، اقفلي التسجيل وابدئي من الأول.</li>
<li>لو في جملة مش عايزة تسجّليها، ممكن تختاري «تخطّي».</li>
<li>
لو قابلتك أي مشكلة أو عندِك سؤال، ابعتي لنا على الإيميل: <a href="mailto:[email protected]">[email protected]</a>
</section>
"""
RECORDING_INSTRUCTIONS_M = """
<section dir="rtl" lang="ar" style="text-align: right"> <ul>
<li>سجّل في مكان هادي ومفيهوش دوشة.</li>
<li>خلّي المايك أو الموبايل قريب من وشّك (حوالي ٢٠ – ٣٠ سم).</li>
<li>اتكلم بصوت واضح وطبيعي زي ما بتتكلم عادي.</li>
<li>حاول تخلي الصوت ثابت، متعليش مرّة وتوطي مرّة.</li>
<li>لو غلطت في كلمة، اقفل التسجيل وابدأ من الأول.</li>
<li>لو في جملة مش عايز تسجّلها، ممكن تختار «تخطّي».</li>
<li> ولو قابلتك أي مشكلة أو عندك سؤال، ابعتلنا على الإيميل: <a href="mailto:[email protected]">[email protected]</a> </li> </ul>
</section>
"""
CONSENT_DETAILS = """
<section dir="rtl" lang="ar" style="text-align: right">
<h1>الموافقة على جمع واستخدام البيانات</h1>
<p>
الاتفاقية دي ما بين <strong>المشارك (حضرتك)</strong> وبين فريق البحث من
<strong>جامعة الملك فهد للبترول والمعادن</strong> و<strong>جامعة طيبة</strong>
(اللي هنسميه هنا "الجامعتين").
الاتفاقية مخصوص علشان جمع واستخدام وتوزيع تسجيلات صوتية هتساعد في أبحاث كشف الأصوات المزوّرة (Deepfake)
وغيرها من الأبحاث غير التجارية.
</p>
<ol>
<li>
<strong>هدف جمع البيانات:</strong><br>
الفريق بيجمع تسجيلات صوت علشان يعمل مجموعة بيانات (Dataset) خاصة بالكشف عن الأصوات المصنوعة بالذكاء الاصطناعي
باستخدام تقنيات تحويل النص لصوت (TTS) أو تقليد الأصوات (Voice Conversion) وطرق تانية.
البيانات دي هتتستخدم في أبحاث علمية وأكاديمية لتطوير طرق أفضل لاكتشاف الأصوات المزوّرة وغيرها من الأبحاث غير التجارية.
</li>
<li>
<strong>طبيعة البيانات اللي هتتجمع:</strong><br>
المشارك بيوافق إنه يدي:
<ul>
<li>تسجيلات صوتية بصوته الطبيعي أو من خلال نصوص/جمل هنطلب منه يقراها.</li>
<li>بيانات اختيارية زي: النوع (ذكر/أنثى)، الفئة العمرية، اللهجة، إلخ.</li>
<li>موافقة إن صوته يتعدل أو يتغيّر بأساليب صناعية.</li>
</ul>
</li>
<li>
<strong>الحقوق الممنوحة:</strong><br>
المشارك بيدي الفريق الحق الكامل (من غير فلوس أو قيود) إنه:
<ul>
<li>يسجّل ويعالج ويستخدم الصوت الطبيعي والنسخ المصنوعة منه.</li>
<li>يوزّع مجموعة البيانات (الطبيعية والمصنوعة) للباحثين في المجتمع العلمي لأغراض بحثية غير تجارية فقط.</li>
<li>ينشر عينات صوتية على منصات مهنية أو أكاديمية زي LinkedIn، Twitter/X، YouTube علشان يزود الوعي بأبحاث الديب فيك أو يعرّف الناس عن توفر البيانات.</li>
</ul>
</li>
<li>
<strong>إتاحة البيانات:</strong><br>
المجموعة الصوتية (الطبيعية والمصنوعة) هتتنشر بترخيص مفتوح
<em>(Creative Commons Attribution 4.0)</em>
يسمح لأي باحث يستخدمها ويشاركها للأغراض الأكاديمية غير التجارية.
</li>
<li>
<strong>الخصوصية والسرية:</strong><br>
<ul>
<li>مش هيتم نشر اسم المشارك أو أي بيانات شخصية مباشرة إلا بموافقته المكتوبة.</li>
<li>المشارك هيكون ليه ID مجهول في مجموعة البيانات.</li>
</ul>
</li>
<li>
<strong>المشاركة والانضمام:</strong><br>
<ul>
<li>المشاركة اختيارية ١٠٠٪.</li>
<li>له الحق ينسحب أو يطلب حذف تسجيلاته قبل ما مجموعة البيانات تتنشر للعامة.</li>
<li>بعد النشر العام، سحب البيانات مش هيكون ممكن بسبب طريقة توزيعها.</li>
</ul>
</li>
<li>
<strong>التعويض:</strong><br>
المشارك عارف إن مفيش أي مقابل مادي أو فلوس مقابل تسجيلاته، والمشاركة هنا لدعم وتطوير البحث العلمي فقط.
</li>
</ol>
</section>
"""
# Initialize on startup
# create_app_directories()
# Initialize user database if it doesn't exist
# if not os.path.exists(USER_DB_FILE):
# with open(USER_DB_FILE, "w") as f:
# json.dump({}, f)
# # Initialize session database if it doesn't exist
# if not os.path.exists(SESSION_FILE):
# with open(SESSION_FILE, "w") as f:
# json.dump({}, f)
SENTENCES_DB_FILE = os.path.join(APP_CODE_DIR, "sentences.json")
# Initialize sentences database if it doesn't exist
if not os.path.exists(SENTENCES_DB_FILE):
with open(SENTENCES_DB_FILE, "w") as f:
json.dump({
"sentences": [
{
"unique_id": "001",
"text": "The birch canoe slid on the smooth planks.",
"dialect": ["sa-hj", "sa-nj"]
},
{
"unique_id": "002",
"text": "Glue the sheet to the dark blue background.",
"dialect": ["sa-jn", "sa-th"]
}
]
}, f)
# S3 Helper Functions
def get_s3_client():
"""Create and return an S3 client using IAM role or environment variables."""
try:
if not AWS_ACCESS_KEY or not AWS_SECRET_KEY:
print("Using IAM role for S3 authentication")
return boto3.client('s3', region_name=AWS_REGION)
else:
print("Using access keys for S3 authentication")
return boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION
)
except Exception as e:
print(f"Error creating S3 client: {e}")
raise
def upload_to_s3(local_file, s3_file):
"""Upload a file to S3 bucket and return True if successful."""
try:
s3_client = get_s3_client()
s3_client.upload_file(local_file, S3_BUCKET, s3_file)
print(f"Successfully uploaded {local_file} to s3://{S3_BUCKET}/{s3_file}")
return True
except NoCredentialsError:
print(f"No AWS credentials available for uploading {local_file} to s3://{S3_BUCKET}/{s3_file}")
return False
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
print(f"ClientError uploading {local_file} to s3://{S3_BUCKET}/{s3_file}: {error_code} - {error_message}")
return False
except Exception as e:
print(f"Unexpected error uploading {local_file} to s3://{S3_BUCKET}/{s3_file}: {type(e).__name__} - {str(e)}")
return False
def download_from_s3(s3_file, local_file):
"""Download a file from S3 bucket."""
try:
s3_client = get_s3_client()
s3_client.download_file(S3_BUCKET, s3_file, local_file)
print(f"Successfully downloaded s3://{S3_BUCKET}/{s3_file} to {local_file}")
return True
except NoCredentialsError:
print(f"No AWS credentials available for downloading s3://{S3_BUCKET}/{s3_file}")
return False
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
print(f"ClientError downloading s3://{S3_BUCKET}/{s3_file}: {error_code} - {error_message}")
return False
except Exception as e:
print(f"Unexpected error downloading s3://{S3_BUCKET}/{s3_file}: {type(e).__name__} - {str(e)}")
return False
# Audio Quality Validation Function - ADD HERE
def validate_audio_quality(audio_path):
"""Check if audio meets minimum quality standards"""
try:
with sf.SoundFile(audio_path) as f:
if f.samplerate < 16000:
return False, f"Sample rate too low: {f.samplerate}Hz"
if len(f) / f.samplerate < 1: # Less than 1 second
return False, "Recording too short"
return True, "Audio quality OK"
except Exception as e:
return False, f"Audio validation error: {e}"
def cleanup_audio_resources():
"""Clean up temporary audio files"""
temp_audio_dir = "/tmp" # or wherever Gradio stores temp files
try:
for file in os.listdir(temp_audio_dir):
if file.startswith("gradio") and file.endswith(".wav"):
file_path = os.path.join(temp_audio_dir, file)
if os.path.getctime(file_path) < time.time() - 300: # 5 minutes old
os.remove(file_path)
except Exception as e:
print(f"Audio cleanup error: {e}")
def normalize_username(name, dialect_code, gender):
"""Convert name to lowercase, replace spaces with underscores, add unique ID and dialect code."""
base_name = re.sub(r'\s+', '_', name.lower().strip())
unique_id = uuid.uuid4().hex[:7]
gender_suffix = "_m" if gender == "Male" else "_f" if gender == "Female" else ""
return f"{base_name}_{unique_id}_{dialect_code}{gender_suffix}"
id = uuid.uuid4().hex[:7]
AWS_DATA_DIR = "tmp/home/ubuntu/.tts_dataset_creator"
CONFIG_AWS = os.path.join(AWS_DATA_DIR, "config")
USER_DB_AWS = os.path.join(CONFIG_AWS, "users.json")
SESSION_AWS = os.path.join(CONFIG_AWS, "sessions.json")
APP_DATA_DIR = "/tmp/home/ubuntu/.tts_dataset_creator" + f"_{id}" # App data (persistent)
CONFIG_DIR = os.path.join(APP_DATA_DIR, "config") # Configuration files
USERS_DIR = os.path.join(".tts_dataset_creator", "users") # User data folders
USERS_DIR_TMP = os.path.join("tmp", USERS_DIR)
# File paths
USER_DB_FILE = os.path.join(CONFIG_DIR, "users.json")
SESSION_FILE = os.path.join(CONFIG_DIR, "sessions.json")
# Create necessary directories
def create_app_directories():
"""Create the improved directory structure."""
directories = [CONFIG_DIR, USERS_DIR_TMP]
for directory in directories:
os.makedirs(directory, exist_ok=True)
print(f"✅ Created directory structure at {APP_DATA_DIR}")
create_app_directories()
download_from_s3(USER_DB_AWS, USER_DB_FILE)
download_from_s3(SESSION_AWS, SESSION_FILE)
# User Authentication Functions
def load_users(id):
"""Load users from the database file."""
try:
response = (
supabase.table("users")
.select("*")
.eq("id", id)
.execute()
)
return response.data[0]
except Exception as e:
print("Error fetching users data:", e)
# create_app_directories()
# with open(USER_DB_FILE, "r") as f:
# return json.load(f)
def save_users(user):
"""Save users to the database file."""
try:
response = (
supabase.table("users")
.insert(user)
.execute()
)
except Exception as e:
print("Error saving session data:", e)
# with open(USER_DB_FILE, "w") as f:
# json.dump(users, f)
# upload_to_s3(USER_DB_FILE, USER_DB_AWS)
# Session Management Functions
def load_sessions(id):
try:
response = (
supabase.table("sessions")
.select("id, current_index, completed_sentences, recorded_sentences, total_recording_duration")
.eq("id", id)
.execute()
)
return response.data[0]
except Exception as e:
print("WEEEEEE")
print("Error fetching session data:", e)
# create_app_directories()
"""Load session data from file."""
# with open(SESSION_FILE, "r") as f:
# return json.load(f)
def save_sessions(id, session):
"""Save session data to file."""
session["id"] = id
try:
response = (
supabase.table("sessions")
.insert(session)
.execute()
)
except Exception as e:
print("Error saving session data:", e)
def register_user(name, email, password, country, dialect, city, gender, consent):
"""Register a new user with country, dialect, city, and gender."""
# users = load_users()
# email = email.lower()
# Check if email is already registered
# for user_id, user_data in users.items():
# if user_data.get("email") == email:
# return False, "Email already registered"
# Generate dialect code
dialect_code = get_dialect_code(country, dialect)
# Create a new user
username = normalize_username(name, dialect_code, gender)
hashed_password = generate_password_hash(password)
global supabase, USER_ID
try:
response = supabase.auth.sign_up(
{
"email": email,
"password": password,
}
)
print(response.user.id)
# print(type(response.user.id))
USER_ID = response.user.id
data = {
"id": USER_ID,
"name": name,
"email": email,
"country": country,
"dialect": dialect,
"dialect_code": dialect_code,
"city": city,
"gender": gender,
"created_at": datetime.now().isoformat()
}
print("HEERER")
try:
session = response.session
supabase.postgrest.auth(session.access_token)
supabase.table("users").insert(data).execute()
# users[username] = data
# save_users(users)
# Create user directory structure
create_user_directories(username)
except Exception as e:
print("Error inserting user data:", e)
# u_id = reponse.user.id # get user id from response
# add_user(authed,"Tibah","Saudi","Maka","Hiajazi", "male") # add user to db with empty fields
try:
session = {
"id": USER_ID,
"current_index": 0,
"completed_sentences": [],
"recorded_sentences": [],
"total_recording_duration": 0.0
}
save_sessions(USER_ID, session)
except Exception as e:
print("Error creating session data:", e)
return True, username
except AuthWeakPasswordError as e:
return(
False, "⚠️ كلمة السر غير مقبولة. استخدم علي الأقل 6 أرقام"
)
except Exception as e:
if(str(e) == "User already registered"):
return(
False, "⚠️ هذا المستخدم موجود بالفعل, قم بتسجيل الدخول علي حسابك"
)
elif(str(e) == "Unable to validate email address: invalid format"):
return(
False, "⚠️ يوجد خطأ بالإيميل, يرجي إعادة الإدخال مرة أخري بصورة صحيحة"
)
else:
print(e)
return(
False, "⚠️ حدث خطأ غير متوقع, يرجى المحاولة مرة أخرى"
)
def authenticate_user(email, password):
"""Authenticate a user."""
# users = load_users()
# email = email.lower()
# for user_id, user_data in users.items():
# if user_data.get("email") == email and check_password_hash(user_data.get("password", ""), password):
# return True, user_id
global supabase, USER_ID
try:
response = supabase.auth.sign_in_with_password(
{
"email": email,
"password": password,
}
)
USER_ID = response.user.id
session = response.session
supabase.postgrest.auth(session.access_token)
print(USER_ID)
load_sessions(USER_ID)
print("XDCE")
return(
True,
email
)
except Exception as e:
print(e)
return(
False,
"⚠️ هناك خطأ بالإيميل أو كلمة السر, في حالة عدم وجود حساب يرجي إنشاء حساب أولاً",
)
# with open(SESSION_FILE, "w") as f:
# json.dump(sessions, f)
# upload_to_s3(SESSION_FILE, SESSION_AWS)
# def get_user_session(id):
# """Get session data for a user."""
# sessions = load_sessions(id)
# return
def update_user_session(id, username, session_data):
"""Update session data for a user."""
sessions = load_sessions(id)
sessions[username] = {
"id": USER_ID,
"current_index": session_data.get("current_index", 0),
"completed_sentences": session_data.get("completed_sentences", []),
"recorded_sentences": session_data.get("recorded_sentences", []),
"total_recording_duration": session_data.get("total_recording_duration", 0.0)
}
save_sessions(USER_ID, session_data)
update_done_sentences_file(username, session_data.get("completed_sentences", []))
def update_done_sentences_file(username, completed_sentences):
"""Update the done sentences file in organized structure."""
user_dir = os.path.join(USERS_DIR_TMP, username) # Updated path
done_file = os.path.join(user_dir, f"{username}_DONE_SENTENCES.txt")
with open(done_file, "w") as f:
for sentence_id in completed_sentences:
f.write(f"{sentence_id}\n")
s3_path = f"{username}/{username}_DONE_SENTENCES.txt"
if upload_to_s3(done_file, s3_path):
try:
os.remove(done_file)
print(f"Deleted local file: {done_file}")
except OSError as e:
print(f"Error deleting local file {done_file}: {e}")
# Sentence Database Functions
def load_sentences():
"""Load sentences from the database file."""
with open(SENTENCES_DB_FILE, "r") as f:
data = json.load(f)
return [(s["unique_id"], s["text"], s.get("dialect", [])) for s in data["sentences"]]
def get_available_sentences(completed_sentences, user_dialect_code):
"""Get sentences that haven't been completed by the user and match their dialect."""
all_sentences = load_sentences()
return [
(sid, text)
for sid, text, dialects in all_sentences
if sid not in completed_sentences and user_dialect_code in dialects
]
# Directory and File Management
def create_user_directories(username):
"""Create directory structure for a user in the organized users folder."""
user_dir = os.path.join(USERS_DIR_TMP, username)
os.makedirs(os.path.join(user_dir, "wavs"), exist_ok=True)
os.makedirs(os.path.join(user_dir, "txt"), exist_ok=True)
metadata_file = os.path.join(user_dir, "metadata.csv")
if not os.path.exists(metadata_file):
with open(metadata_file, "w") as f:
f.write("audio_file|text\n")
done_file = os.path.join(user_dir, f"{username}_DONE_SENTENCES.txt")
if not os.path.exists(done_file):
with open(done_file, "w") as f:
pass
def save_recording(username, audio_path, sentence_id, sentence_text):
"""Save recording in the organized user directory."""
user_dir = os.path.join(USERS_DIR, username)
wav_filename = f"{username}_{sentence_id}.wav"
local_wav_path = os.path.join(user_dir, "wavs", wav_filename)
try:
# os.rename(audio_path, local_wav_path)
# shutil.move(audio_path, local_wav_path)
shutil.copy2(audio_path, local_wav_path)
print(f"Copied {audio_path} to {local_wav_path}")
# Clean up original temp file
try:
os.remove(audio_path)
except OSError as e:
print(f"Warning: Could not remove temp file {audio_path}: {e}")
except OSError as e:
print(f"Error renaming {audio_path} to {local_wav_path}: {e}")
return False, 0.0
# Update metadata.csv with the edited sentence text
metadata_path = os.path.join(user_dir, "metadata.csv")
try:
with open(metadata_path, "a") as f:
f.write(f"{wav_filename}|{sentence_text}\n")
print(f"Updated {metadata_path}")
except OSError as e:
print(f"Error updating {metadata_path}: {e}")
return False, 0.0
# Get recording duration
try:
with sf.SoundFile(local_wav_path) as f:
duration = len(f) / f.samplerate
print(f"Recording duration: {duration} seconds")
except Exception as e:
print(f"Error reading audio duration for {local_wav_path}: {e}")
duration = 0.0
# Upload files to S3
s3_wav_path = f"{username}/wavs/{wav_filename}"
s3_metadata_path = f"{username}/metadata.csv"
all_uploads_successful = True
if not upload_to_s3(local_wav_path, s3_wav_path):
all_uploads_successful = False
if not upload_to_s3(metadata_path, s3_metadata_path):
all_uploads_successful = False
# Delete local files if uploads successful
if all_uploads_successful:
try:
os.remove(local_wav_path)
print(f"Deleted local file: {local_wav_path}")
except OSError as e:
print(f"Error deleting local files: {e}")
all_uploads_successful = False
print(f"save_recording completed, success={all_uploads_successful}")
return all_uploads_successful, duration
# Gradio Interface Components
def login_interface():
"""Create the login interface components."""
login_title = gr.Markdown("# TTS Dataset Creator - Login")
with gr.Tabs():
with gr.Tab("Login"):
login_email = gr.Textbox(label="Email")
login_password = gr.Textbox(label="Password", type="password")
login_button = gr.Button("Login")
login_message = gr.Textbox(label="Message", interactive=False)
with gr.Tab("Register"):
register_name = gr.Textbox(label="Name (in English)")
register_email = gr.Textbox(label="Email")
register_password = gr.Textbox(label="Password", type="password")
register_country = gr.Dropdown(choices=COUNTRIES, label="Country", value="Saudi Arabia")
register_dialect = gr.Dropdown(choices=[], label="Dialect", value=None)
register_city = gr.Textbox(label="City", placeholder="Enter your city name")
register_gender = gr.Dropdown(choices=GENDERS, label="Gender")
register_consent = gr.Checkbox(
label="بمجرد إنشاء حساب فأنا موافق علي استخدام صوتي في الأبحاث ومنها أبحاث التزييف العميق, للمزيد أضغط علي اقرأ المزيد",
value=False
)
with gr.Accordion("اقرأ المزيد", open=False):
consent_details = gr.Markdown(CONSENT_DETAILS)
register_button = gr.Button("Register")
register_message = gr.Textbox(label="Message", interactive=False)
return (
login_title, login_email, login_password, login_button, login_message,
register_name, register_email, register_password, register_country, register_dialect,
register_city, register_gender, register_consent, consent_details,
register_button, register_message
)
def main_app_interface():
"""Create the main application interface components."""
main_title = gr.Markdown("# TTS Dataset Creator", visible=False)
# User info display
with gr.Row(visible=False) as user_row:
username_display = gr.Textbox(label="Username", interactive=False)
logout_button = gr.Button("Logout")
# Collapsible recording instructions
with gr.Accordion("تعليمات التسجيل", open=True, visible=False) as instructions_accordion:
instructions_display = gr.Markdown("", elem_id="instructions_display")
# Recording section
with gr.Column(visible=False) as recording_row:
current_sentence_display = gr.Textbox(
label="Current Sentence",
interactive=True,
elem_classes="large-text",
lines=2
)
with gr.Row():
sentence_id_display = gr.Textbox(label="Sentence ID", interactive=False)
progress_display = gr.Textbox(label="Progress", interactive=False)
with gr.Row(visible=False) as audio_row:
audio_recorder = gr.Audio(
sources=["microphone"],
format="wav",
type="filepath",
label="Record Audio",
scale=4
)
discard_button = gr.Button("Discard (D)", variant="stop", scale=1)
with gr.Row(visible=False) as button_row:
save_button = gr.Button("Save & Next (N)", variant="primary")
skip_button = gr.Button("Skip (S)", variant="secondary")
# Session status
with gr.Row(visible=False) as status_row:
session_status = gr.Textbox(label="Session Status", interactive=False)
return (
main_title, user_row, username_display, logout_button, instructions_accordion,
instructions_display, recording_row, current_sentence_display, sentence_id_display,
progress_display, audio_row, audio_recorder, button_row, save_button, discard_button,
skip_button, status_row, session_status
)
def create_app():
"""Create the main Gradio application."""
with gr.Blocks(css="""
#app-title { text-align: center; margin-bottom: 20px; }
.recording-buttons { display: flex; justify-content: space-between; }
.large-text { font-size: 1.5em; font-weight: bold; }
#instructions_display {
text-align: right; direction: rtl;
}
""") as app:
# Global state for the application
state = gr.State({
"logged_in": False,
"username": "",
"dialect_code": "",
"sentences": [],
"current_index": 0,
"completed_sentences": [],
"recorded_sentences": [],
"total_recording_duration": 0.0,
"current_sentence_text": "",
"current_sentence_id": "",
"progress": "",
"last_save_time": 0
})
# Create interface components
(
login_title, login_email, login_password, login_button, login_message,
register_name, register_email, register_password, register_country, register_dialect,
register_city, register_gender, register_consent, consent_details,
register_button, register_message
) = login_interface()
(
main_title, user_row, username_display, logout_button, instructions_accordion,
instructions_display, recording_row, current_sentence_display, sentence_id_display,
progress_display, audio_row, audio_recorder, button_row, save_button, discard_button,
skip_button, status_row, session_status
) = main_app_interface()
def update_dialects(country):
"""Update dialect choices based on selected country."""
dialects = get_dialects_for_country(country)
return gr.update(choices=dialects, value=dialects[0] if dialects else None)
# Login function
def login(email, password, state_dict):
email = email.lower()
success, result = authenticate_user(email, password)
if success:
username = result
session_data = load_sessions(USER_ID)
user_data = load_users(USER_ID)
user_dialect_code = user_data.get("dialect_code", "")
user_gender = user_data.get("gender", "Male")
available_sentences = get_available_sentences(
session_data.get("completed_sentences", []),
user_dialect_code
)
state_dict["logged_in"] = True
state_dict["username"] = username
state_dict["dialect_code"] = user_dialect_code
state_dict["sentences"] = available_sentences
state_dict["current_index"] = session_data.get("current_index", 0)
state_dict["completed_sentences"] = session_data.get("completed_sentences", [])
state_dict["recorded_sentences"] = session_data.get("recorded_sentences", [])
state_dict["total_recording_duration"] = session_data.get("total_recording_duration", 0.0)
state_dict["last_save_time"] = 0
state_dict["is_processing"] = False
# Select instructions based on gender
instructions = RECORDING_INSTRUCTIONS_M if user_gender == "Male" else RECORDING_INSTRUCTIONS_F
if available_sentences:
sentence_id, sentence_text = available_sentences[0]
state_dict["current_sentence_text"] = sentence_text
state_dict["current_sentence_id"] = sentence_id
total = len(available_sentences)
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
state_dict["progress"] = progress
else:
state_dict["current_sentence_text"] = ""
state_dict["current_sentence_id"] = ""
state_dict["progress"] = "0 sentences recorded, 0 minutes and 0 seconds total"
return (
state_dict,
gr.update(visible=False), # login_title
gr.update(visible=False), # login_email
gr.update(visible=False), # login_password
gr.update(visible=False), # login_button
gr.update(visible=False), # login_message
gr.update(visible=False), # register_name
gr.update(visible=False), # register_email
gr.update(visible=False), # register_password
gr.update(visible=False), # register_country
gr.update(visible=False), # register_dialect
gr.update(visible=False), # register_city
gr.update(visible=False), # register_gender
gr.update(visible=False), # register_consent
gr.update(visible=False), # consent_details
gr.update(visible=False), # register_button
gr.update(visible=False), # register_message
gr.update(visible=True), # main_title
gr.update(visible=True), # user_row
username, # username_display
gr.update(visible=True), # logout_button
gr.update(visible=True), # instructions_accordion
instructions, # instructions_display
gr.update(visible=True), # recording_row
state_dict["current_sentence_text"], # current_sentence_display
state_dict["current_sentence_id"], # sentence_id_display
state_dict["progress"], # progress_display
gr.update(visible=True), # audio_row
gr.update(visible=True), # audio_recorder
gr.update(visible=True), # button_row
gr.update(visible=True), # save_button
gr.update(visible=True), # discard_button
gr.update(visible=True), # skip_button
gr.update(visible=True), # status_row
"Logged in successfully. Ready to record sentences." if available_sentences else "No sentences available."
)
else:
return (
state_dict,
gr.update(visible=True), # login_title
gr.update(visible=True), # login_email
gr.update(visible=True), # login_password
gr.update(visible=True), # login_button
"Invalid email or password",# login_message
gr.update(visible=True), # register_name
gr.update(visible=True), # register_email
gr.update(visible=True), # register_password
gr.update(visible=True), # register_country
gr.update(visible=True), # register_dialect
gr.update(visible=True), # register_city
gr.update(visible=True), # register_gender
gr.update(visible=True), # register_consent
gr.update(visible=True), # consent_details
gr.update(visible=True), # register_button
gr.update(visible=True), # register_message
gr.update(visible=False), # main_title
gr.update(visible=False), # user_row
"", # username_display
gr.update(visible=False), # logout_button
gr.update(visible=False), # instructions_accordion
"", # instructions_display
gr.update(visible=False), # recording_row
"", # current_sentence_display
"", # sentence_id_display
"", # progress_display
gr.update(visible=False), # audio_row
gr.update(visible=False), # audio_recorder
gr.update(visible=False), # button_row
gr.update(visible=False), # save_button
gr.update(visible=False), # discard_button
gr.update(visible=False), # skip_button
gr.update(visible=False), # status_row
"" # session_status
)
# Register function
def register(name, email, password, country, dialect, city, gender, consent, state_dict):
email = email.lower()
if not all([name, email, password, country, dialect, city, gender]):
return state_dict, "Please fill in all fields", gr.update(visible=True)
if not consent:
return state_dict, "You must agree to the research data usage terms to register", gr.update(visible=True)
success, result = register_user(name, email, password, country, dialect, city, gender, consent)
if success:
return state_dict, f"Registration successful! Your username is: {result}. Please go to the Login tab.", gr.update(visible=True)
else:
return state_dict, result, gr.update(visible=True)
# Logout function
def logout(state_dict):
state_dict["logged_in"] = False
state_dict["username"] = ""
state_dict["dialect_code"] = ""
state_dict["sentences"] = []
state_dict["current_index"] = 0
state_dict["completed_sentences"] = []
state_dict["recorded_sentences"] = []
state_dict["total_recording_duration"] = 0.0
state_dict["current_sentence_text"] = ""
state_dict["current_sentence_id"] = ""
state_dict["progress"] = ""
state_dict["last_save_time"] = 0
return (
state_dict,
gr.update(visible=True), # login_title
gr.update(visible=True), # login_email
gr.update(visible=True), # login_password
gr.update(visible=True), # login_button
gr.update(visible=True), # login_message
gr.update(visible=True), # register_name
gr.update(visible=True), # register_email
gr.update(visible=True), # register_password
gr.update(visible=True), # register_country
gr.update(visible=True), # register_dialect
gr.update(visible=True), # register_city
gr.update(visible=True), # register_gender
gr.update(visible=True), # register_consent
gr.update(visible=True), # consent_details
gr.update(visible=True), # register_button
gr.update(visible=True), # register_message
gr.update(visible=False), # main_title
gr.update(visible=False), # user_row
"", # username_display
gr.update(visible=False), # logout_button
gr.update(visible=False), # instructions_accordion
"", # instructions_display
gr.update(visible=False), # recording_row
"", # current_sentence_display
"", # sentence_id_display
"", # progress_display
gr.update(visible=False), # audio_row
gr.update(visible=False), # audio_recorder
gr.update(visible=False), # button_row
gr.update(visible=False), # save_button
gr.update(visible=False), # discard_button
gr.update(visible=False), # skip_button
gr.update(visible=False), # status_row
"" # session_status
)
# Save recording function
def save_recording_handler(audio_path, state_dict, edited_sentence):
if state_dict.get("is_processing", False):
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
"⏳ Processing previous recording...",
gr.update(value=None)
)
current_time = time.time()
if current_time - state_dict["last_save_time"] < 3:
print("Ignoring rapid save request due to debounce")
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
"⏳ Please wait before saving again...",
gr.update(value=None)
)
state_dict["is_processing"] = True
state_dict["last_save_time"] = current_time
try:
print(f"save_recording_handler called with audio_path={audio_path}")
if not audio_path:
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
"Please record audio before saving",
gr.update(value=None)
)
# ADD AUDIO QUALITY VALIDATION HERE
is_valid, validation_message = validate_audio_quality(audio_path)
if not is_valid:
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
f"❌ Audio quality issue: {validation_message}. Please record again.",
gr.update(value=None) # Clear the audio so user can record again
)
if not state_dict["sentences"]:
total = len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
"No sentences available",
gr.update(value=None)
)
current_index = state_dict["current_index"]
sentence_id, _ = state_dict["sentences"][current_index]
sentence_text = edited_sentence.strip() if edited_sentence.strip() else state_dict["current_sentence_text"]
success, duration = save_recording(state_dict["username"], audio_path, sentence_id, sentence_text)
if sentence_id not in state_dict["completed_sentences"]:
state_dict["completed_sentences"].append(sentence_id)
if sentence_id not in state_dict["recorded_sentences"]:
state_dict["recorded_sentences"].append(sentence_id)
state_dict["total_recording_duration"] += duration
# state_dict["username"]["id"] = USER_ID
# save_sessions(USER_ID, state_dict["username"])
update_user_session(USER_ID, state_dict["username"], {
"id": USER_ID,
"current_index": state_dict["current_index"],
"completed_sentences": state_dict["completed_sentences"],
"recorded_sentences": state_dict["recorded_sentences"],
"total_recording_duration": state_dict["total_recording_duration"]
})
# Refresh available sentences
state_dict["sentences"] = get_available_sentences(
state_dict["completed_sentences"],
state_dict["dialect_code"]
)
# print(f"Available sentences after reload: {state_dict['sentences']}")
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
if state_dict["sentences"]:
next_id, next_text = random.choice(state_dict["sentences"])
state_dict["current_sentence_text"] = next_text
state_dict["current_sentence_id"] = next_id
state_dict["current_index"] = next((i for i, s in enumerate(state_dict["sentences"]) if s[0] == next_id), 0)
status_message = "Recording saved successfully. Ready to record next sentence" if success else "Recording saved locally but failed to upload to S3. Moving to next sentence."
else:
state_dict["current_sentence_text"] = ""
state_dict["current_sentence_id"] = ""
state_dict["current_index"] = 0
next_id, next_text = "", ""
status_message = "Recording saved, no more sentences available" if success else "Recording saved locally but failed to upload to S3. No more sentences available."
return (
state_dict,
next_text,
next_id,
progress,
status_message,
gr.update(value=None)
)
finally:
state_dict["is_processing"] = False
# Discard recording function
def discard_recording(state_dict):
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
"Recording discarded. Please record again.",
gr.update(value=None)
)
# Skip sentence function
def skip_sentence(state_dict):
if not state_dict["sentences"]:
total = len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
return (
state_dict,
state_dict["current_sentence_text"],
state_dict["current_sentence_id"],
progress,
"No sentences available",
gr.update(value=None)
)
current_index = state_dict["current_index"]
current_id, _ = state_dict["sentences"][current_index]
if current_id not in state_dict["completed_sentences"]:
state_dict["completed_sentences"].append(current_id)
state_dict["sentences"] = get_available_sentences(
state_dict["completed_sentences"],
state_dict["dialect_code"]
)
total = len(state_dict["sentences"]) if state_dict["sentences"] else len(load_sentences())
completed = len(state_dict["recorded_sentences"])
duration = state_dict["total_recording_duration"]
minutes = int(duration // 60)
seconds = int(duration % 60)
progress = f"{completed} sentences recorded, {minutes} minutes and {seconds} seconds total"
if state_dict["sentences"]:
next_id, next_text = random.choice(state_dict["sentences"])
state_dict["current_sentence_text"] = next_text
state_dict["current_sentence_id"] = next_id
state_dict["current_index"] = next((i for i, s in enumerate(state_dict["sentences"]) if s[0] == next_id), 0)
else:
next_id, next_text = "", ""
state_dict["current_sentence_text"] = ""
state_dict["current_sentence_id"] = ""
state_dict["current_index"] = 0
# state_dict["username"]["id"] = USER_ID
# print(state_dict["username"])
# save_sessions(USER_ID, state_dict["username"])
print(USER_ID)
update_user_session(USER_ID, state_dict["username"], {
"id": USER_ID,
"current_index": state_dict["current_index"],
"completed_sentences": state_dict["completed_sentences"],
"recorded_sentences": state_dict["recorded_sentences"],
"total_recording_duration": state_dict["total_recording_duration"]
})
return (
state_dict,
next_text,
next_id,
progress,
"Sentence skipped. Ready to record next sentence.",
gr.update(value=None)
)
# Connect event handlers
register_country.change(
update_dialects,
inputs=[register_country],
outputs=[register_dialect]
)
login_button.click(
login,
inputs=[login_email, login_password, state],
outputs=[
state, login_title, login_email, login_password, login_button, login_message,
register_name, register_email, register_password, register_country, register_dialect,
register_city, register_gender, register_consent, consent_details, register_button,
register_message, main_title, user_row, username_display, logout_button,
instructions_accordion, instructions_display, recording_row, current_sentence_display,
sentence_id_display, progress_display, audio_row, audio_recorder, button_row, save_button,
discard_button, skip_button, status_row, session_status
]
)
register_button.click(
register,
inputs=[register_name, register_email, register_password, register_country, register_dialect,
register_city, register_gender, register_consent, state],
outputs=[state, register_message, consent_details]
)
logout_button.click(
logout,
inputs=[state],
outputs=[
state, login_title, login_email, login_password, login_button, login_message,
register_name, register_email, register_password, register_country, register_dialect,
register_city, register_gender, register_consent, consent_details, register_button,
register_message, main_title, user_row, username_display, logout_button,
instructions_accordion, instructions_display, recording_row, current_sentence_display,
sentence_id_display, progress_display, audio_row, audio_recorder, button_row, save_button,
discard_button, skip_button, status_row, session_status
]
)
save_button.click(
save_recording_handler,
inputs=[audio_recorder, state, current_sentence_display],
outputs=[
state,
current_sentence_display,
sentence_id_display,
progress_display,
session_status,
audio_recorder
]
)
discard_button.click(
discard_recording,
inputs=[state],
outputs=[state, current_sentence_display, sentence_id_display, progress_display, session_status, audio_recorder]
)
skip_button.click(
skip_sentence,
inputs=[state],
outputs=[
state,
current_sentence_display,
sentence_id_display,
progress_display,
session_status,
audio_recorder
]
)
# Initialize dialects for default country on startup
app.load(
lambda: update_dialects("Saudi Arabia"),
outputs=[register_dialect]
)
return app
if __name__ == "__main__":
port = int(os.environ.get("GRADIO_SERVER_PORT", 7860))
app = create_app()
app.launch(server_name="0.0.0.0", server_port=port, share=False)