Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| import os | |
| os.environ["HF_HOME"] = "/tmp/hf_cache" | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| os.makedirs("/tmp/hf_cache", exist_ok=True) | |
| from huggingface_hub import whoami | |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
| from fastapi import HTTPException | |
| import spaces | |
| from fastapi import FastAPI, Query | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| import io | |
| import requests | |
| from fastapi import BackgroundTasks | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pathlib import Path | |
| from pathlib import Path | |
| from flux_train_ui import start_training | |
| import uuid | |
| import shutil | |
| import json | |
| import os | |
| import os | |
| import os | |
| import zipfile | |
| import tempfile # ✅ Add this! | |
| import yaml | |
| sys.path.insert(0, os.getcwd()) | |
| import gradio as gr | |
| from PIL import Image | |
| import torch | |
| import uuid | |
| import os | |
| import shutil | |
| import json | |
| import yaml | |
| from slugify import slugify | |
| sys.path.insert(0, "ai-toolkit") | |
| from toolkit.job import get_job | |
| app = FastAPI() | |
| # CORS setup to allow requests from your frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Replace "*" with your frontend domain in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def health_check(): | |
| return {"status": "✅ FastAPI running on Hugging Face Spaces!"} | |
| REPO_ID = "rahul7star/ohamlab" | |
| FOLDER = "demo" | |
| BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" | |
| #show all images in a DIR at UI FE | |
| def list_images(): | |
| try: | |
| all_files = list_repo_files(REPO_ID) | |
| folder_prefix = FOLDER.rstrip("/") + "/" | |
| files_in_folder = [ | |
| f for f in all_files | |
| if f.startswith(folder_prefix) | |
| and "/" not in f[len(folder_prefix):] # no subfolder files | |
| and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| ] | |
| urls = [BASE_URL + f for f in files_in_folder] | |
| return {"images": urls} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| from datetime import datetime | |
| import tempfile | |
| import uuid | |
| # upload zip from UI | |
| async def upload_zip(file: UploadFile = File(...)): | |
| if not file.filename.endswith(".zip"): | |
| return {"error": "Please upload a .zip file"} | |
| # Save the ZIP to /tmp | |
| temp_zip_path = f"/tmp/{file.filename}" | |
| with open(temp_zip_path, "wb") as f: | |
| f.write(await file.read()) | |
| # Create a unique subfolder name inside 'demo/' | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| unique_id = uuid.uuid4().hex[:6] | |
| folder_name = f"upload_{timestamp}_{unique_id}" | |
| hf_folder_prefix = f"demo/{folder_name}" | |
| try: | |
| with tempfile.TemporaryDirectory() as extract_dir: | |
| # Extract zip | |
| with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| uploaded_files = [] | |
| # Upload all extracted files | |
| for root_dir, _, files in os.walk(extract_dir): | |
| for name in files: | |
| file_path = os.path.join(root_dir, name) | |
| relative_path = os.path.relpath(file_path, extract_dir) | |
| repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") | |
| upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=repo_path, | |
| repo_id="rahul7star/ohamlab", | |
| repo_type="model", | |
| commit_message=f"Upload {relative_path} to {folder_name}", | |
| token=True, | |
| ) | |
| uploaded_files.append(repo_path) | |
| return { | |
| "message": f"✅ Uploaded {len(uploaded_files)} files", | |
| "folder": folder_name, | |
| "files": uploaded_files, | |
| } | |
| except Exception as e: | |
| return {"error": f"❌ Failed to process zip: {str(e)}"} | |
| # upload a single file from UI | |
| from typing import List | |
| from fastapi import UploadFile, File, APIRouter | |
| import os | |
| from fastapi import UploadFile, File, APIRouter | |
| from typing import List | |
| from datetime import datetime | |
| import uuid, os | |
| async def upload_images( | |
| background_tasks: BackgroundTasks, | |
| files: List[UploadFile] = File(...) | |
| ): | |
| # Step 1: Generate dynamic folder name | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| unique_id = uuid.uuid4().hex[:6] | |
| folder_name = f"upload_{timestamp}_{unique_id}" | |
| hf_folder_prefix = f"demo/{folder_name}" | |
| responses = [] | |
| # Step 2: Save and upload each image | |
| for file in files: | |
| filename = file.filename | |
| contents = await file.read() | |
| temp_path = f"/tmp/{filename}" | |
| with open(temp_path, "wb") as f: | |
| f.write(contents) | |
| try: | |
| upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=f"{hf_folder_prefix}/{filename}", | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload {filename} to {hf_folder_prefix}", | |
| token=True, | |
| ) | |
| responses.append({ | |
| "filename": filename, | |
| "status": "✅ uploaded", | |
| "path": f"{hf_folder_prefix}/{filename}" | |
| }) | |
| except Exception as e: | |
| responses.append({ | |
| "filename": filename, | |
| "status": f"❌ failed: {str(e)}" | |
| }) | |
| os.remove(temp_path) | |
| # Step 3: Add filter job to background | |
| def run_filter(): | |
| try: | |
| result = filter_and_rename_images(folder=hf_folder_prefix) | |
| print(f"🧼 Filter result: {result}") | |
| except Exception as e: | |
| print(f"❌ Filter failed: {str(e)}") | |
| background_tasks.add_task(run_filter) | |
| return { | |
| "message": f"{len(files)} file(s) uploaded", | |
| "upload_folder": hf_folder_prefix, | |
| "results": responses, | |
| "note": "Filtering started in background" | |
| } | |
| #Tranining Data set start fitering data for traninig | |
| T_REPO_ID = "rahul7star/ohamlab" | |
| DESCRIPTION_TEXT = ( | |
| "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " | |
| "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." | |
| ) | |
| def is_image_file(filename: str) -> bool: | |
| return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): | |
| try: | |
| all_files = list_repo_files(T_REPO_ID) | |
| folder_prefix = folder.rstrip("/") + "/" | |
| filter_folder = f"filter-{folder.rstrip('/')}" | |
| filter_prefix = filter_folder + "/" | |
| # Filter images only directly in the folder (no subfolders) | |
| image_files = [ | |
| f for f in all_files | |
| if f.startswith(folder_prefix) | |
| and "/" not in f[len(folder_prefix):] # no deeper path | |
| and is_image_file(f) | |
| ] | |
| if not image_files: | |
| return {"error": f"No images found in folder '{folder}'"} | |
| uploaded_files = [] | |
| for idx, orig_path in enumerate(image_files, start=1): | |
| # Download image content bytes (uses local cache) | |
| local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) | |
| with open(local_path, "rb") as f: | |
| file_bytes = f.read() | |
| # Rename images as image1.jpeg, image2.jpeg, ... | |
| new_image_name = f"image{idx}.jpeg" | |
| # Upload renamed image from memory | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(file_bytes), | |
| path_in_repo=filter_prefix + new_image_name, | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", | |
| token=True, | |
| ) | |
| uploaded_files.append(filter_prefix + new_image_name) | |
| # Create and upload text file for each image | |
| txt_filename = f"image{idx}.txt" | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), | |
| path_in_repo=filter_prefix + txt_filename, | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload text file {txt_filename} to {filter_folder}", | |
| token=True, | |
| ) | |
| uploaded_files.append(filter_prefix + txt_filename) | |
| return { | |
| "message": f"Processed and uploaded {len(image_files)} images and text files.", | |
| "files": uploaded_files, | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # Test call another space and send the payload | |
| def call_other_space(): | |
| try: | |
| payload = {"input": "Start training from external trigger"} | |
| res = requests.post( | |
| "https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger", | |
| json=payload, | |
| timeout=30, | |
| ) | |
| # ✅ check if response has content and is JSON | |
| try: | |
| data = res.json() | |
| except ValueError: | |
| return { | |
| "error": f"Invalid JSON response. Status: {res.status_code}", | |
| "text": res.text | |
| } | |
| return data | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # ========== TRAIN CONFIGURATION ========== | |
| ##checking model sample | |
| import os | |
| import uuid | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| from fastapi.responses import JSONResponse | |
| from huggingface_hub import snapshot_download | |
| # Constants | |
| REPO_ID = "rahul7star/ohamlab" | |
| FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" | |
| CONCEPT_SENTENCE = "ohamlab style" | |
| LORA_NAME = "ohami_filter_autorun" | |
| def fetch_images_and_generate_captions(): | |
| # Create a unique local directory | |
| local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") | |
| os.makedirs(local_dir, exist_ok=True) | |
| # Download all files from the dataset repo | |
| snapshot_path = snapshot_download( | |
| repo_id=REPO_ID, | |
| repo_type="model", | |
| local_dir=local_dir, | |
| local_dir_use_symlinks=False, | |
| allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder | |
| ) | |
| # Resolve image path relative to downloaded snapshot | |
| image_dir = Path(snapshot_path) / FOLDER_IN_REPO | |
| image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) | |
| if not image_paths: | |
| return JSONResponse(status_code=400, content={"error": "No images found in the HF repo folder."}) | |
| captions = [ | |
| f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths | |
| ] | |
| return { | |
| "local_dir": str(image_dir), | |
| "images": [str(p) for p in image_paths], | |
| "captions": captions | |
| } | |
| REPO_ID = "rahul7star/ohamlab" | |
| FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" | |
| CONCEPT_SENTENCE = "ohamlab style" | |
| LORA_NAME = "ohami_filter_autorun" | |
| # ========== FASTAPI APP ========== | |
| # ========== HELPERS ========== | |
| def create_dataset(images, *captions): | |
| if len(images) != len(captions): | |
| raise ValueError("Number of images and captions must be the same.") | |
| destination_folder = Path(f"/tmp/datasets_{uuid.uuid4()}") | |
| destination_folder.mkdir(parents=True, exist_ok=True) | |
| jsonl_file_path = destination_folder / "metadata.jsonl" | |
| with jsonl_file_path.open("a", encoding="utf-8") as jsonl_file: | |
| for image_path, caption in zip(images, captions): | |
| new_image_path = shutil.copy(str(image_path), destination_folder) | |
| file_name = Path(new_image_path).name | |
| entry = {"file_name": file_name, "prompt": caption} | |
| jsonl_file.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
| return str(destination_folder) | |
| def recursive_update(d, u): | |
| for k, v in u.items(): | |
| if isinstance(v, dict) and v: | |
| d[k] = recursive_update(d.get(k, {}), v) | |
| else: | |
| d[k] = v | |
| return d | |
| def start_training0( | |
| lora_name, | |
| concept_sentence, | |
| steps, | |
| lr, | |
| rank, | |
| model_to_train, | |
| low_vram, | |
| dataset_folder, | |
| sample_1, | |
| sample_2, | |
| sample_3, | |
| use_more_advanced_options, | |
| more_advanced_options, | |
| ): | |
| try: | |
| user = whoami() | |
| username = user.get("name", "anonymous") | |
| push_to_hub = True | |
| except: | |
| username = "anonymous" | |
| push_to_hub = False | |
| slugged_lora_name = lora_name.replace(" ", "_").lower() | |
| print(username) | |
| # Load base config | |
| config = { | |
| "job": "extension", | |
| "config": { | |
| "name": slugged_lora_name, | |
| "process": [ | |
| { "type":"sd_trainer", | |
| "model": { | |
| "low_vram": low_vram, | |
| "is_flux": True, | |
| "quantize": True, | |
| "name_or_path": "black-forest-labs/FLUX.1-dev" | |
| }, | |
| "network": { | |
| "linear": rank, | |
| "linear_alpha": rank, | |
| "type": "lora" | |
| }, | |
| "train": { | |
| "steps": steps, | |
| "lr": lr, | |
| "skip_first_sample": True, | |
| "batch_size": 1, | |
| "dtype": "bf16", | |
| "gradient_accumulation_steps": 1, | |
| "gradient_checkpointing": True, | |
| "noise_scheduler": "flowmatch", | |
| "optimizer": "adamw8bit", | |
| "ema_config": { | |
| "use_ema": True, | |
| "ema_decay": 0.99 | |
| } | |
| }, | |
| "datasets": [ | |
| {"folder_path": dataset_folder} | |
| ], | |
| "save": { | |
| "dtype": "float16", | |
| "save_every": 10000, | |
| "push_to_hub": push_to_hub, | |
| "hf_repo_id": f"{username}/{slugged_lora_name}", | |
| "hf_private": True, | |
| "max_step_saves_to_keep": 4 | |
| }, | |
| "sample": { | |
| "guidance_scale": 3.5, | |
| "sample_every": steps, | |
| "sample_steps": 28, | |
| "width": 1024, | |
| "height": 1024, | |
| "walk_seed": True, | |
| "seed": 42, | |
| "sampler": "flowmatch", | |
| "prompts": [p for p in [sample_1, sample_2, sample_3] if p] | |
| }, | |
| "trigger_word": concept_sentence | |
| } | |
| ] | |
| } | |
| } | |
| # Apply advanced YAML overrides if any | |
| # if use_more_advanced_options and more_advanced_options: | |
| # advanced_config = yaml.safe_load(more_advanced_options) | |
| # config["config"]["process"][0] = recursive_update(config["config"]["process"][0], advanced_config) | |
| # Save YAML config | |
| os.makedirs("/tmp/tmp_configs", exist_ok=True) | |
| config_path = f"/tmp/tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" | |
| with open(config_path, "w") as f: | |
| yaml.dump(config, f) | |
| print(config_path) | |
| # Simulate training | |
| job = get_job(config_path) | |
| job.run() | |
| job.cleanup() | |
| print(f"[INFO] Starting training with config: {config_path}") | |
| print(json.dumps(config, indent=2)) | |
| return f"Training started successfully with config: {config_path}" | |
| # ========== MAIN ENDPOINT ========== | |
| def auto_run_lora_from_repo(): | |
| try: | |
| # ✅ Static or dynamic config | |
| REPO_ID = "rahul7star/ohamlab" | |
| FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" | |
| CONCEPT_SENTENCE = "ohamlab style" | |
| LORA_NAME = "ohami_filter_autorun" | |
| # ✅ Setup HF cache | |
| os.environ["HF_HOME"] = "/tmp/hf_cache" | |
| os.makedirs("/tmp/hf_cache", exist_ok=True) | |
| # ✅ Download dataset from HF | |
| local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") | |
| os.makedirs(local_dir, exist_ok=True) | |
| snapshot_path = snapshot_download( | |
| repo_id=REPO_ID, | |
| repo_type="model", | |
| local_dir=local_dir, | |
| local_dir_use_symlinks=False, | |
| allow_patterns=[f"{FOLDER_IN_REPO}/*"], # only files inside the subfolder | |
| ) | |
| image_dir = local_dir / FOLDER_IN_REPO | |
| image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) | |
| if not image_paths: | |
| raise HTTPException(status_code=400, detail="No images found in the Hugging Face folder.") | |
| # ✅ Auto-generate captions | |
| captions = [ | |
| f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths | |
| ] | |
| # ✅ Create dataset folder with metadata.jsonl | |
| dataset_folder = os.path.join("/tmp", f"datasets_{uuid.uuid4()}") | |
| os.makedirs(dataset_folder, exist_ok=True) | |
| print('DATA SET iS CREATED =================================================') | |
| jsonl_file_path = os.path.join(dataset_folder, "metadata.jsonl") | |
| with open(jsonl_file_path, "a") as jsonl_file: | |
| for index, image in enumerate(image_paths): | |
| new_image_path = shutil.copy(str(image), dataset_folder) | |
| file_name = os.path.basename(new_image_path) | |
| data = {"file_name": file_name, "prompt": captions[index]} | |
| jsonl_file.write(json.dumps(data) + "\n") | |
| # ✅ Optional advanced config | |
| slugged_lora_name = LORA_NAME.replace(" ", "_") | |
| os.makedirs("/tmp/tmp_configs", exist_ok=True) | |
| config_path = f"/tmp/tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" | |
| config = { | |
| "sample_1": "a stylish anime character with ohamlab style", | |
| "sample_2": "a cartoon car in ohamlab style", | |
| "sample_3": "portrait in ohamlab lighting" | |
| } | |
| with open(config_path, "w") as f: | |
| yaml.dump(config, f) | |
| # ✅ Final call to train | |
| print(f" slugged_lora{ slugged_lora_name}") | |
| print('Now Start Trainng Set called all data si rADYU =================================================') | |
| result = start_training( | |
| lora_name=LORA_NAME, | |
| concept_sentence=CONCEPT_SENTENCE, | |
| steps=45, | |
| lr=1e-4, | |
| rank=32, | |
| model_to_train="flux", | |
| low_vram=True, | |
| dataset_folder=dataset_folder, | |
| sample_1=config["sample_1"], | |
| sample_2=config["sample_2"], | |
| sample_3=config["sample_3"], | |
| use_more_advanced_options=True, | |
| more_advanced_options=config_path | |
| ) | |
| return JSONResponse(content={"status": "success", "message": result}) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
