Spaces:
Sleeping
Sleeping
| """ | |
| Real-time Performance Evaluation API | |
| 실시간 예측 성능 평가 및 모니터링 API | |
| """ | |
| from fastapi import FastAPI, HTTPException, Header, Query | |
| from datetime import datetime, timedelta | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Optional, List, Dict, Any | |
| import logging | |
| from config import INTERNAL_API_KEY | |
| logger = logging.getLogger(__name__) | |
| # 성능 데이터 저장소 (실제로는 데이터베이스 사용) | |
| performance_cache = { | |
| "realtime_metrics": {}, | |
| "historical_data": [], | |
| "station_performance": {}, | |
| "alert_thresholds": { | |
| "rmse_warning": 30.0, | |
| "rmse_critical": 50.0, | |
| "accuracy_warning": 80.0, | |
| "accuracy_critical": 70.0 | |
| } | |
| } | |
| def verify_internal_api_key(authorization: str = Header(None)): | |
| """내부 API 키 검증""" | |
| if authorization and authorization.startswith("Bearer "): | |
| return authorization == f"Bearer {INTERNAL_API_KEY}" | |
| return False | |
| def register_performance_routes(app: FastAPI): | |
| """성능 평가 API 라우트 등록""" | |
| async def get_realtime_performance( | |
| station_id: Optional[str] = Query(None, description="특정 관측소 성능 (전체: None)"), | |
| authorization: str = Header(None) | |
| ): | |
| """실시간 예측 성능 지표 조회""" | |
| # 내부 API는 인증 필요, 외부는 읽기 전용 | |
| is_internal = verify_internal_api_key(authorization) | |
| try: | |
| current_time = datetime.now() | |
| if station_id: | |
| # 특정 관측소 성능 | |
| station_metrics = await get_station_performance(station_id) | |
| return { | |
| "timestamp": current_time.isoformat(), | |
| "station_id": station_id, | |
| **station_metrics, | |
| "data_source": "realtime" | |
| } | |
| else: | |
| # 전체 시스템 성능 | |
| overall_metrics = await get_overall_performance() | |
| response_data = { | |
| "timestamp": current_time.isoformat(), | |
| "rmse": overall_metrics["rmse"], | |
| "mae": overall_metrics["mae"], | |
| "accuracy": overall_metrics["accuracy"], | |
| "prediction_count": overall_metrics["prediction_count"], | |
| "active_stations": overall_metrics["active_stations"], | |
| "data_quality_score": overall_metrics["data_quality_score"], | |
| "status": overall_metrics["status"] | |
| } | |
| # 내부 요청시 추가 정보 제공 | |
| if is_internal: | |
| response_data.update({ | |
| "detailed_metrics": overall_metrics.get("detailed_metrics", {}), | |
| "station_breakdown": overall_metrics.get("station_breakdown", {}), | |
| "recent_alerts": overall_metrics.get("recent_alerts", []) | |
| }) | |
| return response_data | |
| except Exception as e: | |
| logger.error(f"Realtime performance query failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_historical_performance( | |
| hours: int = Query(24, description="조회할 시간 범위 (시간)"), | |
| station_id: Optional[str] = Query(None, description="특정 관측소"), | |
| metric: str = Query("rmse", description="성능 지표 (rmse/mae/accuracy)"), | |
| authorization: str = Header(None) | |
| ): | |
| """성능 히스토리 조회""" | |
| is_internal = verify_internal_api_key(authorization) | |
| try: | |
| # 시간 범위 검증 | |
| if hours > 168: # 최대 1주일 | |
| hours = 168 | |
| end_time = datetime.now() | |
| start_time = end_time - timedelta(hours=hours) | |
| historical_data = await get_performance_history( | |
| start_time, end_time, station_id, metric | |
| ) | |
| # 통계 계산 | |
| if historical_data: | |
| values = [item[metric] for item in historical_data if item.get(metric) is not None] | |
| if values: | |
| statistics = { | |
| "mean": round(np.mean(values), 2), | |
| "std": round(np.std(values), 2), | |
| "min": round(min(values), 2), | |
| "max": round(max(values), 2), | |
| "trend": calculate_trend(values) | |
| } | |
| else: | |
| statistics = None | |
| else: | |
| statistics = None | |
| return { | |
| "timestamp": end_time.isoformat(), | |
| "query_range": { | |
| "start_time": start_time.isoformat(), | |
| "end_time": end_time.isoformat(), | |
| "hours": hours | |
| }, | |
| "station_id": station_id, | |
| "metric": metric, | |
| "data_points": len(historical_data), | |
| "data": historical_data[-100:] if not is_internal else historical_data, # 외부는 최근 100개만 | |
| "statistics": statistics | |
| } | |
| except Exception as e: | |
| logger.error(f"Historical performance query failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def compare_station_performance( | |
| station_ids: str = Query(..., description="비교할 관측소들 (쉼표 구분)"), | |
| metric: str = Query("rmse", description="비교할 성능 지표"), | |
| period: str = Query("24h", description="비교 기간 (1h/6h/24h/7d)"), | |
| authorization: str = Header(None) | |
| ): | |
| """관측소별 성능 비교""" | |
| is_internal = verify_internal_api_key(authorization) | |
| try: | |
| # 관측소 목록 파싱 | |
| stations = [s.strip() for s in station_ids.split(",")] | |
| if len(stations) > 10: # 최대 10개 관측소 | |
| stations = stations[:10] | |
| # 기간 파싱 | |
| period_hours = { | |
| "1h": 1, "6h": 6, "24h": 24, "7d": 168 | |
| }.get(period, 24) | |
| comparison_data = {} | |
| for station_id in stations: | |
| station_metrics = await get_station_performance_summary( | |
| station_id, period_hours, metric | |
| ) | |
| comparison_data[station_id] = station_metrics | |
| # 순위 계산 | |
| if metric in ["rmse", "mae"]: | |
| # 낮을수록 좋음 | |
| sorted_stations = sorted( | |
| comparison_data.items(), | |
| key=lambda x: x[1].get("current_value", float('inf')) | |
| ) | |
| else: # accuracy 등 | |
| # 높을수록 좋음 | |
| sorted_stations = sorted( | |
| comparison_data.items(), | |
| key=lambda x: x[1].get("current_value", 0), | |
| reverse=True | |
| ) | |
| return { | |
| "timestamp": datetime.now().isoformat(), | |
| "metric": metric, | |
| "period": period, | |
| "stations_count": len(stations), | |
| "comparison": comparison_data, | |
| "ranking": [{"rank": i+1, "station_id": station, "value": data["current_value"]} | |
| for i, (station, data) in enumerate(sorted_stations)], | |
| "best_performer": sorted_stations[0][0] if sorted_stations else None, | |
| "summary": { | |
| "best_value": sorted_stations[0][1]["current_value"] if sorted_stations else None, | |
| "worst_value": sorted_stations[-1][1]["current_value"] if sorted_stations else None, | |
| "average_value": round(np.mean([data["current_value"] for data in comparison_data.values()]), 2) | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Station comparison failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_performance_alerts( | |
| active_only: bool = Query(True, description="활성 알림만 조회"), | |
| hours: int = Query(24, description="조회할 시간 범위"), | |
| authorization: str = Header(None) | |
| ): | |
| """성능 알림 조회""" | |
| verify_internal_api_key(authorization) # 내부 API만 접근 가능 | |
| try: | |
| alerts = await get_current_alerts(active_only, hours) | |
| # 알림 분류 | |
| critical_alerts = [a for a in alerts if a["severity"] == "critical"] | |
| warning_alerts = [a for a in alerts if a["severity"] == "warning"] | |
| return { | |
| "timestamp": datetime.now().isoformat(), | |
| "query_range_hours": hours, | |
| "active_only": active_only, | |
| "total_alerts": len(alerts), | |
| "critical_count": len(critical_alerts), | |
| "warning_count": len(warning_alerts), | |
| "alerts": alerts, | |
| "summary": { | |
| "system_status": "critical" if critical_alerts else ("warning" if warning_alerts else "normal"), | |
| "requires_attention": len(critical_alerts) > 0, | |
| "most_recent": alerts[0] if alerts else None | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Performance alerts query failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def update_performance_metrics( | |
| request_data: Dict[str, Any], | |
| authorization: str = Header(None) | |
| ): | |
| """성능 지표 업데이트 (내부 사용)""" | |
| verify_internal_api_key(authorization) # 내부 API만 접근 가능 | |
| try: | |
| station_id = request_data.get("station_id") | |
| predictions = request_data.get("predictions", []) | |
| actual_values = request_data.get("actual_values", []) | |
| timestamp = request_data.get("timestamp", datetime.now().isoformat()) | |
| if not predictions or not actual_values: | |
| raise HTTPException(status_code=400, detail="Predictions and actual values required") | |
| if len(predictions) != len(actual_values): | |
| raise HTTPException(status_code=400, detail="Predictions and actual values must have same length") | |
| # 성능 지표 계산 | |
| metrics = calculate_performance_metrics(predictions, actual_values) | |
| # 성능 데이터 저장 | |
| performance_record = { | |
| "station_id": station_id, | |
| "timestamp": timestamp, | |
| "predictions": predictions, | |
| "actual_values": actual_values, | |
| "metrics": metrics, | |
| "data_points": len(predictions) | |
| } | |
| await save_performance_record(performance_record) | |
| # 실시간 캐시 업데이트 | |
| performance_cache["realtime_metrics"][station_id] = metrics | |
| performance_cache["historical_data"].append(performance_record) | |
| # 오래된 데이터 정리 (메모리 관리) | |
| if len(performance_cache["historical_data"]) > 1000: | |
| performance_cache["historical_data"] = performance_cache["historical_data"][-500:] | |
| # 알림 체크 | |
| alerts = check_performance_alerts(station_id, metrics) | |
| return { | |
| "success": True, | |
| "timestamp": datetime.now().isoformat(), | |
| "station_id": station_id, | |
| "metrics": metrics, | |
| "data_points": len(predictions), | |
| "alerts_triggered": len(alerts), | |
| "alerts": alerts | |
| } | |
| except Exception as e: | |
| logger.error(f"Performance update failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================================ | |
| # 성능 계산 및 분석 함수들 | |
| # ============================================================================ | |
| async def get_overall_performance(): | |
| """전체 시스템 성능 조회""" | |
| # 시연 모드에서는 시뮬레이션 데이터 사용 | |
| from internal_api import demo_session, active_issues | |
| base_rmse = 18.5 | |
| base_mae = 14.2 | |
| base_accuracy = 89.2 | |
| # 활성 문제들의 영향 계산 | |
| rmse_multiplier = 1.0 | |
| accuracy_penalty = 0 | |
| for issue in active_issues.values(): | |
| if issue["type"] == "extreme_weather": | |
| rmse_multiplier *= 1.8 | |
| accuracy_penalty += 15 | |
| elif issue["type"] == "sensor_malfunction": | |
| rmse_multiplier *= 1.3 | |
| accuracy_penalty += 8 | |
| elif issue["type"] == "data_corruption": | |
| rmse_multiplier *= 1.5 | |
| accuracy_penalty += 12 | |
| elif issue["type"] == "network_failure": | |
| rmse_multiplier *= 1.2 | |
| accuracy_penalty += 5 | |
| # 랜덤 변동 추가 | |
| rmse_variation = np.random.normal(0, 2) | |
| accuracy_variation = np.random.normal(0, 3) | |
| current_rmse = max(10, base_rmse * rmse_multiplier + rmse_variation) | |
| current_mae = max(8, base_mae * rmse_multiplier * 0.8 + rmse_variation * 0.7) | |
| current_accuracy = max(50, min(98, base_accuracy - accuracy_penalty + accuracy_variation)) | |
| # 데이터 품질 점수 | |
| data_quality = 95 - len(active_issues) * 10 | |
| # 상태 결정 | |
| if current_rmse > 40 or current_accuracy < 75: | |
| status = "critical" | |
| elif current_rmse > 25 or current_accuracy < 85: | |
| status = "warning" | |
| else: | |
| status = "good" | |
| return { | |
| "rmse": round(current_rmse, 1), | |
| "mae": round(current_mae, 1), | |
| "accuracy": round(current_accuracy, 1), | |
| "prediction_count": demo_session.get("total_processed", 0), | |
| "active_stations": len(demo_session.get("active_stations", [])), | |
| "data_quality_score": round(data_quality, 1), | |
| "status": status, | |
| "detailed_metrics": { | |
| "rmse_trend": "increasing" if len(active_issues) > 0 else "stable", | |
| "prediction_latency_ms": np.random.randint(50, 200), | |
| "data_freshness_minutes": np.random.randint(1, 10) | |
| }, | |
| "station_breakdown": await get_all_stations_summary(), | |
| "recent_alerts": await get_recent_alerts_summary() | |
| } | |
| async def get_station_performance(station_id: str): | |
| """특정 관측소 성능 조회""" | |
| # 기본 성능 + 관측소별 변동 | |
| station_seed = hash(station_id) % 1000 | |
| np.random.seed(station_seed) | |
| base_rmse = 18.5 + np.random.normal(0, 3) | |
| base_mae = 14.2 + np.random.normal(0, 2) | |
| base_accuracy = 89.2 + np.random.normal(0, 5) | |
| # 활성 문제 영향 | |
| from internal_api import active_issues | |
| for issue in active_issues.values(): | |
| if issue.get("station_id") == station_id or not issue.get("station_id"): | |
| if issue["type"] == "sensor_malfunction" and issue.get("station_id") == station_id: | |
| base_rmse *= 2.0 | |
| base_accuracy -= 25 | |
| elif issue["type"] == "extreme_weather": | |
| base_rmse *= 1.6 | |
| base_accuracy -= 12 | |
| return { | |
| "rmse": round(max(10, base_rmse), 1), | |
| "mae": round(max(8, base_mae), 1), | |
| "accuracy": round(max(50, min(98, base_accuracy)), 1), | |
| "data_points": np.random.randint(50, 200), | |
| "last_prediction": (datetime.now() - timedelta(minutes=np.random.randint(1, 15))).isoformat(), | |
| "status": "active" if station_id in demo_session.get("active_stations", []) else "inactive" | |
| } | |
| async def get_performance_history(start_time, end_time, station_id, metric): | |
| """성능 히스토리 생성 (시뮬레이션)""" | |
| data_points = [] | |
| current_time = start_time | |
| # 1시간 간격으로 데이터 생성 | |
| while current_time <= end_time: | |
| # 시간대별 기본값 | |
| hour = current_time.hour | |
| base_value = { | |
| "rmse": 18.5 + 5 * np.sin(2 * np.pi * hour / 24), | |
| "mae": 14.2 + 3 * np.sin(2 * np.pi * hour / 24), | |
| "accuracy": 89.2 - 8 * np.sin(2 * np.pi * hour / 24) | |
| } | |
| # 랜덤 변동 | |
| variation = np.random.normal(0, 2) | |
| value = base_value[metric] + variation | |
| # 관측소별 조정 | |
| if station_id: | |
| station_offset = hash(station_id) % 10 - 5 | |
| value += station_offset | |
| data_points.append({ | |
| "timestamp": current_time.isoformat(), | |
| "station_id": station_id, | |
| metric: round(value, 1), | |
| "data_points": np.random.randint(10, 50) | |
| }) | |
| current_time += timedelta(hours=1) | |
| return data_points | |
| async def get_station_performance_summary(station_id, period_hours, metric): | |
| """관측소 성능 요약""" | |
| # 현재 성능 | |
| current_perf = await get_station_performance(station_id) | |
| current_value = current_perf[metric] | |
| # 기간별 평균 (시뮬레이션) | |
| period_variation = np.random.normal(0, 5) | |
| period_average = current_value + period_variation | |
| # 트렌드 계산 | |
| trend_change = np.random.uniform(-10, 10) | |
| trend = "improving" if trend_change < -2 else ("degrading" if trend_change > 2 else "stable") | |
| return { | |
| "station_id": station_id, | |
| "current_value": current_value, | |
| "period_average": round(period_average, 1), | |
| "trend": trend, | |
| "trend_change": round(trend_change, 1), | |
| "data_points": np.random.randint(20, 100), | |
| "last_update": datetime.now().isoformat() | |
| } | |
| def calculate_performance_metrics(predictions, actual_values): | |
| """성능 지표 계산""" | |
| predictions = np.array(predictions) | |
| actual_values = np.array(actual_values) | |
| # RMSE | |
| rmse = np.sqrt(np.mean((predictions - actual_values) ** 2)) | |
| # MAE | |
| mae = np.mean(np.abs(predictions - actual_values)) | |
| # 정확도 (95% 신뢰구간 내 예측 비율) | |
| errors = np.abs(predictions - actual_values) | |
| threshold = np.percentile(errors, 95) | |
| accuracy = np.mean(errors <= threshold) * 100 | |
| # 추가 지표 | |
| mape = np.mean(np.abs((actual_values - predictions) / actual_values)) * 100 | |
| r2 = 1 - (np.sum((actual_values - predictions) ** 2) / np.sum((actual_values - np.mean(actual_values)) ** 2)) | |
| return { | |
| "rmse": round(rmse, 2), | |
| "mae": round(mae, 2), | |
| "accuracy": round(accuracy, 1), | |
| "mape": round(mape, 2), | |
| "r2_score": round(r2, 3), | |
| "data_points": len(predictions) | |
| } | |
| def calculate_trend(values): | |
| """트렌드 계산""" | |
| if len(values) < 3: | |
| return "insufficient_data" | |
| # 선형 회귀로 기울기 계산 | |
| x = np.arange(len(values)) | |
| slope = np.polyfit(x, values, 1)[0] | |
| if slope > 0.5: | |
| return "increasing" | |
| elif slope < -0.5: | |
| return "decreasing" | |
| else: | |
| return "stable" | |
| def check_performance_alerts(station_id, metrics): | |
| """성능 알림 체크""" | |
| alerts = [] | |
| thresholds = performance_cache["alert_thresholds"] | |
| # RMSE 체크 | |
| rmse = metrics["rmse"] | |
| if rmse > thresholds["rmse_critical"]: | |
| alerts.append({ | |
| "type": "rmse_critical", | |
| "severity": "critical", | |
| "message": f"Critical RMSE level: {rmse} cm", | |
| "station_id": station_id, | |
| "threshold": thresholds["rmse_critical"], | |
| "current_value": rmse, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| elif rmse > thresholds["rmse_warning"]: | |
| alerts.append({ | |
| "type": "rmse_warning", | |
| "severity": "warning", | |
| "message": f"High RMSE level: {rmse} cm", | |
| "station_id": station_id, | |
| "threshold": thresholds["rmse_warning"], | |
| "current_value": rmse, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # 정확도 체크 | |
| accuracy = metrics["accuracy"] | |
| if accuracy < thresholds["accuracy_critical"]: | |
| alerts.append({ | |
| "type": "accuracy_critical", | |
| "severity": "critical", | |
| "message": f"Critical accuracy drop: {accuracy}%", | |
| "station_id": station_id, | |
| "threshold": thresholds["accuracy_critical"], | |
| "current_value": accuracy, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| elif accuracy < thresholds["accuracy_warning"]: | |
| alerts.append({ | |
| "type": "accuracy_warning", | |
| "severity": "warning", | |
| "message": f"Low accuracy: {accuracy}%", | |
| "station_id": station_id, | |
| "threshold": thresholds["accuracy_warning"], | |
| "current_value": accuracy, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return alerts | |
| async def get_current_alerts(active_only, hours): | |
| """현재 알림 조회 (시뮬레이션)""" | |
| from internal_api import active_issues, demo_session | |
| alerts = [] | |
| # 활성 문제들을 알림으로 변환 | |
| for issue_id, issue in active_issues.items(): | |
| alerts.append({ | |
| "id": issue_id, | |
| "type": f"{issue['type']}_alert", | |
| "severity": "critical" if issue["type"] in ["extreme_weather", "sensor_malfunction"] else "warning", | |
| "message": f"{issue['type'].replace('_', ' ').title()} detected", | |
| "station_id": issue.get("station_id"), | |
| "timestamp": issue["start_time"].isoformat(), | |
| "duration_minutes": (datetime.now() - issue["start_time"]).total_seconds() / 60, | |
| "active": True | |
| }) | |
| # 추가 성능 알림 시뮬레이션 | |
| if demo_session.get("active"): | |
| current_perf = await get_overall_performance() | |
| if current_perf["rmse"] > 30: | |
| alerts.append({ | |
| "id": "perf_rmse_high", | |
| "type": "performance_degradation", | |
| "severity": "warning", | |
| "message": f"High system RMSE: {current_perf['rmse']} cm", | |
| "timestamp": (datetime.now() - timedelta(minutes=5)).isoformat(), | |
| "duration_minutes": 5, | |
| "active": True | |
| }) | |
| # 정렬 (최신순) | |
| alerts.sort(key=lambda x: x["timestamp"], reverse=True) | |
| return alerts | |
| async def get_all_stations_summary(): | |
| """모든 관측소 성능 요약""" | |
| from internal_api import demo_session | |
| stations = demo_session.get("active_stations", ["DT_0001", "DT_0002"]) | |
| summary = {} | |
| for station_id in stations: | |
| perf = await get_station_performance(station_id) | |
| summary[station_id] = { | |
| "rmse": perf["rmse"], | |
| "accuracy": perf["accuracy"], | |
| "status": perf["status"] | |
| } | |
| return summary | |
| async def get_recent_alerts_summary(): | |
| """최근 알림 요약""" | |
| alerts = await get_current_alerts(True, 1) # 최근 1시간 | |
| return { | |
| "total_count": len(alerts), | |
| "critical_count": sum(1 for a in alerts if a["severity"] == "critical"), | |
| "most_recent": alerts[0] if alerts else None | |
| } | |
| async def save_performance_record(performance_record): | |
| """성능 기록 저장 (실제로는 데이터베이스 사용)""" | |
| # 여기서는 메모리에만 저장 (시뮬레이션) | |
| performance_cache["historical_data"].append(performance_record) | |
| # 실제 구현에서는 Supabase나 다른 DB에 저장 | |
| # await supabase.table("performance_metrics").insert(performance_record) | |
| pass | |
| logger.info("Performance API module loaded successfully") |