Spaces:
Sleeping
Sleeping
| # Adapted from https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/evaluate_from_local.py | |
| import csv | |
| import json | |
| import argparse | |
| import os | |
| import torch | |
| import spaces | |
| import random | |
| import transformers | |
| import time | |
| import re | |
| from vllm import LLM, SamplingParams | |
| from tqdm import tqdm | |
| import logging | |
| import sys | |
| from datasets import load_dataset | |
| import pandas as pd | |
| import numpy as mnp | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Can be found at https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/cot_prompt_lib/initial_prompt.txt | |
| initial_prompt = "The following are multiple choice questions (with answers) about {$}. Think step by step and then finish your answer with \"the answer is (X)\" where X is the correct letter choice." | |
| choices = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P"] | |
| max_model_length = 4096 | |
| max_new_tokens = 2048 | |
| def preprocess(test_df): | |
| res_df = [] | |
| for each in test_df: | |
| options = [] | |
| for opt in each["options"]: | |
| if opt == "N/A": | |
| continue | |
| options.append(opt) | |
| each["options"] = options | |
| res_df.append(each) | |
| return res_df | |
| def load_mmlu_pro(): | |
| dataset = load_dataset("TIGER-Lab/MMLU-Pro") | |
| test_df, val_df = dataset["test"], dataset["validation"] | |
| test_df = preprocess(test_df) | |
| val_df = preprocess(val_df) | |
| return test_df, val_df | |
| def load_model(model_name, gpu_utilization=0.8): | |
| llm = LLM(model=model_name, gpu_memory_utilization=float(gpu_utilization), | |
| tensor_parallel_size=torch.cuda.device_count(), | |
| max_model_len=max_model_length, | |
| trust_remote_code=True) | |
| logger.info(f"Torch Device CUDA Count: {torch.cuda.device_count()}") | |
| sampling_params = SamplingParams(temperature=0, max_tokens=max_new_tokens, | |
| stop=["Question:"]) | |
| tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| return (llm, sampling_params), tokenizer | |
| def format_cot_example(example, including_answer=True): | |
| prompt = "Question:\n" | |
| question = example["question"] | |
| options = example["options"] | |
| prompt += question + "\n" | |
| prompt += "Options:\n" | |
| for i, opt in enumerate(options): | |
| prompt += "{}. {}\n".format(choices[i], opt) | |
| if including_answer: | |
| cot_content = example["cot_content"].replace("A: Let's think step by step.", | |
| "Answer: Let's think step by step.") | |
| prompt += cot_content + "\n\n" | |
| else: | |
| prompt += "Answer: Let's think step by step." | |
| return prompt | |
| def generate_cot_prompt(val_df, curr, k): | |
| prompt = initial_prompt | |
| subject = curr["category"] | |
| # Assert that all rows in val_df have 'category' equal to subject | |
| assert (val_df["category"] == subject).all(), "Not all rows in val_df have the correct category" | |
| val_df = val_df[: k] | |
| prompt = prompt.replace("{$}", subject) + "\n" | |
| for example in val_df: | |
| prompt += format_cot_example(example, including_answer=True) | |
| prompt += format_cot_example(curr, including_answer=False) | |
| return prompt | |
| def extract_answer(text): | |
| pattern = r"answer is \(?([A-J])\)?" | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(1) | |
| else: | |
| print("1st answer extract failed\n" + text) | |
| return extract_again(text) | |
| def extract_again(text): | |
| match = re.search(r'.*[aA]nswer:\s*([A-J])', text) | |
| if match: | |
| return match.group(1) | |
| else: | |
| return extract_final(text) | |
| def extract_final(text): | |
| pattern = r"\b[A-J]\b(?!.*\b[A-J]\b)" | |
| match = re.search(pattern, text, re.DOTALL) | |
| if match: | |
| return match.group(0) | |
| else: | |
| return None | |
| def batch_inference(llm, sampling_params, inference_batch): | |
| start = time.time() | |
| outputs = llm.generate(inference_batch, sampling_params) | |
| logging.info(str(len(inference_batch)) + "size batch costing time: " + str(time.time() - start)) | |
| response_batch = [] | |
| pred_batch = [] | |
| for output in outputs: | |
| generated_text = output.outputs[0].text | |
| response_batch.append(generated_text) | |
| pred = extract_answer(generated_text) | |
| pred_batch.append(pred) | |
| logging.info("PRED BATCH: %s, RESPONSE BATCH: %s", pred_batch, response_batch) | |
| return pred_batch, response_batch | |
| def calculate_accuracy(res): | |
| """ | |
| Calculate accuracy and return an array of correctness (1 if correct, 0 if wrong) | |
| along with the overall accuracy. | |
| """ | |
| correctness = [] | |
| for each in res: | |
| if not each["pred"]: | |
| # If prediction is None, use random choice with fixed seed | |
| # This ensures reproducibility when handling missing predictions | |
| random.seed(12345) | |
| x = random.randint(0, len(each["options"]) - 1) | |
| is_correct = 1 if x == each["answer_index"] else 0 | |
| else: | |
| is_correct = 1 if each["pred"] == each["answer"] else 0 | |
| correctness.append(is_correct) | |
| # Calculate accuracy from correctness array | |
| if len(correctness) == 0: | |
| return [], 0.0 | |
| accuracy = sum(correctness) / len(correctness) | |
| return correctness, accuracy | |
| def eval_cot(subject, model, tokenizer, val_df, test_df, num_shots=5): | |
| llm, sampling_params = model | |
| global choices | |
| logging.info("evaluating " + subject) | |
| inference_batches = [] | |
| k = num_shots | |
| for i in tqdm(range(len(test_df))): | |
| curr = test_df[i] | |
| prompt_length_ok = False | |
| prompt = None | |
| while not prompt_length_ok: | |
| prompt = generate_cot_prompt(val_df, curr, k) | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| inputs = {key: value.cuda() for key, value in inputs.items()} | |
| length = len(inputs["input_ids"][0]) | |
| if length < max_model_length - max_new_tokens: | |
| prompt_length_ok = True | |
| k -= 1 | |
| inference_batches.append(prompt) | |
| pred_batch, response_batch = batch_inference(llm, sampling_params, inference_batches) | |
| results = [] | |
| for j, curr in enumerate(test_df): | |
| curr["pred"] = pred_batch[j] | |
| curr["model_outputs"] = response_batch[j] | |
| results.append(curr) | |
| # Get array of correctness and overall accuracy | |
| correctness, accuracy = calculate_accuracy(results) | |
| logging.info("This batch accuracy is: {}, correct samples: {}/{}\n".format( | |
| str(accuracy), str(sum(correctness)), str(len(correctness)))) | |
| return correctness, accuracy | |
| # Extended to 3 minutes for larger evaluations | |
| def evaluate_mmlu_pro(model_name, num_subjects=-1, num_questions=10, num_shots=5): | |
| model, tokenizer = load_model(model_name, gpu_utilization=0.8) | |
| # Ensure model is in evaluation mode | |
| model[0].model.eval() # Assuming model is a tuple of (llm, sampling_params) | |
| test_df, val_df = load_mmlu_pro() | |
| test_df = pd.DataFrame(test_df) | |
| val_df = pd.DataFrame(val_df) # Fixed: was 'val_def' | |
| test_df = test_df.sort_values(['category', 'question_id']) | |
| val_df = val_df.sort_values(['category', 'question_id']) # Fixed: was 'dev_df' | |
| # Get all unique subjects | |
| all_subjects = sorted(test_df['category'].unique()) | |
| selected_subjects = [] | |
| # Select subjects based on num_subjects parameter | |
| if num_subjects == -1 or num_subjects >= len(all_subjects): | |
| selected_subjects = all_subjects | |
| else: | |
| # Take the first num_subjects subjects | |
| selected_subjects = all_subjects[:num_subjects] | |
| logging.info("selected subjects:\n" + "\n".join(selected_subjects)) | |
| results = {} | |
| all_correctness = [] | |
| results_table = [] | |
| for subject in tqdm(selected_subjects, desc="Processing Selected Categories"): | |
| test_samples = test_df[test_df['category'] == subject].head(num_questions) | |
| val_samples = val_df[val_df['category'] == subject].head(num_shots) | |
| correctness, acc = eval_cot(subject, model, tokenizer, val_df=val_samples, test_df=test_samples, num_shots=num_shots) | |
| results[subject] = acc | |
| all_correctness.extend(correctness) | |
| results_table.append({ | |
| 'Subject': subject, | |
| 'Num_samples': len(test_samples), | |
| 'Num_correct': sum(correctness), | |
| 'Accuracy': acc | |
| }) | |
| weighted_acc = np.mean(all_correctness) | |
| min_acc_subject = min(results.items(), key=lambda x: x[1])[0] | |
| max_acc_subject = max(results.items(), key=lambda x: x[1])[0] | |
| return { | |
| "overall_accuracy": weighted_acc, | |
| "min_accuracy_subject": (min_acc_subject, results[min_acc_subject]), | |
| "max_accuracy_subject": (max_acc_subject, results[max_acc_subject]), | |
| "full_accuracy_table": results_table, | |
| } |