import json import os import tempfile import time import uuid from pathlib import Path import datasets import pandas as pd from datasets import Image from huggingface_hub import repo_exists, CommitScheduler def update_dataset_with_new_splits(new_splits: dict, process_name: str = "Main"): """ Add new splits to a regular HuggingFace dataset without downloading existing data. This function pushes individual splits to the Hub using the split parameter, which preserves all existing splits and only adds/updates the specified ones. Key Features: - No downloading of existing dataset required - Existing splits are preserved (not overwritten) - Each split is pushed individually using dataset.push_to_hub(split="name") - Efficient for large datasets with many splits Args: new_splits: dict of {split_name: DataFrame} - splits to add/update process_name: Name for logging and commit messages Example: new_splits = { "validation_2024": val_df, "test_batch_1": test_df, "custom_split": custom_df } update_dataset_with_new_splits(new_splits) """ repo_id = os.environ["REPO_ID"] hf_token = os.environ["HF_TOKEN"] print(f"\n[{process_name}] Starting dataset splits update process...") # --- Start of Critical Section --- if not repo_exists(repo_id, repo_type="dataset", token=hf_token): print(f"[{process_name}] Repository {repo_id} not found. Cannot update.") return # Skip downloading existing dataset - we'll push only new splits print(f"[{process_name}] Preparing to push {len(new_splits)} new splits individually...") # Prepare each split for individual pushing splits_to_push = [] for split_id, df in new_splits.items(): new_split_dataset = datasets.Dataset.from_pandas(df) splits_to_push.append((split_id, new_split_dataset)) print(f"[{process_name}] Prepared split '{split_id}' with {len(new_split_dataset)} entries.") # Push individual splits to Hub with Retry Mechanism _push_splits_to_hub(splits_to_push, repo_id, hf_token, process_name) print(f"[{process_name}] Finished pushing new dataset splits to Hub.") def update_dataset_with_new_images(image_df: pd.DataFrame, process_name: str = "Main", scheduler: CommitScheduler=None, dataset_dir: Path=None, jsonl_path: Path=None): """ Add new images to an image HuggingFace dataset using smart approach: - If dataset is empty/doesn't exist: Create proper HuggingFace dataset - If dataset has data: Use CommitScheduler for efficient incremental updates Key Features: - Automatically detects empty datasets and bootstraps them - Uses CommitScheduler for incremental updates on existing datasets - Saves images as PNG files with unique names - Stores metadata in JSONL format for file-based approach - Thread-safe with scheduler locking Args: image_df: DataFrame with 'id' and 'image' columns (image should be PIL Image objects) process_name: Name for logging and commit messages Example: img_df = pd.DataFrame([{"id": "img1", "image": pil_image}]) update_dataset_with_new_images(img_df) """ print(f"\n[{process_name}] Starting image dataset update...") # Validate input format for image datasets if not hasattr(image_df, 'columns'): raise ValueError(f"image_df must be a pandas DataFrame with 'id' and 'image' columns, got {type(image_df)}") # Validate required columns required_columns = ['id', 'image', 'annotated_image'] missing_columns = [col for col in required_columns if col not in image_df.columns] if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}. Found columns: {list(image_df.columns)}") print(f"[{process_name}] Validated DataFrame with {len(image_df)} entries and columns: {list(image_df.columns)}") return _append_images_with_scheduler(image_df, process_name, scheduler, dataset_dir, jsonl_path) def _append_images_with_scheduler(image_df: pd.DataFrame, process_name: str, scheduler, dataset_dir, jsonl_path): """ Append images to existing dataset using CommitScheduler for efficient incremental updates. """ print(f"[{process_name}] Using CommitScheduler for incremental updates...") print(f"[IMAGE_SCHEDULER] Created CommitScheduler for {os.environ['IMAGE_REPO_ID']} with local path: {dataset_dir}") # Process each image saved_count = 0 failed_count = 0 for idx, row in image_df.iterrows(): try: image_id = row['id'] image = row['image'] annotated_image = row['annotated_image'] # Optional # Skip if image is None if image is None: print(f"[{process_name}] Skipping image {image_id}: image is None") failed_count += 1 continue if annotated_image is None: print(f"[{process_name}] Warning: annotated_image is None for {image_id}") failed_count += 1 continue # Generate unique filename unique_filename_orig = f"{uuid.uuid4()}_orig.png" unique_filename_ann = f"{uuid.uuid4()}_ann.png" image_path_orig = dataset_dir / unique_filename_orig image_path_ann = dataset_dir / unique_filename_ann # Save image and metadata with scheduler with scheduler.lock: # Save image file if hasattr(image, 'save'): # PIL Image object image.save(image_path_orig, format='PNG') elif hasattr(image, 'shape'): # Numpy array from PIL import Image as PILImage PILImage.fromarray(image).save(image_path_orig, format='PNG') else: print(f"[{process_name}] Warning: Unsupported image type for {image_id}: {type(image)}") failed_count += 1 continue # Save annotated image file if hasattr(annotated_image, 'save'): annotated_image.save(image_path_ann, format='PNG') elif hasattr(annotated_image, 'shape'): from PIL import Image as PILImage PILImage.fromarray(annotated_image).save(image_path_ann, format='PNG') else: print(f"[{process_name}] Warning: Unsupported annotated_image type for {image_id}: {type(annotated_image)}") failed_count += 1 continue # Append metadata to JSONL metadata = { "id": image_id, "file_name": unique_filename_orig, "annotated_file_name": unique_filename_ann } with jsonl_path.open("a", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False) f.write("\n") saved_count += 1 print(f"[{process_name}] Saved image {saved_count}/{len(image_df)}: {image_id} -> {unique_filename_orig}") except Exception as e: print(f"[{process_name}] Error processing image {image_id}: {e}") failed_count += 1 continue print(f"[{process_name}] Finished image dataset update:") print(f"[{process_name}] - Successfully saved: {saved_count} images") print(f"[{process_name}] - Failed: {failed_count} images") print(f"[{process_name}] - Images will be automatically committed to dataset repository") if saved_count == 0: print(f"[{process_name}] Warning: No images were successfully saved") return saved_count, failed_count def _push_splits_to_hub(splits_to_push: list, repo_id: str, hf_token: str, process_name: str): """ Helper function to push individual splits to Hub with retry mechanism. Args: splits_to_push: List of (split_name, dataset) tuples repo_id: HuggingFace repository ID hf_token: HuggingFace token process_name: Process name for logging """ max_retries = 5 successful_splits = [] failed_splits = [] for split_name, split_dataset in splits_to_push: print(f"[{process_name}] Pushing split '{split_name}' with {len(split_dataset)} entries...") for attempt in range(max_retries): try: print(f"[{process_name}] Pushing split '{split_name}' (Attempt {attempt + 1}/{max_retries})...") split_dataset.push_to_hub( repo_id=repo_id, split=split_name, # This preserves existing splits token=hf_token, commit_message=f"feat: Add split '{split_name}' from {process_name} with {len(split_dataset)} entries" ) print(f"[{process_name}] Split '{split_name}' pushed successfully on attempt {attempt + 1}.") successful_splits.append(split_name) break # Exit retry loop on success except Exception as e: print(f"[{process_name}] Split '{split_name}' push attempt {attempt + 1} failed: {e}") if attempt < max_retries - 1: wait_time = 5 print(f"[{process_name}] Waiting for {wait_time} seconds before retrying...") time.sleep(wait_time) else: print(f"[{process_name}] All {max_retries} push attempts failed for split '{split_name}'.") failed_splits.append(split_name) # Report results if successful_splits: print(f"[{process_name}] Successfully pushed splits: {successful_splits}") if failed_splits: print(f"[{process_name}] Failed to push splits: {failed_splits}")