Spaces:
Running
Running
| # Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license | |
| import re | |
| import shutil | |
| import subprocess | |
| from itertools import repeat | |
| from multiprocessing.pool import ThreadPool | |
| from pathlib import Path | |
| from urllib import parse, request | |
| import requests | |
| import torch | |
| from ultralytics.utils import LOGGER, TQDM, checks, clean_url, emojis, is_online, url2file | |
| # Define Ultralytics GitHub assets maintained at https://github.com/ultralytics/assets | |
| GITHUB_ASSETS_REPO = "ultralytics/assets" | |
| GITHUB_ASSETS_NAMES = ( | |
| [f"yolov8{k}{suffix}.pt" for k in "nsmlx" for suffix in ("", "-cls", "-seg", "-pose", "-obb", "-oiv7")] | |
| + [f"yolo11{k}{suffix}.pt" for k in "nsmlx" for suffix in ("", "-cls", "-seg", "-pose", "-obb")] | |
| + [f"yolov5{k}{resolution}u.pt" for k in "nsmlx" for resolution in ("", "6")] | |
| + [f"yolov3{k}u.pt" for k in ("", "-spp", "-tiny")] | |
| + [f"yolov8{k}-world.pt" for k in "smlx"] | |
| + [f"yolov8{k}-worldv2.pt" for k in "smlx"] | |
| + [f"yolov9{k}.pt" for k in "tsmce"] | |
| + [f"yolov10{k}.pt" for k in "nsmblx"] | |
| + [f"yolo_nas_{k}.pt" for k in "sml"] | |
| + [f"sam_{k}.pt" for k in "bl"] | |
| + [f"FastSAM-{k}.pt" for k in "sx"] | |
| + [f"rtdetr-{k}.pt" for k in "lx"] | |
| + ["mobile_sam.pt"] | |
| + ["calibration_image_sample_data_20x128x128x3_float32.npy.zip"] | |
| ) | |
| GITHUB_ASSETS_STEMS = [Path(k).stem for k in GITHUB_ASSETS_NAMES] | |
| def is_url(url, check=False): | |
| """ | |
| Validates if the given string is a URL and optionally checks if the URL exists online. | |
| Args: | |
| url (str): The string to be validated as a URL. | |
| check (bool, optional): If True, performs an additional check to see if the URL exists online. | |
| Defaults to False. | |
| Returns: | |
| (bool): Returns True for a valid URL. If 'check' is True, also returns True if the URL exists online. | |
| Returns False otherwise. | |
| Example: | |
| ```python | |
| valid = is_url("https://www.example.com") | |
| ``` | |
| """ | |
| try: | |
| url = str(url) | |
| result = parse.urlparse(url) | |
| assert all([result.scheme, result.netloc]) # check if is url | |
| if check: | |
| with request.urlopen(url) as response: | |
| return response.getcode() == 200 # check if exists online | |
| return True | |
| except Exception: | |
| return False | |
| def delete_dsstore(path, files_to_delete=(".DS_Store", "__MACOSX")): | |
| """ | |
| Deletes all ".DS_store" files under a specified directory. | |
| Args: | |
| path (str, optional): The directory path where the ".DS_store" files should be deleted. | |
| files_to_delete (tuple): The files to be deleted. | |
| Example: | |
| ```python | |
| from ultralytics.utils.downloads import delete_dsstore | |
| delete_dsstore("path/to/dir") | |
| ``` | |
| Note: | |
| ".DS_store" files are created by the Apple operating system and contain metadata about folders and files. They | |
| are hidden system files and can cause issues when transferring files between different operating systems. | |
| """ | |
| for file in files_to_delete: | |
| matches = list(Path(path).rglob(file)) | |
| LOGGER.info(f"Deleting {file} files: {matches}") | |
| for f in matches: | |
| f.unlink() | |
| def zip_directory(directory, compress=True, exclude=(".DS_Store", "__MACOSX"), progress=True): | |
| """ | |
| Zips the contents of a directory, excluding files containing strings in the exclude list. The resulting zip file is | |
| named after the directory and placed alongside it. | |
| Args: | |
| directory (str | Path): The path to the directory to be zipped. | |
| compress (bool): Whether to compress the files while zipping. Default is True. | |
| exclude (tuple, optional): A tuple of filename strings to be excluded. Defaults to ('.DS_Store', '__MACOSX'). | |
| progress (bool, optional): Whether to display a progress bar. Defaults to True. | |
| Returns: | |
| (Path): The path to the resulting zip file. | |
| Example: | |
| ```python | |
| from ultralytics.utils.downloads import zip_directory | |
| file = zip_directory("path/to/dir") | |
| ``` | |
| """ | |
| from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile | |
| delete_dsstore(directory) | |
| directory = Path(directory) | |
| if not directory.is_dir(): | |
| raise FileNotFoundError(f"Directory '{directory}' does not exist.") | |
| # Unzip with progress bar | |
| files_to_zip = [f for f in directory.rglob("*") if f.is_file() and all(x not in f.name for x in exclude)] | |
| zip_file = directory.with_suffix(".zip") | |
| compression = ZIP_DEFLATED if compress else ZIP_STORED | |
| with ZipFile(zip_file, "w", compression) as f: | |
| for file in TQDM(files_to_zip, desc=f"Zipping {directory} to {zip_file}...", unit="file", disable=not progress): | |
| f.write(file, file.relative_to(directory)) | |
| return zip_file # return path to zip file | |
| def unzip_file(file, path=None, exclude=(".DS_Store", "__MACOSX"), exist_ok=False, progress=True): | |
| """ | |
| Unzips a *.zip file to the specified path, excluding files containing strings in the exclude list. | |
| If the zipfile does not contain a single top-level directory, the function will create a new | |
| directory with the same name as the zipfile (without the extension) to extract its contents. | |
| If a path is not provided, the function will use the parent directory of the zipfile as the default path. | |
| Args: | |
| file (str | Path): The path to the zipfile to be extracted. | |
| path (str, optional): The path to extract the zipfile to. Defaults to None. | |
| exclude (tuple, optional): A tuple of filename strings to be excluded. Defaults to ('.DS_Store', '__MACOSX'). | |
| exist_ok (bool, optional): Whether to overwrite existing contents if they exist. Defaults to False. | |
| progress (bool, optional): Whether to display a progress bar. Defaults to True. | |
| Raises: | |
| BadZipFile: If the provided file does not exist or is not a valid zipfile. | |
| Returns: | |
| (Path): The path to the directory where the zipfile was extracted. | |
| Example: | |
| ```python | |
| from ultralytics.utils.downloads import unzip_file | |
| dir = unzip_file("path/to/file.zip") | |
| ``` | |
| """ | |
| from zipfile import BadZipFile, ZipFile, is_zipfile | |
| if not (Path(file).exists() and is_zipfile(file)): | |
| raise BadZipFile(f"File '{file}' does not exist or is a bad zip file.") | |
| if path is None: | |
| path = Path(file).parent # default path | |
| # Unzip the file contents | |
| with ZipFile(file) as zipObj: | |
| files = [f for f in zipObj.namelist() if all(x not in f for x in exclude)] | |
| top_level_dirs = {Path(f).parts[0] for f in files} | |
| # Decide to unzip directly or unzip into a directory | |
| unzip_as_dir = len(top_level_dirs) == 1 # (len(files) > 1 and not files[0].endswith("/")) | |
| if unzip_as_dir: | |
| # Zip has 1 top-level directory | |
| extract_path = path # i.e. ../datasets | |
| path = Path(path) / list(top_level_dirs)[0] # i.e. extract coco8/ dir to ../datasets/ | |
| else: | |
| # Zip has multiple files at top level | |
| path = extract_path = Path(path) / Path(file).stem # i.e. extract multiple files to ../datasets/coco8/ | |
| # Check if destination directory already exists and contains files | |
| if path.exists() and any(path.iterdir()) and not exist_ok: | |
| # If it exists and is not empty, return the path without unzipping | |
| LOGGER.warning(f"WARNING ⚠️ Skipping {file} unzip as destination directory {path} is not empty.") | |
| return path | |
| for f in TQDM(files, desc=f"Unzipping {file} to {Path(path).resolve()}...", unit="file", disable=not progress): | |
| # Ensure the file is within the extract_path to avoid path traversal security vulnerability | |
| if ".." in Path(f).parts: | |
| LOGGER.warning(f"Potentially insecure file path: {f}, skipping extraction.") | |
| continue | |
| zipObj.extract(f, extract_path) | |
| return path # return unzip dir | |
| def check_disk_space(url="https://ultralytics.com/assets/coco8.zip", path=Path.cwd(), sf=1.5, hard=True): | |
| """ | |
| Check if there is sufficient disk space to download and store a file. | |
| Args: | |
| url (str, optional): The URL to the file. Defaults to 'https://ultralytics.com/assets/coco8.zip'. | |
| path (str | Path, optional): The path or drive to check the available free space on. | |
| sf (float, optional): Safety factor, the multiplier for the required free space. Defaults to 1.5. | |
| hard (bool, optional): Whether to throw an error or not on insufficient disk space. Defaults to True. | |
| Returns: | |
| (bool): True if there is sufficient disk space, False otherwise. | |
| """ | |
| try: | |
| r = requests.head(url) # response | |
| assert r.status_code < 400, f"URL error for {url}: {r.status_code} {r.reason}" # check response | |
| except Exception: | |
| return True # requests issue, default to True | |
| # Check file size | |
| gib = 1 << 30 # bytes per GiB | |
| data = int(r.headers.get("Content-Length", 0)) / gib # file size (GB) | |
| total, used, free = (x / gib for x in shutil.disk_usage(path)) # bytes | |
| if data * sf < free: | |
| return True # sufficient space | |
| # Insufficient space | |
| text = ( | |
| f"WARNING ⚠️ Insufficient free disk space {free:.1f} GB < {data * sf:.3f} GB required, " | |
| f"Please free {data * sf - free:.1f} GB additional disk space and try again." | |
| ) | |
| if hard: | |
| raise MemoryError(text) | |
| LOGGER.warning(text) | |
| return False | |
| def get_google_drive_file_info(link): | |
| """ | |
| Retrieves the direct download link and filename for a shareable Google Drive file link. | |
| Args: | |
| link (str): The shareable link of the Google Drive file. | |
| Returns: | |
| (str): Direct download URL for the Google Drive file. | |
| (str): Original filename of the Google Drive file. If filename extraction fails, returns None. | |
| Example: | |
| ```python | |
| from ultralytics.utils.downloads import get_google_drive_file_info | |
| link = "https://drive.google.com/file/d/1cqT-cJgANNrhIHCrEufUYhQ4RqiWG_lJ/view?usp=drive_link" | |
| url, filename = get_google_drive_file_info(link) | |
| ``` | |
| """ | |
| file_id = link.split("/d/")[1].split("/view")[0] | |
| drive_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
| filename = None | |
| # Start session | |
| with requests.Session() as session: | |
| response = session.get(drive_url, stream=True) | |
| if "quota exceeded" in str(response.content.lower()): | |
| raise ConnectionError( | |
| emojis( | |
| f"❌ Google Drive file download quota exceeded. " | |
| f"Please try again later or download this file manually at {link}." | |
| ) | |
| ) | |
| for k, v in response.cookies.items(): | |
| if k.startswith("download_warning"): | |
| drive_url += f"&confirm={v}" # v is token | |
| if cd := response.headers.get("content-disposition"): | |
| filename = re.findall('filename="(.+)"', cd)[0] | |
| return drive_url, filename | |
| def safe_download( | |
| url, | |
| file=None, | |
| dir=None, | |
| unzip=True, | |
| delete=False, | |
| curl=False, | |
| retry=3, | |
| min_bytes=1e0, | |
| exist_ok=False, | |
| progress=True, | |
| ): | |
| """ | |
| Downloads files from a URL, with options for retrying, unzipping, and deleting the downloaded file. | |
| Args: | |
| url (str): The URL of the file to be downloaded. | |
| file (str, optional): The filename of the downloaded file. | |
| If not provided, the file will be saved with the same name as the URL. | |
| dir (str, optional): The directory to save the downloaded file. | |
| If not provided, the file will be saved in the current working directory. | |
| unzip (bool, optional): Whether to unzip the downloaded file. Default: True. | |
| delete (bool, optional): Whether to delete the downloaded file after unzipping. Default: False. | |
| curl (bool, optional): Whether to use curl command line tool for downloading. Default: False. | |
| retry (int, optional): The number of times to retry the download in case of failure. Default: 3. | |
| min_bytes (float, optional): The minimum number of bytes that the downloaded file should have, to be considered | |
| a successful download. Default: 1E0. | |
| exist_ok (bool, optional): Whether to overwrite existing contents during unzipping. Defaults to False. | |
| progress (bool, optional): Whether to display a progress bar during the download. Default: True. | |
| Example: | |
| ```python | |
| from ultralytics.utils.downloads import safe_download | |
| link = "https://ultralytics.com/assets/bus.jpg" | |
| path = safe_download(link) | |
| ``` | |
| """ | |
| gdrive = url.startswith("https://drive.google.com/") # check if the URL is a Google Drive link | |
| if gdrive: | |
| url, file = get_google_drive_file_info(url) | |
| f = Path(dir or ".") / (file or url2file(url)) # URL converted to filename | |
| if "://" not in str(url) and Path(url).is_file(): # URL exists ('://' check required in Windows Python<3.10) | |
| f = Path(url) # filename | |
| elif not f.is_file(): # URL and file do not exist | |
| uri = (url if gdrive else clean_url(url)).replace( # cleaned and aliased url | |
| "https://github.com/ultralytics/assets/releases/download/v0.0.0/", | |
| "https://ultralytics.com/assets/", # assets alias | |
| ) | |
| desc = f"Downloading {uri} to '{f}'" | |
| LOGGER.info(f"{desc}...") | |
| f.parent.mkdir(parents=True, exist_ok=True) # make directory if missing | |
| check_disk_space(url, path=f.parent) | |
| for i in range(retry + 1): | |
| try: | |
| if curl or i > 0: # curl download with retry, continue | |
| s = "sS" * (not progress) # silent | |
| r = subprocess.run(["curl", "-#", f"-{s}L", url, "-o", f, "--retry", "3", "-C", "-"]).returncode | |
| assert r == 0, f"Curl return value {r}" | |
| else: # urllib download | |
| method = "torch" | |
| if method == "torch": | |
| torch.hub.download_url_to_file(url, f, progress=progress) | |
| else: | |
| with request.urlopen(url) as response, TQDM( | |
| total=int(response.getheader("Content-Length", 0)), | |
| desc=desc, | |
| disable=not progress, | |
| unit="B", | |
| unit_scale=True, | |
| unit_divisor=1024, | |
| ) as pbar: | |
| with open(f, "wb") as f_opened: | |
| for data in response: | |
| f_opened.write(data) | |
| pbar.update(len(data)) | |
| if f.exists(): | |
| if f.stat().st_size > min_bytes: | |
| break # success | |
| f.unlink() # remove partial downloads | |
| except Exception as e: | |
| if i == 0 and not is_online(): | |
| raise ConnectionError(emojis(f"❌ Download failure for {uri}. Environment is not online.")) from e | |
| elif i >= retry: | |
| raise ConnectionError(emojis(f"❌ Download failure for {uri}. Retry limit reached.")) from e | |
| LOGGER.warning(f"⚠️ Download failure, retrying {i + 1}/{retry} {uri}...") | |
| if unzip and f.exists() and f.suffix in {"", ".zip", ".tar", ".gz"}: | |
| from zipfile import is_zipfile | |
| unzip_dir = (dir or f.parent).resolve() # unzip to dir if provided else unzip in place | |
| if is_zipfile(f): | |
| unzip_dir = unzip_file(file=f, path=unzip_dir, exist_ok=exist_ok, progress=progress) # unzip | |
| elif f.suffix in {".tar", ".gz"}: | |
| LOGGER.info(f"Unzipping {f} to {unzip_dir}...") | |
| subprocess.run(["tar", "xf" if f.suffix == ".tar" else "xfz", f, "--directory", unzip_dir], check=True) | |
| if delete: | |
| f.unlink() # remove zip | |
| return unzip_dir | |
| def get_github_assets(repo="ultralytics/assets", version="latest", retry=False): | |
| """ | |
| Retrieve the specified version's tag and assets from a GitHub repository. If the version is not specified, the | |
| function fetches the latest release assets. | |
| Args: | |
| repo (str, optional): The GitHub repository in the format 'owner/repo'. Defaults to 'ultralytics/assets'. | |
| version (str, optional): The release version to fetch assets from. Defaults to 'latest'. | |
| retry (bool, optional): Flag to retry the request in case of a failure. Defaults to False. | |
| Returns: | |
| (tuple): A tuple containing the release tag and a list of asset names. | |
| Example: | |
| ```python | |
| tag, assets = get_github_assets(repo="ultralytics/assets", version="latest") | |
| ``` | |
| """ | |
| if version != "latest": | |
| version = f"tags/{version}" # i.e. tags/v6.2 | |
| url = f"https://api.github.com/repos/{repo}/releases/{version}" | |
| r = requests.get(url) # github api | |
| if r.status_code != 200 and r.reason != "rate limit exceeded" and retry: # failed and not 403 rate limit exceeded | |
| r = requests.get(url) # try again | |
| if r.status_code != 200: | |
| LOGGER.warning(f"⚠️ GitHub assets check failure for {url}: {r.status_code} {r.reason}") | |
| return "", [] | |
| data = r.json() | |
| return data["tag_name"], [x["name"] for x in data["assets"]] # tag, assets i.e. ['yolov8n.pt', 'yolov8s.pt', ...] | |
| def attempt_download_asset(file, repo="ultralytics/assets", release="v8.3.0", **kwargs): | |
| """ | |
| Attempt to download a file from GitHub release assets if it is not found locally. The function checks for the file | |
| locally first, then tries to download it from the specified GitHub repository release. | |
| Args: | |
| file (str | Path): The filename or file path to be downloaded. | |
| repo (str, optional): The GitHub repository in the format 'owner/repo'. Defaults to 'ultralytics/assets'. | |
| release (str, optional): The specific release version to be downloaded. Defaults to 'v8.3.0'. | |
| **kwargs (any): Additional keyword arguments for the download process. | |
| Returns: | |
| (str): The path to the downloaded file. | |
| Example: | |
| ```python | |
| file_path = attempt_download_asset("yolo11n.pt", repo="ultralytics/assets", release="latest") | |
| ``` | |
| """ | |
| from ultralytics.utils import SETTINGS # scoped for circular import | |
| if 'v12' in str(file): | |
| repo = "sunsmarterjie/yolov12" | |
| release = "v1.0" | |
| # YOLOv3/5u updates | |
| file = str(file) | |
| file = checks.check_yolov5u_filename(file) | |
| file = Path(file.strip().replace("'", "")) | |
| if file.exists(): | |
| return str(file) | |
| elif (SETTINGS["weights_dir"] / file).exists(): | |
| return str(SETTINGS["weights_dir"] / file) | |
| else: | |
| # URL specified | |
| name = Path(parse.unquote(str(file))).name # decode '%2F' to '/' etc. | |
| download_url = f"https://github.com/{repo}/releases/download" | |
| if str(file).startswith(("http:/", "https:/")): # download | |
| url = str(file).replace(":/", "://") # Pathlib turns :// -> :/ | |
| file = url2file(name) # parse authentication https://url.com/file.txt?auth... | |
| if Path(file).is_file(): | |
| LOGGER.info(f"Found {clean_url(url)} locally at {file}") # file already exists | |
| else: | |
| safe_download(url=url, file=file, min_bytes=1e5, **kwargs) | |
| elif repo == GITHUB_ASSETS_REPO and name in GITHUB_ASSETS_NAMES: | |
| safe_download(url=f"{download_url}/{release}/{name}", file=file, min_bytes=1e5, **kwargs) | |
| else: | |
| tag, assets = get_github_assets(repo, release) | |
| if not assets: | |
| tag, assets = get_github_assets(repo) # latest release | |
| if name in assets: | |
| safe_download(url=f"{download_url}/{tag}/{name}", file=file, min_bytes=1e5, **kwargs) | |
| return str(file) | |
| def download(url, dir=Path.cwd(), unzip=True, delete=False, curl=False, threads=1, retry=3, exist_ok=False): | |
| """ | |
| Downloads files from specified URLs to a given directory. Supports concurrent downloads if multiple threads are | |
| specified. | |
| Args: | |
| url (str | list): The URL or list of URLs of the files to be downloaded. | |
| dir (Path, optional): The directory where the files will be saved. Defaults to the current working directory. | |
| unzip (bool, optional): Flag to unzip the files after downloading. Defaults to True. | |
| delete (bool, optional): Flag to delete the zip files after extraction. Defaults to False. | |
| curl (bool, optional): Flag to use curl for downloading. Defaults to False. | |
| threads (int, optional): Number of threads to use for concurrent downloads. Defaults to 1. | |
| retry (int, optional): Number of retries in case of download failure. Defaults to 3. | |
| exist_ok (bool, optional): Whether to overwrite existing contents during unzipping. Defaults to False. | |
| Example: | |
| ```python | |
| download("https://ultralytics.com/assets/example.zip", dir="path/to/dir", unzip=True) | |
| ``` | |
| """ | |
| dir = Path(dir) | |
| dir.mkdir(parents=True, exist_ok=True) # make directory | |
| if threads > 1: | |
| with ThreadPool(threads) as pool: | |
| pool.map( | |
| lambda x: safe_download( | |
| url=x[0], | |
| dir=x[1], | |
| unzip=unzip, | |
| delete=delete, | |
| curl=curl, | |
| retry=retry, | |
| exist_ok=exist_ok, | |
| progress=threads <= 1, | |
| ), | |
| zip(url, repeat(dir)), | |
| ) | |
| pool.close() | |
| pool.join() | |
| else: | |
| for u in [url] if isinstance(url, (str, Path)) else url: | |
| safe_download(url=u, dir=dir, unzip=unzip, delete=delete, curl=curl, retry=retry, exist_ok=exist_ok) | |