Spaces:
Sleeping
Sleeping
| # cams_downloader.py | |
| # Download CAMS atmospheric composition data | |
| import cdsapi | |
| import zipfile | |
| import os | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| class CAMSDownloader: | |
| def __init__(self, download_dir="downloads"): | |
| """ | |
| Initialize CAMS downloader | |
| Parameters: | |
| download_dir (str): Directory to store downloaded files | |
| """ | |
| self.download_dir = Path(download_dir) | |
| self.download_dir.mkdir(exist_ok=True) | |
| # Create subdirectories | |
| self.extracted_dir = self.download_dir / "extracted" | |
| self.extracted_dir.mkdir(exist_ok=True) | |
| self.client = None | |
| self._init_client() | |
| def _init_client(self): | |
| """Initialize CDS API client""" | |
| try: | |
| # First, try environment variables (preferred for cloud deployments) | |
| cdsapi_url = os.getenv('CDSAPI_URL') | |
| cdsapi_key = os.getenv('CDSAPI_KEY') | |
| if cdsapi_url and cdsapi_key: | |
| self.client = cdsapi.Client(key=cdsapi_key, url=cdsapi_url) | |
| print("β CDS API client initialized from environment variables") | |
| return | |
| # Fallback: Try to read .cdsapirc file from current directory first, then home directory | |
| cdsapirc_path = Path.cwd() / ".cdsapirc" | |
| if not cdsapirc_path.exists(): | |
| cdsapirc_path = Path.home() / ".cdsapirc" | |
| if cdsapirc_path.exists(): | |
| # Parse credentials from .cdsapirc | |
| with open(cdsapirc_path, 'r') as f: | |
| lines = f.readlines() | |
| url = None | |
| key = None | |
| for line in lines: | |
| line = line.strip() | |
| if line.startswith('url:'): | |
| url = line.split(':', 1)[1].strip() | |
| elif line.startswith('key:'): | |
| key = line.split(':', 1)[1].strip() | |
| if url and key: | |
| self.client = cdsapi.Client(key=key, url=url) | |
| print("β CDS API client initialized from .cdsapirc file") | |
| return | |
| else: | |
| raise ValueError("Could not parse URL or key from .cdsapirc file") | |
| # Last resort: Try default initialization | |
| self.client = cdsapi.Client() | |
| print("β CDS API client initialized with default settings") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not initialize CDS API client: {str(e)}") | |
| print("Please ensure you have:") | |
| print("1. Created an account at https://cds.climate.copernicus.eu/") | |
| print("2. Set CDSAPI_URL and CDSAPI_KEY environment variables (recommended for cloud deployments)") | |
| print("3. Or created a .cdsapirc file in your home directory with your credentials") | |
| self.client = None | |
| def is_client_ready(self): | |
| """Check if CDS API client is ready""" | |
| return self.client is not None | |
| def download_cams_data(self, date_str, variables=None, pressure_levels=None): | |
| """ | |
| Download CAMS atmospheric composition data for a specific date | |
| Parameters: | |
| date_str (str): Date in YYYY-MM-DD format | |
| variables (list): List of variables to download (default: common air pollution variables) | |
| pressure_levels (list): List of pressure levels (default: standard levels) | |
| Returns: | |
| str: Path to downloaded ZIP file | |
| """ | |
| if not self.is_client_ready(): | |
| raise Exception("CDS API client not initialized. Please check your credentials.") | |
| # Validate date | |
| try: | |
| target_date = pd.to_datetime(date_str) | |
| date_str = target_date.strftime('%Y-%m-%d') | |
| except: | |
| raise ValueError(f"Invalid date format: {date_str}. Use YYYY-MM-DD format.") | |
| # Check if data already exists | |
| filename = f"{date_str}-cams.nc.zip" | |
| filepath = self.download_dir / filename | |
| if filepath.exists(): | |
| print(f"β Data for {date_str} already exists: {filename}") | |
| return str(filepath) | |
| # Default variables (common air pollution variables) | |
| if variables is None: | |
| variables = [ | |
| # Meteorological surface-level variables | |
| "10m_u_component_of_wind", | |
| "10m_v_component_of_wind", | |
| "2m_temperature", | |
| "mean_sea_level_pressure", | |
| # Pollution surface-level variables | |
| "particulate_matter_1um", | |
| "particulate_matter_2.5um", | |
| "particulate_matter_10um", | |
| "total_column_carbon_monoxide", | |
| "total_column_nitrogen_monoxide", | |
| "total_column_nitrogen_dioxide", | |
| "total_column_ozone", | |
| "total_column_sulphur_dioxide", | |
| # Meteorological atmospheric variables | |
| "u_component_of_wind", | |
| "v_component_of_wind", | |
| "temperature", | |
| "geopotential", | |
| "specific_humidity", | |
| # Pollution atmospheric variables | |
| "carbon_monoxide", | |
| "nitrogen_dioxide", | |
| "nitrogen_monoxide", | |
| "ozone", | |
| "sulphur_dioxide", | |
| ] | |
| # Default pressure levels | |
| if pressure_levels is None: | |
| pressure_levels = [ | |
| "50", "100", "150", "200", "250", "300", "400", | |
| "500", "600", "700", "850", "925", "1000", | |
| ] | |
| print(f"π Downloading CAMS data for {date_str}...") | |
| print(f"Variables: {len(variables)} selected") | |
| print(f"Pressure levels: {len(pressure_levels)} levels") | |
| try: | |
| # Make the API request | |
| print("π‘ Requesting data from CAMS API...") | |
| self.client.retrieve( | |
| "cams-global-atmospheric-composition-forecasts", | |
| { | |
| "type": "forecast", | |
| "leadtime_hour": "0", | |
| "variable": variables, | |
| "pressure_level": pressure_levels, | |
| "date": date_str, | |
| "time": ["00:00", "12:00"], # Two time steps | |
| "format": "netcdf_zip", | |
| }, | |
| str(filepath), | |
| ) | |
| # Validate the downloaded file | |
| if filepath.exists(): | |
| file_size = filepath.stat().st_size | |
| print(f"π Downloaded file size: {file_size / 1024 / 1024:.2f} MB") | |
| # Basic validation - CAMS files should be reasonably large | |
| if file_size < 10000: # Less than 10KB is suspicious | |
| print(f"β οΈ Warning: Downloaded file is very small ({file_size} bytes)") | |
| # Read first few bytes to check for error messages | |
| with open(filepath, 'rb') as f: | |
| header = f.read(200) | |
| if b'error' in header.lower() or b'html' in header.lower(): | |
| filepath.unlink() | |
| raise Exception("CAMS API returned an error response instead of data") | |
| print(f"β Successfully downloaded: {filename}") | |
| return str(filepath) | |
| else: | |
| raise Exception("Download completed but file was not created") | |
| except Exception as e: | |
| # Clean up partial download | |
| if filepath.exists(): | |
| print(f"ποΈ Cleaning up failed download: {filepath}") | |
| filepath.unlink() | |
| raise Exception(f"Error downloading CAMS data: {str(e)}") | |
| def extract_cams_files(self, zip_path): | |
| """ | |
| Extract surface and atmospheric data from CAMS ZIP file | |
| Parameters: | |
| zip_path (str): Path to CAMS ZIP file | |
| Returns: | |
| dict: Paths to extracted files | |
| """ | |
| zip_path = Path(zip_path) | |
| if not zip_path.exists(): | |
| raise FileNotFoundError(f"ZIP file not found: {zip_path}") | |
| # Validate file is actually a ZIP file | |
| try: | |
| # Check file size first | |
| file_size = zip_path.stat().st_size | |
| if file_size < 1000: # Less than 1KB is probably an error response | |
| print(f"β οΈ Downloaded file is too small ({file_size} bytes), likely an error response") | |
| # Try to read first few bytes to see what we got | |
| with open(zip_path, 'rb') as f: | |
| header = f.read(100) | |
| if b'html' in header.lower() or b'error' in header.lower(): | |
| raise Exception("Downloaded file appears to be an HTML error page, not ZIP data") | |
| # Test if it's a valid ZIP file | |
| if not zipfile.is_zipfile(zip_path): | |
| print(f"β File is not a valid ZIP file: {zip_path}") | |
| # Try to read first few lines to diagnose | |
| with open(zip_path, 'r', errors='ignore') as f: | |
| first_lines = f.read(200) | |
| print(f"File contents preview: {first_lines[:100]}...") | |
| raise Exception(f"Downloaded file is not a valid ZIP archive. File size: {file_size} bytes") | |
| except Exception as e: | |
| if "ZIP" in str(e) or "zip" in str(e): | |
| raise e | |
| else: | |
| raise Exception(f"Error validating ZIP file: {str(e)}") | |
| # Extract date from filename | |
| date_str = zip_path.stem.replace("-cams.nc", "") | |
| surface_path = self.extracted_dir / f"{date_str}-cams-surface.nc" | |
| atmospheric_path = self.extracted_dir / f"{date_str}-cams-atmospheric.nc" | |
| extracted_files = {} | |
| try: | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| zip_contents = zf.namelist() | |
| # Extract surface data | |
| surface_file = None | |
| for file in zip_contents: | |
| if 'sfc' in file.lower() or file.endswith('_sfc.nc'): | |
| surface_file = file | |
| break | |
| if surface_file and not surface_path.exists(): | |
| with open(surface_path, "wb") as f: | |
| f.write(zf.read(surface_file)) | |
| print(f"β Extracted surface data: {surface_path.name}") | |
| extracted_files['surface'] = str(surface_path) | |
| elif surface_path.exists(): | |
| extracted_files['surface'] = str(surface_path) | |
| # Extract atmospheric data | |
| atmospheric_file = None | |
| for file in zip_contents: | |
| if 'plev' in file.lower() or file.endswith('_plev.nc'): | |
| atmospheric_file = file | |
| break | |
| if atmospheric_file and not atmospheric_path.exists(): | |
| with open(atmospheric_path, "wb") as f: | |
| f.write(zf.read(atmospheric_file)) | |
| print(f"β Extracted atmospheric data: {atmospheric_path.name}") | |
| extracted_files['atmospheric'] = str(atmospheric_path) | |
| elif atmospheric_path.exists(): | |
| extracted_files['atmospheric'] = str(atmospheric_path) | |
| # If no specific files found, extract all .nc files | |
| if not extracted_files: | |
| nc_files = [f for f in zip_contents if f.endswith('.nc')] | |
| for nc_file in nc_files: | |
| output_path = self.extracted_dir / nc_file | |
| if not output_path.exists(): | |
| with open(output_path, "wb") as f: | |
| f.write(zf.read(nc_file)) | |
| extracted_files[nc_file] = str(output_path) | |
| except Exception as e: | |
| raise Exception(f"Error extracting ZIP file: {str(e)}") | |
| if not extracted_files: | |
| raise Exception("No NetCDF files found in ZIP archive") | |
| return extracted_files | |
| def get_available_dates(self, start_date=None, end_date=None): | |
| """ | |
| Get list of dates for which CAMS data is typically available | |
| Note: This doesn't check actual availability, just generates reasonable date range | |
| Parameters: | |
| start_date (str): Start date (default: 30 days ago) | |
| end_date (str): End date (default: yesterday) | |
| Returns: | |
| list: List of date strings in YYYY-MM-DD format | |
| """ | |
| if start_date is None: | |
| start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d') | |
| if end_date is None: | |
| end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') | |
| # Generate date range | |
| date_range = pd.date_range(start=start_date, end=end_date, freq='D') | |
| return [date.strftime('%Y-%m-%d') for date in date_range] | |
| def list_downloaded_files(self): | |
| """List all downloaded CAMS files""" | |
| downloaded_files = [] | |
| for zip_file in self.download_dir.glob("*-cams.nc.zip"): | |
| date_str = zip_file.stem.replace("-cams.nc", "") | |
| file_info = { | |
| 'date': date_str, | |
| 'zip_path': str(zip_file), | |
| 'size_mb': zip_file.stat().st_size / (1024 * 1024), | |
| 'downloaded': zip_file.stat().st_mtime | |
| } | |
| downloaded_files.append(file_info) | |
| # Sort by date (newest first) | |
| downloaded_files.sort(key=lambda x: x['date'], reverse=True) | |
| return downloaded_files | |
| def cleanup_old_files(self, days_old=30): | |
| """ | |
| Clean up downloaded files older than specified days | |
| Parameters: | |
| days_old (int): Delete files older than this many days | |
| """ | |
| try: | |
| cutoff_date = datetime.now() - timedelta(days=days_old) | |
| deleted_count = 0 | |
| for zip_file in self.download_dir.glob("*-cams.nc.zip"): | |
| if datetime.fromtimestamp(zip_file.stat().st_mtime) < cutoff_date: | |
| zip_file.unlink() | |
| deleted_count += 1 | |
| # Also clean extracted files | |
| for nc_file in self.extracted_dir.glob("*.nc"): | |
| if datetime.fromtimestamp(nc_file.stat().st_mtime) < cutoff_date: | |
| nc_file.unlink() | |
| deleted_count += 1 | |
| print(f"π§Ή Cleaned up {deleted_count} old files") | |
| return deleted_count | |
| except Exception as e: | |
| print(f"Error during cleanup: {str(e)}") | |
| return 0 | |
| def test_cams_downloader(): | |
| """Test function for CAMS downloader""" | |
| print("Testing CAMS downloader...") | |
| downloader = CAMSDownloader() | |
| if not downloader.is_client_ready(): | |
| print("β CDS API client not ready. Please check your credentials.") | |
| return False | |
| # Test with recent date | |
| test_date = (datetime.now() - timedelta(days=600)).strftime('%Y-%m-%d') | |
| print(f"Testing download for date: {test_date}") | |
| print("β οΈ This may take several minutes for the first download...") | |
| try: | |
| # Download data (will skip if already exists) | |
| zip_path = downloader.download_cams_data(test_date) | |
| print(f"β Download successful: {zip_path}") | |
| # Test extraction | |
| extracted_files = downloader.extract_cams_files(zip_path) | |
| print(f"β Extraction successful: {len(extracted_files)} files") | |
| # List downloaded files | |
| downloaded = downloader.list_downloaded_files() | |
| print(f"β Found {len(downloaded)} downloaded files") | |
| return True | |
| except Exception as e: | |
| print(f"β Test failed: {str(e)}") | |
| return False | |
| if __name__ == "__main__": | |
| test_cams_downloader() |