Time-RCD / processing_time_rcd.py
oliverlevn's picture
Upload processing_time_rcd.py
d1a3231 verified
"""
Time_RCD Processor for Time Series Preprocessing
This processor handles:
- Data windowing/sliding windows
- Normalization (per-window z-score)
- Padding to window size multiples
- Creating attention masks
Usage:
>>> from huggingface_time_rcd import TimeRCDProcessor
>>> processor = TimeRCDProcessor(win_size=5000, normalize=True)
>>> inputs = processor(time_series_data)
>>> # inputs contains: {'time_series': tensor, 'attention_mask': tensor}
"""
import numpy as np
import torch
from typing import Optional, Dict, Any
from transformers import ProcessorMixin
class TimeRCDProcessor(ProcessorMixin):
"""
Processor for preparing time series data for Time_RCD model.
Mimics the AnomalyClipDataset preprocessing pipeline:
- Creates sliding windows
- Normalizes per-window (z-score normalization)
- Pads to window_size multiples
- Creates attention masks for padding
Parameters
----------
win_size : int, default=5000
Window size for creating sliding windows
stride : int, default=None
Stride for sliding windows. If None, uses win_size (non-overlapping)
normalize : bool, default=True
Whether to normalize each window (zero mean, unit variance)
pad_to_multiple : bool, default=True
Whether to pad data to make length a multiple of window_size
"""
def __init__(
self,
win_size: int = 5000,
stride: Optional[int] = None,
normalize: bool = True,
pad_to_multiple: bool = True,
**kwargs
):
# Set our processor-specific attributes BEFORE super().__init__
# so ProcessorMixin can validate them during initialization
self.win_size = win_size
self.stride = stride if stride is not None else win_size
self.normalize = normalize
self.pad_to_multiple = pad_to_multiple
# Call parent init after setting attributes
super().__init__(**kwargs)
@property
def model_input_names(self):
"""Return list of model input names."""
return ["time_series", "attention_mask"]
@property
def attributes(self):
"""Return list of attribute names for serialization."""
return ["win_size", "stride", "normalize", "pad_to_multiple"]
def __call__(
self,
time_series: np.ndarray,
return_tensors: Optional[str] = "pt",
) -> Dict[str, Any]:
"""
Preprocess time series data.
Parameters
----------
time_series : np.ndarray
Input time series data of shape (n_samples, n_features) or (n_samples,)
return_tensors : str, optional
Type of tensors to return: "pt" (PyTorch) or None
Returns
-------
dict
Dictionary containing:
- 'time_series': Processed time series windows
- 'attention_mask': Attention masks indicating real vs padded data
"""
# Ensure numpy array
time_series = np.asarray(time_series)
# Ensure 2D shape (N, C)
if time_series.ndim == 1:
time_series = time_series.reshape(-1, 1)
original_length = time_series.shape[0]
# Normalize if requested
if self.normalize:
time_series = self._normalize_data(time_series)
# Pad to multiple if requested
if self.pad_to_multiple:
time_series, padding_mask = self._pad_data_to_multiple(time_series)
else:
padding_mask = np.ones(time_series.shape[0], dtype=bool)
# Create windows
windows, masks = self._create_windows(time_series, padding_mask)
# Convert to tensors if requested
if return_tensors == "pt":
windows = torch.tensor(windows, dtype=torch.float32)
masks = torch.tensor(masks, dtype=torch.bool)
return {
"time_series": windows,
"attention_mask": masks
}
def _normalize_data(self, data: np.ndarray, epsilon: float = 1e-8) -> np.ndarray:
"""Normalize data using mean and standard deviation (per-feature)."""
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
std = np.where(std == 0, epsilon, std)
return (data - mean) / std
def _pad_data_to_multiple(self, data: np.ndarray) -> tuple:
"""
Pad data to make its length a multiple of window_size.
Returns padded data and padding mask.
"""
data_length = data.shape[0]
remainder = data_length % self.win_size
if remainder == 0:
# No padding needed
padding_mask = np.ones(data_length, dtype=bool)
return data, padding_mask
# Calculate padding needed
padding_length = self.win_size - remainder
# Pad by repeating the last row
last_row = data[-1:, :]
padding_data = np.repeat(last_row, padding_length, axis=0)
padded_data = np.vstack([data, padding_data])
# Create padding mask: True for real data, False for padded data
padding_mask = np.ones(data_length + padding_length, dtype=bool)
padding_mask[data_length:] = False
return padded_data, padding_mask
def _create_windows(self, data: np.ndarray, padding_mask: np.ndarray) -> tuple:
"""
Create sliding windows from time series data.
Returns windows and corresponding masks.
"""
windows = []
masks = []
for i in range(0, len(data) - self.win_size + 1, self.stride):
window = data[i:i + self.win_size, :]
mask = padding_mask[i:i + self.win_size]
windows.append(window)
masks.append(mask)
return np.array(windows), np.array(masks)
def save_pretrained(self, save_directory: str):
"""Save processor configuration to directory."""
import json
import os
os.makedirs(save_directory, exist_ok=True)
config = {
"processor_type": "TimeRCDProcessor",
"auto_map": {
"AutoProcessor": "processing_time_rcd.TimeRCDProcessor"
},
"win_size": self.win_size,
"stride": self.stride,
"normalize": self.normalize,
"pad_to_multiple": self.pad_to_multiple,
}
with open(os.path.join(save_directory, "preprocessor_config.json"), "w") as f:
json.dump(config, f, indent=2)
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs):
"""Load processor from pretrained configuration."""
import json
import os
from huggingface_hub import hf_hub_download
# Separate initialization kwargs from hf_hub_download kwargs
init_kwargs = {
k: v for k, v in kwargs.items()
if k in ['win_size', 'stride', 'normalize', 'pad_to_multiple']
}
# Filter kwargs to only include those accepted by hf_hub_download
hf_hub_kwargs = {
k: v for k, v in kwargs.items()
if k in [
'cache_dir', 'force_download', 'proxies', 'resume_download',
'token', 'revision', 'local_files_only', 'library_name',
'library_version', 'user_agent', 'subfolder'
]
}
# Try to load from local path first
config_file = os.path.join(pretrained_model_name_or_path, "preprocessor_config.json")
if os.path.exists(config_file):
# Load from local path
with open(config_file, "r") as f:
config = json.load(f)
else:
# Try to download from HuggingFace Hub
try:
config_file = hf_hub_download(
repo_id=pretrained_model_name_or_path,
filename="preprocessor_config.json",
**hf_hub_kwargs
)
with open(config_file, "r") as f:
config = json.load(f)
except Exception as e:
raise FileNotFoundError(
f"Could not load preprocessor config from {pretrained_model_name_or_path}. "
f"Error: {e}"
)
# Remove processor_type and auto_map from config
config.pop("processor_type", None)
config.pop("auto_map", None)
# Merge loaded config with any passed init_kwargs (init_kwargs take precedence)
config.update(init_kwargs)
return cls(**config)