Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import multiprocessing | |
| from pathlib import Path | |
| from typing import Dict, List | |
| from datasets import load_dataset, Dataset | |
| from transformers import AutoTokenizer | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| DATASET_NAME_PATTERN = re.compile(r"[^a-zA-Z0-9]") | |
| def download_dataset( | |
| ds_name: str, | |
| ds_config: str = None, | |
| ds_split: str = "train", | |
| ): | |
| """ | |
| Download a dataset from the HuggingFace Hub. Will only save the | |
| Args: | |
| ds_name (`str`): | |
| The name of the dataset to load. | |
| ds_config (`str`, *optional*, Defaults to `None`): | |
| The configuration of the dataset to load. | |
| ds_split (`str`, *optional*, Defaults to `"train"`): | |
| The split of the dataset to load. | |
| Returns: | |
| len(ds) (`int`): | |
| The number of rows in the dataset. | |
| """ | |
| if ds_name == "wikipedia": | |
| ds = load_wikipedia(ds_name, ds_config) | |
| else: | |
| if ds_config == "": | |
| ds_config = None | |
| ds = load_dataset(ds_name, ds_config, split=ds_split) | |
| chunk_and_save_dataset( | |
| ds, ds_name=ds_name, ds_config=ds_config, suffix=f"_{ds_split}_raw" | |
| ) | |
| return len(ds) | |
| def load_wikipedia(ds_name, ds_config): | |
| """ | |
| Stream the wikipedia dataset from the HuggingFace Hub. | |
| Args: | |
| ds_name (`str`): | |
| The name of the dataset to load. Must be `"wikipedia"`. | |
| ds_config (`str`, *optional*, Defaults to `None`): | |
| The configuration of the dataset to load. | |
| Returns: | |
| ds (`datasets.Dataset`): | |
| """ | |
| ds = load_dataset(ds_name, ds_config, streaming=True, split="train") | |
| def gen(): | |
| for example in ds: | |
| yield {"text": example["text"]} | |
| return Dataset.from_generator(gen) | |
| def chunk_and_save_dataset( | |
| ds: Dataset, | |
| chunk_size: int = 20_000, | |
| ds_name: str = None, | |
| ds_config: str = None, | |
| suffix: str = "", | |
| ): | |
| """ | |
| Chunk a dataset into smaller datasets of size `chunk_size`. | |
| The name of the dataset will be used to create a folder in `/data`. | |
| Args: | |
| ds (`Dataset`): | |
| The dataset to chunk. | |
| chunk_size (`int`, *optional*, Defaults to `20_000`): | |
| The size of each chunk. Defaults to `20_000`. | |
| ds_name (`str`, *optional*, Defaults to `None`): | |
| The name of the dataset to load. | |
| ds_config (`str`, *optional*, Defaults to `None`): | |
| The configuration of the dataset to load. | |
| suffix (`str`, *optional*, Defaults to `""`): | |
| The suffix to add to the dataset name. | |
| Returns: | |
| chunks (`List[Dataset]`): | |
| The list of chunks. | |
| """ | |
| if ds_config is None: | |
| ds_config = "" | |
| folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config) | |
| folder.mkdir(exist_ok=True, parents=True) | |
| for chunk_num, start_idx in enumerate(range(0, len(ds), chunk_size)): | |
| end_idx = min(start_idx + chunk_size, len(ds)) | |
| temp = ds.select(range(start_idx, end_idx)) | |
| temp.to_parquet(str(folder / f"chunk_{chunk_num}{suffix}")) | |
| def tokenize_dataset( | |
| ds_name: str, | |
| ds_config: str = None, | |
| ds_split: str = "train", | |
| model_name: str = None, | |
| opt_level: str = None, | |
| column_name: str = "text", | |
| num2skip: int = 0, | |
| num2embed: int = -1, | |
| ): | |
| """ | |
| Tokenize the examples using the tokenizer. Sort by length | |
| Args: | |
| ds_name (`str`): | |
| The name of the dataset to load. | |
| ds_config (`str`, *optional*, Defaults to `None`): | |
| The configuration of the dataset to load. | |
| model_name (`str`, *optional*, Defaults to `None`): | |
| The name of the model to use for tokenization. | |
| opt_level (`str`, *optional*, Defaults to `None`): | |
| The optimization level to use for tokenization. | |
| column_name (`str`, *optional*, defaults to `text`): | |
| column name to use for tokenization. Defaults to `text` | |
| num2skip (`int`, *optional*, defaults to `0`): | |
| number of rows to skip. Defaults to `0` | |
| num2embed (`int`, *optional*, defaults to `-1`): | |
| number of rows to embed. Defaults to `-1`, which means all rows. | |
| Returns: | |
| ds (`Dataset`): | |
| """ | |
| # TODO: option for controlling length for models that can go shorter/longer than 512 | |
| folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config) | |
| files = list(map(str, folder.glob(f"chunk_*_{ds_split}_raw"))) | |
| ds = load_dataset("parquet", data_files=files, split="train") | |
| if num2embed == -1: | |
| num2embed = len(ds) | |
| ds = ds.select(range(num2skip, num2skip + num2embed)) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| padding = "max_length" if opt_level == "O4" else False | |
| max_length = 512 | |
| def tokenize( | |
| examples: Dict[str, List[str]], | |
| ): | |
| tokenized = tokenizer( | |
| examples[column_name], | |
| truncation=True, | |
| padding=padding, | |
| max_length=max_length, | |
| ) | |
| tokenized["length"] = [len(x) for x in tokenized["input_ids"]] | |
| return tokenized | |
| tds = ds.map( | |
| tokenize, | |
| batched=True, | |
| batch_size=1000, | |
| remove_columns=set(ds.column_names) - {column_name}, | |
| num_proc=multiprocessing.cpu_count(), | |
| desc="Tokenizing", | |
| ) | |
| # sort to minimize padding | |
| if padding != "max_length": | |
| tds = tds.sort("length") | |
| chunk_and_save_dataset( | |
| tds, ds_name=ds_name, ds_config=ds_config, suffix=f"_{ds_split}_tokenized" | |
| ) | |
| def load_tokenized_dataset( | |
| ds_name: str, | |
| ds_config: str = None, | |
| ds_split: str = "train", | |
| ): | |
| """ | |
| Load a tokenized dataset from disk. | |
| Args: | |
| ds_name (`str`): | |
| The name of the dataset to load. | |
| ds_config (`str`, *optional*, Defaults to `None`): | |
| The configuration of the dataset to load. | |
| ds_split (`str`, *optional*, Defaults to `"train"`): | |
| The split of the dataset to load. | |
| Returns: | |
| ds (`Dataset`): | |
| """ | |
| folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config) | |
| files = list(map(str, folder.glob(f"chunk_*_{ds_split}_tokenized"))) | |
| return load_dataset("parquet", data_files=files, split="train") | |