|
|
""" |
|
|
Time_RCD Processor for Time Series Preprocessing |
|
|
|
|
|
This processor handles: |
|
|
- Data windowing/sliding windows |
|
|
- Normalization (per-window z-score) |
|
|
- Padding to window size multiples |
|
|
- Creating attention masks |
|
|
|
|
|
Usage: |
|
|
>>> from huggingface_time_rcd import TimeRCDProcessor |
|
|
>>> processor = TimeRCDProcessor(win_size=5000, normalize=True) |
|
|
>>> inputs = processor(time_series_data) |
|
|
>>> # inputs contains: {'time_series': tensor, 'attention_mask': tensor} |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from typing import Optional, Dict, Any |
|
|
from transformers import ProcessorMixin |
|
|
|
|
|
|
|
|
class TimeRCDProcessor(ProcessorMixin): |
|
|
""" |
|
|
Processor for preparing time series data for Time_RCD model. |
|
|
|
|
|
Mimics the AnomalyClipDataset preprocessing pipeline: |
|
|
- Creates sliding windows |
|
|
- Normalizes per-window (z-score normalization) |
|
|
- Pads to window_size multiples |
|
|
- Creates attention masks for padding |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
win_size : int, default=5000 |
|
|
Window size for creating sliding windows |
|
|
stride : int, default=None |
|
|
Stride for sliding windows. If None, uses win_size (non-overlapping) |
|
|
normalize : bool, default=True |
|
|
Whether to normalize each window (zero mean, unit variance) |
|
|
pad_to_multiple : bool, default=True |
|
|
Whether to pad data to make length a multiple of window_size |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
win_size: int = 5000, |
|
|
stride: Optional[int] = None, |
|
|
normalize: bool = True, |
|
|
pad_to_multiple: bool = True, |
|
|
**kwargs |
|
|
): |
|
|
|
|
|
|
|
|
self.win_size = win_size |
|
|
self.stride = stride if stride is not None else win_size |
|
|
self.normalize = normalize |
|
|
self.pad_to_multiple = pad_to_multiple |
|
|
|
|
|
|
|
|
super().__init__(**kwargs) |
|
|
|
|
|
@property |
|
|
def model_input_names(self): |
|
|
"""Return list of model input names.""" |
|
|
return ["time_series", "attention_mask"] |
|
|
|
|
|
@property |
|
|
def attributes(self): |
|
|
"""Return list of attribute names for serialization.""" |
|
|
return ["win_size", "stride", "normalize", "pad_to_multiple"] |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
time_series: np.ndarray, |
|
|
return_tensors: Optional[str] = "pt", |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Preprocess time series data. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
time_series : np.ndarray |
|
|
Input time series data of shape (n_samples, n_features) or (n_samples,) |
|
|
return_tensors : str, optional |
|
|
Type of tensors to return: "pt" (PyTorch) or None |
|
|
|
|
|
Returns |
|
|
------- |
|
|
dict |
|
|
Dictionary containing: |
|
|
- 'time_series': Processed time series windows |
|
|
- 'attention_mask': Attention masks indicating real vs padded data |
|
|
""" |
|
|
|
|
|
time_series = np.asarray(time_series) |
|
|
|
|
|
|
|
|
if time_series.ndim == 1: |
|
|
time_series = time_series.reshape(-1, 1) |
|
|
|
|
|
original_length = time_series.shape[0] |
|
|
|
|
|
|
|
|
if self.normalize: |
|
|
time_series = self._normalize_data(time_series) |
|
|
|
|
|
|
|
|
if self.pad_to_multiple: |
|
|
time_series, padding_mask = self._pad_data_to_multiple(time_series) |
|
|
else: |
|
|
padding_mask = np.ones(time_series.shape[0], dtype=bool) |
|
|
|
|
|
|
|
|
windows, masks = self._create_windows(time_series, padding_mask) |
|
|
|
|
|
|
|
|
if return_tensors == "pt": |
|
|
windows = torch.tensor(windows, dtype=torch.float32) |
|
|
masks = torch.tensor(masks, dtype=torch.bool) |
|
|
|
|
|
return { |
|
|
"time_series": windows, |
|
|
"attention_mask": masks |
|
|
} |
|
|
|
|
|
def _normalize_data(self, data: np.ndarray, epsilon: float = 1e-8) -> np.ndarray: |
|
|
"""Normalize data using mean and standard deviation (per-feature).""" |
|
|
mean = np.mean(data, axis=0) |
|
|
std = np.std(data, axis=0) |
|
|
std = np.where(std == 0, epsilon, std) |
|
|
return (data - mean) / std |
|
|
|
|
|
def _pad_data_to_multiple(self, data: np.ndarray) -> tuple: |
|
|
""" |
|
|
Pad data to make its length a multiple of window_size. |
|
|
Returns padded data and padding mask. |
|
|
""" |
|
|
data_length = data.shape[0] |
|
|
remainder = data_length % self.win_size |
|
|
|
|
|
if remainder == 0: |
|
|
|
|
|
padding_mask = np.ones(data_length, dtype=bool) |
|
|
return data, padding_mask |
|
|
|
|
|
|
|
|
padding_length = self.win_size - remainder |
|
|
|
|
|
|
|
|
last_row = data[-1:, :] |
|
|
padding_data = np.repeat(last_row, padding_length, axis=0) |
|
|
padded_data = np.vstack([data, padding_data]) |
|
|
|
|
|
|
|
|
padding_mask = np.ones(data_length + padding_length, dtype=bool) |
|
|
padding_mask[data_length:] = False |
|
|
|
|
|
return padded_data, padding_mask |
|
|
|
|
|
def _create_windows(self, data: np.ndarray, padding_mask: np.ndarray) -> tuple: |
|
|
""" |
|
|
Create sliding windows from time series data. |
|
|
Returns windows and corresponding masks. |
|
|
""" |
|
|
windows = [] |
|
|
masks = [] |
|
|
|
|
|
for i in range(0, len(data) - self.win_size + 1, self.stride): |
|
|
window = data[i:i + self.win_size, :] |
|
|
mask = padding_mask[i:i + self.win_size] |
|
|
windows.append(window) |
|
|
masks.append(mask) |
|
|
|
|
|
return np.array(windows), np.array(masks) |
|
|
|
|
|
def save_pretrained(self, save_directory: str): |
|
|
"""Save processor configuration to directory.""" |
|
|
import json |
|
|
import os |
|
|
|
|
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
|
|
config = { |
|
|
"processor_type": "TimeRCDProcessor", |
|
|
"auto_map": { |
|
|
"AutoProcessor": "processing_time_rcd.TimeRCDProcessor" |
|
|
}, |
|
|
"win_size": self.win_size, |
|
|
"stride": self.stride, |
|
|
"normalize": self.normalize, |
|
|
"pad_to_multiple": self.pad_to_multiple, |
|
|
} |
|
|
|
|
|
with open(os.path.join(save_directory, "preprocessor_config.json"), "w") as f: |
|
|
json.dump(config, f, indent=2) |
|
|
|
|
|
@classmethod |
|
|
def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs): |
|
|
"""Load processor from pretrained configuration.""" |
|
|
import json |
|
|
import os |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
init_kwargs = { |
|
|
k: v for k, v in kwargs.items() |
|
|
if k in ['win_size', 'stride', 'normalize', 'pad_to_multiple'] |
|
|
} |
|
|
|
|
|
|
|
|
hf_hub_kwargs = { |
|
|
k: v for k, v in kwargs.items() |
|
|
if k in [ |
|
|
'cache_dir', 'force_download', 'proxies', 'resume_download', |
|
|
'token', 'revision', 'local_files_only', 'library_name', |
|
|
'library_version', 'user_agent', 'subfolder' |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
config_file = os.path.join(pretrained_model_name_or_path, "preprocessor_config.json") |
|
|
|
|
|
if os.path.exists(config_file): |
|
|
|
|
|
with open(config_file, "r") as f: |
|
|
config = json.load(f) |
|
|
else: |
|
|
|
|
|
try: |
|
|
config_file = hf_hub_download( |
|
|
repo_id=pretrained_model_name_or_path, |
|
|
filename="preprocessor_config.json", |
|
|
**hf_hub_kwargs |
|
|
) |
|
|
with open(config_file, "r") as f: |
|
|
config = json.load(f) |
|
|
except Exception as e: |
|
|
raise FileNotFoundError( |
|
|
f"Could not load preprocessor config from {pretrained_model_name_or_path}. " |
|
|
f"Error: {e}" |
|
|
) |
|
|
|
|
|
|
|
|
config.pop("processor_type", None) |
|
|
config.pop("auto_map", None) |
|
|
|
|
|
|
|
|
config.update(init_kwargs) |
|
|
|
|
|
return cls(**config) |
|
|
|
|
|
|