File size: 8,806 Bytes
4d65552 2c2b38d 4d65552 2c2b38d 4d65552 2c2b38d 4d65552 aded123 4d65552 2c2b38d aded123 2c2b38d d1a3231 2c2b38d 4d65552 2c2b38d 4d65552 2c2b38d 4d65552 2c2b38d 4d65552 c8f39e8 f24cd00 2c2b38d 4d65552 2c2b38d f24cd00 2c2b38d 4d65552 f24cd00 4d65552 f24cd00 4d65552 c8f39e8 4d65552 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
"""
Time_RCD Processor for Time Series Preprocessing
This processor handles:
- Data windowing/sliding windows
- Normalization (per-window z-score)
- Padding to window size multiples
- Creating attention masks
Usage:
>>> from huggingface_time_rcd import TimeRCDProcessor
>>> processor = TimeRCDProcessor(win_size=5000, normalize=True)
>>> inputs = processor(time_series_data)
>>> # inputs contains: {'time_series': tensor, 'attention_mask': tensor}
"""
import numpy as np
import torch
from typing import Optional, Dict, Any
from transformers import ProcessorMixin
class TimeRCDProcessor(ProcessorMixin):
"""
Processor for preparing time series data for Time_RCD model.
Mimics the AnomalyClipDataset preprocessing pipeline:
- Creates sliding windows
- Normalizes per-window (z-score normalization)
- Pads to window_size multiples
- Creates attention masks for padding
Parameters
----------
win_size : int, default=5000
Window size for creating sliding windows
stride : int, default=None
Stride for sliding windows. If None, uses win_size (non-overlapping)
normalize : bool, default=True
Whether to normalize each window (zero mean, unit variance)
pad_to_multiple : bool, default=True
Whether to pad data to make length a multiple of window_size
"""
def __init__(
self,
win_size: int = 5000,
stride: Optional[int] = None,
normalize: bool = True,
pad_to_multiple: bool = True,
**kwargs
):
# Set our processor-specific attributes BEFORE super().__init__
# so ProcessorMixin can validate them during initialization
self.win_size = win_size
self.stride = stride if stride is not None else win_size
self.normalize = normalize
self.pad_to_multiple = pad_to_multiple
# Call parent init after setting attributes
super().__init__(**kwargs)
@property
def model_input_names(self):
"""Return list of model input names."""
return ["time_series", "attention_mask"]
@property
def attributes(self):
"""Return list of attribute names for serialization."""
return ["win_size", "stride", "normalize", "pad_to_multiple"]
def __call__(
self,
time_series: np.ndarray,
return_tensors: Optional[str] = "pt",
) -> Dict[str, Any]:
"""
Preprocess time series data.
Parameters
----------
time_series : np.ndarray
Input time series data of shape (n_samples, n_features) or (n_samples,)
return_tensors : str, optional
Type of tensors to return: "pt" (PyTorch) or None
Returns
-------
dict
Dictionary containing:
- 'time_series': Processed time series windows
- 'attention_mask': Attention masks indicating real vs padded data
"""
# Ensure numpy array
time_series = np.asarray(time_series)
# Ensure 2D shape (N, C)
if time_series.ndim == 1:
time_series = time_series.reshape(-1, 1)
original_length = time_series.shape[0]
# Normalize if requested
if self.normalize:
time_series = self._normalize_data(time_series)
# Pad to multiple if requested
if self.pad_to_multiple:
time_series, padding_mask = self._pad_data_to_multiple(time_series)
else:
padding_mask = np.ones(time_series.shape[0], dtype=bool)
# Create windows
windows, masks = self._create_windows(time_series, padding_mask)
# Convert to tensors if requested
if return_tensors == "pt":
windows = torch.tensor(windows, dtype=torch.float32)
masks = torch.tensor(masks, dtype=torch.bool)
return {
"time_series": windows,
"attention_mask": masks
}
def _normalize_data(self, data: np.ndarray, epsilon: float = 1e-8) -> np.ndarray:
"""Normalize data using mean and standard deviation (per-feature)."""
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
std = np.where(std == 0, epsilon, std)
return (data - mean) / std
def _pad_data_to_multiple(self, data: np.ndarray) -> tuple:
"""
Pad data to make its length a multiple of window_size.
Returns padded data and padding mask.
"""
data_length = data.shape[0]
remainder = data_length % self.win_size
if remainder == 0:
# No padding needed
padding_mask = np.ones(data_length, dtype=bool)
return data, padding_mask
# Calculate padding needed
padding_length = self.win_size - remainder
# Pad by repeating the last row
last_row = data[-1:, :]
padding_data = np.repeat(last_row, padding_length, axis=0)
padded_data = np.vstack([data, padding_data])
# Create padding mask: True for real data, False for padded data
padding_mask = np.ones(data_length + padding_length, dtype=bool)
padding_mask[data_length:] = False
return padded_data, padding_mask
def _create_windows(self, data: np.ndarray, padding_mask: np.ndarray) -> tuple:
"""
Create sliding windows from time series data.
Returns windows and corresponding masks.
"""
windows = []
masks = []
for i in range(0, len(data) - self.win_size + 1, self.stride):
window = data[i:i + self.win_size, :]
mask = padding_mask[i:i + self.win_size]
windows.append(window)
masks.append(mask)
return np.array(windows), np.array(masks)
def save_pretrained(self, save_directory: str):
"""Save processor configuration to directory."""
import json
import os
os.makedirs(save_directory, exist_ok=True)
config = {
"processor_type": "TimeRCDProcessor",
"auto_map": {
"AutoProcessor": "processing_time_rcd.TimeRCDProcessor"
},
"win_size": self.win_size,
"stride": self.stride,
"normalize": self.normalize,
"pad_to_multiple": self.pad_to_multiple,
}
with open(os.path.join(save_directory, "preprocessor_config.json"), "w") as f:
json.dump(config, f, indent=2)
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs):
"""Load processor from pretrained configuration."""
import json
import os
from huggingface_hub import hf_hub_download
# Separate initialization kwargs from hf_hub_download kwargs
init_kwargs = {
k: v for k, v in kwargs.items()
if k in ['win_size', 'stride', 'normalize', 'pad_to_multiple']
}
# Filter kwargs to only include those accepted by hf_hub_download
hf_hub_kwargs = {
k: v for k, v in kwargs.items()
if k in [
'cache_dir', 'force_download', 'proxies', 'resume_download',
'token', 'revision', 'local_files_only', 'library_name',
'library_version', 'user_agent', 'subfolder'
]
}
# Try to load from local path first
config_file = os.path.join(pretrained_model_name_or_path, "preprocessor_config.json")
if os.path.exists(config_file):
# Load from local path
with open(config_file, "r") as f:
config = json.load(f)
else:
# Try to download from HuggingFace Hub
try:
config_file = hf_hub_download(
repo_id=pretrained_model_name_or_path,
filename="preprocessor_config.json",
**hf_hub_kwargs
)
with open(config_file, "r") as f:
config = json.load(f)
except Exception as e:
raise FileNotFoundError(
f"Could not load preprocessor config from {pretrained_model_name_or_path}. "
f"Error: {e}"
)
# Remove processor_type and auto_map from config
config.pop("processor_type", None)
config.pop("auto_map", None)
# Merge loaded config with any passed init_kwargs (init_kwargs take precedence)
config.update(init_kwargs)
return cls(**config)
|