Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from fastapi import FastAPI, HTTPException, Request | |
| from huggingface_hub import HfApi, login | |
| app = FastAPI() | |
| KEY = os.environ.get("WEBHOOK_SECRET") | |
| HF_READ = os.environ.get("HF_READ") | |
| api = HfApi(token=HF_READ) | |
| login(HF_READ) | |
| LATEST_PAYLOAD = "latest_payload.json" | |
| def greet_json(): | |
| try: | |
| with open(LATEST_PAYLOAD, "r") as file: | |
| payload = json.load(file) | |
| except FileNotFoundError: | |
| payload = {"message": "No payload received yet"} | |
| except json.JSONDecodeError: | |
| payload = {"message": "Error reading payload file"} | |
| return payload | |
| async def handle_webhook(request: Request): | |
| secret = request.headers.get("X-Webhook-Secret") | |
| if secret != KEY: | |
| raise HTTPException(status_code=403, detail="Invalid webhook secret") | |
| try: | |
| payload = await request.json() | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON payload") | |
| with open(LATEST_PAYLOAD, "w") as file: | |
| json.dump(payload, file) | |
| return {"status": "success", "received": payload} | |