#!/usr/bin/env python3 """ 권한 문제를 해결한 EXAONE Fine-tuning Space FastAPI 애플리케이션 """ import os import json import subprocess import asyncio from pathlib import Path from typing import Dict, Any import logging from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import StreamingResponse from pydantic import BaseModel import uvicorn # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="EXAONE Fine-tuning", description="EXAONE 4.0 1.2B 모델 파인튜닝 API", version="1.0.0" ) # 전역 변수 training_status = { "is_running": False, "progress": 0, "current_epoch": 0, "total_epochs": 3, "loss": 0.0, "status": "idle", "log_file": "/tmp/training.log" # 권한 문제 해결을 위해 /tmp 사용 } class TrainingRequest(BaseModel): model_name: str = "amis5895/exaone-1p2b-nutrition-kdri" @app.get("/") async def root(): """루트 엔드포인트""" return { "message": "EXAONE Fine-tuning API", "status": "running", "version": "1.0.0" } @app.post("/start_training") async def start_training(request: TrainingRequest, background_tasks: BackgroundTasks): """학습 시작""" global training_status if training_status["is_running"]: raise HTTPException(status_code=400, detail="Training is already running") training_status.update({ "is_running": True, "progress": 0, "current_epoch": 0, "status": "starting" }) # 백그라운드에서 학습 시작 background_tasks.add_task(run_real_training, request) return { "message": "Training started", "status": "starting", "model_name": request.model_name } async def run_real_training(request: TrainingRequest): """실제 AutoTrain을 사용한 학습 실행""" global training_status try: logger.info("Starting real AutoTrain training process...") training_status["status"] = "running" # 데이터 파일 확인 train_file = Path("/app/train.csv") val_file = Path("/app/validation.csv") config_file = Path("/app/autotrain_ultra_low_final.yaml") if not train_file.exists(): logger.error(f"Training file not found: {train_file}") training_status.update({ "is_running": False, "status": "failed", "error": "Training file not found" }) return if not val_file.exists(): logger.error(f"Validation file not found: {val_file}") training_status.update({ "is_running": False, "status": "failed", "error": "Validation file not found" }) return if not config_file.exists(): logger.error(f"Config file not found: {config_file}") training_status.update({ "is_running": False, "status": "failed", "error": "Config file not found" }) return logger.info("All files found, starting real AutoTrain training...") # 로그 파일 초기화 (/tmp 사용) log_file = Path(training_status["log_file"]) try: log_file.write_text("Starting AutoTrain training...\n", encoding="utf-8") except Exception as e: logger.warning(f"Could not write to log file: {e}") # 로그 파일을 사용할 수 없으면 메모리에 저장 training_status["log_content"] = "Starting AutoTrain training...\n" # AutoTrain 명령어 실행 cmd = [ "autotrain", "llm", "--train", "--project_name", "exaone-finetuning", "--model", "LGAI-EXAONE/EXAONE-4.0-1.2B", "--data_path", "/app", "--text_column", "text", "--use_peft", "--quantization", "int4", "--lora_r", "16", "--lora_alpha", "32", "--lora_dropout", "0.05", "--target_modules", "all-linear", "--epochs", "3", "--batch_size", "4", "--gradient_accumulation", "4", "--learning_rate", "2e-4", "--warmup_ratio", "0.03", "--mixed_precision", "fp16", "--push_to_hub", "--hub_model_id", request.model_name, "--username", "amis5895" ] logger.info(f"Running command: {' '.join(cmd)}") # 로그 파일에 명령어 기록 try: with open(log_file, "a", encoding="utf-8") as f: f.write(f"Command: {' '.join(cmd)}\n") f.write("=" * 50 + "\n") except: if "log_content" not in training_status: training_status["log_content"] = "" training_status["log_content"] += f"Command: {' '.join(cmd)}\n" + "=" * 50 + "\n" # AutoTrain 프로세스 실행 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, cwd="/app" ) # 학습 진행 상황 모니터링 for line in process.stdout: logger.info(line.strip()) # 로그 파일에 기록 try: with open(log_file, "a", encoding="utf-8") as f: f.write(line) except: if "log_content" not in training_status: training_status["log_content"] = "" training_status["log_content"] += line # 진행률 파싱 if "epoch" in line.lower() and "/" in line: try: # "Epoch 1/3" 형태에서 진행률 추출 parts = line.split() for i, part in enumerate(parts): if part.lower() == "epoch" and i + 1 < len(parts): epoch_info = parts[i + 1] if "/" in epoch_info: current, total = epoch_info.split("/") training_status["current_epoch"] = int(current) training_status["total_epochs"] = int(total) training_status["progress"] = (int(current) / int(total)) * 100 break except: pass # 손실값 파싱 if "loss" in line.lower(): try: parts = line.split() for i, part in enumerate(parts): if part.lower() == "loss" and i + 1 < len(parts): loss_value = float(parts[i + 1]) training_status["loss"] = loss_value break except: pass process.wait() if process.returncode == 0: training_status.update({ "is_running": False, "progress": 100, "status": "completed" }) logger.info("Training completed successfully!") # 완료 로그 기록 try: with open(log_file, "a", encoding="utf-8") as f: f.write("\n" + "=" * 50 + "\n") f.write("Training completed successfully!\n") except: if "log_content" not in training_status: training_status["log_content"] = "" training_status["log_content"] += "\n" + "=" * 50 + "\nTraining completed successfully!\n" else: training_status.update({ "is_running": False, "status": "failed" }) logger.error("Training failed!") # 실패 로그 기록 try: with open(log_file, "a", encoding="utf-8") as f: f.write("\n" + "=" * 50 + "\n") f.write(f"Training failed with return code: {process.returncode}\n") except: if "log_content" not in training_status: training_status["log_content"] = "" training_status["log_content"] += "\n" + "=" * 50 + f"\nTraining failed with return code: {process.returncode}\n" except Exception as e: logger.error(f"Training error: {str(e)}") training_status.update({ "is_running": False, "status": "error", "error": str(e) }) # 오류 로그 기록 try: with open(log_file, "a", encoding="utf-8") as f: f.write(f"\nError: {str(e)}\n") except: if "log_content" not in training_status: training_status["log_content"] = "" training_status["log_content"] += f"\nError: {str(e)}\n" @app.get("/status") async def get_status(): """학습 상태 조회""" return training_status @app.get("/logs") async def get_logs(): """로그 조회""" log_file = Path(training_status["log_file"]) if log_file.exists(): try: with open(log_file, "r", encoding="utf-8") as f: logs = f.read() return {"logs": logs} except: pass # 파일을 읽을 수 없으면 메모리에서 가져오기 if "log_content" in training_status: return {"logs": training_status["log_content"]} else: return {"logs": "No logs available"} @app.get("/logs/stream") async def stream_logs(): """실시간 로그 스트리밍""" def generate_logs(): log_file = Path(training_status["log_file"]) if log_file.exists(): try: with open(log_file, "r", encoding="utf-8") as f: for line in f: yield f"data: {line}\\n\\n" except: pass # 파일을 읽을 수 없으면 메모리에서 가져오기 if "log_content" in training_status: for line in training_status["log_content"].split('\n'): yield f"data: {line}\\n\\n" else: yield "data: No logs available\\n\\n" return StreamingResponse(generate_logs(), media_type="text/plain") @app.post("/stop_training") async def stop_training(): """학습 중지""" global training_status if not training_status["is_running"]: raise HTTPException(status_code=400, detail="No training is running") training_status.update({ "is_running": False, "status": "stopped" }) return {"message": "Training stopped"} @app.get("/health") async def health_check(): """헬스 체크""" return {"status": "healthy", "timestamp": "2024-01-01T00:00:00Z"} @app.get("/data_info") async def get_data_info(): """데이터 정보 조회""" train_file = Path("/app/train.csv") val_file = Path("/app/validation.csv") config_file = Path("/app/autotrain_ultra_low_final.yaml") info = { "train_file_exists": train_file.exists(), "validation_file_exists": val_file.exists(), "config_file_exists": config_file.exists(), "train_file_size": train_file.stat().st_size if train_file.exists() else 0, "validation_file_size": val_file.stat().st_size if val_file.exists() else 0, "config_file_size": config_file.stat().st_size if config_file.exists() else 0 } return info if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)