cams-pollution-dashboard / data_processor.py
aditya-me13's picture
Integration of Aurora
4f0125c
raw
history blame
33.9 kB
# NetCDF file processing and air pollution variable detection
import os
import zipfile
import warnings
import tempfile
import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path
from datetime import datetime
# India geographical bounds for coordinate trimming
INDIA_BOUNDS = {
'lat_min': 6.0, # Southern tip (including southern islands)
'lat_max': 38.0, # Northern border (including Kashmir)
'lon_min': 68.0, # Western border
'lon_max': 98.0 # Eastern border (including Andaman & Nicobar)
}
# Imports from our Modules
from constants import NETCDF_VARIABLES, AIR_POLLUTION_VARIABLES, PRESSURE_LEVELS
warnings.filterwarnings('ignore')
class NetCDFProcessor:
def __init__(self, file_path):
"""
Initialize NetCDF processor
Parameters:
file_path (str): Path to NetCDF or ZIP file
"""
self.file_path = Path(file_path)
self.dataset = None
self.surface_dataset = None
self.atmospheric_dataset = None
self.detected_variables = {}
def load_dataset(self):
"""Load NetCDF dataset from file or ZIP"""
try:
if self.file_path.suffix.lower() == '.zip':
return self._load_from_zip()
elif self.file_path.suffix.lower() == '.nc':
return self._load_from_netcdf()
else:
raise ValueError("Unsupported file format. Use .nc or .zip files.")
except Exception as e:
raise Exception(f"Error loading dataset: {str(e)}")
def _load_from_zip(self):
"""Load dataset from ZIP file (CAMS format)"""
with zipfile.ZipFile(self.file_path, 'r') as zf:
zip_contents = zf.namelist()
# Look for surface and atmospheric data files
surface_file = None
atmospheric_file = None
for file in zip_contents:
if 'sfc' in file.lower() or 'surface' in file.lower():
surface_file = file
elif 'plev' in file.lower() or 'pressure' in file.lower() or 'atmospheric' in file.lower():
atmospheric_file = file
# Load surface data if available
if surface_file:
with zf.open(surface_file) as f:
with tempfile.NamedTemporaryFile(suffix='.nc') as tmp:
tmp.write(f.read())
tmp.flush()
self.surface_dataset = xr.open_dataset(tmp.name, engine='netcdf4')
print(f"Loaded surface data: {surface_file}")
# Load atmospheric data if available
if atmospheric_file:
with zf.open(atmospheric_file) as f:
with tempfile.NamedTemporaryFile(suffix='.nc') as tmp:
tmp.write(f.read())
tmp.flush()
self.atmospheric_dataset = xr.open_dataset(tmp.name, engine='netcdf4')
print(f"Loaded atmospheric data: {atmospheric_file}")
# If no specific files found, try to load the first .nc file
if not surface_file and not atmospheric_file:
nc_files = [f for f in zip_contents if f.endswith('.nc')]
if nc_files:
with zf.open(nc_files[0]) as f:
with tempfile.NamedTemporaryFile(suffix='.nc') as tmp:
tmp.write(f.read())
tmp.flush()
self.dataset = xr.open_dataset(tmp.name, engine='netcdf4')
print(f"Loaded dataset: {nc_files[0]}")
else:
raise ValueError("No NetCDF files found in ZIP")
return True
def _load_from_netcdf(self):
"""Load dataset from single NetCDF file"""
self.dataset = xr.open_dataset(self.file_path)
print(f"Loaded NetCDF file: {self.file_path.name}")
return True
def detect_variables(self):
"""Detect all supported variables (pollution, meteorological, etc.) in all loaded datasets"""
self.detected_variables = {}
# Check surface dataset
if self.surface_dataset is not None:
surface_vars = self._detect_variables_in_dataset(self.surface_dataset, 'surface')
self.detected_variables.update(surface_vars)
# Check atmospheric dataset
if self.atmospheric_dataset is not None:
atmo_vars = self._detect_variables_in_dataset(self.atmospheric_dataset, 'atmospheric')
self.detected_variables.update(atmo_vars)
# Check main dataset if no separate files
if self.dataset is not None:
main_vars = self._detect_variables_in_dataset(self.dataset, 'unknown')
self.detected_variables.update(main_vars)
return self.detected_variables
def _detect_variables_in_dataset(self, dataset, dataset_type):
"""Detect all supported variables in a specific dataset"""
detected = {}
for var_name in dataset.data_vars:
var_name_lower = var_name.lower()
var_dims = list(dataset[var_name].dims)
# Determine actual variable type based on dimensions (not just dictionary)
actual_var_type = 'surface'
if any(dim in ['level', 'plev', 'pressure_level', 'height'] for dim in [d.lower() for d in var_dims]):
actual_var_type = 'atmospheric'
# Check exact matches first in NETCDF_VARIABLES
if var_name in NETCDF_VARIABLES:
detected[var_name] = NETCDF_VARIABLES[var_name].copy()
detected[var_name]['original_name'] = var_name
detected[var_name]['dataset_type'] = dataset_type
detected[var_name]['shape'] = dataset[var_name].shape
detected[var_name]['dims'] = var_dims
# Override type based on actual dimensions
detected[var_name]['type'] = actual_var_type
elif var_name_lower in NETCDF_VARIABLES:
detected[var_name] = NETCDF_VARIABLES[var_name_lower].copy()
detected[var_name]['original_name'] = var_name
detected[var_name]['dataset_type'] = dataset_type
detected[var_name]['shape'] = dataset[var_name].shape
detected[var_name]['dims'] = var_dims
# Override type based on actual dimensions
detected[var_name]['type'] = actual_var_type
else:
# Auto-detect unknown variables by examining their attributes
var_info = dataset[var_name]
long_name = getattr(var_info, 'long_name', '').lower()
standard_name = getattr(var_info, 'standard_name', '').lower()
units = getattr(var_info, 'units', 'unknown')
# Try to match against any known variable in NETCDF_VARIABLES by keywords
matched = False
for known_var, properties in NETCDF_VARIABLES.items():
if (known_var in var_name_lower or
known_var in long_name or
known_var in standard_name or
properties['name'].lower() in var_name_lower or
properties['name'].lower() in long_name):
detected[var_name] = properties.copy()
detected[var_name]['original_name'] = var_name
detected[var_name]['dataset_type'] = dataset_type
detected[var_name]['shape'] = dataset[var_name].shape
detected[var_name]['dims'] = var_dims
# Override type based on actual dimensions
detected[var_name]['type'] = actual_var_type
if units != 'unknown':
detected[var_name]['units'] = units # Use actual units from file
matched = True
break
# If still no match, create a generic entry for any 2D+ variable
if not matched and len(dataset[var_name].dims) >= 2:
# Check if it has lat/lon dimensions
has_spatial = any(dim in ['lat', 'lon', 'latitude', 'longitude', 'x', 'y']
for dim in [d.lower() for d in var_dims])
if has_spatial:
# Use the already determined variable type
var_type = actual_var_type
# Auto-determine color scheme based on variable name or units
cmap = 'viridis' # default
if 'temp' in var_name_lower or 'temperature' in long_name:
cmap = 'RdYlBu'
elif any(word in var_name_lower for word in ['wind', 'u', 'v']):
cmap = 'coolwarm'
elif any(word in var_name_lower for word in ['precip', 'rain', 'cloud', 'humid']):
cmap = 'Blues'
elif 'pressure' in var_name_lower or 'pressure' in long_name:
cmap = 'RdYlBu'
elif any(word in var_name_lower for word in ['radiation', 'solar']):
cmap = 'YlOrRd'
detected[var_name] = {
'units': units,
'name': long_name.title() if long_name else var_name.replace('_', ' ').title(),
'cmap': cmap,
'vmax_percentile': 95,
'type': var_type,
'original_name': var_name,
'dataset_type': dataset_type,
'shape': dataset[var_name].shape,
'dims': var_dims,
'auto_detected': True # Flag to indicate this was auto-detected
}
return detected
def get_coordinates(self, dataset):
"""Get coordinate names from dataset"""
coords = list(dataset.coords.keys())
# Find latitude coordinate
lat_names = ['latitude', 'lat', 'y', 'Latitude', 'LATITUDE']
lat_coord = next((name for name in lat_names if name in coords), None)
# Find longitude coordinate
lon_names = ['longitude', 'lon', 'x', 'Longitude', 'LONGITUDE']
lon_coord = next((name for name in lon_names if name in coords), None)
# Find time coordinate
time_names = ['time', 'Time', 'TIME', 'forecast_reference_time']
time_coord = next((name for name in time_names if name in coords), None)
# Find pressure/level coordinate
level_names = ['pressure_level', 'plev', 'level', 'pressure', 'lev']
level_coord = next((name for name in level_names if name in coords), None)
return {
'lat': lat_coord,
'lon': lon_coord,
'time': time_coord,
'level': level_coord
}
def format_timestamp(self, timestamp):
"""Format timestamp for display in plots"""
try:
if pd.isna(timestamp):
return "Unknown Time"
# Convert to pandas datetime if it isn't already
if not isinstance(timestamp, pd.Timestamp):
timestamp = pd.to_datetime(timestamp)
# Format as "YYYY-MM-DD HH:MM"
return timestamp.strftime('%Y-%m-%d %H:%M')
except:
return str(timestamp)
def extract_data(self, variable_name, time_index=1, pressure_level=None):
"""
Extract data for a specific variable
Parameters:
variable_name (str): Name of the variable to extract
time_index (int): Time index to extract (default: 0 for current time)
pressure_level (float): Pressure level for atmospheric variables (default: surface level)
Returns:
tuple: (data_array, metadata)
"""
if variable_name not in self.detected_variables:
raise ValueError(f"Variable {variable_name} not found in detected variables")
var_info = self.detected_variables[variable_name]
dataset_type = var_info['dataset_type']
# Determine which dataset to use
if dataset_type == 'surface' and self.surface_dataset is not None:
dataset = self.surface_dataset
elif dataset_type == 'atmospheric' and self.atmospheric_dataset is not None:
dataset = self.atmospheric_dataset
elif self.dataset is not None:
dataset = self.dataset
else:
raise ValueError(f"No suitable dataset found for variable {variable_name}")
# Get the data variable
data_var = dataset[variable_name]
coords = self.get_coordinates(dataset)
print(f"Coordinates: {coords}\n\n")
# Handle different data shapes
data_array = data_var
print(f"Data array shape: {data_array.dims} \n\n")
# Get timestamp information before extracting data
selected_timestamp = None
timestamp_str = "Unknown Time"
# Handle time dimension
if coords['time'] and coords['time'] in data_array.dims:
# Get all available times
available_times = pd.to_datetime(dataset[coords['time']].values)
if time_index == -1: # Latest time
time_index = len(available_times) - 1
# Ensure time_index is within bounds
if 0 <= time_index < len(available_times):
selected_timestamp = available_times[time_index]
timestamp_str = self.format_timestamp(selected_timestamp)
print(f"Time index: {time_index} selected - {timestamp_str}")
data_array = data_array.isel({coords['time']: time_index})
else:
print(f"Warning: time_index {time_index} out of bounds, using index 0")
time_index = 0
selected_timestamp = available_times[time_index]
timestamp_str = self.format_timestamp(selected_timestamp)
data_array = data_array.isel({coords['time']: time_index})
# Handle pressure/level dimension for atmospheric variables
actual_pressure = None
if coords['level'] and coords['level'] in data_array.dims:
if pressure_level is None:
# Default to surface level (highest pressure)
pressure_level = 1000
# Find closest pressure level
pressure_values = dataset[coords['level']].values
level_index = np.argmin(np.abs(pressure_values - pressure_level))
actual_pressure = pressure_values[level_index]
data_array = data_array.isel({coords['level']: level_index})
print(f"Selected pressure level: {actual_pressure} hPa (requested: {pressure_level} hPa)")
elif pressure_level is not None:
# Store the requested pressure level even if no level dimension exists
actual_pressure = pressure_level
# Handle batch dimension (usually the first dimension for CAMS data)
shape = data_array.shape
if len(shape) == 4: # (batch, time, lat, lon) or similar
data_array = data_array[0, -1] # Take first batch, latest time
elif len(shape) == 3: # (batch, lat, lon) or (time, lat, lon)
data_array = data_array[-1] # Take latest
elif len(shape) == 5: # (batch, time, level, lat, lon)
data_array = data_array[0, -1] # Already handled level above
# Get coordinate arrays
lats = dataset[coords['lat']].values
lons = dataset[coords['lon']].values
# Crop data to India region for better performance
data_values, lats, lons = self._crop_to_india(data_array.values, lats, lons)
# Convert units if necessary
original_units = getattr(dataset[variable_name], 'units', '')
data_values = self._convert_units(data_values, original_units, var_info['units'])
metadata = {
'variable_name': variable_name,
'display_name': var_info['name'],
'units': var_info['units'],
'original_units': original_units,
'shape': data_values.shape,
'lats': lats,
'lons': lons,
'pressure_level': actual_pressure,
'time_index': time_index,
'timestamp': selected_timestamp,
'timestamp_str': timestamp_str,
'dataset_type': dataset_type
}
return data_values, metadata
def _convert_units(self, data, original_units, target_units):
"""Convert data units for air pollution variables"""
data_converted = data.copy()
if original_units and target_units:
orig_lower = original_units.lower()
target_lower = target_units.lower()
# kg/m³ to µg/m³
if 'kg' in orig_lower and 'µg' in target_lower:
data_converted = data_converted * 1e9
print(f"Converting from {original_units} to {target_units} (×1e9)")
# kg/m³ to mg/m³
elif 'kg' in orig_lower and 'mg' in target_lower:
data_converted = data_converted * 1e6
print(f"Converting from {original_units} to {target_units} (×1e6)")
# mol/m² conversions (keep as is)
elif 'mol' in orig_lower:
print(f"Units {original_units} kept as is")
# No unit (dimensionless) - keep as is
elif target_units == '':
print("Dimensionless variable - no unit conversion")
return data_converted
def _crop_to_india(self, data_values, lats, lons):
"""
Crop data to India region to improve performance and focus visualization
Parameters:
data_values (np.ndarray): 2D data array (lat, lon)
lats (np.ndarray): Latitude values
lons (np.ndarray): Longitude values
Returns:
tuple: (cropped_data, cropped_lats, cropped_lons)
"""
# Use same India bounds as Aurora processor for consistency
lat_min = INDIA_BOUNDS['lat_min'] - 2 # 6-2 = 4°N
lat_max = INDIA_BOUNDS['lat_max'] + 2 # 38+2 = 40°N
lon_min = INDIA_BOUNDS['lon_min'] - 2 # 68-2 = 66°E
lon_max = INDIA_BOUNDS['lon_max'] + 2 # 97+2 = 99°E
print(f"Original data shape: {data_values.shape}")
print(f"Original lat range: {lats.min():.2f} to {lats.max():.2f}")
print(f"Original lon range: {lons.min():.2f} to {lons.max():.2f}")
# Find indices within India bounds
lat_mask = (lats >= lat_min) & (lats <= lat_max)
lon_mask = (lons >= lon_min) & (lons <= lon_max)
# Get the indices
lat_indices = np.where(lat_mask)[0]
lon_indices = np.where(lon_mask)[0]
if len(lat_indices) == 0 or len(lon_indices) == 0:
print("Warning: No data found in India region, using full dataset")
return data_values, lats, lons
# Get the min and max indices to define the crop region
lat_start, lat_end = lat_indices[0], lat_indices[-1] + 1
lon_start, lon_end = lon_indices[0], lon_indices[-1] + 1
# Crop the data
if len(data_values.shape) == 2: # (lat, lon)
cropped_data = data_values[lat_start:lat_end, lon_start:lon_end]
else:
print(f"Warning: Unexpected data shape {data_values.shape}, cropping first two dimensions")
cropped_data = data_values[lat_start:lat_end, lon_start:lon_end]
# Crop coordinates
cropped_lats = lats[lat_start:lat_end]
cropped_lons = lons[lon_start:lon_end]
print(f"Cropped data shape: {cropped_data.shape}")
print(f"Cropped lat range: {cropped_lats.min():.2f} to {cropped_lats.max():.2f}")
print(f"Cropped lon range: {cropped_lons.min():.2f} to {cropped_lons.max():.2f}")
print(f"Data reduction: {(1 - cropped_data.size / data_values.size) * 100:.1f}%")
return cropped_data, cropped_lats, cropped_lons
def get_available_times(self, variable_name):
"""Get available time steps for a variable"""
if variable_name not in self.detected_variables:
return []
var_info = self.detected_variables[variable_name]
dataset_type = var_info['dataset_type']
# Determine which dataset to use
if dataset_type == 'surface' and self.surface_dataset is not None:
dataset = self.surface_dataset
elif dataset_type == 'atmospheric' and self.atmospheric_dataset is not None:
dataset = self.atmospheric_dataset
elif self.dataset is not None:
dataset = self.dataset
else:
return []
coords = self.get_coordinates(dataset)
if coords['time'] and coords['time'] in dataset.dims:
times = pd.to_datetime(dataset[coords['time']].values)
print(f"Times: {times.to_list()}")
return times.tolist()
return []
def get_available_pressure_levels(self, variable_name):
"""Get available pressure levels for atmospheric variables"""
if variable_name not in self.detected_variables:
return []
var_info = self.detected_variables[variable_name]
if var_info['type'] != 'atmospheric':
return []
dataset_type = var_info['dataset_type']
# Determine which dataset to use
if dataset_type == 'atmospheric' and self.atmospheric_dataset is not None:
dataset = self.atmospheric_dataset
elif self.dataset is not None:
dataset = self.dataset
else:
return []
coords = self.get_coordinates(dataset)
if coords['level'] and coords['level'] in dataset.dims:
levels = dataset[coords['level']].values
return levels.tolist()
return PRESSURE_LEVELS # Default pressure levels
def close(self):
"""Close all open datasets safely"""
try:
if self.dataset is not None:
self.dataset.close()
self.dataset = None
except (RuntimeError, OSError):
pass # Dataset already closed or invalid
try:
if self.surface_dataset is not None:
self.surface_dataset.close()
self.surface_dataset = None
except (RuntimeError, OSError):
pass # Dataset already closed or invalid
try:
if self.atmospheric_dataset is not None:
self.atmospheric_dataset.close()
self.atmospheric_dataset = None
except (RuntimeError, OSError):
pass # Dataset already closed or invalid
class AuroraPredictionProcessor:
def __init__(self, file_path):
"""
Initialize Aurora prediction processor for single NetCDF files with timestep data
Parameters:
file_path (str): Path to Aurora prediction NetCDF file
"""
self.file_path = Path(file_path)
self.dataset = None
self.detected_variables = {}
def _trim_to_india_bounds(self, var, lats, lons):
"""
Trim data and coordinates to India geographical bounds to reduce computation
Parameters:
var (xarray.DataArray): Variable data
lats (numpy.ndarray): Latitude coordinates
lons (numpy.ndarray): Longitude coordinates
Returns:
tuple: (trimmed_var, trimmed_lats, trimmed_lons)
"""
# Find indices within India bounds
lat_mask = (lats >= INDIA_BOUNDS['lat_min']) & (lats <= INDIA_BOUNDS['lat_max'])
lon_mask = (lons >= INDIA_BOUNDS['lon_min']) & (lons <= INDIA_BOUNDS['lon_max'])
lat_indices = np.where(lat_mask)[0]
lon_indices = np.where(lon_mask)[0]
if len(lat_indices) == 0 or len(lon_indices) == 0:
# If no points in India bounds, return original data
return var, lats, lons
# Get min/max indices for slicing
lat_start, lat_end = lat_indices[0], lat_indices[-1] + 1
lon_start, lon_end = lon_indices[0], lon_indices[-1] + 1
# Trim coordinates
trimmed_lats = lats[lat_start:lat_end]
trimmed_lons = lons[lon_start:lon_end]
# Trim data - handle different dimension orders
if var.ndim == 2: # (lat, lon)
trimmed_var = var[lat_start:lat_end, lon_start:lon_end]
elif var.ndim == 3 and 'latitude' in var.dims and 'longitude' in var.dims:
# Find latitude and longitude dimension positions
lat_dim_pos = var.dims.index('latitude') if 'latitude' in var.dims else var.dims.index('lat')
lon_dim_pos = var.dims.index('longitude') if 'longitude' in var.dims else var.dims.index('lon')
if lat_dim_pos == 1 and lon_dim_pos == 2: # (time/level, lat, lon)
trimmed_var = var[:, lat_start:lat_end, lon_start:lon_end]
elif lat_dim_pos == 0 and lon_dim_pos == 1: # (lat, lon, time/level)
trimmed_var = var[lat_start:lat_end, lon_start:lon_end, :]
else:
# Fall back to original if dimension order is unexpected
return var, lats, lons
else:
# For other dimensions or if trimming fails, return original
return var, lats, lons
return trimmed_var, trimmed_lats, trimmed_lons
def load_dataset(self):
"""Load Aurora prediction NetCDF dataset"""
try:
self.dataset = xr.open_dataset(self.file_path)
return True
except Exception as e:
raise Exception(f"Error loading Aurora prediction dataset: {str(e)}")
def extract_variable_data(self, variable_name, pressure_level=None, step=0):
"""
Extract variable data from Aurora prediction file
Parameters:
variable_name (str): Name of the variable to extract
pressure_level (float, optional): Pressure level for atmospheric variables
step (int): Time step index (default: 0)
Returns:
tuple: (data_array, metadata_dict)
"""
if self.dataset is None:
self.load_dataset()
if variable_name not in self.dataset.data_vars:
raise ValueError(f"Variable '{variable_name}' not found in dataset")
var = self.dataset[variable_name]
# Handle Aurora-specific dimensions
# Aurora files have: (forecast_period, forecast_reference_time, [pressure_level], latitude, longitude)
# First, squeeze singleton forecast_period dimension
if 'forecast_period' in var.dims and var.sizes['forecast_period'] == 1:
var = var.squeeze('forecast_period')
# Handle forecast_reference_time - take the first one (index 0)
if 'forecast_reference_time' in var.dims:
var = var.isel(forecast_reference_time=0)
# Handle step dimension if present (for backward compatibility)
if 'step' in var.dims:
if step >= var.sizes['step']:
raise ValueError(f"Step {step} not available. Dataset has {var.sizes['step']} steps.")
var = var.isel(step=step)
# Handle pressure level dimension if present
if pressure_level is not None and 'pressure_level' in var.dims:
pressure_level = float(pressure_level)
# Find closest pressure level
available_levels = var.pressure_level.values
closest_idx = np.argmin(np.abs(available_levels - pressure_level))
actual_level = available_levels[closest_idx]
var = var.isel(pressure_level=closest_idx)
pressure_info = f"at {actual_level:.0f} hPa"
else:
pressure_info = None
# Handle different coordinate naming conventions
if 'latitude' in self.dataset.coords:
lats = self.dataset['latitude'].values
lons = self.dataset['longitude'].values
else:
lats = self.dataset['lat'].values if 'lat' in self.dataset else self.dataset['latitude'].values
lons = self.dataset['lon'].values if 'lon' in self.dataset else self.dataset['longitude'].values
# Trim data and coordinates to India bounds to reduce computation
var, lats, lons = self._trim_to_india_bounds(var, lats, lons)
# Extract trimmed data
data_values = var.values
# Get variable information
from constants import NETCDF_VARIABLES
var_info = NETCDF_VARIABLES.get(variable_name, {})
display_name = var_info.get('name', variable_name.replace('_', ' ').title())
units = var.attrs.get('units', var_info.get('units', ''))
# Prepare metadata
metadata = {
'variable_name': variable_name,
'display_name': display_name,
'units': units,
'lats': lats,
'lons': lons,
'pressure_level': pressure_level if pressure_level else None,
'pressure_info': pressure_info,
'step': step,
'data_shape': data_values.shape,
'source': 'Aurora Prediction',
'file_path': str(self.file_path),
}
# Add timestamp information
# For Aurora predictions, step represents the forecast step (12-hour intervals)
hours_from_start = (step + 1) * 12 # Assuming 12-hour intervals
metadata['timestamp_str'] = f"T+{hours_from_start}h (Step {step + 1})"
return data_values, metadata
def get_available_variables(self):
"""Get list of available variables categorized by type"""
if self.dataset is None:
self.load_dataset()
surface_vars = []
atmospheric_vars = []
for var_name in self.dataset.data_vars:
var = self.dataset[var_name]
# Check if variable has pressure level dimension
if 'pressure_level' in var.dims:
atmospheric_vars.append(var_name)
else:
surface_vars.append(var_name)
return {
'surface_vars': surface_vars,
'atmospheric_vars': atmospheric_vars
}
def get_available_pressure_levels(self):
"""Get available pressure levels"""
if self.dataset is None:
self.load_dataset()
if 'pressure_level' in self.dataset.coords:
return self.dataset.pressure_level.values.tolist()
return []
def get_available_steps(self):
"""Get available time steps"""
if self.dataset is None:
self.load_dataset()
if 'step' in self.dataset.dims:
return list(range(self.dataset.sizes['step']))
return [0]
def close(self):
"""Close the dataset safely"""
try:
if self.dataset is not None:
self.dataset.close()
self.dataset = None
except (RuntimeError, OSError):
pass # Dataset already closed or invalid
def analyze_netcdf_file(file_path):
"""
Analyze NetCDF file structure and return summary
Parameters:
file_path (str): Path to NetCDF or ZIP file
Returns:
dict: Analysis summary
"""
processor = NetCDFProcessor(file_path)
try:
processor.load_dataset()
detected_vars = processor.detect_variables()
analysis = {
'success': True,
'file_path': str(file_path),
'detected_variables': detected_vars,
'total_variables': len(detected_vars),
'surface_variables': len([v for v in detected_vars.values() if v.get('type') == 'surface']),
'atmospheric_variables': len([v for v in detected_vars.values() if v.get('type') == 'atmospheric']),
}
# Get sample time information
if detected_vars:
sample_var = list(detected_vars.keys())[0]
times = processor.get_available_times(sample_var)
if times:
analysis['time_range'] = {
'start': str(times[0]),
'end': str(times[-1]),
'count': len(times)
}
processor.close()
return analysis
except Exception as e:
processor.close()
return {
'success': False,
'error': str(e),
'file_path': str(file_path)
}