cams-pollution-dashboard / cams_downloader.py
aditya-me13's picture
integrated both the versions
610152e
raw
history blame
16.6 kB
# cams_downloader.py
# Download CAMS atmospheric composition data
import cdsapi
import zipfile
import os
from pathlib import Path
from datetime import datetime, timedelta
import pandas as pd
class CAMSDownloader:
def __init__(self, download_dir="downloads"):
"""
Initialize CAMS downloader
Parameters:
download_dir (str): Directory to store downloaded files
"""
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
# Create subdirectories
self.extracted_dir = self.download_dir / "extracted"
self.extracted_dir.mkdir(exist_ok=True)
self.client = None
self._init_client()
def _init_client(self):
"""Initialize CDS API client"""
try:
# First, try environment variables (preferred for cloud deployments)
cdsapi_url = os.getenv('CDSAPI_URL')
cdsapi_key = os.getenv('CDSAPI_KEY')
if cdsapi_url and cdsapi_key:
self.client = cdsapi.Client(key=cdsapi_key, url=cdsapi_url)
print("βœ… CDS API client initialized from environment variables")
return
# Fallback: Try to read .cdsapirc file from current directory first, then home directory
cdsapirc_path = Path.cwd() / ".cdsapirc"
if not cdsapirc_path.exists():
cdsapirc_path = Path.home() / ".cdsapirc"
if cdsapirc_path.exists():
# Parse credentials from .cdsapirc
with open(cdsapirc_path, 'r') as f:
lines = f.readlines()
url = None
key = None
for line in lines:
line = line.strip()
if line.startswith('url:'):
url = line.split(':', 1)[1].strip()
elif line.startswith('key:'):
key = line.split(':', 1)[1].strip()
if url and key:
self.client = cdsapi.Client(key=key, url=url)
print("βœ… CDS API client initialized from .cdsapirc file")
return
else:
raise ValueError("Could not parse URL or key from .cdsapirc file")
# Last resort: Try default initialization
self.client = cdsapi.Client()
print("βœ… CDS API client initialized with default settings")
except Exception as e:
print(f"⚠️ Warning: Could not initialize CDS API client: {str(e)}")
print("Please ensure you have:")
print("1. Created an account at https://cds.climate.copernicus.eu/")
print("2. Set CDSAPI_URL and CDSAPI_KEY environment variables (recommended for cloud deployments)")
print("3. Or created a .cdsapirc file in your home directory with your credentials")
self.client = None
def is_client_ready(self):
"""Check if CDS API client is ready"""
return self.client is not None
def download_cams_data(self, date_str, variables=None, pressure_levels=None):
"""
Download CAMS atmospheric composition data for a specific date
Parameters:
date_str (str): Date in YYYY-MM-DD format
variables (list): List of variables to download (default: common air pollution variables)
pressure_levels (list): List of pressure levels (default: standard levels)
Returns:
str: Path to downloaded ZIP file
"""
if not self.is_client_ready():
raise Exception("CDS API client not initialized. Please check your credentials.")
# Validate date
try:
target_date = pd.to_datetime(date_str)
date_str = target_date.strftime('%Y-%m-%d')
except:
raise ValueError(f"Invalid date format: {date_str}. Use YYYY-MM-DD format.")
# Check if data already exists
filename = f"{date_str}-cams.nc.zip"
filepath = self.download_dir / filename
if filepath.exists():
print(f"βœ… Data for {date_str} already exists: {filename}")
return str(filepath)
# Default variables (common air pollution variables)
if variables is None:
variables = [
# Meteorological surface-level variables
"10m_u_component_of_wind",
"10m_v_component_of_wind",
"2m_temperature",
"mean_sea_level_pressure",
# Pollution surface-level variables
"particulate_matter_1um",
"particulate_matter_2.5um",
"particulate_matter_10um",
"total_column_carbon_monoxide",
"total_column_nitrogen_monoxide",
"total_column_nitrogen_dioxide",
"total_column_ozone",
"total_column_sulphur_dioxide",
# Meteorological atmospheric variables
"u_component_of_wind",
"v_component_of_wind",
"temperature",
"geopotential",
"specific_humidity",
# Pollution atmospheric variables
"carbon_monoxide",
"nitrogen_dioxide",
"nitrogen_monoxide",
"ozone",
"sulphur_dioxide",
]
# Default pressure levels
if pressure_levels is None:
pressure_levels = [
"50", "100", "150", "200", "250", "300", "400",
"500", "600", "700", "850", "925", "1000",
]
print(f"πŸ”„ Downloading CAMS data for {date_str}...")
print(f"Variables: {len(variables)} selected")
print(f"Pressure levels: {len(pressure_levels)} levels")
try:
# Make the API request
print("πŸ“‘ Requesting data from CAMS API...")
self.client.retrieve(
"cams-global-atmospheric-composition-forecasts",
{
"type": "forecast",
"leadtime_hour": "0",
"variable": variables,
"pressure_level": pressure_levels,
"date": date_str,
"time": ["00:00", "12:00"], # Two time steps
"format": "netcdf_zip",
},
str(filepath),
)
# Validate the downloaded file
if filepath.exists():
file_size = filepath.stat().st_size
print(f"πŸ“ Downloaded file size: {file_size / 1024 / 1024:.2f} MB")
# Basic validation - CAMS files should be reasonably large
if file_size < 10000: # Less than 10KB is suspicious
print(f"⚠️ Warning: Downloaded file is very small ({file_size} bytes)")
# Read first few bytes to check for error messages
with open(filepath, 'rb') as f:
header = f.read(200)
if b'error' in header.lower() or b'html' in header.lower():
filepath.unlink()
raise Exception("CAMS API returned an error response instead of data")
print(f"βœ… Successfully downloaded: {filename}")
return str(filepath)
else:
raise Exception("Download completed but file was not created")
except Exception as e:
# Clean up partial download
if filepath.exists():
print(f"πŸ—‘οΈ Cleaning up failed download: {filepath}")
filepath.unlink()
raise Exception(f"Error downloading CAMS data: {str(e)}")
def extract_cams_files(self, zip_path):
"""
Extract surface and atmospheric data from CAMS ZIP file
Parameters:
zip_path (str): Path to CAMS ZIP file
Returns:
dict: Paths to extracted files
"""
zip_path = Path(zip_path)
if not zip_path.exists():
raise FileNotFoundError(f"ZIP file not found: {zip_path}")
# Validate file is actually a ZIP file
try:
# Check file size first
file_size = zip_path.stat().st_size
if file_size < 1000: # Less than 1KB is probably an error response
print(f"⚠️ Downloaded file is too small ({file_size} bytes), likely an error response")
# Try to read first few bytes to see what we got
with open(zip_path, 'rb') as f:
header = f.read(100)
if b'html' in header.lower() or b'error' in header.lower():
raise Exception("Downloaded file appears to be an HTML error page, not ZIP data")
# Test if it's a valid ZIP file
if not zipfile.is_zipfile(zip_path):
print(f"❌ File is not a valid ZIP file: {zip_path}")
# Try to read first few lines to diagnose
with open(zip_path, 'r', errors='ignore') as f:
first_lines = f.read(200)
print(f"File contents preview: {first_lines[:100]}...")
raise Exception(f"Downloaded file is not a valid ZIP archive. File size: {file_size} bytes")
except Exception as e:
if "ZIP" in str(e) or "zip" in str(e):
raise e
else:
raise Exception(f"Error validating ZIP file: {str(e)}")
# Extract date from filename
date_str = zip_path.stem.replace("-cams.nc", "")
surface_path = self.extracted_dir / f"{date_str}-cams-surface.nc"
atmospheric_path = self.extracted_dir / f"{date_str}-cams-atmospheric.nc"
extracted_files = {}
try:
with zipfile.ZipFile(zip_path, "r") as zf:
zip_contents = zf.namelist()
# Extract surface data
surface_file = None
for file in zip_contents:
if 'sfc' in file.lower() or file.endswith('_sfc.nc'):
surface_file = file
break
if surface_file and not surface_path.exists():
with open(surface_path, "wb") as f:
f.write(zf.read(surface_file))
print(f"βœ… Extracted surface data: {surface_path.name}")
extracted_files['surface'] = str(surface_path)
elif surface_path.exists():
extracted_files['surface'] = str(surface_path)
# Extract atmospheric data
atmospheric_file = None
for file in zip_contents:
if 'plev' in file.lower() or file.endswith('_plev.nc'):
atmospheric_file = file
break
if atmospheric_file and not atmospheric_path.exists():
with open(atmospheric_path, "wb") as f:
f.write(zf.read(atmospheric_file))
print(f"βœ… Extracted atmospheric data: {atmospheric_path.name}")
extracted_files['atmospheric'] = str(atmospheric_path)
elif atmospheric_path.exists():
extracted_files['atmospheric'] = str(atmospheric_path)
# If no specific files found, extract all .nc files
if not extracted_files:
nc_files = [f for f in zip_contents if f.endswith('.nc')]
for nc_file in nc_files:
output_path = self.extracted_dir / nc_file
if not output_path.exists():
with open(output_path, "wb") as f:
f.write(zf.read(nc_file))
extracted_files[nc_file] = str(output_path)
except Exception as e:
raise Exception(f"Error extracting ZIP file: {str(e)}")
if not extracted_files:
raise Exception("No NetCDF files found in ZIP archive")
return extracted_files
def get_available_dates(self, start_date=None, end_date=None):
"""
Get list of dates for which CAMS data is typically available
Note: This doesn't check actual availability, just generates reasonable date range
Parameters:
start_date (str): Start date (default: 30 days ago)
end_date (str): End date (default: yesterday)
Returns:
list: List of date strings in YYYY-MM-DD format
"""
if start_date is None:
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
if end_date is None:
end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# Generate date range
date_range = pd.date_range(start=start_date, end=end_date, freq='D')
return [date.strftime('%Y-%m-%d') for date in date_range]
def list_downloaded_files(self):
"""List all downloaded CAMS files"""
downloaded_files = []
for zip_file in self.download_dir.glob("*-cams.nc.zip"):
date_str = zip_file.stem.replace("-cams.nc", "")
file_info = {
'date': date_str,
'zip_path': str(zip_file),
'size_mb': zip_file.stat().st_size / (1024 * 1024),
'downloaded': zip_file.stat().st_mtime
}
downloaded_files.append(file_info)
# Sort by date (newest first)
downloaded_files.sort(key=lambda x: x['date'], reverse=True)
return downloaded_files
def cleanup_old_files(self, days_old=30):
"""
Clean up downloaded files older than specified days
Parameters:
days_old (int): Delete files older than this many days
"""
try:
cutoff_date = datetime.now() - timedelta(days=days_old)
deleted_count = 0
for zip_file in self.download_dir.glob("*-cams.nc.zip"):
if datetime.fromtimestamp(zip_file.stat().st_mtime) < cutoff_date:
zip_file.unlink()
deleted_count += 1
# Also clean extracted files
for nc_file in self.extracted_dir.glob("*.nc"):
if datetime.fromtimestamp(nc_file.stat().st_mtime) < cutoff_date:
nc_file.unlink()
deleted_count += 1
print(f"🧹 Cleaned up {deleted_count} old files")
return deleted_count
except Exception as e:
print(f"Error during cleanup: {str(e)}")
return 0
def test_cams_downloader():
"""Test function for CAMS downloader"""
print("Testing CAMS downloader...")
downloader = CAMSDownloader()
if not downloader.is_client_ready():
print("❌ CDS API client not ready. Please check your credentials.")
return False
# Test with recent date
test_date = (datetime.now() - timedelta(days=600)).strftime('%Y-%m-%d')
print(f"Testing download for date: {test_date}")
print("⚠️ This may take several minutes for the first download...")
try:
# Download data (will skip if already exists)
zip_path = downloader.download_cams_data(test_date)
print(f"βœ… Download successful: {zip_path}")
# Test extraction
extracted_files = downloader.extract_cams_files(zip_path)
print(f"βœ… Extraction successful: {len(extracted_files)} files")
# List downloaded files
downloaded = downloader.list_downloaded_files()
print(f"βœ… Found {len(downloaded)} downloaded files")
return True
except Exception as e:
print(f"❌ Test failed: {str(e)}")
return False
if __name__ == "__main__":
test_cams_downloader()