import os import re import traceback import unicodedata import tiktoken from transformers import AutoTokenizer, XGLMTokenizerFast from mappings import MODEL_MAP, TOKENIZER_INFO TOKENIZER_CACHE = {} class TokenMonsterTokenizer: def __init__(self, name): import tokenmonster self.name = name self.vocab = tokenmonster.load(name.split("/")[-1]) def __call__(self, text, **kwargs): ids = list(self.vocab.tokenize(text)) return {"input_ids": ids} def convert_ids_to_tokens(self, ids): return [self.vocab.decode(id_) for id_ in ids] def get_token_type(token_text): if re.match(r"^\s+$", token_text): return "whitespace" elif re.match(r"^[a-zA-Z]+$", token_text): return "word" elif re.match(r"^\d+$", token_text): return "number" elif re.match(r"^[^\w\s]+$", token_text): return "punctuation" elif token_text.startswith("<") and token_text.endswith(">"): return "special" else: return "mixed" def is_subword(token_text, model, is_first): if not token_text or token_text.isspace(): return False if token_text.startswith("<") and token_text.endswith(">"): return False # special token if model in { "llama-2", "llama-3", "gemma-2", "bloom", "aya-expanse", "comma", }: return ( not (token_text.startswith("▁") or token_text.startswith("Ġ")) and not is_first ) elif model == "bert": return token_text.startswith("##") elif model in {"qwen3", "qwen2.5"}: return ( not (token_text.startswith("▁") or token_text.startswith("Ġ")) and not is_first ) elif model in {"gpt-4", "gpt-2", "byt5"}: return not token_text.startswith(" ") and not is_first else: return not is_first def tokenize_with_tiktoken(text, model): enc = tiktoken.encoding_for_model(model) # Process the entire text at once, not line by line token_ids = enc.encode(text) token_data = [] current_text_pos = 0 # Build character-to-token mapping char_to_tokens = {} # Decode each token and find its position in the original text for i, token_id in enumerate(token_ids): token_text = enc.decode([token_id]) # Find where this token appears in the remaining text remaining_text = text[current_text_pos:] if token_text in remaining_text: # Find the position of this token in the original text local_pos = remaining_text.find(token_text) actual_start = current_text_pos + local_pos actual_end = actual_start + len(token_text) # Map each character position to this token for char_pos in range(actual_start, actual_end): if char_pos not in char_to_tokens: char_to_tokens[char_pos] = [] char_to_tokens[char_pos].append(token_id) current_text_pos = actual_end # Group consecutive characters that have the same token ID sets processed_chars = set() text_pos = 0 while text_pos < len(text): if text_pos in processed_chars: text_pos += 1 continue # Get tokens for current character current_tokens = char_to_tokens.get(text_pos, []) if not current_tokens: # Handle characters not covered by any token token_data.append( { "text": text[text_pos], "id": None, "type": get_token_type(text[text_pos]), "is_subword": False, "bytes": len(text[text_pos].encode("utf-8")), "position": len(token_data), } ) processed_chars.add(text_pos) text_pos += 1 continue # Find the span of characters that share the same token ID set span_start = text_pos span_end = text_pos + 1 # Extend span while characters have the same token set while ( span_end < len(text) and span_end in char_to_tokens and char_to_tokens[span_end] == current_tokens ): span_end += 1 # Get the text for this span span_text = text[span_start:span_end] # Create token data entry token_data.append( { "text": span_text, "id": current_tokens if len(current_tokens) > 1 else current_tokens[0], "type": get_token_type(span_text), "is_subword": is_subword(span_text, model, len(token_data) == 0), "bytes": len(span_text.encode("utf-8")), "position": len(token_data), } ) # Mark all characters in this span as processed for pos in range(span_start, span_end): processed_chars.add(pos) text_pos = span_end return { "model": TOKENIZER_INFO[model]["name"], "token_count": len(token_ids), "tokens": token_data, "compression_ratio": len(text) / len(token_data) if token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } def tokenize_with_tiktoke1n(text, model): encoding = "cl100k_base" if model == "gpt-4" else "gpt2" enc = tiktoken.get_encoding(encoding) token_data = [] current_pos = 0 text_ = text for text in text_.split("\n"): tokens = enc.encode(text + "\n") # token_text = enc.decode([token_id]) # token_type = get_token_type(token_text) # subword = is_subword(token_text, model, i == 0) token_ids = encoding["input_ids"] ## offset in the text for each token, i.e. token i covers text[offsets[i][0]:offsets[i][1]] offsets = encoding.get("offset_mapping", []) token_data = [] curr_tok_id = 0 current_text_pos = 0 token_id = [] while curr_tok_id < len(token_ids) and curr_tok_id < len(tokens): if offsets and curr_tok_id < len(offsets): start, end = offsets[curr_tok_id] actual_text = text[start:end] if current_text_pos == end: token_id.append(token_ids[curr_tok_id]) else: token_id = [token_ids[curr_tok_id]] token_type = get_token_type(actual_text) subword = is_subword(actual_text, model, curr_tok_id == 0) if current_text_pos != end: token_data.append( { "text": actual_text, "id": token_id, "type": token_type, "is_subword": subword, "bytes": len(actual_text.encode("utf-8")), "position": curr_tok_id, } ) curr_tok_id += 1 current_text_pos = end return { "model": TOKENIZER_INFO[model]["name"], "token_count": len(token_data), "tokens": token_data, "compression_ratio": len(text) / len(token_data) if token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } def get_hf_tokenizer(model): model_name = MODEL_MAP.get(model, "gpt2") if model_name in TOKENIZER_CACHE: return TOKENIZER_CACHE[model_name] # Get token from environment hf_token = os.getenv("HF_TOKEN") if not hf_token: return { "model": TOKENIZER_INFO[model]["name"], "token_count": 0, "tokens": [], "error": "HF_TOKEN not found in environment. Please add your HuggingFace token to Space secrets.", } if "tokenmonster" in model_name: tokenizer = TokenMonsterTokenizer("englishcode-32000-consistent-v1") else: tokenizer = AutoTokenizer.from_pretrained( model_name, token=hf_token, trust_remote_code=True ) TOKENIZER_CACHE[model_name] = tokenizer return tokenizer def get_tokenizer(model): # import code; code.interact(local=locals()|globals()) model_name = MODEL_MAP.get(model, None) if model_name is None: raise ValueError(f"Unknown tokenizer code {model_name}") print(model_name) if model_name in TOKENIZER_CACHE: return TOKENIZER_CACHE[model_name] # Get token from environment hf_token = os.getenv("HF_TOKEN") if not hf_token: return { "model": TOKENIZER_INFO[model]["name"], "token_count": 0, "tokens": [], "error": "HF_TOKEN not found in environment. Please add your HuggingFace token to Space secrets.", } if "tekken" in model_name: from mistral_common.tokens.tokenizers.mistral import MistralTokenizer tok = MistralTokenizer.v3(is_tekken=True) tokenizer = tok.instruct_tokenizer.tokenizer elif "tokenmonster" in model_name: tokenizer = TokenMonsterTokenizer("englishcode-32000-consistent-v1") elif "xglm" in model_name.lower(): # tokenizer = AutoTokenizer.from_pretrained( tokenizer = XGLMTokenizerFast.from_pretrained( model_name, token=hf_token, trust_remote_code=True,# use_fast=False ) else: tokenizer = AutoTokenizer.from_pretrained( model_name, token=hf_token, trust_remote_code=True ) TOKENIZER_CACHE[model_name] = tokenizer return tokenizer def tokenize_w_tekken(text, model): tokenizer = get_tokenizer(model) # Process the entire text at once, not line by line token_ids = tokenizer.encode(text, bos=False, eos=False) token_data = [] current_text_pos = 0 # Build character-to-token mapping char_to_tokens = {} # Decode each token and find its position in the original text for i, token_id in enumerate(token_ids): token_text = tokenizer.decode([token_id]) # Find where this token appears in the remaining text remaining_text = text[current_text_pos:] if token_text in remaining_text: # Find the position of this token in the original text local_pos = remaining_text.find(token_text) actual_start = current_text_pos + local_pos actual_end = actual_start + len(token_text) # Map each character position to this token for char_pos in range(actual_start, actual_end): if char_pos not in char_to_tokens: char_to_tokens[char_pos] = [] char_to_tokens[char_pos].append(token_id) current_text_pos = actual_end # Group consecutive characters that have the same token ID sets processed_chars = set() text_pos = 0 while text_pos < len(text): if text_pos in processed_chars: text_pos += 1 continue # Get tokens for current character current_tokens = char_to_tokens.get(text_pos, []) if not current_tokens: # Handle characters not covered by any token token_data.append( { "text": text[text_pos], "id": None, "type": get_token_type(text[text_pos]), "is_subword": False, "bytes": len(text[text_pos].encode("utf-8")), "position": len(token_data), } ) processed_chars.add(text_pos) text_pos += 1 continue # Find the span of characters that share the same token ID set span_start = text_pos span_end = text_pos + 1 # Extend span while characters have the same token set while ( span_end < len(text) and span_end in char_to_tokens and char_to_tokens[span_end] == current_tokens ): span_end += 1 # Get the text for this span span_text = text[span_start:span_end] # Create token data entry token_data.append( { "text": span_text, "id": current_tokens if len(current_tokens) > 1 else current_tokens[0], "type": get_token_type(span_text), "is_subword": is_subword(span_text, model, len(token_data) == 0), "bytes": len(span_text.encode("utf-8")), "position": len(token_data), } ) # Mark all characters in this span as processed for pos in range(span_start, span_end): processed_chars.add(pos) text_pos = span_end return { "model": TOKENIZER_INFO[model]["name"], "token_count": len(token_ids), "tokens": token_data, "compression_ratio": len(text) / len(token_data) if token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } def tokenize_w_tekken1(text, model): try: tokenizer = get_tokenizer(model) text_ = text index = 0 token_data = [] for text_ in text.split("\n"): text_ += "\n" token_ids = tokenizer.encode(text_, bos=False, eos=False) tokens = [tokenizer.decode([tok]) for tok in token_ids] # import code; code.interact(local=locals()|globals()) for i, tok in enumerate(tokens): tok = tok[0].encode("utf-8") # token_type = get_token_type(tok) token_type=None # subword = is_subword(tok, tokenizer, is_first=index == 0) subword=False token_data.append( { "text": tok, "id": token_ids[i], "type": token_type, "is_subword": subword, "bytes": len(tok), "position": index, } ) index += 1 # import code; code.interact(local=locals()|globals()) return { "model": TOKENIZER_INFO[model]["name"], "token_count": index, "tokens": token_data, "compression_ratio": len(text) / len(token_data) if token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } except Exception as e: # Your existing error handling... print(f"Error: {e}") pass # Alternative version if you really need line-by-line processing: def tokenize_with_hf(text, model): try: tokenizer = get_tokenizer(model) all_token_data = [] global_position = 0 text_offset = 0 # Process line by line but accumulate results for line in text.split("\n"): line_with_newline = line + "\n" encoding = tokenizer( line_with_newline, return_offsets_mapping=True, return_tensors=None, add_special_tokens=False, ) token_ids = encoding["input_ids"] tokens = tokenizer.convert_ids_to_tokens(token_ids) offsets = encoding.get("offset_mapping", []) # Process tokens for this line for i in range(len(token_ids)): if i < len(offsets) and offsets[i] is not None: start, end = offsets[i] actual_text = line_with_newline[start:end] else: actual_text = tokens[i] if i < len(tokens) else "" if not actual_text: continue token_type = get_token_type(actual_text) subword = is_subword(actual_text, model, global_position == 0) all_token_data.append({ # "text": actual_text, "text": tokens[i], "id": [token_ids[i]], "type": token_type, "is_subword": subword, "bytes": len(actual_text.encode("utf-8")), "position": global_position, }) global_position += 1 text_offset += len(line_with_newline) # Calculate total token count total_tokens = sum(len(encoding["input_ids"]) for encoding in [ tokenizer(text, return_tensors=None, add_special_tokens=False) ]) return { "model": TOKENIZER_INFO[model]["name"], "token_count": total_tokens, "tokens": all_token_data, "compression_ratio": len(text) / len(all_token_data) if all_token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() return None def tokenize_with_hfold(text, model): try: tokenizer = get_hf_tokenizer(model) # Process the ENTIRE text at once, not line by line text_ = text token_data = [] for text_ in text.split("\n"): text_ += "\n" encoding = tokenizer( text, # Use original text, not line by line return_offsets_mapping=True, return_tensors=None, add_special_tokens=False, ) token_ids = encoding["input_ids"] tokens = tokenizer.convert_ids_to_tokens(token_ids) ## offset in the text for each token, i.e. token i covers text[offsets[i][0]:offsets[i][1]] offsets = encoding.get("offset_mapping", []) curr_tok_id = 0 current_text_pos = 0 token_id = [] while curr_tok_id < len(token_ids) and curr_tok_id < len(tokens): if offsets and curr_tok_id < len(offsets): start, end = offsets[curr_tok_id] actual_text = text[start:end] if current_text_pos == end: token_id.append(token_ids[curr_tok_id]) else: token_id = [token_ids[curr_tok_id]] token_type = get_token_type(actual_text) subword = is_subword(actual_text, model, curr_tok_id == 0) if current_text_pos != end: token_data.append( { "text": actual_text, "id": token_id, "type": token_type, "is_subword": subword, "bytes": len(actual_text.encode("utf-8")), "position": curr_tok_id, } ) current_text_pos = end else: token_data.append( { "text": tokens[curr_tok_id], "id": [token_ids[curr_tok_id]], "type": get_token_type(tokens[curr_tok_id]), "is_subword": is_subword(tokens[curr_tok_id]), "bytes": len(tokens[curr_tok_id].encode("utf-8")), "position": curr_tok_id, } ) curr_tok_id += 1 return { "model": TOKENIZER_INFO[model]["name"], "token_count": len(token_ids), "tokens": token_data, "compression_ratio": len(text) / len(token_data) if token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } except Exception as e: # Your existing error handling... print(f"Error: {e}") pass def tokenize_with_byt5(text, model): """Special handling for ByT5 byte-level tokenizer""" try: tokenizer = get_hf_tokenizer(model) # ByT5 doesn't support offset_mapping, so we handle it differently encoding = tokenizer( text, return_tensors=None, add_special_tokens=False, ) token_ids = encoding["input_ids"] # For ByT5, each token represents a byte text_bytes = text.encode('utf-8') token_data = [] for i, token_id in enumerate(token_ids): # Decode individual token try: token_text = tokenizer.decode([token_id]) # For ByT5, tokens often correspond to individual bytes/characters if i < len(text_bytes): # Get the actual byte this token represents byte_val = text_bytes[i] actual_char = chr(byte_val) if byte_val < 128 else text_bytes[i:i+1].decode('utf-8', errors='replace') else: actual_char = token_text token_type = get_token_type(actual_char) subword = is_subword(actual_char, model, i == 0) token_data.append({ "text": actual_char, "id": [token_id], "type": token_type, "is_subword": subword, "bytes": len(actual_char.encode("utf-8")), "position": i, }) except Exception as e: # Handle special tokens or decoding issues token_data.append({ "text": f"", "id": [token_id], "type": "special", "is_subword": False, "bytes": 0, "position": i, }) return { "model": TOKENIZER_INFO[model]["name"], "token_count": len(token_ids), "tokens": token_data, "compression_ratio": len(text) / len(token_data) if token_data else 0, "encoding": TOKENIZER_INFO[model]["encoding"], "vocab_size": TOKENIZER_INFO[model]["vocab_size"], } except Exception as e: print(f"Error in ByT5 tokenization: {e}") return None def normalize_text(text, method): """Apply normalization method to text""" if method == "none": return text elif method == "lowercase": return text.lower() elif method == "nfc": return unicodedata.normalize("NFC", text) elif method == "nfd": return unicodedata.normalize("NFD", text) elif method == "nfk": return unicodedata.normalize("NFK", text) elif method == "nfkc": return unicodedata.normalize("NFKC", text) elif method == "nfkd": return unicodedata.normalize("NFKD", text) elif method == "strip_accents": return "".join( c for c in unicodedata.normalize("NFD", text) if unicodedata.category(c) != "Mn" ) elif method == "strip_punctuation": return re.sub(r"[^\w\s]", "", text) elif method == "whitespace_normalize": return " ".join(text.split()) return text def get_normalization_methods(): """Return available normalization methods""" return [ ("none", "No normalization"), ("lowercase", "Lowercase"), ("nfc", "Unicode NFC (Canonical)"), ("nfd", "Unicode NFD (Decomposed)"), ("nfk", ""), ("nfkc", "Unicode NFKC (Compatible)"), ("nfkd", "Unicode NFKD (Compatible Decomposed)"), ("strip_accents", "Remove Accents"), ("strip_punctuation", "Remove Punctuation"), ("whitespace_normalize", "Normalize Whitespace"), ] def clean_token_display(token_text, tokenizer=None): """Clean up token display to avoid ? characters""" if token_text == "\n" or token_text == " ": return "" # Handle common prefixes if token_text.startswith("Ġ"): # GPT-2 style return " " + token_text[1:] elif token_text.startswith("▁"): # SentencePiece style return " " + token_text[1:] # Handle byte-level representations if token_text.startswith("<0x") and token_text.endswith(">"): try: # Convert hex byte to character hex_val = token_text[3:-1] byte_val = int(hex_val, 16) return chr(byte_val) if 32 <= byte_val <= 126 else f"[{hex_val}]" except: return token_text # Handle other special cases if "�" in token_text: # Unicode replacement character return token_text.replace("�", "?") return token_text