Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (c) Alibaba, Inc. and its affiliates. | |
| import contextlib | |
| import os | |
| import tempfile | |
| from abc import ABCMeta, abstractmethod | |
| from pathlib import Path | |
| from typing import Generator, Union | |
| import requests | |
| from urllib.parse import urlparse | |
| def download_from_url(url): | |
| result = urlparse(url) | |
| file_path = None | |
| if result.scheme is not None and len(result.scheme) > 0: | |
| storage = HTTPStorage() | |
| # bytes | |
| data = storage.read(url) | |
| work_dir = tempfile.TemporaryDirectory().name | |
| if not os.path.exists(work_dir): | |
| os.makedirs(work_dir) | |
| file_path = os.path.join(work_dir, os.path.basename(url)) | |
| with open(file_path, "wb") as fb: | |
| fb.write(data) | |
| assert file_path is not None, f"failed to download: {url}" | |
| return file_path | |
| class Storage(metaclass=ABCMeta): | |
| """Abstract class of storage. | |
| All backends need to implement two apis: ``read()`` and ``read_text()``. | |
| ``read()`` reads the file as a byte stream and ``read_text()`` reads | |
| the file as texts. | |
| """ | |
| def read(self, filepath: str): | |
| pass | |
| def read_text(self, filepath: str): | |
| pass | |
| def write(self, obj: bytes, filepath: Union[str, Path]) -> None: | |
| pass | |
| def write_text( | |
| self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8" | |
| ) -> None: | |
| pass | |
| class LocalStorage(Storage): | |
| """Local hard disk storage""" | |
| def read(self, filepath: Union[str, Path]) -> bytes: | |
| """Read data from a given ``filepath`` with 'rb' mode. | |
| Args: | |
| filepath (str or Path): Path to read data. | |
| Returns: | |
| bytes: Expected bytes object. | |
| """ | |
| with open(filepath, "rb") as f: | |
| content = f.read() | |
| return content | |
| def read_text(self, filepath: Union[str, Path], encoding: str = "utf-8") -> str: | |
| """Read data from a given ``filepath`` with 'r' mode. | |
| Args: | |
| filepath (str or Path): Path to read data. | |
| encoding (str): The encoding format used to open the ``filepath``. | |
| Default: 'utf-8'. | |
| Returns: | |
| str: Expected text reading from ``filepath``. | |
| """ | |
| with open(filepath, "r", encoding=encoding) as f: | |
| value_buf = f.read() | |
| return value_buf | |
| def write(self, obj: bytes, filepath: Union[str, Path]) -> None: | |
| """Write data to a given ``filepath`` with 'wb' mode. | |
| Note: | |
| ``write`` will create a directory if the directory of ``filepath`` | |
| does not exist. | |
| Args: | |
| obj (bytes): Data to be written. | |
| filepath (str or Path): Path to write data. | |
| """ | |
| dirname = os.path.dirname(filepath) | |
| if dirname and not os.path.exists(dirname): | |
| os.makedirs(dirname, exist_ok=True) | |
| with open(filepath, "wb") as f: | |
| f.write(obj) | |
| def write_text( | |
| self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8" | |
| ) -> None: | |
| """Write data to a given ``filepath`` with 'w' mode. | |
| Note: | |
| ``write_text`` will create a directory if the directory of | |
| ``filepath`` does not exist. | |
| Args: | |
| obj (str): Data to be written. | |
| filepath (str or Path): Path to write data. | |
| encoding (str): The encoding format used to open the ``filepath``. | |
| Default: 'utf-8'. | |
| """ | |
| dirname = os.path.dirname(filepath) | |
| if dirname and not os.path.exists(dirname): | |
| os.makedirs(dirname, exist_ok=True) | |
| with open(filepath, "w", encoding=encoding) as f: | |
| f.write(obj) | |
| def as_local_path( | |
| self, filepath: Union[str, Path] | |
| ) -> Generator[Union[str, Path], None, None]: | |
| """Only for unified API and do nothing.""" | |
| yield filepath | |
| class HTTPStorage(Storage): | |
| """HTTP and HTTPS storage.""" | |
| def read(self, url): | |
| # TODO @wenmeng.zwm add progress bar if file is too large | |
| r = requests.get(url) | |
| r.raise_for_status() | |
| return r.content | |
| def read_text(self, url): | |
| r = requests.get(url) | |
| r.raise_for_status() | |
| return r.text | |
| def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]: | |
| """Download a file from ``filepath``. | |
| ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It | |
| can be called with ``with`` statement, and when exists from the | |
| ``with`` statement, the temporary path will be released. | |
| Args: | |
| filepath (str): Download a file from ``filepath``. | |
| Examples: | |
| >>> storage = HTTPStorage() | |
| >>> # After existing from the ``with`` clause, | |
| >>> # the path will be removed | |
| >>> with storage.get_local_path('http://path/to/file') as path: | |
| ... # do something here | |
| """ | |
| try: | |
| f = tempfile.NamedTemporaryFile(delete=False) | |
| f.write(self.read(filepath)) | |
| f.close() | |
| yield f.name | |
| finally: | |
| os.remove(f.name) | |
| def write(self, obj: bytes, url: Union[str, Path]) -> None: | |
| raise NotImplementedError("write is not supported by HTTP Storage") | |
| def write_text( | |
| self, obj: str, url: Union[str, Path], encoding: str = "utf-8" | |
| ) -> None: | |
| raise NotImplementedError("write_text is not supported by HTTP Storage") | |
| class OSSStorage(Storage): | |
| """OSS storage.""" | |
| def __init__(self, oss_config_file=None): | |
| # read from config file or env var | |
| raise NotImplementedError("OSSStorage.__init__ to be implemented in the future") | |
| def read(self, filepath): | |
| raise NotImplementedError("OSSStorage.read to be implemented in the future") | |
| def read_text(self, filepath, encoding="utf-8"): | |
| raise NotImplementedError( | |
| "OSSStorage.read_text to be implemented in the future" | |
| ) | |
| def as_local_path(self, filepath: str) -> Generator[Union[str, Path], None, None]: | |
| """Download a file from ``filepath``. | |
| ``as_local_path`` is decorated by :meth:`contextlib.contextmanager`. It | |
| can be called with ``with`` statement, and when exists from the | |
| ``with`` statement, the temporary path will be released. | |
| Args: | |
| filepath (str): Download a file from ``filepath``. | |
| Examples: | |
| >>> storage = OSSStorage() | |
| >>> # After existing from the ``with`` clause, | |
| >>> # the path will be removed | |
| >>> with storage.get_local_path('http://path/to/file') as path: | |
| ... # do something here | |
| """ | |
| try: | |
| f = tempfile.NamedTemporaryFile(delete=False) | |
| f.write(self.read(filepath)) | |
| f.close() | |
| yield f.name | |
| finally: | |
| os.remove(f.name) | |
| def write(self, obj: bytes, filepath: Union[str, Path]) -> None: | |
| raise NotImplementedError("OSSStorage.write to be implemented in the future") | |
| def write_text( | |
| self, obj: str, filepath: Union[str, Path], encoding: str = "utf-8" | |
| ) -> None: | |
| raise NotImplementedError( | |
| "OSSStorage.write_text to be implemented in the future" | |
| ) | |
| G_STORAGES = {} | |
| class File(object): | |
| _prefix_to_storage: dict = { | |
| "oss": OSSStorage, | |
| "http": HTTPStorage, | |
| "https": HTTPStorage, | |
| "local": LocalStorage, | |
| } | |
| def _get_storage(uri): | |
| assert isinstance(uri, str), f"uri should be str type, but got {type(uri)}" | |
| if "://" not in uri: | |
| # local path | |
| storage_type = "local" | |
| else: | |
| prefix, _ = uri.split("://") | |
| storage_type = prefix | |
| assert storage_type in File._prefix_to_storage, ( | |
| f"Unsupported uri {uri}, valid prefixs: " | |
| f"{list(File._prefix_to_storage.keys())}" | |
| ) | |
| if storage_type not in G_STORAGES: | |
| G_STORAGES[storage_type] = File._prefix_to_storage[storage_type]() | |
| return G_STORAGES[storage_type] | |
| def read(uri: str) -> bytes: | |
| """Read data from a given ``filepath`` with 'rb' mode. | |
| Args: | |
| filepath (str or Path): Path to read data. | |
| Returns: | |
| bytes: Expected bytes object. | |
| """ | |
| storage = File._get_storage(uri) | |
| return storage.read(uri) | |
| def read_text(uri: Union[str, Path], encoding: str = "utf-8") -> str: | |
| """Read data from a given ``filepath`` with 'r' mode. | |
| Args: | |
| filepath (str or Path): Path to read data. | |
| encoding (str): The encoding format used to open the ``filepath``. | |
| Default: 'utf-8'. | |
| Returns: | |
| str: Expected text reading from ``filepath``. | |
| """ | |
| storage = File._get_storage(uri) | |
| return storage.read_text(uri) | |
| def write(obj: bytes, uri: Union[str, Path]) -> None: | |
| """Write data to a given ``filepath`` with 'wb' mode. | |
| Note: | |
| ``write`` will create a directory if the directory of ``filepath`` | |
| does not exist. | |
| Args: | |
| obj (bytes): Data to be written. | |
| filepath (str or Path): Path to write data. | |
| """ | |
| storage = File._get_storage(uri) | |
| return storage.write(obj, uri) | |
| def write_text(obj: str, uri: str, encoding: str = "utf-8") -> None: | |
| """Write data to a given ``filepath`` with 'w' mode. | |
| Note: | |
| ``write_text`` will create a directory if the directory of | |
| ``filepath`` does not exist. | |
| Args: | |
| obj (str): Data to be written. | |
| filepath (str or Path): Path to write data. | |
| encoding (str): The encoding format used to open the ``filepath``. | |
| Default: 'utf-8'. | |
| """ | |
| storage = File._get_storage(uri) | |
| return storage.write_text(obj, uri) | |
| def as_local_path(uri: str) -> Generator[Union[str, Path], None, None]: | |
| """Only for unified API and do nothing.""" | |
| storage = File._get_storage(uri) | |
| with storage.as_local_path(uri) as local_path: | |
| yield local_path | |