Spaces:
Sleeping
Sleeping
| """ | |
| Agent Tuning Module for Agent Tuning Optimization Framework | |
| This module provides functionality for efficiently tuning large language models | |
| into specialized agents using a combination of positive examples, negative examples, | |
| and synthetically generated interaction trajectories. | |
| """ | |
| import os | |
| import torch | |
| import numpy as np | |
| from typing import List, Dict, Any, Union, Optional, Tuple | |
| from tqdm import tqdm | |
| from transformers import ( | |
| Trainer, TrainingArguments, | |
| DataCollatorForLanguageModeling, | |
| AutoModelForCausalLM, AutoTokenizer | |
| ) | |
| from datasets import Dataset | |
| from data.trajectory_data import Trajectory, TrajectoryDataset | |
| from models.llm_interface import LLMInterface | |
| class AgentTuner: | |
| """Base class for agent tuning methods.""" | |
| def __init__(self, name: str): | |
| """ | |
| Initialize the agent tuner. | |
| Args: | |
| name: Name of the tuning method | |
| """ | |
| self.name = name | |
| def tune( | |
| self, | |
| model_name: str, | |
| trajectories: List[Trajectory], | |
| **kwargs | |
| ) -> Tuple[Any, Dict[str, Any]]: | |
| """ | |
| Tune a model into a specialized agent. | |
| Args: | |
| model_name: Name of the base model | |
| trajectories: List of training trajectories | |
| **kwargs: Additional tuning parameters | |
| Returns: | |
| Tuple of (tuned_model, training_metrics) | |
| """ | |
| raise NotImplementedError("Subclasses must implement this method") | |
| def save_model(self, model: Any, path: str) -> None: | |
| """ | |
| Save the tuned model. | |
| Args: | |
| model: Tuned model | |
| path: Path to save the model | |
| """ | |
| raise NotImplementedError("Subclasses must implement this method") | |
| def load_model(self, path: str) -> Any: | |
| """ | |
| Load a tuned model. | |
| Args: | |
| path: Path to the model | |
| Returns: | |
| Loaded model | |
| """ | |
| raise NotImplementedError("Subclasses must implement this method") | |
| class SupervisedFineTuner(AgentTuner): | |
| """Tune agents using supervised fine-tuning.""" | |
| def __init__(self): | |
| """Initialize the supervised fine-tuner.""" | |
| super().__init__("supervised_fine_tuning") | |
| def tune( | |
| self, | |
| model_name: str, | |
| trajectories: List[Trajectory], | |
| output_dir: str = "./tuned_model", | |
| num_train_epochs: int = 3, | |
| learning_rate: float = 5e-5, | |
| batch_size: int = 4, | |
| gradient_accumulation_steps: int = 4, | |
| max_seq_length: int = 512, | |
| format_type: str = "interleaved", | |
| positive_weight: float = 0.8, | |
| device: str = "cuda" if torch.cuda.is_available() else "cpu", | |
| **kwargs | |
| ) -> Tuple[Any, Dict[str, Any]]: | |
| """ | |
| Tune a model using supervised fine-tuning. | |
| Args: | |
| model_name: Name of the base model | |
| trajectories: List of training trajectories | |
| output_dir: Directory to save the model | |
| num_train_epochs: Number of training epochs | |
| learning_rate: Learning rate | |
| batch_size: Batch size | |
| gradient_accumulation_steps: Gradient accumulation steps | |
| max_seq_length: Maximum sequence length | |
| format_type: Format type for trajectories | |
| positive_weight: Weight for positive examples | |
| device: Device to use for training | |
| **kwargs: Additional tuning parameters | |
| Returns: | |
| Tuple of (tuned_model, training_metrics) | |
| """ | |
| print(f"Starting supervised fine-tuning of {model_name}") | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Load model and tokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| # Ensure the tokenizer has a pad token | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Prepare training data | |
| print("Preparing training data...") | |
| # Separate positive and negative trajectories | |
| positive_trajectories = [t for t in trajectories if t.is_positive] | |
| negative_trajectories = [t for t in trajectories if not t.is_positive] | |
| print(f"Found {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories") | |
| # Calculate sample counts based on positive weight | |
| total_samples = len(trajectories) | |
| target_positive = int(total_samples * positive_weight) | |
| target_negative = total_samples - target_positive | |
| # Sample trajectories to achieve desired ratio | |
| if len(positive_trajectories) > target_positive: | |
| positive_trajectories = np.random.choice(positive_trajectories, target_positive, replace=False).tolist() | |
| if len(negative_trajectories) > target_negative: | |
| negative_trajectories = np.random.choice(negative_trajectories, target_negative, replace=False).tolist() | |
| # Combine trajectories | |
| sampled_trajectories = positive_trajectories + negative_trajectories | |
| np.random.shuffle(sampled_trajectories) | |
| print(f"Using {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories for training") | |
| # Format trajectories for training | |
| training_texts = [] | |
| for trajectory in tqdm(sampled_trajectories, desc="Formatting trajectories"): | |
| formatted = trajectory.to_training_format(format_type) | |
| training_texts.append(formatted) | |
| # Tokenize training data | |
| def tokenize_function(examples): | |
| return tokenizer( | |
| examples["text"], | |
| padding="max_length", | |
| truncation=True, | |
| max_length=max_seq_length | |
| ) | |
| # Create dataset | |
| dataset = Dataset.from_dict({"text": training_texts}) | |
| tokenized_dataset = dataset.map( | |
| tokenize_function, | |
| batched=True, | |
| remove_columns=["text"] | |
| ) | |
| # Set up training arguments | |
| training_args = TrainingArguments( | |
| output_dir=output_dir, | |
| num_train_epochs=num_train_epochs, | |
| per_device_train_batch_size=batch_size, | |
| gradient_accumulation_steps=gradient_accumulation_steps, | |
| learning_rate=learning_rate, | |
| weight_decay=0.01, | |
| save_strategy="epoch", | |
| save_total_limit=2, | |
| logging_dir=f"{output_dir}/logs", | |
| logging_steps=10, | |
| report_to="none" | |
| ) | |
| # Create data collator | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=tokenizer, | |
| mlm=False | |
| ) | |
| # Create trainer | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_dataset, | |
| data_collator=data_collator | |
| ) | |
| # Train the model | |
| print("Starting training...") | |
| train_result = trainer.train() | |
| # Save the model | |
| print(f"Saving model to {output_dir}") | |
| trainer.save_model(output_dir) | |
| tokenizer.save_pretrained(output_dir) | |
| # Return the model and metrics | |
| metrics = { | |
| "train_loss": train_result.training_loss, | |
| "train_runtime": train_result.metrics["train_runtime"], | |
| "samples_per_second": train_result.metrics["train_samples_per_second"], | |
| "num_train_samples": len(tokenized_dataset) | |
| } | |
| return model, metrics | |
| def save_model(self, model: Any, path: str) -> None: | |
| """ | |
| Save the tuned model. | |
| Args: | |
| model: Tuned model | |
| path: Path to save the model | |
| """ | |
| model.save_pretrained(path) | |
| def load_model(self, path: str) -> Any: | |
| """ | |
| Load a tuned model. | |
| Args: | |
| path: Path to the model | |
| Returns: | |
| Loaded model | |
| """ | |
| return AutoModelForCausalLM.from_pretrained(path) | |
| class ParameterEfficientFineTuner(AgentTuner): | |
| """Tune agents using parameter-efficient fine-tuning methods.""" | |
| def __init__(self): | |
| """Initialize the parameter-efficient fine-tuner.""" | |
| super().__init__("parameter_efficient_fine_tuning") | |
| def tune( | |
| self, | |
| model_name: str, | |
| trajectories: List[Trajectory], | |
| output_dir: str = "./tuned_model", | |
| method: str = "lora", # 'lora', 'prefix', 'prompt_tuning' | |
| num_train_epochs: int = 3, | |
| learning_rate: float = 1e-4, | |
| batch_size: int = 4, | |
| gradient_accumulation_steps: int = 4, | |
| max_seq_length: int = 512, | |
| format_type: str = "interleaved", | |
| positive_weight: float = 0.8, | |
| device: str = "cuda" if torch.cuda.is_available() else "cpu", | |
| **kwargs | |
| ) -> Tuple[Any, Dict[str, Any]]: | |
| """ | |
| Tune a model using parameter-efficient methods. | |
| Args: | |
| model_name: Name of the base model | |
| trajectories: List of training trajectories | |
| output_dir: Directory to save the model | |
| method: PEFT method to use | |
| num_train_epochs: Number of training epochs | |
| learning_rate: Learning rate | |
| batch_size: Batch size | |
| gradient_accumulation_steps: Gradient accumulation steps | |
| max_seq_length: Maximum sequence length | |
| format_type: Format type for trajectories | |
| positive_weight: Weight for positive examples | |
| device: Device to use for training | |
| **kwargs: Additional tuning parameters | |
| Returns: | |
| Tuple of (tuned_model, training_metrics) | |
| """ | |
| try: | |
| from peft import ( | |
| get_peft_model, LoraConfig, PrefixTuningConfig, | |
| PromptTuningConfig, TaskType, PeftModel | |
| ) | |
| except ImportError: | |
| raise ImportError("PEFT library is required for parameter-efficient fine-tuning. Install it with 'pip install peft'.") | |
| print(f"Starting parameter-efficient fine-tuning of {model_name} using {method}") | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Load model and tokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| # Ensure the tokenizer has a pad token | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Configure PEFT method | |
| if method == "lora": | |
| peft_config = LoraConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| r=16, | |
| lora_alpha=32, | |
| lora_dropout=0.1, | |
| target_modules=["q_proj", "v_proj"] | |
| ) | |
| elif method == "prefix": | |
| peft_config = PrefixTuningConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| num_virtual_tokens=20, | |
| prefix_projection=True | |
| ) | |
| elif method == "prompt_tuning": | |
| peft_config = PromptTuningConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| num_virtual_tokens=20, | |
| tokenizer_name_or_path=model_name | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported PEFT method: {method}") | |
| # Create PEFT model | |
| model = get_peft_model(model, peft_config) | |
| model.print_trainable_parameters() | |
| # Prepare training data (same as SupervisedFineTuner) | |
| print("Preparing training data...") | |
| # Separate positive and negative trajectories | |
| positive_trajectories = [t for t in trajectories if t.is_positive] | |
| negative_trajectories = [t for t in trajectories if not t.is_positive] | |
| print(f"Found {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories") | |
| # Calculate sample counts based on positive weight | |
| total_samples = len(trajectories) | |
| target_positive = int(total_samples * positive_weight) | |
| target_negative = total_samples - target_positive | |
| # Sample trajectories to achieve desired ratio | |
| if len(positive_trajectories) > target_positive: | |
| positive_trajectories = np.random.choice(positive_trajectories, target_positive, replace=False).tolist() | |
| if len(negative_trajectories) > target_negative: | |
| negative_trajectories = np.random.choice(negative_trajectories, target_negative, replace=False).tolist() | |
| # Combine trajectories | |
| sampled_trajectories = positive_trajectories + negative_trajectories | |
| np.random.shuffle(sampled_trajectories) | |
| print(f"Using {len(positive_trajectories)} positive and {len(negative_trajectories)} negative trajectories for training") | |
| # Format trajectories for training | |
| training_texts = [] | |
| for trajectory in tqdm(sampled_trajectories, desc="Formatting trajectories"): | |
| formatted = trajectory.to_training_format(format_type) | |
| training_texts.append(formatted) | |
| # Tokenize training data | |
| def tokenize_function(examples): | |
| return tokenizer( | |
| examples["text"], | |
| padding="max_length", | |
| truncation=True, | |
| max_length=max_seq_length | |
| ) | |
| # Create dataset | |
| dataset = Dataset.from_dict({"text": training_texts}) | |
| tokenized_dataset = dataset.map( | |
| tokenize_function, | |
| batched=True, | |
| remove_columns=["text"] | |
| ) | |
| # Set up training arguments | |
| training_args = TrainingArguments( | |
| output_dir=output_dir, | |
| num_train_epochs=num_train_epochs, | |
| per_device_train_batch_size=batch_size, | |
| gradient_accumulation_steps=gradient_accumulation_steps, | |
| learning_rate=learning_rate, | |
| weight_decay=0.01, | |
| save_strategy="epoch", | |
| save_total_limit=2, | |
| logging_dir=f"{output_dir}/logs", | |
| logging_steps=10, | |
| report_to="none" | |
| ) | |
| # Create data collator | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=tokenizer, | |
| mlm=False | |
| ) | |
| # Create trainer | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_dataset, | |
| data_collator=data_collator | |
| ) | |
| # Train the model | |
| print("Starting training...") | |
| train_result = trainer.train() | |
| # Save the model | |
| print(f"Saving model to {output_dir}") | |
| trainer.save_model(output_dir) | |
| tokenizer.save_pretrained(output_dir) | |
| # Return the model and metrics | |
| metrics = { | |
| "train_loss": train_result.training_loss, | |
| "train_runtime": train_result.metrics["train_runtime"], | |
| "samples_per_second": train_result.metrics["train_samples_per_second"], | |
| "num_train_samples": len(tokenized_dataset), | |
| "peft_method": method | |
| } | |
| return model, metrics | |
| def save_model(self, model: Any, path: str) -> None: | |
| """ | |
| Save the tuned model. | |
| Args: | |
| model: Tuned model | |
| path: Path to save the model | |
| """ | |
| model.save_pretrained(path) | |
| (Content truncated due to size limit. Use line ranges to read in chunks) |