Spaces:
Sleeping
Sleeping
| # NetCDF file processing and air pollution variable detection | |
| import os | |
| import zipfile | |
| import warnings | |
| import tempfile | |
| import numpy as np | |
| import pandas as pd | |
| import xarray as xr | |
| from pathlib import Path | |
| from datetime import datetime | |
| # India geographical bounds for coordinate trimming | |
| INDIA_BOUNDS = { | |
| 'lat_min': 6.0, # Southern tip (including southern islands) | |
| 'lat_max': 38.0, # Northern border (including Kashmir) | |
| 'lon_min': 68.0, # Western border | |
| 'lon_max': 98.0 # Eastern border (including Andaman & Nicobar) | |
| } | |
| # Imports from our Modules | |
| from constants import NETCDF_VARIABLES, AIR_POLLUTION_VARIABLES, PRESSURE_LEVELS | |
| warnings.filterwarnings('ignore') | |
| class NetCDFProcessor: | |
| def __init__(self, file_path): | |
| """ | |
| Initialize NetCDF processor | |
| Parameters: | |
| file_path (str): Path to NetCDF or ZIP file | |
| """ | |
| self.file_path = Path(file_path) | |
| self.dataset = None | |
| self.surface_dataset = None | |
| self.atmospheric_dataset = None | |
| self.detected_variables = {} | |
| def load_dataset(self): | |
| """Load NetCDF dataset from file or ZIP""" | |
| try: | |
| if self.file_path.suffix.lower() == '.zip': | |
| return self._load_from_zip() | |
| elif self.file_path.suffix.lower() == '.nc': | |
| return self._load_from_netcdf() | |
| else: | |
| raise ValueError("Unsupported file format. Use .nc or .zip files.") | |
| except Exception as e: | |
| raise Exception(f"Error loading dataset: {str(e)}") | |
| def _load_from_zip(self): | |
| """Load dataset from ZIP file (CAMS format)""" | |
| with zipfile.ZipFile(self.file_path, 'r') as zf: | |
| zip_contents = zf.namelist() | |
| # Look for surface and atmospheric data files | |
| surface_file = None | |
| atmospheric_file = None | |
| for file in zip_contents: | |
| if 'sfc' in file.lower() or 'surface' in file.lower(): | |
| surface_file = file | |
| elif 'plev' in file.lower() or 'pressure' in file.lower() or 'atmospheric' in file.lower(): | |
| atmospheric_file = file | |
| # Load surface data if available | |
| if surface_file: | |
| with zf.open(surface_file) as f: | |
| with tempfile.NamedTemporaryFile(suffix='.nc') as tmp: | |
| tmp.write(f.read()) | |
| tmp.flush() | |
| self.surface_dataset = xr.open_dataset(tmp.name, engine='netcdf4') | |
| print(f"Loaded surface data: {surface_file}") | |
| # Load atmospheric data if available | |
| if atmospheric_file: | |
| with zf.open(atmospheric_file) as f: | |
| with tempfile.NamedTemporaryFile(suffix='.nc') as tmp: | |
| tmp.write(f.read()) | |
| tmp.flush() | |
| self.atmospheric_dataset = xr.open_dataset(tmp.name, engine='netcdf4') | |
| print(f"Loaded atmospheric data: {atmospheric_file}") | |
| # If no specific files found, try to load the first .nc file | |
| if not surface_file and not atmospheric_file: | |
| nc_files = [f for f in zip_contents if f.endswith('.nc')] | |
| if nc_files: | |
| with zf.open(nc_files[0]) as f: | |
| with tempfile.NamedTemporaryFile(suffix='.nc') as tmp: | |
| tmp.write(f.read()) | |
| tmp.flush() | |
| self.dataset = xr.open_dataset(tmp.name, engine='netcdf4') | |
| print(f"Loaded dataset: {nc_files[0]}") | |
| else: | |
| raise ValueError("No NetCDF files found in ZIP") | |
| return True | |
| def _load_from_netcdf(self): | |
| """Load dataset from single NetCDF file""" | |
| self.dataset = xr.open_dataset(self.file_path) | |
| print(f"Loaded NetCDF file: {self.file_path.name}") | |
| return True | |
| def detect_variables(self): | |
| """Detect all supported variables (pollution, meteorological, etc.) in all loaded datasets""" | |
| self.detected_variables = {} | |
| # Check surface dataset | |
| if self.surface_dataset is not None: | |
| surface_vars = self._detect_variables_in_dataset(self.surface_dataset, 'surface') | |
| self.detected_variables.update(surface_vars) | |
| # Check atmospheric dataset | |
| if self.atmospheric_dataset is not None: | |
| atmo_vars = self._detect_variables_in_dataset(self.atmospheric_dataset, 'atmospheric') | |
| self.detected_variables.update(atmo_vars) | |
| # Check main dataset if no separate files | |
| if self.dataset is not None: | |
| main_vars = self._detect_variables_in_dataset(self.dataset, 'unknown') | |
| self.detected_variables.update(main_vars) | |
| return self.detected_variables | |
| def _detect_variables_in_dataset(self, dataset, dataset_type): | |
| """Detect all supported variables in a specific dataset""" | |
| detected = {} | |
| for var_name in dataset.data_vars: | |
| var_name_lower = var_name.lower() | |
| var_dims = list(dataset[var_name].dims) | |
| # Determine actual variable type based on dimensions (not just dictionary) | |
| actual_var_type = 'surface' | |
| if any(dim in ['level', 'plev', 'pressure_level', 'height'] for dim in [d.lower() for d in var_dims]): | |
| actual_var_type = 'atmospheric' | |
| # Check exact matches first in NETCDF_VARIABLES | |
| if var_name in NETCDF_VARIABLES: | |
| detected[var_name] = NETCDF_VARIABLES[var_name].copy() | |
| detected[var_name]['original_name'] = var_name | |
| detected[var_name]['dataset_type'] = dataset_type | |
| detected[var_name]['shape'] = dataset[var_name].shape | |
| detected[var_name]['dims'] = var_dims | |
| # Override type based on actual dimensions | |
| detected[var_name]['type'] = actual_var_type | |
| elif var_name_lower in NETCDF_VARIABLES: | |
| detected[var_name] = NETCDF_VARIABLES[var_name_lower].copy() | |
| detected[var_name]['original_name'] = var_name | |
| detected[var_name]['dataset_type'] = dataset_type | |
| detected[var_name]['shape'] = dataset[var_name].shape | |
| detected[var_name]['dims'] = var_dims | |
| # Override type based on actual dimensions | |
| detected[var_name]['type'] = actual_var_type | |
| else: | |
| # Auto-detect unknown variables by examining their attributes | |
| var_info = dataset[var_name] | |
| long_name = getattr(var_info, 'long_name', '').lower() | |
| standard_name = getattr(var_info, 'standard_name', '').lower() | |
| units = getattr(var_info, 'units', 'unknown') | |
| # Try to match against any known variable in NETCDF_VARIABLES by keywords | |
| matched = False | |
| for known_var, properties in NETCDF_VARIABLES.items(): | |
| if (known_var in var_name_lower or | |
| known_var in long_name or | |
| known_var in standard_name or | |
| properties['name'].lower() in var_name_lower or | |
| properties['name'].lower() in long_name): | |
| detected[var_name] = properties.copy() | |
| detected[var_name]['original_name'] = var_name | |
| detected[var_name]['dataset_type'] = dataset_type | |
| detected[var_name]['shape'] = dataset[var_name].shape | |
| detected[var_name]['dims'] = var_dims | |
| # Override type based on actual dimensions | |
| detected[var_name]['type'] = actual_var_type | |
| if units != 'unknown': | |
| detected[var_name]['units'] = units # Use actual units from file | |
| matched = True | |
| break | |
| # If still no match, create a generic entry for any 2D+ variable | |
| if not matched and len(dataset[var_name].dims) >= 2: | |
| # Check if it has lat/lon dimensions | |
| has_spatial = any(dim in ['lat', 'lon', 'latitude', 'longitude', 'x', 'y'] | |
| for dim in [d.lower() for d in var_dims]) | |
| if has_spatial: | |
| # Use the already determined variable type | |
| var_type = actual_var_type | |
| # Auto-determine color scheme based on variable name or units | |
| cmap = 'viridis' # default | |
| if 'temp' in var_name_lower or 'temperature' in long_name: | |
| cmap = 'RdYlBu' | |
| elif any(word in var_name_lower for word in ['wind', 'u', 'v']): | |
| cmap = 'coolwarm' | |
| elif any(word in var_name_lower for word in ['precip', 'rain', 'cloud', 'humid']): | |
| cmap = 'Blues' | |
| elif 'pressure' in var_name_lower or 'pressure' in long_name: | |
| cmap = 'RdYlBu' | |
| elif any(word in var_name_lower for word in ['radiation', 'solar']): | |
| cmap = 'YlOrRd' | |
| detected[var_name] = { | |
| 'units': units, | |
| 'name': long_name.title() if long_name else var_name.replace('_', ' ').title(), | |
| 'cmap': cmap, | |
| 'vmax_percentile': 95, | |
| 'type': var_type, | |
| 'original_name': var_name, | |
| 'dataset_type': dataset_type, | |
| 'shape': dataset[var_name].shape, | |
| 'dims': var_dims, | |
| 'auto_detected': True # Flag to indicate this was auto-detected | |
| } | |
| return detected | |
| def get_coordinates(self, dataset): | |
| """Get coordinate names from dataset""" | |
| coords = list(dataset.coords.keys()) | |
| # Find latitude coordinate | |
| lat_names = ['latitude', 'lat', 'y', 'Latitude', 'LATITUDE'] | |
| lat_coord = next((name for name in lat_names if name in coords), None) | |
| # Find longitude coordinate | |
| lon_names = ['longitude', 'lon', 'x', 'Longitude', 'LONGITUDE'] | |
| lon_coord = next((name for name in lon_names if name in coords), None) | |
| # Find time coordinate | |
| time_names = ['time', 'Time', 'TIME', 'forecast_reference_time'] | |
| time_coord = next((name for name in time_names if name in coords), None) | |
| # Find pressure/level coordinate | |
| level_names = ['pressure_level', 'plev', 'level', 'pressure', 'lev'] | |
| level_coord = next((name for name in level_names if name in coords), None) | |
| return { | |
| 'lat': lat_coord, | |
| 'lon': lon_coord, | |
| 'time': time_coord, | |
| 'level': level_coord | |
| } | |
| def format_timestamp(self, timestamp): | |
| """Format timestamp for display in plots""" | |
| try: | |
| if pd.isna(timestamp): | |
| return "Unknown Time" | |
| # Convert to pandas datetime if it isn't already | |
| if not isinstance(timestamp, pd.Timestamp): | |
| timestamp = pd.to_datetime(timestamp) | |
| # Format as "YYYY-MM-DD HH:MM" | |
| return timestamp.strftime('%Y-%m-%d %H:%M') | |
| except: | |
| return str(timestamp) | |
| def extract_data(self, variable_name, time_index=1, pressure_level=None): | |
| """ | |
| Extract data for a specific variable | |
| Parameters: | |
| variable_name (str): Name of the variable to extract | |
| time_index (int): Time index to extract (default: 0 for current time) | |
| pressure_level (float): Pressure level for atmospheric variables (default: surface level) | |
| Returns: | |
| tuple: (data_array, metadata) | |
| """ | |
| if variable_name not in self.detected_variables: | |
| raise ValueError(f"Variable {variable_name} not found in detected variables") | |
| var_info = self.detected_variables[variable_name] | |
| dataset_type = var_info['dataset_type'] | |
| # Determine which dataset to use | |
| if dataset_type == 'surface' and self.surface_dataset is not None: | |
| dataset = self.surface_dataset | |
| elif dataset_type == 'atmospheric' and self.atmospheric_dataset is not None: | |
| dataset = self.atmospheric_dataset | |
| elif self.dataset is not None: | |
| dataset = self.dataset | |
| else: | |
| raise ValueError(f"No suitable dataset found for variable {variable_name}") | |
| # Get the data variable | |
| data_var = dataset[variable_name] | |
| coords = self.get_coordinates(dataset) | |
| print(f"Coordinates: {coords}\n\n") | |
| # Handle different data shapes | |
| data_array = data_var | |
| print(f"Data array shape: {data_array.dims} \n\n") | |
| # Get timestamp information before extracting data | |
| selected_timestamp = None | |
| timestamp_str = "Unknown Time" | |
| # Handle time dimension | |
| if coords['time'] and coords['time'] in data_array.dims: | |
| # Get all available times | |
| available_times = pd.to_datetime(dataset[coords['time']].values) | |
| if time_index == -1: # Latest time | |
| time_index = len(available_times) - 1 | |
| # Ensure time_index is within bounds | |
| if 0 <= time_index < len(available_times): | |
| selected_timestamp = available_times[time_index] | |
| timestamp_str = self.format_timestamp(selected_timestamp) | |
| print(f"Time index: {time_index} selected - {timestamp_str}") | |
| data_array = data_array.isel({coords['time']: time_index}) | |
| else: | |
| print(f"Warning: time_index {time_index} out of bounds, using index 0") | |
| time_index = 0 | |
| selected_timestamp = available_times[time_index] | |
| timestamp_str = self.format_timestamp(selected_timestamp) | |
| data_array = data_array.isel({coords['time']: time_index}) | |
| # Handle pressure/level dimension for atmospheric variables | |
| actual_pressure = None | |
| if coords['level'] and coords['level'] in data_array.dims: | |
| if pressure_level is None: | |
| # Default to surface level (highest pressure) | |
| pressure_level = 1000 | |
| # Find closest pressure level | |
| pressure_values = dataset[coords['level']].values | |
| level_index = np.argmin(np.abs(pressure_values - pressure_level)) | |
| actual_pressure = pressure_values[level_index] | |
| data_array = data_array.isel({coords['level']: level_index}) | |
| print(f"Selected pressure level: {actual_pressure} hPa (requested: {pressure_level} hPa)") | |
| elif pressure_level is not None: | |
| # Store the requested pressure level even if no level dimension exists | |
| actual_pressure = pressure_level | |
| # Handle batch dimension (usually the first dimension for CAMS data) | |
| shape = data_array.shape | |
| if len(shape) == 4: # (batch, time, lat, lon) or similar | |
| data_array = data_array[0, -1] # Take first batch, latest time | |
| elif len(shape) == 3: # (batch, lat, lon) or (time, lat, lon) | |
| data_array = data_array[-1] # Take latest | |
| elif len(shape) == 5: # (batch, time, level, lat, lon) | |
| data_array = data_array[0, -1] # Already handled level above | |
| # Get coordinate arrays | |
| lats = dataset[coords['lat']].values | |
| lons = dataset[coords['lon']].values | |
| # Crop data to India region for better performance | |
| data_values, lats, lons = self._crop_to_india(data_array.values, lats, lons) | |
| # Convert units if necessary | |
| original_units = getattr(dataset[variable_name], 'units', '') | |
| data_values = self._convert_units(data_values, original_units, var_info['units']) | |
| metadata = { | |
| 'variable_name': variable_name, | |
| 'display_name': var_info['name'], | |
| 'units': var_info['units'], | |
| 'original_units': original_units, | |
| 'shape': data_values.shape, | |
| 'lats': lats, | |
| 'lons': lons, | |
| 'pressure_level': actual_pressure, | |
| 'time_index': time_index, | |
| 'timestamp': selected_timestamp, | |
| 'timestamp_str': timestamp_str, | |
| 'dataset_type': dataset_type | |
| } | |
| return data_values, metadata | |
| def _convert_units(self, data, original_units, target_units): | |
| """Convert data units for air pollution variables""" | |
| data_converted = data.copy() | |
| if original_units and target_units: | |
| orig_lower = original_units.lower() | |
| target_lower = target_units.lower() | |
| # kg/m³ to µg/m³ | |
| if 'kg' in orig_lower and 'µg' in target_lower: | |
| data_converted = data_converted * 1e9 | |
| print(f"Converting from {original_units} to {target_units} (×1e9)") | |
| # kg/m³ to mg/m³ | |
| elif 'kg' in orig_lower and 'mg' in target_lower: | |
| data_converted = data_converted * 1e6 | |
| print(f"Converting from {original_units} to {target_units} (×1e6)") | |
| # mol/m² conversions (keep as is) | |
| elif 'mol' in orig_lower: | |
| print(f"Units {original_units} kept as is") | |
| # No unit (dimensionless) - keep as is | |
| elif target_units == '': | |
| print("Dimensionless variable - no unit conversion") | |
| return data_converted | |
| def _crop_to_india(self, data_values, lats, lons): | |
| """ | |
| Crop data to India region to improve performance and focus visualization | |
| Parameters: | |
| data_values (np.ndarray): 2D data array (lat, lon) | |
| lats (np.ndarray): Latitude values | |
| lons (np.ndarray): Longitude values | |
| Returns: | |
| tuple: (cropped_data, cropped_lats, cropped_lons) | |
| """ | |
| # Use same India bounds as Aurora processor for consistency | |
| lat_min = INDIA_BOUNDS['lat_min'] - 2 # 6-2 = 4°N | |
| lat_max = INDIA_BOUNDS['lat_max'] + 2 # 38+2 = 40°N | |
| lon_min = INDIA_BOUNDS['lon_min'] - 2 # 68-2 = 66°E | |
| lon_max = INDIA_BOUNDS['lon_max'] + 2 # 97+2 = 99°E | |
| print(f"Original data shape: {data_values.shape}") | |
| print(f"Original lat range: {lats.min():.2f} to {lats.max():.2f}") | |
| print(f"Original lon range: {lons.min():.2f} to {lons.max():.2f}") | |
| # Find indices within India bounds | |
| lat_mask = (lats >= lat_min) & (lats <= lat_max) | |
| lon_mask = (lons >= lon_min) & (lons <= lon_max) | |
| # Get the indices | |
| lat_indices = np.where(lat_mask)[0] | |
| lon_indices = np.where(lon_mask)[0] | |
| if len(lat_indices) == 0 or len(lon_indices) == 0: | |
| print("Warning: No data found in India region, using full dataset") | |
| return data_values, lats, lons | |
| # Get the min and max indices to define the crop region | |
| lat_start, lat_end = lat_indices[0], lat_indices[-1] + 1 | |
| lon_start, lon_end = lon_indices[0], lon_indices[-1] + 1 | |
| # Crop the data | |
| if len(data_values.shape) == 2: # (lat, lon) | |
| cropped_data = data_values[lat_start:lat_end, lon_start:lon_end] | |
| else: | |
| print(f"Warning: Unexpected data shape {data_values.shape}, cropping first two dimensions") | |
| cropped_data = data_values[lat_start:lat_end, lon_start:lon_end] | |
| # Crop coordinates | |
| cropped_lats = lats[lat_start:lat_end] | |
| cropped_lons = lons[lon_start:lon_end] | |
| print(f"Cropped data shape: {cropped_data.shape}") | |
| print(f"Cropped lat range: {cropped_lats.min():.2f} to {cropped_lats.max():.2f}") | |
| print(f"Cropped lon range: {cropped_lons.min():.2f} to {cropped_lons.max():.2f}") | |
| print(f"Data reduction: {(1 - cropped_data.size / data_values.size) * 100:.1f}%") | |
| return cropped_data, cropped_lats, cropped_lons | |
| def get_available_times(self, variable_name): | |
| """Get available time steps for a variable""" | |
| if variable_name not in self.detected_variables: | |
| return [] | |
| var_info = self.detected_variables[variable_name] | |
| dataset_type = var_info['dataset_type'] | |
| # Determine which dataset to use | |
| if dataset_type == 'surface' and self.surface_dataset is not None: | |
| dataset = self.surface_dataset | |
| elif dataset_type == 'atmospheric' and self.atmospheric_dataset is not None: | |
| dataset = self.atmospheric_dataset | |
| elif self.dataset is not None: | |
| dataset = self.dataset | |
| else: | |
| return [] | |
| coords = self.get_coordinates(dataset) | |
| if coords['time'] and coords['time'] in dataset.dims: | |
| times = pd.to_datetime(dataset[coords['time']].values) | |
| print(f"Times: {times.to_list()}") | |
| return times.tolist() | |
| return [] | |
| def get_available_pressure_levels(self, variable_name): | |
| """Get available pressure levels for atmospheric variables""" | |
| if variable_name not in self.detected_variables: | |
| return [] | |
| var_info = self.detected_variables[variable_name] | |
| if var_info['type'] != 'atmospheric': | |
| return [] | |
| dataset_type = var_info['dataset_type'] | |
| # Determine which dataset to use | |
| if dataset_type == 'atmospheric' and self.atmospheric_dataset is not None: | |
| dataset = self.atmospheric_dataset | |
| elif self.dataset is not None: | |
| dataset = self.dataset | |
| else: | |
| return [] | |
| coords = self.get_coordinates(dataset) | |
| if coords['level'] and coords['level'] in dataset.dims: | |
| levels = dataset[coords['level']].values | |
| return levels.tolist() | |
| return PRESSURE_LEVELS # Default pressure levels | |
| def close(self): | |
| """Close all open datasets safely""" | |
| try: | |
| if self.dataset is not None: | |
| self.dataset.close() | |
| self.dataset = None | |
| except (RuntimeError, OSError): | |
| pass # Dataset already closed or invalid | |
| try: | |
| if self.surface_dataset is not None: | |
| self.surface_dataset.close() | |
| self.surface_dataset = None | |
| except (RuntimeError, OSError): | |
| pass # Dataset already closed or invalid | |
| try: | |
| if self.atmospheric_dataset is not None: | |
| self.atmospheric_dataset.close() | |
| self.atmospheric_dataset = None | |
| except (RuntimeError, OSError): | |
| pass # Dataset already closed or invalid | |
| class AuroraPredictionProcessor: | |
| def __init__(self, file_path): | |
| """ | |
| Initialize Aurora prediction processor for single NetCDF files with timestep data | |
| Parameters: | |
| file_path (str): Path to Aurora prediction NetCDF file | |
| """ | |
| self.file_path = Path(file_path) | |
| self.dataset = None | |
| self.detected_variables = {} | |
| def _trim_to_india_bounds(self, var, lats, lons): | |
| """ | |
| Trim data and coordinates to India geographical bounds to reduce computation | |
| Parameters: | |
| var (xarray.DataArray): Variable data | |
| lats (numpy.ndarray): Latitude coordinates | |
| lons (numpy.ndarray): Longitude coordinates | |
| Returns: | |
| tuple: (trimmed_var, trimmed_lats, trimmed_lons) | |
| """ | |
| # Find indices within India bounds | |
| lat_mask = (lats >= INDIA_BOUNDS['lat_min']) & (lats <= INDIA_BOUNDS['lat_max']) | |
| lon_mask = (lons >= INDIA_BOUNDS['lon_min']) & (lons <= INDIA_BOUNDS['lon_max']) | |
| lat_indices = np.where(lat_mask)[0] | |
| lon_indices = np.where(lon_mask)[0] | |
| if len(lat_indices) == 0 or len(lon_indices) == 0: | |
| # If no points in India bounds, return original data | |
| return var, lats, lons | |
| # Get min/max indices for slicing | |
| lat_start, lat_end = lat_indices[0], lat_indices[-1] + 1 | |
| lon_start, lon_end = lon_indices[0], lon_indices[-1] + 1 | |
| # Trim coordinates | |
| trimmed_lats = lats[lat_start:lat_end] | |
| trimmed_lons = lons[lon_start:lon_end] | |
| # Trim data - handle different dimension orders | |
| if var.ndim == 2: # (lat, lon) | |
| trimmed_var = var[lat_start:lat_end, lon_start:lon_end] | |
| elif var.ndim == 3 and 'latitude' in var.dims and 'longitude' in var.dims: | |
| # Find latitude and longitude dimension positions | |
| lat_dim_pos = var.dims.index('latitude') if 'latitude' in var.dims else var.dims.index('lat') | |
| lon_dim_pos = var.dims.index('longitude') if 'longitude' in var.dims else var.dims.index('lon') | |
| if lat_dim_pos == 1 and lon_dim_pos == 2: # (time/level, lat, lon) | |
| trimmed_var = var[:, lat_start:lat_end, lon_start:lon_end] | |
| elif lat_dim_pos == 0 and lon_dim_pos == 1: # (lat, lon, time/level) | |
| trimmed_var = var[lat_start:lat_end, lon_start:lon_end, :] | |
| else: | |
| # Fall back to original if dimension order is unexpected | |
| return var, lats, lons | |
| else: | |
| # For other dimensions or if trimming fails, return original | |
| return var, lats, lons | |
| return trimmed_var, trimmed_lats, trimmed_lons | |
| def load_dataset(self): | |
| """Load Aurora prediction NetCDF dataset""" | |
| try: | |
| self.dataset = xr.open_dataset(self.file_path) | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Error loading Aurora prediction dataset: {str(e)}") | |
| def extract_variable_data(self, variable_name, pressure_level=None, step=0): | |
| """ | |
| Extract variable data from Aurora prediction file | |
| Parameters: | |
| variable_name (str): Name of the variable to extract | |
| pressure_level (float, optional): Pressure level for atmospheric variables | |
| step (int): Time step index (default: 0) | |
| Returns: | |
| tuple: (data_array, metadata_dict) | |
| """ | |
| if self.dataset is None: | |
| self.load_dataset() | |
| if variable_name not in self.dataset.data_vars: | |
| raise ValueError(f"Variable '{variable_name}' not found in dataset") | |
| var = self.dataset[variable_name] | |
| # Handle Aurora-specific dimensions | |
| # Aurora files have: (forecast_period, forecast_reference_time, [pressure_level], latitude, longitude) | |
| # First, squeeze singleton forecast_period dimension | |
| if 'forecast_period' in var.dims and var.sizes['forecast_period'] == 1: | |
| var = var.squeeze('forecast_period') | |
| # Handle forecast_reference_time - take the first one (index 0) | |
| if 'forecast_reference_time' in var.dims: | |
| var = var.isel(forecast_reference_time=0) | |
| # Handle step dimension if present (for backward compatibility) | |
| if 'step' in var.dims: | |
| if step >= var.sizes['step']: | |
| raise ValueError(f"Step {step} not available. Dataset has {var.sizes['step']} steps.") | |
| var = var.isel(step=step) | |
| # Handle pressure level dimension if present | |
| if pressure_level is not None and 'pressure_level' in var.dims: | |
| pressure_level = float(pressure_level) | |
| # Find closest pressure level | |
| available_levels = var.pressure_level.values | |
| closest_idx = np.argmin(np.abs(available_levels - pressure_level)) | |
| actual_level = available_levels[closest_idx] | |
| var = var.isel(pressure_level=closest_idx) | |
| pressure_info = f"at {actual_level:.0f} hPa" | |
| else: | |
| pressure_info = None | |
| # Handle different coordinate naming conventions | |
| if 'latitude' in self.dataset.coords: | |
| lats = self.dataset['latitude'].values | |
| lons = self.dataset['longitude'].values | |
| else: | |
| lats = self.dataset['lat'].values if 'lat' in self.dataset else self.dataset['latitude'].values | |
| lons = self.dataset['lon'].values if 'lon' in self.dataset else self.dataset['longitude'].values | |
| # Trim data and coordinates to India bounds to reduce computation | |
| var, lats, lons = self._trim_to_india_bounds(var, lats, lons) | |
| # Extract trimmed data | |
| data_values = var.values | |
| # Get variable information | |
| from constants import NETCDF_VARIABLES | |
| var_info = NETCDF_VARIABLES.get(variable_name, {}) | |
| display_name = var_info.get('name', variable_name.replace('_', ' ').title()) | |
| units = var.attrs.get('units', var_info.get('units', '')) | |
| # Prepare metadata | |
| metadata = { | |
| 'variable_name': variable_name, | |
| 'display_name': display_name, | |
| 'units': units, | |
| 'lats': lats, | |
| 'lons': lons, | |
| 'pressure_level': pressure_level if pressure_level else None, | |
| 'pressure_info': pressure_info, | |
| 'step': step, | |
| 'data_shape': data_values.shape, | |
| 'source': 'Aurora Prediction', | |
| 'file_path': str(self.file_path), | |
| } | |
| # Add timestamp information | |
| # For Aurora predictions, step represents the forecast step (12-hour intervals) | |
| hours_from_start = (step + 1) * 12 # Assuming 12-hour intervals | |
| metadata['timestamp_str'] = f"T+{hours_from_start}h (Step {step + 1})" | |
| return data_values, metadata | |
| def get_available_variables(self): | |
| """Get list of available variables categorized by type""" | |
| if self.dataset is None: | |
| self.load_dataset() | |
| surface_vars = [] | |
| atmospheric_vars = [] | |
| for var_name in self.dataset.data_vars: | |
| var = self.dataset[var_name] | |
| # Check if variable has pressure level dimension | |
| if 'pressure_level' in var.dims: | |
| atmospheric_vars.append(var_name) | |
| else: | |
| surface_vars.append(var_name) | |
| return { | |
| 'surface_vars': surface_vars, | |
| 'atmospheric_vars': atmospheric_vars | |
| } | |
| def get_available_pressure_levels(self): | |
| """Get available pressure levels""" | |
| if self.dataset is None: | |
| self.load_dataset() | |
| if 'pressure_level' in self.dataset.coords: | |
| return self.dataset.pressure_level.values.tolist() | |
| return [] | |
| def get_available_steps(self): | |
| """Get available time steps""" | |
| if self.dataset is None: | |
| self.load_dataset() | |
| if 'step' in self.dataset.dims: | |
| return list(range(self.dataset.sizes['step'])) | |
| return [0] | |
| def close(self): | |
| """Close the dataset safely""" | |
| try: | |
| if self.dataset is not None: | |
| self.dataset.close() | |
| self.dataset = None | |
| except (RuntimeError, OSError): | |
| pass # Dataset already closed or invalid | |
| def analyze_netcdf_file(file_path): | |
| """ | |
| Analyze NetCDF file structure and return summary | |
| Parameters: | |
| file_path (str): Path to NetCDF or ZIP file | |
| Returns: | |
| dict: Analysis summary | |
| """ | |
| processor = NetCDFProcessor(file_path) | |
| try: | |
| processor.load_dataset() | |
| detected_vars = processor.detect_variables() | |
| analysis = { | |
| 'success': True, | |
| 'file_path': str(file_path), | |
| 'detected_variables': detected_vars, | |
| 'total_variables': len(detected_vars), | |
| 'surface_variables': len([v for v in detected_vars.values() if v.get('type') == 'surface']), | |
| 'atmospheric_variables': len([v for v in detected_vars.values() if v.get('type') == 'atmospheric']), | |
| } | |
| # Get sample time information | |
| if detected_vars: | |
| sample_var = list(detected_vars.keys())[0] | |
| times = processor.get_available_times(sample_var) | |
| if times: | |
| analysis['time_range'] = { | |
| 'start': str(times[0]), | |
| 'end': str(times[-1]), | |
| 'count': len(times) | |
| } | |
| processor.close() | |
| return analysis | |
| except Exception as e: | |
| processor.close() | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'file_path': str(file_path) | |
| } |