my-tide-env / preprocessing.py
alwaysgood's picture
Update preprocessing.py
2ff7d22 verified
# preprocessing.py - 전처리 및 데이터 변환 모듈
import pandas as pd
import numpy as np
from datetime import datetime
import pytz
from dateutil import parser as date_parser
from supabase_utils import get_supabase_client, get_harmonic_predictions
import traceback
def convert_tide_level_to_residual(df, station_id):
"""
tide_level 데이터를 residual로 변환
residual = tide_level - harmonic_level
"""
print(f"🔄 {station_id} 관측소의 tide_level → residual 변환 시작")
# 1. 입력 데이터 검증
if 'tide_level' not in df.columns:
raise ValueError("tide_level 컬럼이 없습니다.")
if 'date' not in df.columns:
raise ValueError("date 컬럼이 없습니다.")
# 2. 시간 데이터 정리
df = df.copy()
df['date'] = pd.to_datetime(df['date'])
# 3. KST 시간대 설정
kst = pytz.timezone('Asia/Seoul')
if df['date'].dt.tz is None:
df['date'] = df['date'].dt.tz_localize(kst)
else:
df['date'] = df['date'].dt.tz_convert(kst)
# 4. 입력 데이터는 이미 144개로 슬라이싱된 상태
df_input = df.copy()
start_time = df_input['date'].min()
end_time = df_input['date'].max()
print(f"📅 입력 데이터 시간 범위: {start_time} ~ {end_time}")
# 5. Supabase에서 harmonic_level 조회
try:
harmonic_data = get_harmonic_predictions(station_id, start_time, end_time)
print(f"📊 조화 예측 데이터 {len(harmonic_data) if harmonic_data else 0}개 조회")
if not harmonic_data:
print("⚠️ 조화 예측 데이터가 없습니다. 가상 데이터로 대체합니다.")
return create_mock_residual_data(df_input)
except Exception as e:
print(f"❌ Supabase 조회 오류: {e}")
print("⚠️ 가상 데이터로 대체합니다.")
return create_mock_residual_data(df_input)
# 6. harmonic_data를 딕셔너리로 변환 (시간 기준)
harmonic_dict = {}
for h_data in harmonic_data:
try:
h_time_str = h_data['predicted_at']
h_time = parse_time_string(h_time_str)
if h_time is None:
continue
# KST로 변환
if h_time.tzinfo is None:
h_time = pytz.UTC.localize(h_time)
h_time = h_time.astimezone(kst)
# 5분 단위로 정규화
minutes = (h_time.minute // 5) * 5
h_time = h_time.replace(minute=minutes, second=0, microsecond=0)
harmonic_dict[h_time] = float(h_data['harmonic_level'])
except Exception as e:
print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}")
continue
print(f"📊 사용 가능한 조화 데이터: {len(harmonic_dict)}개")
# 7. residual 계산
residual_values = []
successful_conversions = 0
for idx, row in df_input.iterrows():
tide_level = row['tide_level']
timestamp = row['date']
# 5분 단위로 정규화
minutes = (timestamp.minute // 5) * 5
normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0)
# 정확히 매칭되는 harmonic_level 찾기
harmonic_level = harmonic_dict.get(normalized_time)
# 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내)
if harmonic_level is None:
harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5)
# 이상치 플래그 확인
is_outlier = False
if '_tide_outlier_flag' in df_input.columns:
is_outlier = df_input.at[idx, '_tide_outlier_flag'] if not pd.isna(df_input.at[idx, '_tide_outlier_flag']) else False
if is_outlier:
# 이상치로 탐지된 경우 residual = 0 (harmonic만 사용)
residual_values.append(0.0)
successful_conversions += 1
print(f" 🚨 이상치 탐지된 시점 {timestamp}: residual=0 적용")
elif harmonic_level is not None:
residual = tide_level - harmonic_level
residual_values.append(residual)
successful_conversions += 1
else:
# 조화 데이터가 없으면 평균값으로 대체
residual_values.append(0.0)
# 8. residual 컬럼 추가
df_input['residual'] = residual_values
# 9. 불필요한 컬럼 제거 (모델에서 사용하지 않음)
columns_to_remove = ['tide_level', '_tide_outlier_flag']
for col in columns_to_remove:
if col in df_input.columns:
df_input = df_input.drop(columns=[col])
print(f"🗑️ {col} 컬럼 제거")
print(f"📊 최종 컬럼: {list(df_input.columns)}")
print(f"📊 최종 shape: {df_input.shape}")
conversion_rate = successful_conversions / len(df_input) * 100
print(f"✅ 변환 완료: {successful_conversions}/{len(df_input)} ({conversion_rate:.1f}%)")
return df_input
def parse_time_string(time_str):
"""다양한 형태의 시간 문자열 파싱"""
try:
if 'T' in time_str:
if time_str.endswith('Z'):
return datetime.fromisoformat(time_str[:-1] + '+00:00')
elif '+' in time_str or time_str.count('-') > 2:
return datetime.fromisoformat(time_str)
else:
return datetime.fromisoformat(time_str + '+00:00')
else:
return date_parser.parse(time_str)
except Exception as e:
print(f"시간 파싱 실패: {time_str}, {e}")
return None
def find_closest_harmonic(target_time, harmonic_dict, max_diff_minutes=5):
"""가장 가까운 시간의 harmonic_level 찾기"""
min_diff = float('inf')
closest_value = None
for h_time, h_value in harmonic_dict.items():
diff_seconds = abs((h_time - target_time).total_seconds())
if diff_seconds < min_diff and diff_seconds <= max_diff_minutes * 60:
min_diff = diff_seconds
closest_value = h_value
return closest_value
def create_mock_residual_data(df):
"""Supabase 연결 실패 시 가상 residual 데이터 생성"""
print("🎭 가상 residual 데이터 생성 중...")
df = df.copy()
# 간단한 사인파 기반 가상 residual 생성
timestamps = df['date']
time_hours = [(t - timestamps.iloc[0]).total_seconds() / 3600 for t in timestamps]
# 조위의 일반적인 패턴을 모방한 가상 residual
# 주기적 성분 + 노이즈 + 트렌드
residual_values = []
for i, hour in enumerate(time_hours):
# 반일주조 (12.42시간 주기)
semi_diurnal = 15 * np.sin(2 * np.pi * hour / 12.42)
# 일주조 (24.84시간 주기)
diurnal = 8 * np.sin(2 * np.pi * hour / 24.84)
# 노이즈
noise = np.random.normal(0, 3)
# 작은 트렌드
trend = 0.1 * hour
residual = semi_diurnal + diurnal + noise + trend
residual_values.append(residual)
df['residual'] = residual_values
# 불필요한 컬럼 제거 (모델에서 사용하지 않음)
columns_to_remove = ['tide_level', '_tide_outlier_flag']
for col in columns_to_remove:
if col in df.columns:
df = df.drop(columns=[col])
print(f"🗑️ {col} 컬럼 제거")
print(f"📊 최종 컬럼: {list(df.columns)}")
print(f"📊 최종 shape: {df.shape}")
print(f"✅ 가상 residual 데이터 {len(residual_values)}개 생성")
return df
def detect_harmonic_based_outliers(df, station_id):
"""조화 예측 기반 tide_level 이상치 탐지"""
print("🌊 Harmonic 기반 tide_level 이상치 탐지 시작...")
if 'tide_level' not in df.columns:
print("⚠️ tide_level 컬럼이 없어서 이상치 탐지를 건너뜁니다.")
return pd.Series(False, index=df.index)
try:
# 1. 해당 시간대 harmonic_level 조회
df_copy = df.copy()
df_copy['date'] = pd.to_datetime(df_copy['date'])
# KST 시간대 설정
kst = pytz.timezone('Asia/Seoul')
if df_copy['date'].dt.tz is None:
df_copy['date'] = df_copy['date'].dt.tz_localize(kst)
else:
df_copy['date'] = df_copy['date'].dt.tz_convert(kst)
start_time = df_copy['date'].min()
end_time = df_copy['date'].max()
print(f"📅 이상치 탐지 시간 범위: {start_time} ~ {end_time}")
# 2. Harmonic 데이터 조회
harmonic_data = get_harmonic_predictions(station_id, start_time, end_time)
if not harmonic_data:
print("❌ 조화 예측 데이터가 없어서 물리적 한계로 대체합니다.")
# 물리적 한계로 폴백
physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000)
return physical_outliers
print(f"📊 조화 예측 데이터: {len(harmonic_data)}개 조회")
# 3. Harmonic 딕셔너리 생성
harmonic_dict = {}
for h_data in harmonic_data:
try:
h_time_str = h_data['predicted_at']
h_time = parse_time_string(h_time_str)
if h_time is None:
continue
# KST로 변환
if h_time.tzinfo is None:
h_time = pytz.UTC.localize(h_time)
h_time = h_time.astimezone(kst)
# 5분 단위로 정규화
minutes = (h_time.minute // 5) * 5
h_time = h_time.replace(minute=minutes, second=0, microsecond=0)
harmonic_dict[h_time] = float(h_data['harmonic_level'])
except Exception as e:
print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}")
continue
print(f"📊 사용 가능한 조화 예측: {len(harmonic_dict)}개")
# 4. Residual 계산
residuals = []
outlier_flags = []
for idx, row in df_copy.iterrows():
tide_level = row['tide_level']
timestamp = row['date']
# 5분 단위로 정규화
minutes = (timestamp.minute // 5) * 5
normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0)
# 정확히 매칭되는 harmonic_level 찾기
harmonic_level = harmonic_dict.get(normalized_time)
# 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내)
if harmonic_level is None:
harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5)
if harmonic_level is not None:
residual = tide_level - harmonic_level
residuals.append(residual)
outlier_flags.append(False) # 일단 정상으로 표시
else:
# Harmonic 데이터가 없으면 물리적 한계로 판정
outlier_flags.append(tide_level < -300 or tide_level > 2000)
residuals.append(0.0)
# 5. Residual 기반 3σ 이상치 탐지
if len(residuals) > 0:
residuals_array = np.array(residuals)
# NaN이 아닌 residual만으로 통계 계산
valid_residuals = residuals_array[~np.isnan(residuals_array)]
if len(valid_residuals) > 3: # 최소 3개 이상 필요
residual_mean = np.mean(valid_residuals)
residual_std = np.std(valid_residuals)
print(f"📈 Residual 통계: 평균={residual_mean:.1f}cm, 표준편차={residual_std:.1f}cm")
if residual_std > 0: # 표준편차가 0이 아닌 경우만
# 3σ 기준 이상치 탐지
threshold = 3 * residual_std
for i, residual in enumerate(residuals):
if not np.isnan(residual):
if abs(residual - residual_mean) > threshold:
outlier_flags[i] = True
outlier_count = sum(outlier_flags)
print(f"🚨 Harmonic 기반 이상치 탐지: {outlier_count}개 (3σ={threshold:.1f}cm 기준)")
else:
print("📊 Residual 표준편차가 0이므로 물리적 한계만 적용")
else:
print("📊 유효한 residual이 부족하여 물리적 한계만 적용")
return pd.Series(outlier_flags, index=df.index)
except Exception as e:
print(f"❌ Harmonic 기반 이상치 탐지 실패: {e}")
traceback.print_exc()
# 폴백: 물리적 한계로 탐지
physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000)
return physical_outliers
def detect_weather_outliers(df):
"""기상 데이터 물리적 한계 기반 이상치 탐지"""
print("🌡️ 기상 데이터 물리적 한계 기반 이상치 탐지 시작...")
# 물리적 한계 정의
PHYSICAL_LIMITS = {
'air_pres': (850, 1100), # hPa (극한 기상 포함)
'wind_speed': (0, 80), # m/s (한국 최대풍속 고려)
'air_temp': (-35, 45), # °C (한국 기록 극값)
'wind_dir': (0, 360) # degree
}
outliers = pd.DataFrame(False, index=df.index, columns=df.columns)
for col, (min_val, max_val) in PHYSICAL_LIMITS.items():
if col in df.columns:
col_outliers = (df[col] < min_val) | (df[col] > max_val)
outlier_count = col_outliers.sum()
if outlier_count > 0:
print(f"🌡️ {col} 물리적 한계 이상치: {outlier_count}개 (범위: {min_val}~{max_val})")
outliers[col] = col_outliers
return outliers
def validate_input_data(df):
"""입력 데이터 유효성 검증"""
print("🔍 입력 데이터 검증 중...")
issues = []
# 필수 컬럼 체크
required_columns = ['date', 'air_pres', 'wind_dir', 'wind_speed', 'air_temp', 'tide_level']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
issues.append(f"필수 컬럼 누락: {missing_columns}")
# 결측치 체크 (경고만 표시, 진행은 허용)
warnings = []
missing_counts = df.isnull().sum()
for col, count in missing_counts.items():
if count > 0:
missing_rate = count / len(df) * 100
warnings.append(f"결측치 발견: {col} 컬럼에 {count}개 ({missing_rate:.1f}%)")
# tide_level 범위 체크 (일반적인 조위 범위)
if 'tide_level' in df.columns:
valid_tide_data = df['tide_level'].dropna()
if len(valid_tide_data) > 0:
min_tide = valid_tide_data.min()
max_tide = valid_tide_data.max()
if min_tide < -200 or max_tide > 1500:
warnings.append(f"조위 범위 이상: {min_tide:.1f}~{max_tide:.1f}cm (일반 범위: -200~1500cm)")
# 심각한 이슈가 있으면 실패
critical_issues = []
# 데이터가 너무 적으면 중단
if len(df) < 72:
critical_issues.append(f"데이터 부족: 최소 72개 필요, 현재 {len(df)}개")
# 필수 컬럼 전체가 결측인 경우 중단
for col in ['date', 'tide_level']:
if col in missing_counts and missing_counts[col] == len(df):
critical_issues.append(f"필수 컬럼 {col}이 완전히 비어있습니다")
if critical_issues:
print(f"❌ 심각한 문제: {len(critical_issues)}개")
for issue in critical_issues:
print(f" - {issue}")
return False, critical_issues
if warnings:
print(f"⚠️ 경고사항: {len(warnings)}개")
for warning in warnings:
print(f" - {warning}")
print("⏩ 경고가 있지만 처리를 계속합니다")
print("✅ 데이터 검증 통과")
return True, warnings
def adaptive_interpolation(df, col, missing_groups, station_id=None):
"""적응적 보간: 결측치 패턴에 따라 다른 보간 방법 적용"""
for start_idx, end_idx in missing_groups:
gap_size = end_idx - start_idx + 1
print(f"🔧 {col} 컬럼: {gap_size}개 연속 결측치 처리 중...")
if gap_size <= 3: # 1-3개 (5-15분)
# 선형보간
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \
df[col].iloc[start_idx:end_idx+1].interpolate(method='linear')
print(f" → 선형보간 적용")
elif gap_size <= 12: # 4-12개 (20분-1시간)
if col == 'tide_level' and station_id:
# 조화 예측 기반 보간
filled_values = harmonic_based_imputation(df, start_idx, end_idx, station_id)
if filled_values is not None:
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values
print(f" → 조화 예측 기반 보간 적용")
else:
# 폴백: 스플라인 보간
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \
df[col].iloc[start_idx:end_idx+1].interpolate(method='spline', order=2)
print(f" → 스플라인 보간 적용 (조화 데이터 없음)")
else:
# 기상 데이터: 스플라인 보간
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \
df[col].iloc[start_idx:end_idx+1].interpolate(method='spline', order=2)
print(f" → 스플라인 보간 적용")
elif gap_size <= 36: # 13-36개 (1-3시간)
# 주기 패턴 기반 (12시간 전 데이터)
filled_values = seasonal_pattern_imputation(df, col, start_idx, end_idx)
if filled_values is not None:
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values
print(f" → 주기 패턴 기반 보간 적용")
else:
# 폴백: 이동평균 보간
window_size = min(12, (len(df) - end_idx - 1), start_idx)
if window_size > 0:
before_avg = df[col].iloc[start_idx-window_size:start_idx].mean()
after_avg = df[col].iloc[end_idx+1:end_idx+1+window_size].mean()
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = \
np.linspace(before_avg, after_avg, gap_size)
print(f" → 이동평균 기반 보간 적용")
else: # 37개 이상 (3시간+) - 극한 상황
print(f" ⚠️ 극한 결측치 상황 ({gap_size}개)")
if col == 'tide_level' and station_id:
# tide_level: 조화 예측만으로 복원 시도
filled_values = extreme_harmonic_imputation(df, start_idx, end_idx, station_id)
if filled_values is not None:
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = filled_values
print(f" → 조화 예측 기반 극한 보간 적용")
else:
# 조화 예측 실패시 폴백
fill_value = get_fallback_value(df, col, start_idx, end_idx)
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = fill_value
print(f" → 폴백 상수값 채움 (값: {fill_value:.2f})")
else:
# 기상 데이터: 기후 평년값 또는 상수값
fill_value = get_climate_normal_value(col) or get_fallback_value(df, col, start_idx, end_idx)
df.iloc[start_idx:end_idx+1, df.columns.get_loc(col)] = fill_value
print(f" → 기후 평년값/상수값 채움 (값: {fill_value:.2f})")
def harmonic_based_imputation(df, start_idx, end_idx, station_id):
"""조화 예측 기반 tide_level 보간"""
try:
# 결측 구간의 시간 정보
missing_times = df['date'].iloc[start_idx:end_idx+1]
start_time = missing_times.iloc[0]
end_time = missing_times.iloc[-1]
# 조화 예측 데이터 조회
harmonic_data = get_harmonic_predictions(station_id, start_time, end_time)
if not harmonic_data:
return None
# 주변 residual 평균 계산
before_residual = 0
after_residual = 0
if start_idx > 5: # 이전 5개 평균
before_tide = df['tide_level'].iloc[start_idx-5:start_idx].mean()
# 조화값이 있다면 residual 계산 가능
if end_idx < len(df) - 6: # 이후 5개 평균
after_tide = df['tide_level'].iloc[end_idx+1:end_idx+6].mean()
# 조화값 + 추정 residual로 tide_level 복원
estimated_residual = 0 # 단순화: 주변 패턴 분석하여 residual 추정
harmonic_dict = {}
for h_data in harmonic_data:
h_time = pd.to_datetime(h_data['predicted_at']).tz_convert('Asia/Seoul')
h_time = h_time.replace(minute=(h_time.minute // 5) * 5, second=0, microsecond=0)
harmonic_dict[h_time] = float(h_data['harmonic_level'])
# 각 시점별로 조화값 + 추정 residual
filled_values = []
for time in missing_times:
time_normalized = time.replace(minute=(time.minute // 5) * 5, second=0, microsecond=0)
if time_normalized in harmonic_dict:
estimated_tide = harmonic_dict[time_normalized] + estimated_residual
filled_values.append(estimated_tide)
else:
return None # 조화 데이터 부족
return filled_values
except Exception as e:
print(f" ❌ 조화 기반 보간 실패: {e}")
return None
def seasonal_pattern_imputation(df, col, start_idx, end_idx):
"""주기 패턴 기반 보간 (12시간/24시간 전 데이터 활용)"""
try:
gap_size = end_idx - start_idx + 1
# 12시간 전 (144개 전) 데이터 확인
cycle_12h = 144 # 5분 * 144 = 12시간
if start_idx >= cycle_12h:
pattern_12h = df[col].iloc[start_idx-cycle_12h:start_idx-cycle_12h+gap_size]
if not pattern_12h.isna().any():
print(f" → 12시간 전 패턴 사용")
return pattern_12h.values
# 24시간 전 (288개 전) 데이터 확인
cycle_24h = 288
if start_idx >= cycle_24h:
pattern_24h = df[col].iloc[start_idx-cycle_24h:start_idx-cycle_24h+gap_size]
if not pattern_24h.isna().any():
print(f" → 24시간 전 패턴 사용")
return pattern_24h.values
return None
except Exception as e:
print(f" ❌ 주기 패턴 보간 실패: {e}")
return None
def extreme_harmonic_imputation(df, start_idx, end_idx, station_id):
"""극한 상황에서 조화 예측만으로 tide_level 복원"""
try:
# 결측 구간의 시간 정보
missing_times = df['date'].iloc[start_idx:end_idx+1]
start_time = missing_times.iloc[0]
end_time = missing_times.iloc[-1]
print(f" 🔍 조화 예측 조회: {start_time} ~ {end_time}")
# 조화 예측 데이터 조회
harmonic_data = get_harmonic_predictions(station_id, start_time, end_time)
if not harmonic_data:
print(f" ❌ 조화 예측 데이터 없음")
return None
# 조화 예측 딕셔너리 생성
harmonic_dict = {}
kst = pytz.timezone('Asia/Seoul')
for h_data in harmonic_data:
try:
h_time = pd.to_datetime(h_data['predicted_at'])
if h_time.tzinfo is None:
h_time = pytz.UTC.localize(h_time)
h_time = h_time.astimezone(kst)
# 5분 단위로 정규화
h_time = h_time.replace(minute=(h_time.minute // 5) * 5, second=0, microsecond=0)
harmonic_dict[h_time] = float(h_data['harmonic_level'])
except Exception as e:
continue
print(f" 📊 사용 가능한 조화 예측: {len(harmonic_dict)}개")
# 주변 잔차(residual) 추정 - 극한 상황이므로 보수적으로 0 사용
# 실제로는 주변 정상 데이터에서 평균 residual을 계산할 수 있음
estimated_residual = 0
# 주변 잔차 패턴이 있으면 활용
if start_idx > 10:
# 이전 정상 데이터에서 tide_level - harmonic_level 패턴 분석
recent_data = df.iloc[max(0, start_idx-20):start_idx]
if len(recent_data) > 5:
# 간단한 추정: 최근 추세 반영
recent_tide_mean = recent_data['tide_level'].mean()
# 조화값이 있다면 residual 추정 가능하지만 복잡하므로 단순화
estimated_residual = 0 # 보수적 접근
# 각 시점별로 조화값 + 추정 잔차로 tide_level 복원
filled_values = []
missing_count = 0
for time in missing_times:
time_normalized = time.replace(minute=(time.minute // 5) * 5, second=0, microsecond=0)
if time_normalized in harmonic_dict:
# tide_level = harmonic_level + residual
estimated_tide = harmonic_dict[time_normalized] + estimated_residual
filled_values.append(estimated_tide)
else:
missing_count += 1
# 조화 데이터가 없으면 보간 실패
filled_values.append(None)
# 모든 시점에 조화 데이터가 있는지 확인
if missing_count == 0:
print(f" ✅ 조화 예측으로 {len(filled_values)}개 포인트 복원")
return filled_values
else:
print(f" ❌ {missing_count}개 시점에 조화 데이터 부족")
return None
except Exception as e:
print(f" ❌ 극한 조화 보간 실패: {e}")
return None
def get_climate_normal_value(col):
"""기상 요소별 기후 평년값 반환"""
climate_normals = {
'air_pres': 1013.25, # 표준 대기압 (hPa)
'air_temp': 15.0, # 한국 연평균 기온 (°C)
'wind_speed': 2.5, # 평균 풍속 (m/s)
'wind_dir': 225.0 # 남서풍 (도)
}
return climate_normals.get(col)
def get_fallback_value(df, col, start_idx, end_idx):
"""상수값 결정 (기존 로직)"""
if start_idx > 0 and end_idx < len(df) - 1:
return (df[col].iloc[start_idx-1] + df[col].iloc[end_idx+1]) / 2
elif start_idx > 0:
return df[col].iloc[start_idx-1]
elif end_idx < len(df) - 1:
return df[col].iloc[end_idx+1]
else:
return df[col].mean()
def find_missing_groups(series):
"""연속된 결측치 구간들을 찾아 반환"""
missing_mask = series.isna()
groups = []
start = None
for i, is_missing in enumerate(missing_mask):
if is_missing and start is None:
start = i
elif not is_missing and start is not None:
groups.append((start, i-1))
start = None
# 마지막까지 결측치인 경우
if start is not None:
groups.append((start, len(series)-1))
return groups
def handle_missing_values(df, station_id=None):
"""적응적 결측치 처리"""
print("🧠 적응적 결측치 처리 시작...")
df = df.copy()
# 각 컬럼별로 적응적 처리
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if df[col].isna().any():
before_na = df[col].isna().sum()
print(f"📊 {col}: {before_na}개 결측치 발견")
# 연속 결측치 구간 찾기
missing_groups = find_missing_groups(df[col])
print(f" → {len(missing_groups)}개 결측 구간")
# 적응적 보간 적용
adaptive_interpolation(df, col, missing_groups, station_id)
after_na = df[col].isna().sum()
filled_count = before_na - after_na
print(f"✅ {col}: {filled_count}개 결측치 보간 완료")
if after_na > 0:
print(f"⚠️ {col}{after_na}개 결측치가 여전히 남아있습니다")
print(f"✅ 적응적 결측치 처리 완료: {len(df)}행 유지")
return df
def preprocess_uploaded_file(file_path, station_id):
"""
업로드된 파일의 전체 전처리 파이프라인
슬라이싱 → 이상치 탐지 → 결측치 처리 → tide_level → residual 변환 + 검증
"""
try:
print(f"\n🚀 {station_id} 관측소 데이터 전처리 시작")
print(f"📁 파일: {file_path}")
# 1. 파일 읽기
df = pd.read_csv(file_path)
print(f"📊 원본 데이터: {len(df)}행 × {len(df.columns)}열")
# 2. 입력 데이터 검증
is_valid, issues = validate_input_data(df)
if not is_valid:
return None, f"입력 데이터 오류:\n" + "\n".join(issues)
# 3. 마지막 144개로 먼저 슬라이싱 (모델 입력 크기)
print(f"✂️ 마지막 144개 데이터로 슬라이싱 (모델 입력 크기)")
df_sliced = df.tail(144).copy()
print(f"📊 슬라이싱 후 데이터: {len(df_sliced)}행 × {len(df_sliced.columns)}열")
# 4. 이상치 탐지 및 처리 (144개만 대상)
print("\n🔍 이상치 탐지 및 처리 단계 (144개 데이터 기준)")
# 4-1. Harmonic 기반 tide_level 이상치 탐지
tide_outliers = detect_harmonic_based_outliers(df_sliced, station_id)
if tide_outliers.any():
print(f"🌊 tide_level 이상치 {tide_outliers.sum()}개 → residual=0 처리 예정")
df_sliced.loc[tide_outliers, '_tide_outlier_flag'] = True
# 4-2. 기상 데이터 물리적 한계 기반 이상치 탐지
weather_outliers = detect_weather_outliers(df_sliced)
for col in weather_outliers.columns:
if weather_outliers[col].any():
print(f"🌡️ {col} 이상치 {weather_outliers[col].sum()}개 → NaN 변환")
df_sliced.loc[weather_outliers[col], col] = np.nan
# 5. 결측치 처리
df_cleaned = handle_missing_values(df_sliced, station_id)
# 6. tide_level → residual 변환 (이상치 플래그 반영)
converted_df = convert_tide_level_to_residual(df_cleaned, station_id)
# 5. 변환된 데이터를 임시 파일로 저장
output_path = file_path.replace('.csv', '_processed.csv')
converted_df.to_csv(output_path, index=False)
print(f"💾 전처리 완료: {output_path}")
# 6. 처리 결과 요약
summary = {
'original_rows': len(df),
'processed_rows': len(converted_df),
'has_residual': 'residual' in converted_df.columns,
'residual_mean': converted_df['residual'].mean() if 'residual' in converted_df.columns else None,
'residual_std': converted_df['residual'].std() if 'residual' in converted_df.columns else None,
'output_file': output_path
}
return converted_df, summary
except Exception as e:
error_msg = f"전처리 오류: {str(e)}\n{traceback.format_exc()}"
print(f"❌ {error_msg}")
return None, error_msg