| from typing import Dict, List, Any | |
| # import transformers | |
| # from transformers import AutoTokenizer | |
| # import torch | |
| from datetime import datetime | |
| import torch | |
| # torch.backends.cuda.matmul.allow_tf32 = True | |
| from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler | |
| class EndpointHandler(): | |
| def __init__(self, path=""): | |
| # Use the DPMSolverMultistepScheduler (DPM-Solver++) scheduler here instead | |
| self.pipe = StableDiffusionPipeline.from_pretrained(path, torch_dtype=torch.float16) | |
| self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config) | |
| self.pipe = self.pipe.to("cuda") | |
| # self.pipe.enable_attention_slicing() | |
| self.pipe.enable_xformers_memory_efficient_attention() | |
| # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # self.model.eval() | |
| # self.model.to(device=device, dtype=self.torch_dtype) | |
| # self.generate_kwargs = { | |
| # 'max_new_tokens': 512, | |
| # 'temperature': 0.0001, | |
| # 'top_p': 1.0, | |
| # 'top_k': 0, | |
| # 'use_cache': True, | |
| # 'do_sample': True, | |
| # 'eos_token_id': self.tokenizer.eos_token_id, | |
| # 'pad_token_id': self.tokenizer.pad_token_id, | |
| # "repetition_penalty": 1.1 | |
| # } | |
| def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| data args: | |
| inputs (:obj: `str` | `PIL.Image` | `np.array`) | |
| kwargs | |
| Return: | |
| A :obj:`list` | `dict`: will be serialized and returned | |
| """ | |
| # streamer = TextIteratorStreamer( | |
| # self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True | |
| # ) | |
| ## Model Parameters | |
| # self.generate_kwargs['max_new_tokens'] = data['max_new_tokens'] if 'max_new_tokens' in data else self.generate_kwargs['max_new_tokens'] | |
| # self.generate_kwargs['temperature'] = data['temperature'] if 'temperature' in data else self.generate_kwargs['temperature'] | |
| # self.generate_kwargs['top_p'] = data['top_p'] if 'top_p' in data else self.generate_kwargs['top_p'] | |
| # self.generate_kwargs['top_k'] = data['top_k'] if 'top_k' in data else self.generate_kwargs['top_k'] | |
| # self.generate_kwargs['do_sample'] = data['do_sample'] if 'do_sample' in data else self.generate_kwargs['do_sample'] | |
| # self.generate_kwargs['repetition_penalty'] = data['repetition_penalty'] if 'repetition_penalty' in data else self.generate_kwargs['repetition_penalty'] | |
| ## Prepare the inputs | |
| # inputs = data.pop("inputs",data) | |
| # input_ids = self.tokenizer(inputs, return_tensors="pt").input_ids | |
| # input_ids = input_ids.to(self.model.device) | |
| # pip install accelerate | |
| batch_size = data.pop("batch_size",data) | |
| now = datetime.now() | |
| with torch.inference_mode(): | |
| prompt = "a photo of an astronaut riding a horse on mars" | |
| image = self.pipe([prompt]*batch_size, num_inference_steps=20) | |
| # image.save("astronaut_rides_horse.png") | |
| current = datetime.now() | |
| # encoded_inp = self.tokenizer(inputs, return_tensors='pt', padding=True) | |
| # for key, value in encoded_inp.items(): | |
| # encoded_inp[key] = value.to('cuda:0') | |
| ## Invoke the model | |
| # with torch.no_grad(): | |
| # gen_tokens = self.model.generate( | |
| # input_ids=encoded_inp['input_ids'], | |
| # attention_mask=encoded_inp['attention_mask'], | |
| # **generate_kwargs, | |
| # ) | |
| # ## Decode using tokenizer | |
| # decoded_gen = self.tokenizer.batch_decode(gen_tokens, skip_special_tokens=True) | |
| # with torch.no_grad(): | |
| # output_ids = self.model.generate(input_ids, **self.generate_kwargs) | |
| # # Slice the output_ids tensor to get only new tokens | |
| # new_tokens = output_ids[0, len(input_ids[0]) :] | |
| # output_text = self.tokenizer.decode(new_tokens, skip_special_tokens=True) | |
| return [{"batch_size":batch_size, "time_elapsed": str(current-now)}] | |