import gradio as gr import pandas as pd import numpy as np import torch from momentfm import MOMENTPipeline from scipy.stats import zscore # Load the model model = MOMENTPipeline.from_pretrained( "AutonLab/MOMENT-1-large", model_kwargs={"task_name": "reconstruction", "n_channels": 3}, ) model.init() model.eval() def detect_anomalies(data, window_size=512, stride=128): n, num_features = data.shape errors = np.zeros(n) counts = np.zeros(n) with torch.no_grad(): for start in range(0, n - window_size + 1, stride): end = start + window_size window = data[start:end] x = torch.from_numpy(window).float().unsqueeze(0) # (1, window_size, 3) output = model(x) recon = output.reconstruction # (1, window_size, 3) step_errors = ((x - recon) ** 2).mean(dim=2).squeeze(0).numpy() # (window_size,) errors[start:end] += step_errors counts[start:end] += 1 errors /= np.maximum(counts, 1e-6) return errors def process_csv(file): if not file: return "Please upload a CSV file." try: df = pd.read_csv(file) required_cols = ['timestamp', 'voltage', 'current', 'frequency'] if not all(col in df.columns for col in required_cols): return "CSV must contain columns: timestamp, voltage, current, frequency" df['timestamp'] = pd.to_datetime(df['timestamp']) df = df.sort_values('timestamp') features = ['voltage', 'current', 'frequency'] data = df[features].values.astype(float) # (len, 3) # Normalize data per channel data = zscore(data, axis=0) seq_len = len(data) if seq_len < 512: # Pad with zeros pad = np.zeros((512 - seq_len, 3)) data_padded = np.vstack((data, pad)) errors = detect_anomalies(data_padded)[:seq_len] else: errors = detect_anomalies(data) # Compute threshold mean_e = np.mean(errors) std_e = np.std(errors) threshold = mean_e + 3 * std_e is_anomaly = errors > threshold # Severity score (scaled 0-10) severity = np.clip((errors - mean_e) / (3 * std_e), 0, np.inf) * 10 # For explanations, use statistical deviations means = df[features].mean().values stds = df[features].std().values z_scores = (df[features].values - means) / (stds + 1e-6) explanations = [] for i in range(seq_len): if not is_anomaly[i]: explanations.append("Normal") else: reasons = [] for j, feat in enumerate(features): if abs(z_scores[i, j]) > 3: direction = "High" if z_scores[i, j] > 0 else "Low" reasons.append(f"{direction} {feat}") exp = " and ".join(reasons) if reasons else "Unusual pattern detected by the model" explanations.append(exp) # Add columns to dataframe df['Anomaly'] = ['Yes' if a else 'No' for a in is_anomaly] df['Severity Score'] = severity.round(2) df['Explanation'] = explanations return df except Exception as e: return f"Error processing file: {str(e)}" # Define Gradio interface with gr.Blocks(title="Anomaly Detection in Smart Grid Sensor Data") as demo: gr.Markdown("# Anomaly Detection in Smart Grid Sensor Data Using Transformers") gr.Markdown("Upload a CSV file with columns: timestamp, voltage, current, frequency") input_file = gr.File(label="Upload CSV file", file_types=[".csv"]) output_df = gr.Dataframe(label="Results") btn = gr.Button("Detect Anomalies") btn.click(process_csv, inputs=input_file, outputs=output_df) # Launch the app demo.launch()