Spaces:
Sleeping
Sleeping
| import os, json, time, threading, logging | |
| from datetime import datetime | |
| from typing import List, Tuple | |
| try: | |
| import boto3 | |
| from botocore.exceptions import ClientError, NoCredentialsError | |
| except Exception: | |
| boto3 = None | |
| ClientError = NoCredentialsError = Exception | |
| LOG_FILE = os.getenv("CONVO_LOG_FILE", "conversation_history.jsonl") | |
| UPLOAD_ENABLED = os.getenv("SPACES_UPLOAD_CONVO", "true").lower() == "true" | |
| SPACES_KEY = os.getenv("SPACES_KEY") | |
| SPACES_SECRET = os.getenv("SPACES_SECRET") | |
| SPACES_BUCKET = os.getenv("SPACES_BUCKET") | |
| SPACES_REGION = os.getenv("SPACES_REGION", "ams3") | |
| _lock = threading.Lock() | |
| def load_history(max_lines: int = 500) -> List[Tuple[str,str]]: | |
| if not os.path.exists(LOG_FILE): | |
| return [] | |
| pairs: List[Tuple[str,str]] = [] | |
| try: | |
| with open(LOG_FILE, "r", encoding="utf-8") as f: | |
| for line in f.readlines()[-max_lines:]: | |
| try: | |
| obj = json.loads(line) | |
| if obj.get("role") == "exchange": | |
| pairs.append((obj.get("user",""), obj.get("assistant",""))) | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logging.error(f"Failed to load history: {e}") | |
| return pairs | |
| def _write_line(obj: dict): | |
| with open(LOG_FILE, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(obj, ensure_ascii=False) + "\n") | |
| def _upload_file(): | |
| if not (UPLOAD_ENABLED and boto3 and SPACES_KEY and SPACES_SECRET and SPACES_BUCKET): | |
| return | |
| try: | |
| session = boto3.session.Session() | |
| client = session.client( | |
| 's3', | |
| region_name=SPACES_REGION, | |
| endpoint_url=f"https://{SPACES_REGION}.digitaloceanspaces.com", | |
| aws_access_key_id=SPACES_KEY, | |
| aws_secret_access_key=SPACES_SECRET, | |
| ) | |
| object_name = os.getenv("SPACES_CONVO_OBJECT", f"chat-logs/{os.path.basename(LOG_FILE)}") | |
| client.upload_file(LOG_FILE, SPACES_BUCKET, object_name) | |
| except (ClientError, NoCredentialsError) as e: | |
| logging.error(f"Spaces upload failed: {e}") | |
| except Exception as e: | |
| logging.error(f"Unexpected upload error: {e}") | |
| def log_exchange(user_msg: str, assistant_msg: str, meta: dict = None): | |
| ts = time.time() | |
| record = { | |
| "role": "exchange", | |
| "timestamp": datetime.utcfromtimestamp(ts).isoformat() + "Z", | |
| "user": user_msg, | |
| "assistant": assistant_msg, | |
| "meta": meta or {} | |
| } | |
| with _lock: | |
| _write_line(record) | |
| # Upload in background thread to avoid blocking UI | |
| threading.Thread(target=_upload_file, daemon=True).start() | |