""" Time_RCD Processor for Time Series Preprocessing This processor handles: - Data windowing/sliding windows - Normalization (per-window z-score) - Padding to window size multiples - Creating attention masks Usage: >>> from huggingface_time_rcd import TimeRCDProcessor >>> processor = TimeRCDProcessor(win_size=5000, normalize=True) >>> inputs = processor(time_series_data) >>> # inputs contains: {'time_series': tensor, 'attention_mask': tensor} """ import numpy as np import torch from typing import Optional, Dict, Any from transformers import ProcessorMixin class TimeRCDProcessor(ProcessorMixin): """ Processor for preparing time series data for Time_RCD model. Mimics the AnomalyClipDataset preprocessing pipeline: - Creates sliding windows - Normalizes per-window (z-score normalization) - Pads to window_size multiples - Creates attention masks for padding Parameters ---------- win_size : int, default=5000 Window size for creating sliding windows stride : int, default=None Stride for sliding windows. If None, uses win_size (non-overlapping) normalize : bool, default=True Whether to normalize each window (zero mean, unit variance) pad_to_multiple : bool, default=True Whether to pad data to make length a multiple of window_size """ def __init__( self, win_size: int = 5000, stride: Optional[int] = None, normalize: bool = True, pad_to_multiple: bool = True, **kwargs ): # Set our processor-specific attributes BEFORE super().__init__ # so ProcessorMixin can validate them during initialization self.win_size = win_size self.stride = stride if stride is not None else win_size self.normalize = normalize self.pad_to_multiple = pad_to_multiple # Call parent init after setting attributes super().__init__(**kwargs) @property def model_input_names(self): """Return list of model input names.""" return ["time_series", "attention_mask"] @property def attributes(self): """Return list of attribute names for serialization.""" return ["win_size", "stride", "normalize", "pad_to_multiple"] def __call__( self, time_series: np.ndarray, return_tensors: Optional[str] = "pt", ) -> Dict[str, Any]: """ Preprocess time series data. Parameters ---------- time_series : np.ndarray Input time series data of shape (n_samples, n_features) or (n_samples,) return_tensors : str, optional Type of tensors to return: "pt" (PyTorch) or None Returns ------- dict Dictionary containing: - 'time_series': Processed time series windows - 'attention_mask': Attention masks indicating real vs padded data """ # Ensure numpy array time_series = np.asarray(time_series) # Ensure 2D shape (N, C) if time_series.ndim == 1: time_series = time_series.reshape(-1, 1) original_length = time_series.shape[0] # Normalize if requested if self.normalize: time_series = self._normalize_data(time_series) # Pad to multiple if requested if self.pad_to_multiple: time_series, padding_mask = self._pad_data_to_multiple(time_series) else: padding_mask = np.ones(time_series.shape[0], dtype=bool) # Create windows windows, masks = self._create_windows(time_series, padding_mask) # Convert to tensors if requested if return_tensors == "pt": windows = torch.tensor(windows, dtype=torch.float32) masks = torch.tensor(masks, dtype=torch.bool) return { "time_series": windows, "attention_mask": masks } def _normalize_data(self, data: np.ndarray, epsilon: float = 1e-8) -> np.ndarray: """Normalize data using mean and standard deviation (per-feature).""" mean = np.mean(data, axis=0) std = np.std(data, axis=0) std = np.where(std == 0, epsilon, std) return (data - mean) / std def _pad_data_to_multiple(self, data: np.ndarray) -> tuple: """ Pad data to make its length a multiple of window_size. Returns padded data and padding mask. """ data_length = data.shape[0] remainder = data_length % self.win_size if remainder == 0: # No padding needed padding_mask = np.ones(data_length, dtype=bool) return data, padding_mask # Calculate padding needed padding_length = self.win_size - remainder # Pad by repeating the last row last_row = data[-1:, :] padding_data = np.repeat(last_row, padding_length, axis=0) padded_data = np.vstack([data, padding_data]) # Create padding mask: True for real data, False for padded data padding_mask = np.ones(data_length + padding_length, dtype=bool) padding_mask[data_length:] = False return padded_data, padding_mask def _create_windows(self, data: np.ndarray, padding_mask: np.ndarray) -> tuple: """ Create sliding windows from time series data. Returns windows and corresponding masks. """ windows = [] masks = [] for i in range(0, len(data) - self.win_size + 1, self.stride): window = data[i:i + self.win_size, :] mask = padding_mask[i:i + self.win_size] windows.append(window) masks.append(mask) return np.array(windows), np.array(masks) def save_pretrained(self, save_directory: str): """Save processor configuration to directory.""" import json import os os.makedirs(save_directory, exist_ok=True) config = { "processor_type": "TimeRCDProcessor", "auto_map": { "AutoProcessor": "processing_time_rcd.TimeRCDProcessor" }, "win_size": self.win_size, "stride": self.stride, "normalize": self.normalize, "pad_to_multiple": self.pad_to_multiple, } with open(os.path.join(save_directory, "preprocessor_config.json"), "w") as f: json.dump(config, f, indent=2) @classmethod def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs): """Load processor from pretrained configuration.""" import json import os from huggingface_hub import hf_hub_download # Separate initialization kwargs from hf_hub_download kwargs init_kwargs = { k: v for k, v in kwargs.items() if k in ['win_size', 'stride', 'normalize', 'pad_to_multiple'] } # Filter kwargs to only include those accepted by hf_hub_download hf_hub_kwargs = { k: v for k, v in kwargs.items() if k in [ 'cache_dir', 'force_download', 'proxies', 'resume_download', 'token', 'revision', 'local_files_only', 'library_name', 'library_version', 'user_agent', 'subfolder' ] } # Try to load from local path first config_file = os.path.join(pretrained_model_name_or_path, "preprocessor_config.json") if os.path.exists(config_file): # Load from local path with open(config_file, "r") as f: config = json.load(f) else: # Try to download from HuggingFace Hub try: config_file = hf_hub_download( repo_id=pretrained_model_name_or_path, filename="preprocessor_config.json", **hf_hub_kwargs ) with open(config_file, "r") as f: config = json.load(f) except Exception as e: raise FileNotFoundError( f"Could not load preprocessor config from {pretrained_model_name_or_path}. " f"Error: {e}" ) # Remove processor_type and auto_map from config config.pop("processor_type", None) config.pop("auto_map", None) # Merge loaded config with any passed init_kwargs (init_kwargs take precedence) config.update(init_kwargs) return cls(**config)