Spaces:
Sleeping
Sleeping
| # preprocessing.py - 전처리 및 데이터 변환 모듈 | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime | |
| import pytz | |
| from dateutil import parser as date_parser | |
| from supabase_utils import get_supabase_client, get_harmonic_predictions | |
| import traceback | |
| def convert_tide_level_to_residual(df, station_id): | |
| """ | |
| tide_level 데이터를 residual로 변환 | |
| residual = tide_level - harmonic_level | |
| """ | |
| print(f"🔄 {station_id} 관측소의 tide_level → residual 변환 시작") | |
| # 1. 입력 데이터 검증 | |
| if 'tide_level' not in df.columns: | |
| raise ValueError("tide_level 컬럼이 없습니다.") | |
| if 'date' not in df.columns: | |
| raise ValueError("date 컬럼이 없습니다.") | |
| # 2. 시간 데이터 정리 | |
| df = df.copy() | |
| df['date'] = pd.to_datetime(df['date']) | |
| # 3. KST 시간대 설정 | |
| kst = pytz.timezone('Asia/Seoul') | |
| if df['date'].dt.tz is None: | |
| df['date'] = df['date'].dt.tz_localize(kst) | |
| else: | |
| df['date'] = df['date'].dt.tz_convert(kst) | |
| # 4. 입력 데이터는 이미 144개로 슬라이싱된 상태 | |
| df_input = df.copy() | |
| start_time = df_input['date'].min() | |
| end_time = df_input['date'].max() | |
| print(f"📅 입력 데이터 시간 범위: {start_time} ~ {end_time}") | |
| # 5. Supabase에서 harmonic_level 조회 | |
| try: | |
| harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) | |
| print(f"📊 조화 예측 데이터 {len(harmonic_data) if harmonic_data else 0}개 조회") | |
| if not harmonic_data: | |
| print("⚠️ 조화 예측 데이터가 없습니다. 가상 데이터로 대체합니다.") | |
| return create_mock_residual_data(df_input) | |
| except Exception as e: | |
| print(f"❌ Supabase 조회 오류: {e}") | |
| print("⚠️ 가상 데이터로 대체합니다.") | |
| return create_mock_residual_data(df_input) | |
| # 6. harmonic_data를 딕셔너리로 변환 (시간 기준) | |
| harmonic_dict = {} | |
| for h_data in harmonic_data: | |
| try: | |
| h_time_str = h_data['predicted_at'] | |
| h_time = parse_time_string(h_time_str) | |
| if h_time is None: | |
| continue | |
| # KST로 변환 | |
| if h_time.tzinfo is None: | |
| h_time = pytz.UTC.localize(h_time) | |
| h_time = h_time.astimezone(kst) | |
| # 5분 단위로 정규화 | |
| minutes = (h_time.minute // 5) * 5 | |
| h_time = h_time.replace(minute=minutes, second=0, microsecond=0) | |
| harmonic_dict[h_time] = float(h_data['harmonic_level']) | |
| except Exception as e: | |
| print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}") | |
| continue | |
| print(f"📊 사용 가능한 조화 데이터: {len(harmonic_dict)}개") | |
| # 7. residual 계산 | |
| residual_values = [] | |
| successful_conversions = 0 | |
| for idx, row in df_input.iterrows(): | |
| tide_level = row['tide_level'] | |
| timestamp = row['date'] | |
| # 5분 단위로 정규화 | |
| minutes = (timestamp.minute // 5) * 5 | |
| normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0) | |
| # 정확히 매칭되는 harmonic_level 찾기 | |
| harmonic_level = harmonic_dict.get(normalized_time) | |
| # 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내) | |
| if harmonic_level is None: | |
| harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5) | |
| # 이상치 플래그 확인 | |
| is_outlier = False | |
| if '_tide_outlier_flag' in df_input.columns: | |
| is_outlier = df_input.at[idx, '_tide_outlier_flag'] if not pd.isna(df_input.at[idx, '_tide_outlier_flag']) else False | |
| if is_outlier: | |
| # 이상치로 탐지된 경우 residual = 0 (harmonic만 사용) | |
| residual_values.append(0.0) | |
| successful_conversions += 1 | |
| print(f" 🚨 이상치 탐지된 시점 {timestamp}: residual=0 적용") | |
| elif harmonic_level is not None: | |
| residual = tide_level - harmonic_level | |
| residual_values.append(residual) | |
| successful_conversions += 1 | |
| else: | |
| # 조화 데이터가 없으면 평균값으로 대체 | |
| residual_values.append(0.0) | |
| # 8. residual 컬럼 추가 | |
| df_input['residual'] = residual_values | |
| # 9. 불필요한 컬럼 제거 (모델에서 사용하지 않음) | |
| columns_to_remove = ['tide_level', '_tide_outlier_flag'] | |
| for col in columns_to_remove: | |
| if col in df_input.columns: | |
| df_input = df_input.drop(columns=[col]) | |
| print(f"🗑️ {col} 컬럼 제거") | |
| print(f"📊 최종 컬럼: {list(df_input.columns)}") | |
| print(f"📊 최종 shape: {df_input.shape}") | |
| conversion_rate = successful_conversions / len(df_input) * 100 | |
| print(f"✅ 변환 완료: {successful_conversions}/{len(df_input)} ({conversion_rate:.1f}%)") | |
| return df_input | |
| def parse_time_string(time_str): | |
| """다양한 형태의 시간 문자열 파싱""" | |
| try: | |
| if 'T' in time_str: | |
| if time_str.endswith('Z'): | |
| return datetime.fromisoformat(time_str[:-1] + '+00:00') | |
| elif '+' in time_str or time_str.count('-') > 2: | |
| return datetime.fromisoformat(time_str) | |
| else: | |
| return datetime.fromisoformat(time_str + '+00:00') | |
| else: | |
| return date_parser.parse(time_str) | |
| except Exception as e: | |
| print(f"시간 파싱 실패: {time_str}, {e}") | |
| return None | |
| def find_closest_harmonic(target_time, harmonic_dict, max_diff_minutes=5): | |
| """가장 가까운 시간의 harmonic_level 찾기""" | |
| min_diff = float('inf') | |
| closest_value = None | |
| for h_time, h_value in harmonic_dict.items(): | |
| diff_seconds = abs((h_time - target_time).total_seconds()) | |
| if diff_seconds < min_diff and diff_seconds <= max_diff_minutes * 60: | |
| min_diff = diff_seconds | |
| closest_value = h_value | |
| return closest_value | |
| def create_mock_residual_data(df): | |
| """Supabase 연결 실패 시 가상 residual 데이터 생성""" | |
| print("🎭 가상 residual 데이터 생성 중...") | |
| df = df.copy() | |
| # 간단한 사인파 기반 가상 residual 생성 | |
| timestamps = df['date'] | |
| time_hours = [(t - timestamps.iloc[0]).total_seconds() / 3600 for t in timestamps] | |
| # 조위의 일반적인 패턴을 모방한 가상 residual | |
| # 주기적 성분 + 노이즈 + 트렌드 | |
| residual_values = [] | |
| for i, hour in enumerate(time_hours): | |
| # 반일주조 (12.42시간 주기) | |
| semi_diurnal = 15 * np.sin(2 * np.pi * hour / 12.42) | |
| # 일주조 (24.84시간 주기) | |
| diurnal = 8 * np.sin(2 * np.pi * hour / 24.84) | |
| # 노이즈 | |
| noise = np.random.normal(0, 3) | |
| # 작은 트렌드 | |
| trend = 0.1 * hour | |
| residual = semi_diurnal + diurnal + noise + trend | |
| residual_values.append(residual) | |
| df['residual'] = residual_values | |
| # 불필요한 컬럼 제거 (모델에서 사용하지 않음) | |
| columns_to_remove = ['tide_level', '_tide_outlier_flag'] | |
| for col in columns_to_remove: | |
| if col in df.columns: | |
| df = df.drop(columns=[col]) | |
| print(f"🗑️ {col} 컬럼 제거") | |
| print(f"📊 최종 컬럼: {list(df.columns)}") | |
| print(f"📊 최종 shape: {df.shape}") | |
| print(f"✅ 가상 residual 데이터 {len(residual_values)}개 생성") | |
| return df | |
| def detect_harmonic_based_outliers(df, station_id): | |
| """조화 예측 기반 tide_level 이상치 탐지""" | |
| print("🌊 Harmonic 기반 tide_level 이상치 탐지 시작...") | |
| if 'tide_level' not in df.columns: | |
| print("⚠️ tide_level 컬럼이 없어서 이상치 탐지를 건너뜁니다.") | |
| return pd.Series(False, index=df.index) | |
| try: | |
| # 1. 해당 시간대 harmonic_level 조회 | |
| df_copy = df.copy() | |
| df_copy['date'] = pd.to_datetime(df_copy['date']) | |
| # KST 시간대 설정 | |
| kst = pytz.timezone('Asia/Seoul') | |
| if df_copy['date'].dt.tz is None: | |
| df_copy['date'] = df_copy['date'].dt.tz_localize(kst) | |
| else: | |
| df_copy['date'] = df_copy['date'].dt.tz_convert(kst) | |
| start_time = df_copy['date'].min() | |
| end_time = df_copy['date'].max() | |
| print(f"📅 이상치 탐지 시간 범위: {start_time} ~ {end_time}") | |
| # 2. Harmonic 데이터 조회 | |
| harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) | |
| if not harmonic_data: | |
| print("❌ 조화 예측 데이터가 없어서 물리적 한계로 대체합니다.") | |
| # 물리적 한계로 폴백 | |
| physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000) | |
| return physical_outliers | |
| print(f"📊 조화 예측 데이터: {len(harmonic_data)}개 조회") | |
| # 3. Harmonic 딕셔너리 생성 | |
| harmonic_dict = {} | |
| for h_data in harmonic_data: | |
| try: | |
| h_time_str = h_data['predicted_at'] | |
| h_time = parse_time_string(h_time_str) | |
| if h_time is None: | |
| continue | |
| # KST로 변환 | |
| if h_time.tzinfo is None: | |
| h_time = pytz.UTC.localize(h_time) | |
| h_time = h_time.astimezone(kst) | |
| # 5분 단위로 정규화 | |
| minutes = (h_time.minute // 5) * 5 | |
| h_time = h_time.replace(minute=minutes, second=0, microsecond=0) | |
| harmonic_dict[h_time] = float(h_data['harmonic_level']) | |
| except Exception as e: | |
| print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}") | |
| continue | |
| print(f"📊 사용 가능한 조화 예측: {len(harmonic_dict)}개") | |
| # 4. Residual 계산 | |
| residuals = [] | |
| outlier_flags = [] | |
| for idx, row in df_copy.iterrows(): | |
| tide_level = row['tide_level'] | |
| timestamp = row['date'] | |
| # 5분 단위로 정규화 | |
| minutes = (timestamp.minute // 5) * 5 | |
| normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0) | |
| # 정확히 매칭되는 harmonic_level 찾기 | |
| harmonic_level = harmonic_dict.get(normalized_time) | |
| # 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내) | |
| if harmonic_level is None: | |
| harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5) | |
| if harmonic_level is not None: | |
| residual = tide_level - harmonic_level | |
| residuals.append(residual) | |
| outlier_flags.append(False) # 일단 정상으로 표시 | |
| else: | |
| # Harmonic 데이터가 없으면 물리적 한계로 판정 | |
| outlier_flags.append(tide_level < -300 or tide_level > 2000) | |
| residuals.append(0.0) | |
| # 5. Residual 기반 3σ 이상치 탐지 | |
| if len(residuals) > 0: | |
| residuals_array = np.array(residuals) | |
| # NaN이 아닌 residual만으로 통계 계산 | |
| valid_residuals = residuals_array[~np.isnan(residuals_array)] | |
| if len(valid_residuals) > 3: # 최소 3개 이상 필요 | |
| residual_mean = np.mean(valid_residuals) | |
| residual_std = np.std(valid_residuals) | |
| print(f"📈 Residual 통계: 평균={residual_mean:.1f}cm, 표준편차={residual_std:.1f}cm") | |
| if residual_std > 0: # 표준편차가 0이 아닌 경우만 | |
| # 3σ 기준 이상치 탐지 | |
| threshold = 3 * residual_std | |
| for i, residual in enumerate(residuals): | |
| if not np.isnan(residual): | |
| if abs(residual - residual_mean) > threshold: | |
| outlier_flags[i] = True | |
| outlier_count = sum(outlier_flags) | |
| print(f"🚨 Harmonic 기반 이상치 탐지: {outlier_count}개 (3σ={threshold:.1f}cm 기준)") | |
| else: | |
| print("📊 Residual 표준편차가 0이므로 물리적 한계만 적용") | |
| else: | |
| print("📊 유효한 residual이 부족하여 물리적 한계만 적용") | |
| return pd.Series(outlier_flags, index=df.index) | |
| except Exception as e: | |
| print(f"❌ Harmonic 기반 이상치 탐지 실패: {e}") | |
| traceback.print_exc() | |
| # 폴백: 물리적 한계로 탐지 | |
| physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000) | |
| return physical_outliers | |
| def detect_weather_outliers(df): | |
| """기상 데이터 물리적 한계 기반 이상치 탐지""" | |
| print("🌡️ 기상 데이터 물리적 한계 기반 이상치 탐지 시작...") | |
| # 물리적 한계 정의 | |
| PHYSICAL_LIMITS = { | |
| 'air_pres': (850, 1100), # hPa (극한 기상 포함) | |
| 'wind_speed': (0, 80), # m/s (한국 최대풍속 고려) | |
| 'air_temp': (-35, 45), # °C (한국 기록 극값) | |
| 'wind_dir': (0, 360) # degree | |
| } | |
| outliers = pd.DataFrame(False, index=df.index, columns=df.columns) | |
| for col, (min_val, max_val) in PHYSICAL_LIMITS.items(): | |
| if col in df.columns: | |
| col_outliers = (df[col] < min_val) | (df[col] > max_val) | |
| outlier_count = col_outliers.sum() | |
| if outlier_count > 0: | |
| print(f"🌡️ {col} 물리적 한계 이상치: {outlier_count}개 (범위: {min_val}~{max_val})") | |
| outliers[col] = col_outliers | |
| return outliers | |
| def validate_input_data(df): | |
| """입력 데이터 유효성 검증""" | |
| print("🔍 입력 데이터 검증 중...") | |
| issues = [] | |
| # 필수 컬럼 체크 | |
| required_columns = ['date', 'air_pres', 'wind_dir', 'wind_speed', 'air_temp', 'tide_level'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| issues.append(f"필수 컬럼 누락: {missing_columns}") | |
| # 결측치 체크 (경고만 표시, 진행은 허용) | |
| warnings = [] | |
| missing_counts = df.isnull().sum() | |
| for col, count in missing_counts.items(): | |
| if count > 0: | |
| missing_rate = count / len(df) * 100 | |
| warnings.append(f"결측치 발견: {col} 컬럼에 {count}개 ({missing_rate:.1f}%)") | |
| # tide_level 범위 체크 (일반적인 조위 범위) | |
| if 'tide_level' in df.columns: | |
| valid_tide_data = df['tide_level'].dropna() | |
| if len(valid_tide_data) > 0: | |
| min_tide = valid_tide_data.min() | |
| max_tide = valid_tide_data.max() | |
| if min_tide < -200 or max_tide > 1500: | |
| warnings.append(f"조위 범위 이상: {min_tide:.1f}~{max_tide:.1f}cm (일반 범위: -200~1500cm)") | |
| # 심각한 이슈가 있으면 실패 | |
| critical_issues = [] | |
| # 데이터가 너무 적으면 중단 | |
| if len(df) < 72: | |
| critical_issues.append(f"데이터 부족: 최소 72개 필요, 현재 {len(df)}개") | |
| # 필수 컬럼 전체가 결측인 경우 중단 | |
| for col in ['date', 'tide_level']: | |
| if col in missing_counts and missing_counts[col] == len(df): | |
| critical_issues.append(f"필수 컬럼 {col}이 완전히 비어있습니다") | |
| if critical_issues: | |
| print(f"❌ 심각한 문제: {len(critical_issues)}개") | |
| for issue in critical_issues: | |
| print(f" - {issue}") | |
| return False, critical_issues | |
| if warnings: | |
| print(f"⚠️ 경고사항: {len(warnings)}개") | |
| for warning in warnings: | |
| print(f" - {warning}") | |
| print("⏩ 경고가 있지만 처리를 계속합니다") | |
| print("✅ 데이터 검증 통과") | |
| return True, warnings | |
| def adaptive_interpolation(df, col, missing_groups, station_id=None): | |
| """적응적 보간: 결측치 패턴에 따라 다른 보간 방법 적용""" | |
| for start_idx, end_idx in missing_groups: | |
| gap_size = end_idx - start_idx + 1 | |
| print(f"🔧 {col} 컬럼: {gap_size}개 연속 결측치 처리 중...") | |
| if gap_size <= 3: # 1-3개 (5-15분) | |
| # 선형보간 | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ | |
| df[col].iloc[start_idx:end_idx+1].interpolate(method='linear') | |
| print(f" → 선형보간 적용") | |
| elif gap_size <= 12: # 4-12개 (20분-1시간) | |
| if col == 'tide_level' and station_id: | |
| # 조화 예측 기반 보간 | |
| filled_values = harmonic_based_imputation(df, start_idx, end_idx, station_id) | |
| if filled_values is not None: | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values | |
| print(f" → 조화 예측 기반 보간 적용") | |
| else: | |
| # 폴백: 스플라인 보간 | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ | |
| df[col].iloc[start_idx:end_idx+1].interpolate(method='spline', order=2) | |
| print(f" → 스플라인 보간 적용 (조화 데이터 없음)") | |
| else: | |
| # 기상 데이터: 스플라인 보간 | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ | |
| df[col].iloc[start_idx:end_idx+1].interpolate(method='spline', order=2) | |
| print(f" → 스플라인 보간 적용") | |
| elif gap_size <= 36: # 13-36개 (1-3시간) | |
| # 주기 패턴 기반 (12시간 전 데이터) | |
| filled_values = seasonal_pattern_imputation(df, col, start_idx, end_idx) | |
| if filled_values is not None: | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values | |
| print(f" → 주기 패턴 기반 보간 적용") | |
| else: | |
| # 폴백: 이동평균 보간 | |
| window_size = min(12, (len(df) - end_idx - 1), start_idx) | |
| if window_size > 0: | |
| before_avg = df[col].iloc[start_idx-window_size:start_idx].mean() | |
| after_avg = df[col].iloc[end_idx+1:end_idx+1+window_size].mean() | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ | |
| np.linspace(before_avg, after_avg, gap_size) | |
| print(f" → 이동평균 기반 보간 적용") | |
| else: # 37개 이상 (3시간+) - 극한 상황 | |
| print(f" ⚠️ 극한 결측치 상황 ({gap_size}개)") | |
| if col == 'tide_level' and station_id: | |
| # tide_level: 조화 예측만으로 복원 시도 | |
| filled_values = extreme_harmonic_imputation(df, start_idx, end_idx, station_id) | |
| if filled_values is not None: | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values | |
| print(f" → 조화 예측 기반 극한 보간 적용") | |
| else: | |
| # 조화 예측 실패시 폴백 | |
| fill_value = get_fallback_value(df, col, start_idx, end_idx) | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = fill_value | |
| print(f" → 폴백 상수값 채움 (값: {fill_value:.2f})") | |
| else: | |
| # 기상 데이터: 기후 평년값 또는 상수값 | |
| fill_value = get_climate_normal_value(col) or get_fallback_value(df, col, start_idx, end_idx) | |
| df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = fill_value | |
| print(f" → 기후 평년값/상수값 채움 (값: {fill_value:.2f})") | |
| def harmonic_based_imputation(df, start_idx, end_idx, station_id): | |
| """조화 예측 기반 tide_level 보간""" | |
| try: | |
| # 결측 구간의 시간 정보 | |
| missing_times = df['date'].iloc[start_idx:end_idx+1] | |
| start_time = missing_times.iloc[0] | |
| end_time = missing_times.iloc[-1] | |
| # 조화 예측 데이터 조회 | |
| harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) | |
| if not harmonic_data: | |
| return None | |
| # 주변 residual 평균 계산 | |
| before_residual = 0 | |
| after_residual = 0 | |
| if start_idx > 5: # 이전 5개 평균 | |
| before_tide = df['tide_level'].iloc[start_idx-5:start_idx].mean() | |
| # 조화값이 있다면 residual 계산 가능 | |
| if end_idx < len(df) - 6: # 이후 5개 평균 | |
| after_tide = df['tide_level'].iloc[end_idx+1:end_idx+6].mean() | |
| # 조화값 + 추정 residual로 tide_level 복원 | |
| estimated_residual = 0 # 단순화: 주변 패턴 분석하여 residual 추정 | |
| harmonic_dict = {} | |
| for h_data in harmonic_data: | |
| h_time = pd.to_datetime(h_data['predicted_at']).tz_convert('Asia/Seoul') | |
| h_time = h_time.replace(minute=(h_time.minute // 5) * 5, second=0, microsecond=0) | |
| harmonic_dict[h_time] = float(h_data['harmonic_level']) | |
| # 각 시점별로 조화값 + 추정 residual | |
| filled_values = [] | |
| for time in missing_times: | |
| time_normalized = time.replace(minute=(time.minute // 5) * 5, second=0, microsecond=0) | |
| if time_normalized in harmonic_dict: | |
| estimated_tide = harmonic_dict[time_normalized] + estimated_residual | |
| filled_values.append(estimated_tide) | |
| else: | |
| return None # 조화 데이터 부족 | |
| return filled_values | |
| except Exception as e: | |
| print(f" ❌ 조화 기반 보간 실패: {e}") | |
| return None | |
| def seasonal_pattern_imputation(df, col, start_idx, end_idx): | |
| """주기 패턴 기반 보간 (12시간/24시간 전 데이터 활용)""" | |
| try: | |
| gap_size = end_idx - start_idx + 1 | |
| # 12시간 전 (144개 전) 데이터 확인 | |
| cycle_12h = 144 # 5분 * 144 = 12시간 | |
| if start_idx >= cycle_12h: | |
| pattern_12h = df[col].iloc[start_idx-cycle_12h:start_idx-cycle_12h+gap_size] | |
| if not pattern_12h.isna().any(): | |
| print(f" → 12시간 전 패턴 사용") | |
| return pattern_12h.values | |
| # 24시간 전 (288개 전) 데이터 확인 | |
| cycle_24h = 288 | |
| if start_idx >= cycle_24h: | |
| pattern_24h = df[col].iloc[start_idx-cycle_24h:start_idx-cycle_24h+gap_size] | |
| if not pattern_24h.isna().any(): | |
| print(f" → 24시간 전 패턴 사용") | |
| return pattern_24h.values | |
| return None | |
| except Exception as e: | |
| print(f" ❌ 주기 패턴 보간 실패: {e}") | |
| return None | |
| def extreme_harmonic_imputation(df, start_idx, end_idx, station_id): | |
| """극한 상황에서 조화 예측만으로 tide_level 복원""" | |
| try: | |
| # 결측 구간의 시간 정보 | |
| missing_times = df['date'].iloc[start_idx:end_idx+1] | |
| start_time = missing_times.iloc[0] | |
| end_time = missing_times.iloc[-1] | |
| print(f" 🔍 조화 예측 조회: {start_time} ~ {end_time}") | |
| # 조화 예측 데이터 조회 | |
| harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) | |
| if not harmonic_data: | |
| print(f" ❌ 조화 예측 데이터 없음") | |
| return None | |
| # 조화 예측 딕셔너리 생성 | |
| harmonic_dict = {} | |
| kst = pytz.timezone('Asia/Seoul') | |
| for h_data in harmonic_data: | |
| try: | |
| h_time = pd.to_datetime(h_data['predicted_at']) | |
| if h_time.tzinfo is None: | |
| h_time = pytz.UTC.localize(h_time) | |
| h_time = h_time.astimezone(kst) | |
| # 5분 단위로 정규화 | |
| h_time = h_time.replace(minute=(h_time.minute // 5) * 5, second=0, microsecond=0) | |
| harmonic_dict[h_time] = float(h_data['harmonic_level']) | |
| except Exception as e: | |
| continue | |
| print(f" 📊 사용 가능한 조화 예측: {len(harmonic_dict)}개") | |
| # 주변 잔차(residual) 추정 - 극한 상황이므로 보수적으로 0 사용 | |
| # 실제로는 주변 정상 데이터에서 평균 residual을 계산할 수 있음 | |
| estimated_residual = 0 | |
| # 주변 잔차 패턴이 있으면 활용 | |
| if start_idx > 10: | |
| # 이전 정상 데이터에서 tide_level - harmonic_level 패턴 분석 | |
| recent_data = df.iloc[max(0, start_idx-20):start_idx] | |
| if len(recent_data) > 5: | |
| # 간단한 추정: 최근 추세 반영 | |
| recent_tide_mean = recent_data['tide_level'].mean() | |
| # 조화값이 있다면 residual 추정 가능하지만 복잡하므로 단순화 | |
| estimated_residual = 0 # 보수적 접근 | |
| # 각 시점별로 조화값 + 추정 잔차로 tide_level 복원 | |
| filled_values = [] | |
| missing_count = 0 | |
| for time in missing_times: | |
| time_normalized = time.replace(minute=(time.minute // 5) * 5, second=0, microsecond=0) | |
| if time_normalized in harmonic_dict: | |
| # tide_level = harmonic_level + residual | |
| estimated_tide = harmonic_dict[time_normalized] + estimated_residual | |
| filled_values.append(estimated_tide) | |
| else: | |
| missing_count += 1 | |
| # 조화 데이터가 없으면 보간 실패 | |
| filled_values.append(None) | |
| # 모든 시점에 조화 데이터가 있는지 확인 | |
| if missing_count == 0: | |
| print(f" ✅ 조화 예측으로 {len(filled_values)}개 포인트 복원") | |
| return filled_values | |
| else: | |
| print(f" ❌ {missing_count}개 시점에 조화 데이터 부족") | |
| return None | |
| except Exception as e: | |
| print(f" ❌ 극한 조화 보간 실패: {e}") | |
| return None | |
| def get_climate_normal_value(col): | |
| """기상 요소별 기후 평년값 반환""" | |
| climate_normals = { | |
| 'air_pres': 1013.25, # 표준 대기압 (hPa) | |
| 'air_temp': 15.0, # 한국 연평균 기온 (°C) | |
| 'wind_speed': 2.5, # 평균 풍속 (m/s) | |
| 'wind_dir': 225.0 # 남서풍 (도) | |
| } | |
| return climate_normals.get(col) | |
| def get_fallback_value(df, col, start_idx, end_idx): | |
| """상수값 결정 (기존 로직)""" | |
| if start_idx > 0 and end_idx < len(df) - 1: | |
| return (df[col].iloc[start_idx-1] + df[col].iloc[end_idx+1]) / 2 | |
| elif start_idx > 0: | |
| return df[col].iloc[start_idx-1] | |
| elif end_idx < len(df) - 1: | |
| return df[col].iloc[end_idx+1] | |
| else: | |
| return df[col].mean() | |
| def find_missing_groups(series): | |
| """연속된 결측치 구간들을 찾아 반환""" | |
| missing_mask = series.isna() | |
| groups = [] | |
| start = None | |
| for i, is_missing in enumerate(missing_mask): | |
| if is_missing and start is None: | |
| start = i | |
| elif not is_missing and start is not None: | |
| groups.append((start, i-1)) | |
| start = None | |
| # 마지막까지 결측치인 경우 | |
| if start is not None: | |
| groups.append((start, len(series)-1)) | |
| return groups | |
| def handle_missing_values(df, station_id=None): | |
| """적응적 결측치 처리""" | |
| print("🧠 적응적 결측치 처리 시작...") | |
| df = df.copy() | |
| # 각 컬럼별로 적응적 처리 | |
| numeric_columns = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_columns: | |
| if df[col].isna().any(): | |
| before_na = df[col].isna().sum() | |
| print(f"📊 {col}: {before_na}개 결측치 발견") | |
| # 연속 결측치 구간 찾기 | |
| missing_groups = find_missing_groups(df[col]) | |
| print(f" → {len(missing_groups)}개 결측 구간") | |
| # 적응적 보간 적용 | |
| adaptive_interpolation(df, col, missing_groups, station_id) | |
| after_na = df[col].isna().sum() | |
| filled_count = before_na - after_na | |
| print(f"✅ {col}: {filled_count}개 결측치 보간 완료") | |
| if after_na > 0: | |
| print(f"⚠️ {col}에 {after_na}개 결측치가 여전히 남아있습니다") | |
| print(f"✅ 적응적 결측치 처리 완료: {len(df)}행 유지") | |
| return df | |
| def preprocess_uploaded_file(file_path, station_id): | |
| """ | |
| 업로드된 파일의 전체 전처리 파이프라인 | |
| 슬라이싱 → 이상치 탐지 → 결측치 처리 → tide_level → residual 변환 + 검증 | |
| """ | |
| try: | |
| print(f"\n🚀 {station_id} 관측소 데이터 전처리 시작") | |
| print(f"📁 파일: {file_path}") | |
| # 1. 파일 읽기 | |
| df = pd.read_csv(file_path) | |
| print(f"📊 원본 데이터: {len(df)}행 × {len(df.columns)}열") | |
| # 2. 입력 데이터 검증 | |
| is_valid, issues = validate_input_data(df) | |
| if not is_valid: | |
| return None, f"입력 데이터 오류:\n" + "\n".join(issues) | |
| # 3. 마지막 144개로 먼저 슬라이싱 (모델 입력 크기) | |
| print(f"✂️ 마지막 144개 데이터로 슬라이싱 (모델 입력 크기)") | |
| df_sliced = df.tail(144).copy() | |
| print(f"📊 슬라이싱 후 데이터: {len(df_sliced)}행 × {len(df_sliced.columns)}열") | |
| # 4. 이상치 탐지 및 처리 (144개만 대상) | |
| print("\n🔍 이상치 탐지 및 처리 단계 (144개 데이터 기준)") | |
| # 4-1. Harmonic 기반 tide_level 이상치 탐지 | |
| tide_outliers = detect_harmonic_based_outliers(df_sliced, station_id) | |
| if tide_outliers.any(): | |
| print(f"🌊 tide_level 이상치 {tide_outliers.sum()}개 → residual=0 처리 예정") | |
| df_sliced.loc[tide_outliers, '_tide_outlier_flag'] = True | |
| # 4-2. 기상 데이터 물리적 한계 기반 이상치 탐지 | |
| weather_outliers = detect_weather_outliers(df_sliced) | |
| for col in weather_outliers.columns: | |
| if weather_outliers[col].any(): | |
| print(f"🌡️ {col} 이상치 {weather_outliers[col].sum()}개 → NaN 변환") | |
| df_sliced.loc[weather_outliers[col], col] = np.nan | |
| # 5. 결측치 처리 | |
| df_cleaned = handle_missing_values(df_sliced, station_id) | |
| # 6. tide_level → residual 변환 (이상치 플래그 반영) | |
| converted_df = convert_tide_level_to_residual(df_cleaned, station_id) | |
| # 5. 변환된 데이터를 임시 파일로 저장 | |
| output_path = file_path.replace('.csv', '_processed.csv') | |
| converted_df.to_csv(output_path, index=False) | |
| print(f"💾 전처리 완료: {output_path}") | |
| # 6. 처리 결과 요약 | |
| summary = { | |
| 'original_rows': len(df), | |
| 'processed_rows': len(converted_df), | |
| 'has_residual': 'residual' in converted_df.columns, | |
| 'residual_mean': converted_df['residual'].mean() if 'residual' in converted_df.columns else None, | |
| 'residual_std': converted_df['residual'].std() if 'residual' in converted_df.columns else None, | |
| 'output_file': output_path | |
| } | |
| return converted_df, summary | |
| except Exception as e: | |
| error_msg = f"전처리 오류: {str(e)}\n{traceback.format_exc()}" | |
| print(f"❌ {error_msg}") | |
| return None, error_msg |