Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| from pydantic import BaseModel | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from transformers import Idefics2ForConditionalGeneration, Idefics2Processor, PreTrainedModel, ProcessorMixin | |
| from typing import Optional | |
| import re | |
| import logging | |
| import pickle | |
| from pathlib import Path | |
| import numpy as np | |
| from matplotlib import pyplot as plt | |
| from sklearn.cluster import KMeans | |
| from huggingface_hub import hf_hub_download | |
| class BaseModelYamlJsonMixin: | |
| """ | |
| BaseModel with helper methods for loading and saving to yaml/json format. | |
| """ | |
| def from_yaml(cls, path: Path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| return cls(**yaml.safe_load(f)) | |
| def to_yaml(self: BaseModel, path: Path): | |
| with open(path, "w", encoding="utf-8") as f: | |
| yaml.safe_dump(self.model_dump(), f) | |
| def from_json(cls, path: Path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| return cls.model_validate_json(f.read()) | |
| def to_json(self: BaseModel, path: Path, indent: int = 4, *args, **kwargs): | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(self.model_dump_json(indent=indent, *args, **kwargs)) | |
| class BaseModelWithYamlJsonFromTo(BaseModel, BaseModelYamlJsonMixin): | |
| pass | |
| class Idefics2TrainAdditionalConfig(BaseModel): | |
| """ | |
| num_action_tokens (`int`, defaults to `32`): | |
| Number of action tokens to add to the tokenizer vocabulary. | |
| do_image_splitting (`bool`, *optional*, defaults to `False`): | |
| Whether to split the image into a sequence 4 equal sub-images concatenated with the original image. That | |
| strategy was first introduced in https://arxiv.org/abs/2311.06607. | |
| lora_config (`dict`, defaults to recommended config from https://x.com/danielhanchen/status/1791900967472140583): | |
| Configuration for the LoRA model. If it is `None`, the model will not use LoRA. | |
| """ | |
| # must be set to extend vocabulary of model + tokenizer | |
| num_action_tokens: int = -1 # it will be overwritten by the processor_config.yml | |
| # must be set to be used in pipeline | |
| num_actions: int = -1 # it will be overwritten by the processor_config.yml | |
| do_image_splitting: bool = True | |
| freeze_original_vocab: bool = False | |
| freeze_vision_model: bool = False | |
| freeze_connector: bool = False | |
| torch_dtype: str = "bfloat16" | |
| lora_config: dict | None = dict( | |
| r=256, | |
| lora_alpha=512, | |
| lora_dropout=0.1, | |
| target_modules="all-linear", | |
| use_rslora=True, | |
| init_lora_weights="gaussian", | |
| modules_to_save=["lm_head", "embed_tokens"], | |
| ) | |
| model_name_or_path: str = "HuggingFaceM4/idefics2-8b" | |
| class KMeansActionTokenizer(): | |
| def __init__(self, action_count: int = 128): | |
| self.action_count = action_count | |
| self.kmeans = KMeans(n_clusters=self.action_count, random_state=np.random.RandomState(seed=42)) | |
| def token_count(self): | |
| return self.action_count | |
| def from_pretrained(cls, model_path: str | Path): | |
| model_path = Path(model_path) | |
| self = cls() | |
| action_tokenizer_path = hf_hub_download(repo_id=str(model_path), filename="tokenizer.pkl") | |
| with open(action_tokenizer_path, "rb") as file: | |
| self.kmeans = pickle.load(file) | |
| self.action_count = self.kmeans.n_clusters | |
| # assert self.action_count == 32 | |
| return self | |
| def save_pretrained(self, model_path: str | Path): | |
| model_path = Path(model_path) | |
| model_path.mkdir(exist_ok=True) | |
| with open(model_path / "tokenizer.pkl", "wb") as file: | |
| pickle.dump(self.kmeans, file) | |
| def train(self, actions): | |
| self.kmeans.fit(actions) | |
| def tokenize(self, action, padding=False, max_length=-1, truncation=False): | |
| # action: (K, 3) shape, adjusted delta_position and delta_yaw | |
| return [i for i in self.kmeans.predict(action)] | |
| def detokenize(self, tokens): | |
| # Token Check | |
| check = np.asarray(tokens) | |
| in_valid_range = (0 <= check) & (check < self.action_count) | |
| if not in_valid_range.all(): | |
| logging.warning(f"Invalid tokens occur: {tokens}") | |
| # If error occurs, return stop action. | |
| return np.asarray([[0.0, 0.0, 0.0] for _ in range(len(tokens))]) | |
| return np.asarray([self.kmeans.cluster_centers_[t] for t in tokens]) | |
| def visualize(self, figset=None): | |
| if figset is None: | |
| fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(12, 16), dpi=300) | |
| else: | |
| fig, axes = figset | |
| FONT = {"fontsize": 20} | |
| axes[0].set_title("Center", fontdict=FONT) | |
| axes[1].set_title("Center_Rot", fontdict=FONT) | |
| labels = self.kmeans.labels_ | |
| centers = self.kmeans.cluster_centers_ | |
| # plot center. each center is given as (x, y, yaw). plot point (x,y) and arrow from (x,y) to p', with direction of yaw. consider (x, y)'s scale | |
| scale_factor = 0.05 | |
| for i, center in enumerate(centers): | |
| x, y, yaw = center | |
| axes[0].plot(x, y, "ro") | |
| axes[0].arrow( | |
| x, | |
| y, | |
| np.cos(yaw) * scale_factor, | |
| np.sin(yaw) * scale_factor, | |
| head_width=scale_factor * 0.3, | |
| head_length=scale_factor * 0.3, | |
| fc="k", | |
| ec="k", | |
| ) | |
| axes[0].text(x, y, f"{i}", fontsize=10) | |
| axes[0].axis("equal") | |
| axes[0].grid(True) | |
| # filter centers that are not far from origin in distance 0.3 | |
| _centers = centers[np.linalg.norm(centers[:, :2], axis=1) < 0.05] | |
| # print(f"action near zero: {_centers}") | |
| scale_factor = 0.1 | |
| for center in _centers: | |
| x, y, yaw = center | |
| axes[1].plot(x, y, "ro") | |
| axes[1].arrow( | |
| x, | |
| y, | |
| np.cos(yaw) * scale_factor, | |
| np.sin(yaw) * scale_factor, | |
| head_width=scale_factor * 0.3, | |
| head_length=scale_factor * 0.3, | |
| fc="k", | |
| ec="k", | |
| ) | |
| axes[1].axis("equal") | |
| axes[1].grid(True) | |
| return fig, axes | |
| class Idefics2PipelineConfig(BaseModelWithYamlJsonFromTo): | |
| pipeline_class: str = "Idefics2Pipeline" | |
| train_additional_cfg: Idefics2TrainAdditionalConfig | |
| class Idefics2Pipeline(): | |
| def __init__( | |
| self, | |
| model: PreTrainedModel, | |
| processor: ProcessorMixin, | |
| action_tokenizer: KMeansActionTokenizer, | |
| config: Idefics2PipelineConfig, | |
| ): | |
| self.model = model | |
| self.processor = processor | |
| self.action_tokenizer = action_tokenizer | |
| self.config = config | |
| def save_pretrained( | |
| self, | |
| save_directory: str, | |
| ): | |
| if not isinstance(save_directory, Path): | |
| save_directory = Path(save_directory) | |
| self.model.save_pretrained(save_directory) | |
| self.processor.save_pretrained(save_directory) | |
| self.action_tokenizer.save_pretrained(save_directory) | |
| self.config.to_json(f"{save_directory}/pipeline_config.json") | |
| def from_pretrained(cls, pretrained_model_name_or_path: str): | |
| pipeline_config_path = hf_hub_download(repo_id=pretrained_model_name_or_path, filename="pipeline_config.json") | |
| pipeline_config_path = Path(pipeline_config_path) | |
| config = Idefics2PipelineConfig.model_validate_json( | |
| (pipeline_config_path).read_text() | |
| ) | |
| if not isinstance(pretrained_model_name_or_path, Path): | |
| pretrained_model_name_or_path = Path(pretrained_model_name_or_path) | |
| model = Idefics2ForConditionalGeneration.from_pretrained(pretrained_model_name_or_path) | |
| processor = Idefics2Processor.from_pretrained(pretrained_model_name_or_path) | |
| model.eval() | |
| action_tokenizer = KMeansActionTokenizer.from_pretrained(pretrained_model_name_or_path) | |
| return cls(model, processor, action_tokenizer, config) | |
| def to(self, device): | |
| return self.model.to(device) | |
| def __call__( | |
| self, | |
| examples: list[dict], | |
| return_traj: Optional[bool] = False, | |
| ): | |
| """ | |
| call model with examples | |
| Args: | |
| examples: list of example, [B, example] | |
| return_traj: return trajectory if True | |
| """ | |
| raise NotImplementedError("Not implemented yet") | |
| # same as idefics2 data collator | |
| texts = [] | |
| images = [] | |
| for example in examples: | |
| image = example["images"] | |
| messages = example["messages"] | |
| text = self.processor.apply_chat_template(messages, add_generation_prompt=False) | |
| texts.append(text.strip()) | |
| images.append(image) | |
| inputs = self.processor(text=texts, images=images, return_tensors="pt", padding=True) | |
| generate_ids = self.model.generate(**inputs, max_new_tokens=self.config.num_actions) | |
| generated_text = self.processor.batch_decode(generate_ids, skip_special_tokens=True) | |
| if return_traj: | |
| return self.action_tokenizer.detokenize(generated_text) | |
| else: | |
| return generated_text | |
| def __call__( | |
| self, | |
| message_list: list[list[dict]], | |
| images_list: list[list[Image.Image]], | |
| return_traj: Optional[bool] = False, | |
| ): | |
| """ | |
| call model with message and images | |
| Args: | |
| message_list: list of messages, [B, messages] | |
| images_list: list of images, [B, images] | |
| return_traj: return trajectory if True | |
| """ | |
| # we don't use batch inference for run model worker | |
| if len(message_list) != 1: | |
| raise ValueError("No batch api call allowed for Idefics2Pipeline") | |
| message = message_list[0] | |
| images = images_list[0] | |
| prompt = self.processor.apply_chat_template(message, add_generation_prompt=True) | |
| prompt.replace("<end_of_utterance>", "") | |
| # add space to match the training data | |
| prompt = prompt + " " | |
| inputs = self.processor(text=prompt, images=images, return_tensors="pt", padding=True) | |
| device = self.model.device | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| generate_ids = self.model.generate( | |
| **inputs, max_new_tokens=self.config.train_additional_cfg.num_actions, top_k=1 | |
| ) | |
| generated_texts = self.processor.batch_decode(generate_ids, skip_special_tokens=True) | |
| if return_traj: | |
| pred_action = re.findall(r"<ACTION_(\d+)>", generated_texts[0]) | |
| # pred_action = pred_action if len(pred_action) == self.config.num_actions else [-1] * self.config.num_actions | |
| pred_action = np.array(pred_action, dtype=np.int64) | |
| return self.action_tokenizer.detokenize(pred_action).tolist() | |
| else: | |
| return generated_texts |