my-tide-env / api_utils.py
alwaysgood's picture
Update api_utils.py
ca1121a verified
"""
API 유틸리티 모듈
실무에서 바로 사용 가능한 조위 예측 API 함수들
"""
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
import pytz
from supabase_utils import get_supabase_client
from config import STATION_NAMES
# API 응답 표준 포맷
def create_api_response(success: bool, data: any = None, error: str = None, meta: Dict = None) -> Dict:
"""표준 API 응답 포맷 생성"""
response = {
"success": success,
"timestamp": datetime.now(pytz.timezone('Asia/Seoul')).isoformat(),
}
if meta:
response["meta"] = meta
if success:
response["data"] = data
else:
response["error"] = error or "Unknown error"
return response
def get_station_meta(station_id: str) -> Dict:
"""관측소 메타 정보 반환"""
# 관측소 좌표 정보 (실제 좌표)
STATION_COORDS = {
"DT_0001": {"lat": 37.452, "lon": 126.592},
"DT_0002": {"lat": 36.9669, "lon": 126.823},
"DT_0003": {"lat": 35.4262, "lon": 126.421},
"DT_0008": {"lat": 37.1922, "lon": 126.647},
"DT_0017": {"lat": 37.0075, "lon": 126.353},
"DT_0018": {"lat": 35.9755, "lon": 126.563},
"DT_0024": {"lat": 36.0069, "lon": 126.688},
"DT_0025": {"lat": 36.4064, "lon": 126.486},
"DT_0037": {"lat": 36.1173, "lon": 125.985},
"DT_0043": {"lat": 37.2394, "lon": 126.429},
"DT_0050": {"lat": 36.9131, "lon": 126.239},
"DT_0051": {"lat": 36.1289, "lon": 126.495},
"DT_0052": {"lat": 37.3382, "lon": 126.586},
"DT_0065": {"lat": 37.2394, "lon": 126.155},
"DT_0066": {"lat": 35.6858, "lon": 126.334},
"DT_0067": {"lat": 36.6737, "lon": 126.132},
"DT_0068": {"lat": 35.6181, "lon": 126.302},
}
coords = STATION_COORDS.get(station_id, {"lat": 0, "lon": 0})
return {
"obs_post_id": station_id,
"obs_post_name": STATION_NAMES.get(station_id, "Unknown"),
"obs_lat": str(coords["lat"]),
"obs_lon": str(coords["lon"]),
"data_type": "prediction" # 예측 데이터임을 명시
}
# 1. 현재/미래 조위 조회 (조화 예측 폴백 포함)
def api_get_tide_level(
station_id: str,
target_time: Optional[str] = None,
use_harmonic_fallback: bool = True
) -> Dict:
"""
특정 시간의 조위 정보 조회
Args:
station_id: 관측소 ID
target_time: 조회 시간 (ISO format, None이면 현재 시간)
use_harmonic_fallback: 최종 예측이 없을 때 조화 예측 사용 여부
Returns:
API 응답 (최종 예측 우선, 없으면 조화 예측)
"""
supabase = get_supabase_client()
if not supabase:
return create_api_response(False, error="Database connection failed")
try:
# 대상 시간 파싱
kst = pytz.timezone('Asia/Seoul')
if target_time:
query_time = datetime.fromisoformat(target_time.replace('Z', '+00:00'))
if query_time.tzinfo is None:
query_time = kst.localize(query_time)
else:
query_time = datetime.now(kst)
# UTC로 변환하여 쿼리 (중요!)
query_time_utc = query_time.astimezone(pytz.UTC)
query_str = query_time_utc.strftime('%Y-%m-%dT%H:%M:%S')
# 가장 가까운 5분 단위로 반올림
minutes = query_time.minute
rounded_minutes = round(minutes / 5) * 5
if rounded_minutes == 60:
query_time_rounded = query_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
else:
query_time_rounded = query_time.replace(minute=rounded_minutes, second=0, microsecond=0)
query_time_rounded_utc = query_time_rounded.astimezone(pytz.UTC)
# 1차: 정확한 시간 매칭 시도
result = supabase.table('tide_predictions')\
.select('*')\
.eq('station_id', station_id)\
.eq('predicted_at', query_time_rounded_utc.strftime('%Y-%m-%dT%H:%M:%S'))\
.execute()
# 2차: 정확한 매칭이 없으면 전후 5분 범위에서 가장 가까운 것
if not result.data:
start_time = (query_time_rounded_utc - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S')
end_time = (query_time_rounded_utc + timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S')
result = supabase.table('tide_predictions')\
.select('*')\
.eq('station_id', station_id)\
.gte('predicted_at', start_time)\
.lte('predicted_at', end_time)\
.order('predicted_at')\
.limit(1)\
.execute()
if result.data:
# 최종 예측 데이터가 있는 경우
data = result.data[0]
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(data['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
return create_api_response(
success=True,
data={
"record_time": time_kst.isoformat(), # KST로 변환
"record_time_kst": time_kst.strftime('%Y-%m-%d %H:%M:%S KST'),
"tide_level": round(data.get('final_tide_level', 0), 1),
"residual_value": round(data.get('predicted_residual', 0), 1),
"harmonic_value": round(data.get('harmonic_level', 0), 1),
"data_source": "final_prediction",
"confidence": "high"
},
meta=get_station_meta(station_id)
)
# 2차: 조화 예측 (harmonic_predictions) 폴백
if use_harmonic_fallback and not result.data:
# 1차: 정확한 시간 매칭 시도
result = supabase.table('harmonic_predictions')\
.select('*')\
.eq('station_id', station_id)\
.eq('predicted_at', query_time_rounded_utc.strftime('%Y-%m-%dT%H:%M:%S'))\
.execute()
# 2차: 정확한 매칭이 없으면 전후 5분 범위
if not result.data:
start_time = (query_time_rounded_utc - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S')
end_time = (query_time_rounded_utc + timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S')
result = supabase.table('harmonic_predictions')\
.select('*')\
.eq('station_id', station_id)\
.gte('predicted_at', start_time)\
.lte('predicted_at', end_time)\
.order('predicted_at')\
.limit(1)\
.execute()
if result.data:
data = result.data[0]
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(data['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
return create_api_response(
success=True,
data={
"record_time": time_kst.isoformat(), # KST로 변환
"record_time_kst": time_kst.strftime('%Y-%m-%d %H:%M:%S KST'),
"tide_level": round(data.get('harmonic_level', 0), 1),
"residual_value": None, # 잔차 예측 없음
"harmonic_value": round(data.get('harmonic_level', 0), 1),
"data_source": "harmonic_only",
"confidence": "medium",
"note": "잔차 예측이 없어 조화 예측만 제공됩니다",
"query_time": query_time.strftime('%Y-%m-%d %H:%M:%S KST'),
"matched_time_diff_seconds": abs((time_utc - query_time_utc).total_seconds())
},
meta=get_station_meta(station_id)
)
return create_api_response(
success=False,
error=f"No data available for {query_str}",
meta=get_station_meta(station_id)
)
except Exception as e:
return create_api_response(False, error=str(e))
# 2. 시간대별 조위 조회 (공공 API 형식)
def api_get_tide_series(
station_id: str,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
interval_minutes: int = 60
) -> Dict:
"""
시간대별 조위 정보 조회 (공공 API 형식과 유사)
Args:
station_id: 관측소 ID
start_time: 시작 시간 (None이면 현재)
end_time: 종료 시간 (None이면 24시간 후)
interval_minutes: 데이터 간격 (기본 60분)
Returns:
시계열 데이터
"""
supabase = get_supabase_client()
if not supabase:
return create_api_response(False, error="Database connection failed")
try:
# 시간 범위 설정
kst = pytz.timezone('Asia/Seoul')
if start_time:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
else:
start_dt = datetime.now(kst)
if end_time:
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_dt = start_dt + timedelta(hours=24)
# 최종 예측 조회
result = supabase.table('tide_predictions')\
.select('predicted_at, final_tide_level, predicted_residual, harmonic_level')\
.eq('station_id', station_id)\
.gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.order('predicted_at')\
.execute()
data_points = []
data_source = "final_prediction"
if result.data:
# 간격에 맞춰 데이터 필터링
for i, item in enumerate(result.data):
if i % (interval_minutes // 5) == 0: # 5분 간격 데이터 기준
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(item['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
data_points.append({
"record_time": time_kst.strftime('%Y-%m-%d %H:%M:%S'), # KST
"real_value": str(round(item['final_tide_level'], 0)), # 정수로 표시
"pre_value": str(round(item['harmonic_level'], 0)),
"residual": str(round(item['predicted_residual'], 0))
})
else:
# 조화 예측 폴백
result = supabase.table('harmonic_predictions')\
.select('predicted_at, harmonic_level')\
.eq('station_id', station_id)\
.gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.order('predicted_at')\
.execute()
if result.data:
data_source = "harmonic_only"
for i, item in enumerate(result.data):
if i % (interval_minutes // 5) == 0:
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(item['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
data_points.append({
"record_time": time_kst.strftime('%Y-%m-%d %H:%M:%S'), # KST
"real_value": str(round(item['harmonic_level'], 0)),
"pre_value": str(round(item['harmonic_level'], 0)),
"residual": "0"
})
meta = get_station_meta(station_id)
meta["data_source"] = data_source
meta["data_count"] = len(data_points)
meta["interval_minutes"] = interval_minutes
return {
"result": {
"meta": meta,
"data": data_points
}
}
except Exception as e:
return create_api_response(False, error=str(e))
# 3. 만조/간조 정보
def api_get_extremes_info(
station_id: str,
date: Optional[str] = None,
include_secondary: bool = False
) -> Dict:
"""
특정 날짜의 만조/간조 정보
Args:
station_id: 관측소 ID
date: 날짜 (YYYY-MM-DD, None이면 오늘)
include_secondary: 부차 만조/간조 포함 여부
Returns:
만조/간조 시간과 수위
"""
supabase = get_supabase_client()
if not supabase:
return create_api_response(False, error="Database connection failed")
try:
# 날짜 범위 설정
if date:
target_date = datetime.strptime(date, '%Y-%m-%d')
else:
target_date = datetime.now(pytz.timezone('Asia/Seoul'))
start_dt = target_date.replace(hour=0, minute=0, second=0)
end_dt = target_date.replace(hour=23, minute=59, second=59)
# 데이터 조회 (최종 예측 우선)
result = supabase.table('tide_predictions')\
.select('predicted_at, final_tide_level')\
.eq('station_id', station_id)\
.gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.order('predicted_at')\
.execute()
data_source = "final_prediction"
# 데이터가 없으면 조화 예측 사용
if not result.data:
result = supabase.table('harmonic_predictions')\
.select('predicted_at, harmonic_level')\
.eq('station_id', station_id)\
.gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\
.order('predicted_at')\
.execute()
if result.data:
# 컬럼명 통일
for item in result.data:
item['final_tide_level'] = item.pop('harmonic_level')
data_source = "harmonic_only"
if not result.data or len(result.data) < 3:
return create_api_response(False, error="Insufficient data for extremes")
# 극값 찾기
extremes = []
data = result.data
for i in range(1, len(data) - 1):
prev_level = data[i-1]['final_tide_level']
curr_level = data[i]['final_tide_level']
next_level = data[i+1]['final_tide_level']
# 만조 (극대값)
if curr_level > prev_level and curr_level > next_level:
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(data[i]['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
extremes.append({
'type': 'high_tide',
'time': time_kst.isoformat(), # KST ISO format
'time_kst': time_kst.strftime('%Y-%m-%d %H:%M:%S KST'),
'level': round(curr_level, 2)
})
# 간조 (극소값)
elif curr_level < prev_level and curr_level < next_level:
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(data[i]['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
extremes.append({
'type': 'low_tide',
'time': time_kst.isoformat(), # KST ISO format
'time_kst': time_kst.strftime('%Y-%m-%d %H:%M:%S KST'),
'level': round(curr_level, 2)
})
# 주요 만조/간조만 필터링 (부차 제외)
if not include_secondary and len(extremes) > 4:
# 수위 차이가 큰 것들만 선택
high_tides = sorted([e for e in extremes if e['type'] == 'high_tide'],
key=lambda x: x['level'], reverse=True)[:2]
low_tides = sorted([e for e in extremes if e['type'] == 'low_tide'],
key=lambda x: x['level'])[:2]
extremes = sorted(high_tides + low_tides, key=lambda x: x['time'])
meta = get_station_meta(station_id)
meta["date"] = target_date.strftime('%Y-%m-%d')
meta["data_source"] = data_source
return create_api_response(
success=True,
data={
"extremes": extremes,
"summary": {
"high_tide_count": len([e for e in extremes if e['type'] == 'high_tide']),
"low_tide_count": len([e for e in extremes if e['type'] == 'low_tide']),
"max_level": max([e['level'] for e in extremes]) if extremes else None,
"min_level": min([e['level'] for e in extremes]) if extremes else None
}
},
meta=meta
)
except Exception as e:
return create_api_response(False, error=str(e))
# 4. 위험 수위 알림
def api_check_tide_alert(
station_id: str,
hours_ahead: int = 24,
warning_level: float = 700.0,
danger_level: float = 750.0
) -> Dict:
"""
위험 수위 체크 및 알림
Args:
station_id: 관측소 ID
hours_ahead: 확인할 시간 (기본 24시간)
warning_level: 주의 수위 (cm)
danger_level: 경고 수위 (cm)
Returns:
위험 수위 정보
"""
supabase = get_supabase_client()
if not supabase:
return create_api_response(False, error="Database connection failed")
try:
now = datetime.now(pytz.timezone('Asia/Seoul'))
end_time = now + timedelta(hours=hours_ahead)
# 위험 수위 데이터 조회
result = supabase.table('tide_predictions')\
.select('predicted_at, final_tide_level')\
.eq('station_id', station_id)\
.gte('predicted_at', now.strftime('%Y-%m-%dT%H:%M:%S'))\
.lte('predicted_at', end_time.strftime('%Y-%m-%dT%H:%M:%S'))\
.gte('final_tide_level', warning_level)\
.order('predicted_at')\
.execute()
alerts = []
alert_level = "safe"
if result.data:
for item in result.data:
level = item['final_tide_level']
if level >= danger_level:
severity = "danger"
alert_level = "danger"
elif level >= warning_level:
severity = "warning"
if alert_level != "danger":
alert_level = "warning"
else:
continue
# UTC를 KST로 변환
time_utc = datetime.fromisoformat(item['predicted_at'].replace('Z', '+00:00'))
time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul'))
alerts.append({
"time": time_kst.isoformat(), # KST ISO format
"time_kst": time_kst.strftime('%Y-%m-%d %H:%M:%S KST'),
"level": round(level, 2),
"severity": severity
})
# 첫 위험 시간 계산
first_alert_time = None
first_alert_time_kst = None
if alerts:
first_alert_time = alerts[0]['time'] # 이미 KST
first_alert_time_kst = alerts[0]['time_kst']
time_until = (datetime.fromisoformat(first_alert_time) - now).total_seconds() / 3600
else:
time_until = None
meta = get_station_meta(station_id)
meta["check_time"] = now.isoformat()
meta["hours_ahead"] = hours_ahead
return create_api_response(
success=True,
data={
"alert_level": alert_level,
"alert_count": len(alerts),
"first_alert_time": first_alert_time,
"hours_until_first": round(time_until, 1) if time_until else None,
"alerts": alerts[:10], # 최대 10개만
"thresholds": {
"warning": warning_level,
"danger": danger_level
}
},
meta=meta
)
except Exception as e:
return create_api_response(False, error=str(e))
# 5. 다중 관측소 비교
def api_compare_stations(
station_ids: List[str],
target_time: Optional[str] = None
) -> Dict:
"""
여러 관측소 동시 비교
Args:
station_ids: 관측소 ID 리스트
target_time: 비교 시간 (None이면 현재)
Returns:
관측소별 조위 비교 정보
"""
if not station_ids:
return create_api_response(False, error="No station IDs provided")
try:
comparison_data = []
for station_id in station_ids[:10]: # 최대 10개 관측소
result = api_get_tide_level(station_id, target_time)
if result.get("success") and result.get("data"):
data = result["data"]
comparison_data.append({
"station_id": station_id,
"station_name": STATION_NAMES.get(station_id, "Unknown"),
"tide_level": data.get("tide_level"),
"data_source": data.get("data_source"),
"time": data.get("record_time")
})
else:
comparison_data.append({
"station_id": station_id,
"station_name": STATION_NAMES.get(station_id, "Unknown"),
"tide_level": None,
"data_source": "no_data",
"time": None
})
# 수위 기준 정렬
comparison_data.sort(key=lambda x: x['tide_level'] if x['tide_level'] else 0, reverse=True)
# 통계 계산
valid_levels = [d['tide_level'] for d in comparison_data if d['tide_level']]
stats = {
"max_level": max(valid_levels) if valid_levels else None,
"min_level": min(valid_levels) if valid_levels else None,
"avg_level": round(sum(valid_levels) / len(valid_levels), 1) if valid_levels else None,
"station_count": len(comparison_data),
"valid_count": len(valid_levels)
}
return create_api_response(
success=True,
data={
"comparison": comparison_data,
"statistics": stats
},
meta={
"query_time": target_time or datetime.now(pytz.timezone('Asia/Seoul')).isoformat(),
"station_count": len(station_ids)
}
)
except Exception as e:
return create_api_response(False, error=str(e))
# 6. 건강 체크 / 상태 확인
def api_health_check() -> Dict:
"""
API 및 데이터베이스 상태 확인
Returns:
시스템 상태 정보
"""
try:
supabase = get_supabase_client()
db_status = "connected" if supabase else "disconnected"
# 데이터 가용성 체크
data_availability = {}
if supabase:
# 최종 예측 데이터 확인
result = supabase.table('tide_predictions')\
.select('station_id', count='exact')\
.limit(1)\
.execute()
tide_count = result.count if hasattr(result, 'count') else 0
# 조화 예측 데이터 확인
result = supabase.table('harmonic_predictions')\
.select('station_id', count='exact')\
.limit(1)\
.execute()
harmonic_count = result.count if hasattr(result, 'count') else 0
data_availability = {
"tide_predictions": tide_count,
"harmonic_predictions": harmonic_count
}
return create_api_response(
success=True,
data={
"status": "healthy" if db_status == "connected" else "degraded",
"database": db_status,
"data_availability": data_availability,
"api_version": "1.0.0",
"endpoints": [
"/api/tide_level",
"/api/tide_series",
"/api/extremes",
"/api/alert",
"/api/compare",
"/api/health"
]
}
)
except Exception as e:
return create_api_response(
success=False,
error=str(e),
data={"status": "error"}
)