Spaces:
Running
on
Zero
Running
on
Zero
| # logging.py | |
| import os | |
| import uuid | |
| import time | |
| from huggingface_hub import CommitScheduler, HfApi | |
| from PIL import Image | |
| import numpy as np | |
| from diffusers.utils import load_image | |
| APP_VERSION = "0_4" | |
| HF_DATASET_REPO = "LPX55/upscaler_logs" # Change to your dataset repo | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_TOKEN") # Make sure this is set in your environment | |
| LOG_DIR = "logs_" + APP_VERSION | |
| IMAGE_DIR = os.path.join(LOG_DIR, "upscaler") | |
| LOG_FILE = os.path.join(LOG_DIR, f"{int(time.time())}-logs.csv") | |
| api = HfApi(token=HF_TOKEN) | |
| scheduler = CommitScheduler( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| folder_path=LOG_DIR, | |
| every=5, | |
| private=True, | |
| token=HF_TOKEN, | |
| path_in_repo="v" + APP_VERSION | |
| ) | |
| # def cache_temp(img_id): | |
| # api.upload_file( | |
| # path_or_fileobj=os.path.join("/tmp/gradio", f"{img_id}"), | |
| # path_in_repo="/v" + APP_VERSION + "/" + img_id, | |
| # repo_id=HF_DATASET_REPO, | |
| # repo_type="dataset", | |
| # token=HF_TOKEN, | |
| # ) | |
| def save_image(image_id, image_path: Image.Image) -> None: | |
| os.makedirs(IMAGE_DIR, exist_ok=True) | |
| print("Image ID: " + image_id) | |
| print("Image ID Type: " + str(type(image_id))) | |
| save_image_path = os.path.join(IMAGE_DIR, f"{image_id}") | |
| print("Save image path: " + save_image_path) | |
| try: | |
| loaded = load_image(image_path) | |
| cache_file = "/tmp/gradio/" + str(uuid.uuid4()) + ".png" | |
| loaded.save(cache_file, "PNG") | |
| print("Loaded Type: " + str(type(loaded))) | |
| with scheduler.lock: | |
| try: | |
| # cache_temp(cache_file) | |
| #print("Cache path: " + cache_file) | |
| #print("Type: " + str(type(cache_file))) | |
| img2 = Image.open(cache_file) | |
| img2.save(save_image_path) | |
| print(f"Saved image: {save_image_path}") | |
| except Exception as e: | |
| print(f"Error saving image: {str(e)}") | |
| except Exception as e: | |
| print(f"Error loading image: {str(e)}") | |
| def log_params( | |
| prompt, scale, steps, controlnet_conditioning_scale, guidance_scale, seed, guidance_end, | |
| before_image, after_image, user=None | |
| ): | |
| before_id = str(uuid.uuid4()) + "_before.png" | |
| after_id = str(uuid.uuid4()) + "_after.png" | |
| before_path = os.path.join(IMAGE_DIR, before_id) | |
| after_path = os.path.join(IMAGE_DIR, after_id) | |
| print("Type before: " + str(type(before_image))) | |
| print("Type after: " + str(type(after_image))) | |
| save_image(before_id, before_image) | |
| save_image(after_id, after_image) | |
| #print("Before path: " + before_path) | |
| #print("After path: " + after_path) | |
| is_new = not os.path.exists(LOG_FILE) | |
| with open(LOG_FILE, "a", newline='') as f: | |
| import csv | |
| writer = csv.writer(f) | |
| if is_new: | |
| writer.writerow([ | |
| "timestamp", "user", "prompt", "scale", "steps", "controlnet_conditioning_scale", | |
| "guidance_scale", "seed", "guidance_end", "before_image", "after_image" | |
| ]) | |
| writer.writerow([ | |
| time.strftime("%Y-%m-%dT%H:%M:%S"), | |
| user or "anonymous", | |
| prompt, scale, steps, controlnet_conditioning_scale, | |
| guidance_scale, seed, guidance_end, before_path, after_path | |
| ]) | |