Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import joblib | |
| import logging | |
| import zipfile | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| # Get model URLs from environment variables | |
| DIABETES_MODEL_URL = os.getenv("DIABETES_MODEL_URL") | |
| SCALER_URL = os.getenv("SCALER_URL") | |
| MULTI_MODEL_URL = os.getenv("MULTI_MODEL_URL") | |
| # Local paths for downloaded models | |
| MODEL_PATHS = { | |
| "DIABETES_MODEL": "finaliseddiabetes_model.zip", | |
| "SCALER": "finalisedscaler.zip", | |
| "MULTI_MODEL": "nodiabetes.zip", | |
| } | |
| # Extracted model names | |
| EXTRACTED_MODELS = { | |
| "DIABETES_MODEL": "finaliseddiabetes_model.joblib", | |
| "SCALER": "finalisedscaler.joblib", | |
| "MULTI_MODEL": "nodiabetes.joblib", | |
| } | |
| BASE_DIR = os.getcwd() # Get current working directory | |
| def download_model(url, zip_filename): | |
| """Downloads the model zip file from the given URL and saves it locally.""" | |
| zip_path = os.path.join(BASE_DIR, zip_filename) | |
| if not url: | |
| logging.error(f"URL for {zip_filename} is missing!") | |
| return False | |
| try: | |
| response = requests.get(url, allow_redirects=True) | |
| if response.status_code == 200: | |
| with open(zip_path, 'wb') as f: | |
| f.write(response.content) | |
| logging.info(f"Downloaded {zip_filename} successfully.") | |
| return True | |
| else: | |
| logging.error(f"Failed to download {zip_filename}. HTTP Status: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| logging.error(f"Error downloading {zip_filename}: {e}") | |
| return False | |
| def extract_if_needed(zip_filename, extracted_filename): | |
| """Extracts model file from zip if not already extracted.""" | |
| zip_path = os.path.join(BASE_DIR, zip_filename) | |
| extracted_path = os.path.join(BASE_DIR, extracted_filename) | |
| if os.path.exists(extracted_path): | |
| logging.info(f"{extracted_filename} already exists. Skipping extraction.") | |
| return True | |
| if not os.path.exists(zip_path): | |
| logging.error(f"Zip file missing: {zip_path}") | |
| return False | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(BASE_DIR) | |
| extracted_files = zip_ref.namelist() | |
| logging.info(f"Extracted {zip_filename}, contents: {extracted_files}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error extracting {zip_filename}: {e}") | |
| return False | |
| def load_model(model_filename): | |
| """Loads a model from the given filename.""" | |
| model_path = os.path.join(BASE_DIR, model_filename) | |
| if not os.path.exists(model_path): | |
| logging.error(f"Model file not found: {model_path}") | |
| return None | |
| try: | |
| model = joblib.load(model_path) | |
| logging.info(f"Loaded {model_filename} successfully.") | |
| return model | |
| except Exception as e: | |
| logging.error(f"Error loading {model_filename}: {e}") | |
| return None | |
| # **Main Execution** | |
| for model_key, zip_filename in MODEL_PATHS.items(): | |
| extracted_filename = EXTRACTED_MODELS[model_key] | |
| # Step 1: Download model if not present | |
| if not os.path.exists(os.path.join(BASE_DIR, zip_filename)): | |
| download_model(globals()[f"{model_key}_URL"], zip_filename) | |
| # Step 2: Extract model file | |
| extract_if_needed(zip_filename, extracted_filename) | |
| # Step 3: Load models | |
| diabetes_model = load_model(EXTRACTED_MODELS["DIABETES_MODEL"]) | |
| scaler = load_model(EXTRACTED_MODELS["SCALER"]) | |
| multi_model = load_model(EXTRACTED_MODELS["MULTI_MODEL"]) | |
| # Final check | |
| if diabetes_model and scaler and multi_model: | |
| logging.info("All models loaded successfully! β ") | |
| else: | |
| logging.error("Some models failed to load. β Check logs for details.") | |