# preprocessing.py - 전처리 및 데이터 변환 모듈 import pandas as pd import numpy as np from datetime import datetime import pytz from dateutil import parser as date_parser from supabase_utils import get_supabase_client, get_harmonic_predictions import traceback def convert_tide_level_to_residual(df, station_id): """ tide_level 데이터를 residual로 변환 residual = tide_level - harmonic_level """ print(f"🔄 {station_id} 관측소의 tide_level → residual 변환 시작") # 1. 입력 데이터 검증 if 'tide_level' not in df.columns: raise ValueError("tide_level 컬럼이 없습니다.") if 'date' not in df.columns: raise ValueError("date 컬럼이 없습니다.") # 2. 시간 데이터 정리 df = df.copy() df['date'] = pd.to_datetime(df['date']) # 3. KST 시간대 설정 kst = pytz.timezone('Asia/Seoul') if df['date'].dt.tz is None: df['date'] = df['date'].dt.tz_localize(kst) else: df['date'] = df['date'].dt.tz_convert(kst) # 4. 입력 데이터는 이미 144개로 슬라이싱된 상태 df_input = df.copy() start_time = df_input['date'].min() end_time = df_input['date'].max() print(f"📅 입력 데이터 시간 범위: {start_time} ~ {end_time}") # 5. Supabase에서 harmonic_level 조회 try: harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) print(f"📊 조화 예측 데이터 {len(harmonic_data) if harmonic_data else 0}개 조회") if not harmonic_data: print("⚠️ 조화 예측 데이터가 없습니다. 가상 데이터로 대체합니다.") return create_mock_residual_data(df_input) except Exception as e: print(f"❌ Supabase 조회 오류: {e}") print("⚠️ 가상 데이터로 대체합니다.") return create_mock_residual_data(df_input) # 6. harmonic_data를 딕셔너리로 변환 (시간 기준) harmonic_dict = {} for h_data in harmonic_data: try: h_time_str = h_data['predicted_at'] h_time = parse_time_string(h_time_str) if h_time is None: continue # KST로 변환 if h_time.tzinfo is None: h_time = pytz.UTC.localize(h_time) h_time = h_time.astimezone(kst) # 5분 단위로 정규화 minutes = (h_time.minute // 5) * 5 h_time = h_time.replace(minute=minutes, second=0, microsecond=0) harmonic_dict[h_time] = float(h_data['harmonic_level']) except Exception as e: print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}") continue print(f"📊 사용 가능한 조화 데이터: {len(harmonic_dict)}개") # 7. residual 계산 residual_values = [] successful_conversions = 0 for idx, row in df_input.iterrows(): tide_level = row['tide_level'] timestamp = row['date'] # 5분 단위로 정규화 minutes = (timestamp.minute // 5) * 5 normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0) # 정확히 매칭되는 harmonic_level 찾기 harmonic_level = harmonic_dict.get(normalized_time) # 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내) if harmonic_level is None: harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5) # 이상치 플래그 확인 is_outlier = False if '_tide_outlier_flag' in df_input.columns: is_outlier = df_input.at[idx, '_tide_outlier_flag'] if not pd.isna(df_input.at[idx, '_tide_outlier_flag']) else False if is_outlier: # 이상치로 탐지된 경우 residual = 0 (harmonic만 사용) residual_values.append(0.0) successful_conversions += 1 print(f" 🚨 이상치 탐지된 시점 {timestamp}: residual=0 적용") elif harmonic_level is not None: residual = tide_level - harmonic_level residual_values.append(residual) successful_conversions += 1 else: # 조화 데이터가 없으면 평균값으로 대체 residual_values.append(0.0) # 8. residual 컬럼 추가 df_input['residual'] = residual_values # 9. 불필요한 컬럼 제거 (모델에서 사용하지 않음) columns_to_remove = ['tide_level', '_tide_outlier_flag'] for col in columns_to_remove: if col in df_input.columns: df_input = df_input.drop(columns=[col]) print(f"🗑️ {col} 컬럼 제거") print(f"📊 최종 컬럼: {list(df_input.columns)}") print(f"📊 최종 shape: {df_input.shape}") conversion_rate = successful_conversions / len(df_input) * 100 print(f"✅ 변환 완료: {successful_conversions}/{len(df_input)} ({conversion_rate:.1f}%)") return df_input def parse_time_string(time_str): """다양한 형태의 시간 문자열 파싱""" try: if 'T' in time_str: if time_str.endswith('Z'): return datetime.fromisoformat(time_str[:-1] + '+00:00') elif '+' in time_str or time_str.count('-') > 2: return datetime.fromisoformat(time_str) else: return datetime.fromisoformat(time_str + '+00:00') else: return date_parser.parse(time_str) except Exception as e: print(f"시간 파싱 실패: {time_str}, {e}") return None def find_closest_harmonic(target_time, harmonic_dict, max_diff_minutes=5): """가장 가까운 시간의 harmonic_level 찾기""" min_diff = float('inf') closest_value = None for h_time, h_value in harmonic_dict.items(): diff_seconds = abs((h_time - target_time).total_seconds()) if diff_seconds < min_diff and diff_seconds <= max_diff_minutes * 60: min_diff = diff_seconds closest_value = h_value return closest_value def create_mock_residual_data(df): """Supabase 연결 실패 시 가상 residual 데이터 생성""" print("🎭 가상 residual 데이터 생성 중...") df = df.copy() # 간단한 사인파 기반 가상 residual 생성 timestamps = df['date'] time_hours = [(t - timestamps.iloc[0]).total_seconds() / 3600 for t in timestamps] # 조위의 일반적인 패턴을 모방한 가상 residual # 주기적 성분 + 노이즈 + 트렌드 residual_values = [] for i, hour in enumerate(time_hours): # 반일주조 (12.42시간 주기) semi_diurnal = 15 * np.sin(2 * np.pi * hour / 12.42) # 일주조 (24.84시간 주기) diurnal = 8 * np.sin(2 * np.pi * hour / 24.84) # 노이즈 noise = np.random.normal(0, 3) # 작은 트렌드 trend = 0.1 * hour residual = semi_diurnal + diurnal + noise + trend residual_values.append(residual) df['residual'] = residual_values # 불필요한 컬럼 제거 (모델에서 사용하지 않음) columns_to_remove = ['tide_level', '_tide_outlier_flag'] for col in columns_to_remove: if col in df.columns: df = df.drop(columns=[col]) print(f"🗑️ {col} 컬럼 제거") print(f"📊 최종 컬럼: {list(df.columns)}") print(f"📊 최종 shape: {df.shape}") print(f"✅ 가상 residual 데이터 {len(residual_values)}개 생성") return df def detect_harmonic_based_outliers(df, station_id): """조화 예측 기반 tide_level 이상치 탐지""" print("🌊 Harmonic 기반 tide_level 이상치 탐지 시작...") if 'tide_level' not in df.columns: print("⚠️ tide_level 컬럼이 없어서 이상치 탐지를 건너뜁니다.") return pd.Series(False, index=df.index) try: # 1. 해당 시간대 harmonic_level 조회 df_copy = df.copy() df_copy['date'] = pd.to_datetime(df_copy['date']) # KST 시간대 설정 kst = pytz.timezone('Asia/Seoul') if df_copy['date'].dt.tz is None: df_copy['date'] = df_copy['date'].dt.tz_localize(kst) else: df_copy['date'] = df_copy['date'].dt.tz_convert(kst) start_time = df_copy['date'].min() end_time = df_copy['date'].max() print(f"📅 이상치 탐지 시간 범위: {start_time} ~ {end_time}") # 2. Harmonic 데이터 조회 harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) if not harmonic_data: print("❌ 조화 예측 데이터가 없어서 물리적 한계로 대체합니다.") # 물리적 한계로 폴백 physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000) return physical_outliers print(f"📊 조화 예측 데이터: {len(harmonic_data)}개 조회") # 3. Harmonic 딕셔너리 생성 harmonic_dict = {} for h_data in harmonic_data: try: h_time_str = h_data['predicted_at'] h_time = parse_time_string(h_time_str) if h_time is None: continue # KST로 변환 if h_time.tzinfo is None: h_time = pytz.UTC.localize(h_time) h_time = h_time.astimezone(kst) # 5분 단위로 정규화 minutes = (h_time.minute // 5) * 5 h_time = h_time.replace(minute=minutes, second=0, microsecond=0) harmonic_dict[h_time] = float(h_data['harmonic_level']) except Exception as e: print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}") continue print(f"📊 사용 가능한 조화 예측: {len(harmonic_dict)}개") # 4. Residual 계산 residuals = [] outlier_flags = [] for idx, row in df_copy.iterrows(): tide_level = row['tide_level'] timestamp = row['date'] # 5분 단위로 정규화 minutes = (timestamp.minute // 5) * 5 normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0) # 정확히 매칭되는 harmonic_level 찾기 harmonic_level = harmonic_dict.get(normalized_time) # 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내) if harmonic_level is None: harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5) if harmonic_level is not None: residual = tide_level - harmonic_level residuals.append(residual) outlier_flags.append(False) # 일단 정상으로 표시 else: # Harmonic 데이터가 없으면 물리적 한계로 판정 outlier_flags.append(tide_level < -300 or tide_level > 2000) residuals.append(0.0) # 5. Residual 기반 3σ 이상치 탐지 if len(residuals) > 0: residuals_array = np.array(residuals) # NaN이 아닌 residual만으로 통계 계산 valid_residuals = residuals_array[~np.isnan(residuals_array)] if len(valid_residuals) > 3: # 최소 3개 이상 필요 residual_mean = np.mean(valid_residuals) residual_std = np.std(valid_residuals) print(f"📈 Residual 통계: 평균={residual_mean:.1f}cm, 표준편차={residual_std:.1f}cm") if residual_std > 0: # 표준편차가 0이 아닌 경우만 # 3σ 기준 이상치 탐지 threshold = 3 * residual_std for i, residual in enumerate(residuals): if not np.isnan(residual): if abs(residual - residual_mean) > threshold: outlier_flags[i] = True outlier_count = sum(outlier_flags) print(f"🚨 Harmonic 기반 이상치 탐지: {outlier_count}개 (3σ={threshold:.1f}cm 기준)") else: print("📊 Residual 표준편차가 0이므로 물리적 한계만 적용") else: print("📊 유효한 residual이 부족하여 물리적 한계만 적용") return pd.Series(outlier_flags, index=df.index) except Exception as e: print(f"❌ Harmonic 기반 이상치 탐지 실패: {e}") traceback.print_exc() # 폴백: 물리적 한계로 탐지 physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000) return physical_outliers def detect_weather_outliers(df): """기상 데이터 물리적 한계 기반 이상치 탐지""" print("🌡️ 기상 데이터 물리적 한계 기반 이상치 탐지 시작...") # 물리적 한계 정의 PHYSICAL_LIMITS = { 'air_pres': (850, 1100), # hPa (극한 기상 포함) 'wind_speed': (0, 80), # m/s (한국 최대풍속 고려) 'air_temp': (-35, 45), # °C (한국 기록 극값) 'wind_dir': (0, 360) # degree } outliers = pd.DataFrame(False, index=df.index, columns=df.columns) for col, (min_val, max_val) in PHYSICAL_LIMITS.items(): if col in df.columns: col_outliers = (df[col] < min_val) | (df[col] > max_val) outlier_count = col_outliers.sum() if outlier_count > 0: print(f"🌡️ {col} 물리적 한계 이상치: {outlier_count}개 (범위: {min_val}~{max_val})") outliers[col] = col_outliers return outliers def validate_input_data(df): """입력 데이터 유효성 검증""" print("🔍 입력 데이터 검증 중...") issues = [] # 필수 컬럼 체크 required_columns = ['date', 'air_pres', 'wind_dir', 'wind_speed', 'air_temp', 'tide_level'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: issues.append(f"필수 컬럼 누락: {missing_columns}") # 결측치 체크 (경고만 표시, 진행은 허용) warnings = [] missing_counts = df.isnull().sum() for col, count in missing_counts.items(): if count > 0: missing_rate = count / len(df) * 100 warnings.append(f"결측치 발견: {col} 컬럼에 {count}개 ({missing_rate:.1f}%)") # tide_level 범위 체크 (일반적인 조위 범위) if 'tide_level' in df.columns: valid_tide_data = df['tide_level'].dropna() if len(valid_tide_data) > 0: min_tide = valid_tide_data.min() max_tide = valid_tide_data.max() if min_tide < -200 or max_tide > 1500: warnings.append(f"조위 범위 이상: {min_tide:.1f}~{max_tide:.1f}cm (일반 범위: -200~1500cm)") # 심각한 이슈가 있으면 실패 critical_issues = [] # 데이터가 너무 적으면 중단 if len(df) < 72: critical_issues.append(f"데이터 부족: 최소 72개 필요, 현재 {len(df)}개") # 필수 컬럼 전체가 결측인 경우 중단 for col in ['date', 'tide_level']: if col in missing_counts and missing_counts[col] == len(df): critical_issues.append(f"필수 컬럼 {col}이 완전히 비어있습니다") if critical_issues: print(f"❌ 심각한 문제: {len(critical_issues)}개") for issue in critical_issues: print(f" - {issue}") return False, critical_issues if warnings: print(f"⚠️ 경고사항: {len(warnings)}개") for warning in warnings: print(f" - {warning}") print("⏩ 경고가 있지만 처리를 계속합니다") print("✅ 데이터 검증 통과") return True, warnings def adaptive_interpolation(df, col, missing_groups, station_id=None): """적응적 보간: 결측치 패턴에 따라 다른 보간 방법 적용""" for start_idx, end_idx in missing_groups: gap_size = end_idx - start_idx + 1 print(f"🔧 {col} 컬럼: {gap_size}개 연속 결측치 처리 중...") if gap_size <= 3: # 1-3개 (5-15분) # 선형보간 df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ df[col].iloc[start_idx:end_idx+1].interpolate(method='linear') print(f" → 선형보간 적용") elif gap_size <= 12: # 4-12개 (20분-1시간) if col == 'tide_level' and station_id: # 조화 예측 기반 보간 filled_values = harmonic_based_imputation(df, start_idx, end_idx, station_id) if filled_values is not None: df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values print(f" → 조화 예측 기반 보간 적용") else: # 폴백: 스플라인 보간 df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ df[col].iloc[start_idx:end_idx+1].interpolate(method='spline', order=2) print(f" → 스플라인 보간 적용 (조화 데이터 없음)") else: # 기상 데이터: 스플라인 보간 df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ df[col].iloc[start_idx:end_idx+1].interpolate(method='spline', order=2) print(f" → 스플라인 보간 적용") elif gap_size <= 36: # 13-36개 (1-3시간) # 주기 패턴 기반 (12시간 전 데이터) filled_values = seasonal_pattern_imputation(df, col, start_idx, end_idx) if filled_values is not None: df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values print(f" → 주기 패턴 기반 보간 적용") else: # 폴백: 이동평균 보간 window_size = min(12, (len(df) - end_idx - 1), start_idx) if window_size > 0: before_avg = df[col].iloc[start_idx-window_size:start_idx].mean() after_avg = df[col].iloc[end_idx+1:end_idx+1+window_size].mean() df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \ np.linspace(before_avg, after_avg, gap_size) print(f" → 이동평균 기반 보간 적용") else: # 37개 이상 (3시간+) - 극한 상황 print(f" ⚠️ 극한 결측치 상황 ({gap_size}개)") if col == 'tide_level' and station_id: # tide_level: 조화 예측만으로 복원 시도 filled_values = extreme_harmonic_imputation(df, start_idx, end_idx, station_id) if filled_values is not None: df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values print(f" → 조화 예측 기반 극한 보간 적용") else: # 조화 예측 실패시 폴백 fill_value = get_fallback_value(df, col, start_idx, end_idx) df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = fill_value print(f" → 폴백 상수값 채움 (값: {fill_value:.2f})") else: # 기상 데이터: 기후 평년값 또는 상수값 fill_value = get_climate_normal_value(col) or get_fallback_value(df, col, start_idx, end_idx) df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = fill_value print(f" → 기후 평년값/상수값 채움 (값: {fill_value:.2f})") def harmonic_based_imputation(df, start_idx, end_idx, station_id): """조화 예측 기반 tide_level 보간""" try: # 결측 구간의 시간 정보 missing_times = df['date'].iloc[start_idx:end_idx+1] start_time = missing_times.iloc[0] end_time = missing_times.iloc[-1] # 조화 예측 데이터 조회 harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) if not harmonic_data: return None # 주변 residual 평균 계산 before_residual = 0 after_residual = 0 if start_idx > 5: # 이전 5개 평균 before_tide = df['tide_level'].iloc[start_idx-5:start_idx].mean() # 조화값이 있다면 residual 계산 가능 if end_idx < len(df) - 6: # 이후 5개 평균 after_tide = df['tide_level'].iloc[end_idx+1:end_idx+6].mean() # 조화값 + 추정 residual로 tide_level 복원 estimated_residual = 0 # 단순화: 주변 패턴 분석하여 residual 추정 harmonic_dict = {} for h_data in harmonic_data: h_time = pd.to_datetime(h_data['predicted_at']).tz_convert('Asia/Seoul') h_time = h_time.replace(minute=(h_time.minute // 5) * 5, second=0, microsecond=0) harmonic_dict[h_time] = float(h_data['harmonic_level']) # 각 시점별로 조화값 + 추정 residual filled_values = [] for time in missing_times: time_normalized = time.replace(minute=(time.minute // 5) * 5, second=0, microsecond=0) if time_normalized in harmonic_dict: estimated_tide = harmonic_dict[time_normalized] + estimated_residual filled_values.append(estimated_tide) else: return None # 조화 데이터 부족 return filled_values except Exception as e: print(f" ❌ 조화 기반 보간 실패: {e}") return None def seasonal_pattern_imputation(df, col, start_idx, end_idx): """주기 패턴 기반 보간 (12시간/24시간 전 데이터 활용)""" try: gap_size = end_idx - start_idx + 1 # 12시간 전 (144개 전) 데이터 확인 cycle_12h = 144 # 5분 * 144 = 12시간 if start_idx >= cycle_12h: pattern_12h = df[col].iloc[start_idx-cycle_12h:start_idx-cycle_12h+gap_size] if not pattern_12h.isna().any(): print(f" → 12시간 전 패턴 사용") return pattern_12h.values # 24시간 전 (288개 전) 데이터 확인 cycle_24h = 288 if start_idx >= cycle_24h: pattern_24h = df[col].iloc[start_idx-cycle_24h:start_idx-cycle_24h+gap_size] if not pattern_24h.isna().any(): print(f" → 24시간 전 패턴 사용") return pattern_24h.values return None except Exception as e: print(f" ❌ 주기 패턴 보간 실패: {e}") return None def extreme_harmonic_imputation(df, start_idx, end_idx, station_id): """극한 상황에서 조화 예측만으로 tide_level 복원""" try: # 결측 구간의 시간 정보 missing_times = df['date'].iloc[start_idx:end_idx+1] start_time = missing_times.iloc[0] end_time = missing_times.iloc[-1] print(f" 🔍 조화 예측 조회: {start_time} ~ {end_time}") # 조화 예측 데이터 조회 harmonic_data = get_harmonic_predictions(station_id, start_time, end_time) if not harmonic_data: print(f" ❌ 조화 예측 데이터 없음") return None # 조화 예측 딕셔너리 생성 harmonic_dict = {} kst = pytz.timezone('Asia/Seoul') for h_data in harmonic_data: try: h_time = pd.to_datetime(h_data['predicted_at']) if h_time.tzinfo is None: h_time = pytz.UTC.localize(h_time) h_time = h_time.astimezone(kst) # 5분 단위로 정규화 h_time = h_time.replace(minute=(h_time.minute // 5) * 5, second=0, microsecond=0) harmonic_dict[h_time] = float(h_data['harmonic_level']) except Exception as e: continue print(f" 📊 사용 가능한 조화 예측: {len(harmonic_dict)}개") # 주변 잔차(residual) 추정 - 극한 상황이므로 보수적으로 0 사용 # 실제로는 주변 정상 데이터에서 평균 residual을 계산할 수 있음 estimated_residual = 0 # 주변 잔차 패턴이 있으면 활용 if start_idx > 10: # 이전 정상 데이터에서 tide_level - harmonic_level 패턴 분석 recent_data = df.iloc[max(0, start_idx-20):start_idx] if len(recent_data) > 5: # 간단한 추정: 최근 추세 반영 recent_tide_mean = recent_data['tide_level'].mean() # 조화값이 있다면 residual 추정 가능하지만 복잡하므로 단순화 estimated_residual = 0 # 보수적 접근 # 각 시점별로 조화값 + 추정 잔차로 tide_level 복원 filled_values = [] missing_count = 0 for time in missing_times: time_normalized = time.replace(minute=(time.minute // 5) * 5, second=0, microsecond=0) if time_normalized in harmonic_dict: # tide_level = harmonic_level + residual estimated_tide = harmonic_dict[time_normalized] + estimated_residual filled_values.append(estimated_tide) else: missing_count += 1 # 조화 데이터가 없으면 보간 실패 filled_values.append(None) # 모든 시점에 조화 데이터가 있는지 확인 if missing_count == 0: print(f" ✅ 조화 예측으로 {len(filled_values)}개 포인트 복원") return filled_values else: print(f" ❌ {missing_count}개 시점에 조화 데이터 부족") return None except Exception as e: print(f" ❌ 극한 조화 보간 실패: {e}") return None def get_climate_normal_value(col): """기상 요소별 기후 평년값 반환""" climate_normals = { 'air_pres': 1013.25, # 표준 대기압 (hPa) 'air_temp': 15.0, # 한국 연평균 기온 (°C) 'wind_speed': 2.5, # 평균 풍속 (m/s) 'wind_dir': 225.0 # 남서풍 (도) } return climate_normals.get(col) def get_fallback_value(df, col, start_idx, end_idx): """상수값 결정 (기존 로직)""" if start_idx > 0 and end_idx < len(df) - 1: return (df[col].iloc[start_idx-1] + df[col].iloc[end_idx+1]) / 2 elif start_idx > 0: return df[col].iloc[start_idx-1] elif end_idx < len(df) - 1: return df[col].iloc[end_idx+1] else: return df[col].mean() def find_missing_groups(series): """연속된 결측치 구간들을 찾아 반환""" missing_mask = series.isna() groups = [] start = None for i, is_missing in enumerate(missing_mask): if is_missing and start is None: start = i elif not is_missing and start is not None: groups.append((start, i-1)) start = None # 마지막까지 결측치인 경우 if start is not None: groups.append((start, len(series)-1)) return groups def handle_missing_values(df, station_id=None): """적응적 결측치 처리""" print("🧠 적응적 결측치 처리 시작...") df = df.copy() # 각 컬럼별로 적응적 처리 numeric_columns = df.select_dtypes(include=[np.number]).columns for col in numeric_columns: if df[col].isna().any(): before_na = df[col].isna().sum() print(f"📊 {col}: {before_na}개 결측치 발견") # 연속 결측치 구간 찾기 missing_groups = find_missing_groups(df[col]) print(f" → {len(missing_groups)}개 결측 구간") # 적응적 보간 적용 adaptive_interpolation(df, col, missing_groups, station_id) after_na = df[col].isna().sum() filled_count = before_na - after_na print(f"✅ {col}: {filled_count}개 결측치 보간 완료") if after_na > 0: print(f"⚠️ {col}에 {after_na}개 결측치가 여전히 남아있습니다") print(f"✅ 적응적 결측치 처리 완료: {len(df)}행 유지") return df def preprocess_uploaded_file(file_path, station_id): """ 업로드된 파일의 전체 전처리 파이프라인 슬라이싱 → 이상치 탐지 → 결측치 처리 → tide_level → residual 변환 + 검증 """ try: print(f"\n🚀 {station_id} 관측소 데이터 전처리 시작") print(f"📁 파일: {file_path}") # 1. 파일 읽기 df = pd.read_csv(file_path) print(f"📊 원본 데이터: {len(df)}행 × {len(df.columns)}열") # 2. 입력 데이터 검증 is_valid, issues = validate_input_data(df) if not is_valid: return None, f"입력 데이터 오류:\n" + "\n".join(issues) # 3. 마지막 144개로 먼저 슬라이싱 (모델 입력 크기) print(f"✂️ 마지막 144개 데이터로 슬라이싱 (모델 입력 크기)") df_sliced = df.tail(144).copy() print(f"📊 슬라이싱 후 데이터: {len(df_sliced)}행 × {len(df_sliced.columns)}열") # 4. 이상치 탐지 및 처리 (144개만 대상) print("\n🔍 이상치 탐지 및 처리 단계 (144개 데이터 기준)") # 4-1. Harmonic 기반 tide_level 이상치 탐지 tide_outliers = detect_harmonic_based_outliers(df_sliced, station_id) if tide_outliers.any(): print(f"🌊 tide_level 이상치 {tide_outliers.sum()}개 → residual=0 처리 예정") df_sliced.loc[tide_outliers, '_tide_outlier_flag'] = True # 4-2. 기상 데이터 물리적 한계 기반 이상치 탐지 weather_outliers = detect_weather_outliers(df_sliced) for col in weather_outliers.columns: if weather_outliers[col].any(): print(f"🌡️ {col} 이상치 {weather_outliers[col].sum()}개 → NaN 변환") df_sliced.loc[weather_outliers[col], col] = np.nan # 5. 결측치 처리 df_cleaned = handle_missing_values(df_sliced, station_id) # 6. tide_level → residual 변환 (이상치 플래그 반영) converted_df = convert_tide_level_to_residual(df_cleaned, station_id) # 5. 변환된 데이터를 임시 파일로 저장 output_path = file_path.replace('.csv', '_processed.csv') converted_df.to_csv(output_path, index=False) print(f"💾 전처리 완료: {output_path}") # 6. 처리 결과 요약 summary = { 'original_rows': len(df), 'processed_rows': len(converted_df), 'has_residual': 'residual' in converted_df.columns, 'residual_mean': converted_df['residual'].mean() if 'residual' in converted_df.columns else None, 'residual_std': converted_df['residual'].std() if 'residual' in converted_df.columns else None, 'output_file': output_path } return converted_df, summary except Exception as e: error_msg = f"전처리 오류: {str(e)}\n{traceback.format_exc()}" print(f"❌ {error_msg}") return None, error_msg