Spaces:
Running
Running
| import os | |
| os.environ["HF_HOME"] = "/tmp/hf_cache" | |
| os.makedirs("/tmp/hf_cache", exist_ok=True) | |
| from fastapi import FastAPI, Query | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| import io | |
| import requests | |
| from fastapi import BackgroundTasks | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import os | |
| import os | |
| import zipfile | |
| import tempfile # ✅ Add this! | |
| app = FastAPI() | |
| # CORS setup to allow requests from your frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Replace "*" with your frontend domain in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def health_check(): | |
| return {"status": "✅ FastAPI running on Hugging Face Spaces!"} | |
| REPO_ID = "rahul7star/ohamlab" | |
| FOLDER = "demo" | |
| BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" | |
| #show all images in a DIR at UI FE | |
| def list_images(): | |
| try: | |
| all_files = list_repo_files(REPO_ID) | |
| folder_prefix = FOLDER.rstrip("/") + "/" | |
| files_in_folder = [ | |
| f for f in all_files | |
| if f.startswith(folder_prefix) | |
| and "/" not in f[len(folder_prefix):] # no subfolder files | |
| and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| ] | |
| urls = [BASE_URL + f for f in files_in_folder] | |
| return {"images": urls} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| from datetime import datetime | |
| import tempfile | |
| import uuid | |
| # upload zip from UI | |
| async def upload_zip(file: UploadFile = File(...)): | |
| if not file.filename.endswith(".zip"): | |
| return {"error": "Please upload a .zip file"} | |
| # Save the ZIP to /tmp | |
| temp_zip_path = f"/tmp/{file.filename}" | |
| with open(temp_zip_path, "wb") as f: | |
| f.write(await file.read()) | |
| # Create a unique subfolder name inside 'demo/' | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| unique_id = uuid.uuid4().hex[:6] | |
| folder_name = f"upload_{timestamp}_{unique_id}" | |
| hf_folder_prefix = f"demo/{folder_name}" | |
| try: | |
| with tempfile.TemporaryDirectory() as extract_dir: | |
| # Extract zip | |
| with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| uploaded_files = [] | |
| # Upload all extracted files | |
| for root_dir, _, files in os.walk(extract_dir): | |
| for name in files: | |
| file_path = os.path.join(root_dir, name) | |
| relative_path = os.path.relpath(file_path, extract_dir) | |
| repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") | |
| upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=repo_path, | |
| repo_id="rahul7star/ohamlab", | |
| repo_type="model", | |
| commit_message=f"Upload {relative_path} to {folder_name}", | |
| token=True, | |
| ) | |
| uploaded_files.append(repo_path) | |
| return { | |
| "message": f"✅ Uploaded {len(uploaded_files)} files", | |
| "folder": folder_name, | |
| "files": uploaded_files, | |
| } | |
| except Exception as e: | |
| return {"error": f"❌ Failed to process zip: {str(e)}"} | |
| # upload a single file from UI | |
| from typing import List | |
| from fastapi import UploadFile, File, APIRouter | |
| import os | |
| from fastapi import UploadFile, File, APIRouter | |
| from typing import List | |
| from datetime import datetime | |
| import uuid, os | |
| async def upload_images( | |
| background_tasks: BackgroundTasks, | |
| files: List[UploadFile] = File(...) | |
| ): | |
| # Step 1: Generate dynamic folder name | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| unique_id = uuid.uuid4().hex[:6] | |
| folder_name = f"upload_{timestamp}_{unique_id}" | |
| hf_folder_prefix = f"demo/{folder_name}" | |
| responses = [] | |
| # Step 2: Save and upload each image | |
| for file in files: | |
| filename = file.filename | |
| contents = await file.read() | |
| temp_path = f"/tmp/{filename}" | |
| with open(temp_path, "wb") as f: | |
| f.write(contents) | |
| try: | |
| upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=f"{hf_folder_prefix}/{filename}", | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload {filename} to {hf_folder_prefix}", | |
| token=True, | |
| ) | |
| responses.append({ | |
| "filename": filename, | |
| "status": "✅ uploaded", | |
| "path": f"{hf_folder_prefix}/{filename}" | |
| }) | |
| except Exception as e: | |
| responses.append({ | |
| "filename": filename, | |
| "status": f"❌ failed: {str(e)}" | |
| }) | |
| os.remove(temp_path) | |
| # Step 3: Add filter job to background | |
| def run_filter(): | |
| try: | |
| result = filter_and_rename_images(folder=hf_folder_prefix) | |
| print(f"🧼 Filter result: {result}") | |
| except Exception as e: | |
| print(f"❌ Filter failed: {str(e)}") | |
| background_tasks.add_task(run_filter) | |
| return { | |
| "message": f"{len(files)} file(s) uploaded", | |
| "upload_folder": hf_folder_prefix, | |
| "results": responses, | |
| "note": "Filtering started in background" | |
| } | |
| #Tranining Data set start fitering data for traninig | |
| T_REPO_ID = "rahul7star/ohamlab" | |
| DESCRIPTION_TEXT = ( | |
| "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " | |
| "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." | |
| ) | |
| def is_image_file(filename: str) -> bool: | |
| return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): | |
| try: | |
| all_files = list_repo_files(T_REPO_ID) | |
| folder_prefix = folder.rstrip("/") + "/" | |
| filter_folder = f"filter-{folder.rstrip('/')}" | |
| filter_prefix = filter_folder + "/" | |
| # Filter images only directly in the folder (no subfolders) | |
| image_files = [ | |
| f for f in all_files | |
| if f.startswith(folder_prefix) | |
| and "/" not in f[len(folder_prefix):] # no deeper path | |
| and is_image_file(f) | |
| ] | |
| if not image_files: | |
| return {"error": f"No images found in folder '{folder}'"} | |
| uploaded_files = [] | |
| for idx, orig_path in enumerate(image_files, start=1): | |
| # Download image content bytes (uses local cache) | |
| local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) | |
| with open(local_path, "rb") as f: | |
| file_bytes = f.read() | |
| # Rename images as image1.jpeg, image2.jpeg, ... | |
| new_image_name = f"image{idx}.jpeg" | |
| # Upload renamed image from memory | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(file_bytes), | |
| path_in_repo=filter_prefix + new_image_name, | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", | |
| token=True, | |
| ) | |
| uploaded_files.append(filter_prefix + new_image_name) | |
| # Create and upload text file for each image | |
| txt_filename = f"image{idx}.txt" | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), | |
| path_in_repo=filter_prefix + txt_filename, | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload text file {txt_filename} to {filter_folder}", | |
| token=True, | |
| ) | |
| uploaded_files.append(filter_prefix + txt_filename) | |
| return { | |
| "message": f"Processed and uploaded {len(image_files)} images and text files.", | |
| "files": uploaded_files, | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # Test call another space and send the payload | |
| def call_other_space(): | |
| try: | |
| payload = {"input": "Start training from external trigger"} | |
| res = requests.post( | |
| "https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger", | |
| json=payload, | |
| timeout=30, | |
| ) | |
| # ✅ check if response has content and is JSON | |
| try: | |
| data = res.json() | |
| except ValueError: | |
| return { | |
| "error": f"Invalid JSON response. Status: {res.status_code}", | |
| "text": res.text | |
| } | |
| return data | |
| except Exception as e: | |
| return {"error": str(e)} | |