Spaces:
Paused
Paused
| # Copyright 2020-2025 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import gc | |
| import tempfile | |
| import unittest | |
| import torch | |
| from parameterized import parameterized | |
| from transformers import AutoModelForCausalLM, AutoModelForSeq2SeqLM, GenerationConfig | |
| from trl import AutoModelForCausalLMWithValueHead, AutoModelForSeq2SeqLMWithValueHead, create_reference_model | |
| ALL_CAUSAL_LM_MODELS = [ | |
| "trl-internal-testing/tiny-BloomForCausalLM", | |
| "trl-internal-testing/tiny-CohereForCausalLM", | |
| "trl-internal-testing/tiny-DbrxForCausalLM", | |
| # "trl-internal-testing/tiny-FalconMambaForCausalLM", # FalconMambaForCausalLM modeling seems to be broken for now | |
| "trl-internal-testing/tiny-Gemma2ForCausalLM", | |
| "trl-internal-testing/tiny-GemmaForCausalLM", | |
| "trl-internal-testing/tiny-GPT2LMHeadModel", | |
| "trl-internal-testing/tiny-GPTNeoXForCausalLM", | |
| "trl-internal-testing/tiny-LlamaForCausalLM-3.1", | |
| "trl-internal-testing/tiny-LlamaForCausalLM-3.2", | |
| "trl-internal-testing/tiny-LlamaForCausalLM-3", | |
| "trl-internal-testing/tiny-MistralForCausalLM-0.1", | |
| "trl-internal-testing/tiny-MistralForCausalLM-0.2", | |
| "trl-internal-testing/tiny-OPTForCausalLM", | |
| "trl-internal-testing/tiny-Phi3ForCausalLM", | |
| "trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", | |
| ] | |
| ALL_SEQ2SEQ_MODELS = [ | |
| "trl-internal-testing/tiny-T5ForConditionalGeneration", | |
| "trl-internal-testing/tiny-BartModel", | |
| ] | |
| class BaseTester: | |
| class VHeadModelTester(unittest.TestCase): | |
| all_model_names = None | |
| trl_model_class = None | |
| transformers_model_class = None | |
| def test_value_head(self): | |
| r""" | |
| Test if the v-head is added to the model successfully | |
| """ | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| self.assertTrue(hasattr(model, "v_head")) | |
| def test_value_head_shape(self): | |
| r""" | |
| Test if the v-head has the correct shape | |
| """ | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| self.assertEqual(model.v_head.summary.weight.shape[0], 1) | |
| def test_value_head_init_random(self): | |
| r""" | |
| Test if the v-head has been randomly initialized. We can check that by making sure the bias is different | |
| than zeros by default. | |
| """ | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| self.assertFalse( | |
| torch.allclose(model.v_head.summary.bias, torch.zeros_like(model.v_head.summary.bias)) | |
| ) | |
| def test_value_head_not_str(self): | |
| r""" | |
| Test if the v-head is added to the model successfully, by passing a non `PretrainedModel` as an argument to | |
| `from_pretrained`. | |
| """ | |
| for model_name in self.all_model_names: | |
| pretrained_model = self.transformers_model_class.from_pretrained(model_name) | |
| model = self.trl_model_class.from_pretrained(pretrained_model) | |
| self.assertTrue(hasattr(model, "v_head")) | |
| def test_from_save_trl(self): | |
| """ | |
| Test if the model can be saved and loaded from a directory and get the same weights Including the | |
| additional modules (e.g. v_head) | |
| """ | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| model.save_pretrained(tmp_dir) | |
| model_from_save = self.trl_model_class.from_pretrained(tmp_dir) | |
| # Check if the weights are the same | |
| for key in model_from_save.state_dict(): | |
| self.assertTrue(torch.allclose(model_from_save.state_dict()[key], model.state_dict()[key])) | |
| def test_from_save_trl_sharded(self): | |
| """ | |
| Test if the model can be saved and loaded from a directory and get the same weights - sharded case | |
| """ | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| model.save_pretrained(tmp_dir) | |
| model_from_save = self.trl_model_class.from_pretrained(tmp_dir) | |
| # Check if the weights are the same | |
| for key in model_from_save.state_dict(): | |
| self.assertTrue(torch.allclose(model_from_save.state_dict()[key], model.state_dict()[key])) | |
| def test_from_save_transformers_sharded(self): | |
| """ | |
| Test if the model can be saved and loaded using transformers and get the same weights - sharded case | |
| """ | |
| for model_name in self.all_model_names: | |
| transformers_model = self.trl_model_class.transformers_parent_class.from_pretrained(model_name) | |
| trl_model = self.trl_model_class.from_pretrained(model_name) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| trl_model.save_pretrained(tmp_dir, max_shard_size="1MB") | |
| transformers_model_from_save = self.trl_model_class.transformers_parent_class.from_pretrained( | |
| tmp_dir | |
| ) | |
| # Check if the weights are the same | |
| for key in transformers_model.state_dict(): | |
| self.assertTrue( | |
| torch.allclose( | |
| transformers_model_from_save.state_dict()[key], transformers_model.state_dict()[key] | |
| ) | |
| ) | |
| def test_from_save_transformers(self): | |
| """ | |
| Test if the model can be saved and loaded using transformers and get the same weights. We override the test | |
| of the super class to check if the weights are the same. | |
| """ | |
| for model_name in self.all_model_names: | |
| transformers_model = self.trl_model_class.transformers_parent_class.from_pretrained(model_name) | |
| trl_model = self.trl_model_class.from_pretrained(model_name) | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| trl_model.save_pretrained(tmp_dir) | |
| transformers_model_from_save = self.trl_model_class.transformers_parent_class.from_pretrained( | |
| tmp_dir | |
| ) | |
| # Check if the weights are the same | |
| for key in transformers_model.state_dict(): | |
| self.assertTrue( | |
| torch.allclose( | |
| transformers_model_from_save.state_dict()[key], transformers_model.state_dict()[key] | |
| ) | |
| ) | |
| # Check if the trl model has the same keys as the transformers model | |
| # except the v_head | |
| for key in trl_model.state_dict(): | |
| if "v_head" not in key: | |
| self.assertIn(key, transformers_model.state_dict()) | |
| # check if the weights are the same | |
| self.assertTrue( | |
| torch.allclose(trl_model.state_dict()[key], transformers_model.state_dict()[key]) | |
| ) | |
| # check if they have the same modules | |
| self.assertEqual( | |
| set(transformers_model_from_save.state_dict().keys()), | |
| set(transformers_model.state_dict().keys()), | |
| ) | |
| class CausalLMValueHeadModelTester(BaseTester.VHeadModelTester, unittest.TestCase): | |
| """ | |
| Testing suite for v-head models. | |
| """ | |
| all_model_names = ALL_CAUSAL_LM_MODELS | |
| trl_model_class = AutoModelForCausalLMWithValueHead | |
| transformers_model_class = AutoModelForCausalLM | |
| def tearDown(self): | |
| # free memory | |
| gc.collect() | |
| def test_inference(self): | |
| r""" | |
| Test if the model can be used for inference and outputs 3 values | |
| - logits, loss, and value states | |
| """ | |
| EXPECTED_OUTPUT_SIZE = 3 | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| input_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]) | |
| outputs = model(input_ids) | |
| # Check if the outputs are of the right size - here | |
| # we always output 3 values - logits, loss, and value states | |
| self.assertEqual(len(outputs), EXPECTED_OUTPUT_SIZE) | |
| def test_dropout_config(self): | |
| r""" | |
| Test if we instantiate a model by adding `summary_drop_prob` to the config it will be added to the v_head | |
| """ | |
| for model_name in self.all_model_names: | |
| pretrained_model = self.transformers_model_class.from_pretrained(model_name) | |
| pretrained_model.config.summary_dropout_prob = 0.5 | |
| model = self.trl_model_class.from_pretrained(pretrained_model) | |
| # Check if v head of the model has the same dropout as the config | |
| self.assertEqual(model.v_head.dropout.p, pretrained_model.config.summary_dropout_prob) | |
| def test_dropout_kwargs(self): | |
| r""" | |
| Test if we instantiate a model by adding `summary_drop_prob` to the config it will be added to the v_head | |
| """ | |
| for model_name in self.all_model_names: | |
| v_head_kwargs = {"summary_dropout_prob": 0.5} | |
| model = self.trl_model_class.from_pretrained(model_name, **v_head_kwargs) | |
| # Check if v head of the model has the same dropout as the config | |
| self.assertEqual(model.v_head.dropout.p, 0.5) | |
| model = self.trl_model_class.from_pretrained(model_name, summary_dropout_prob=0.5) | |
| # Check if v head of the model has the same dropout as the config | |
| self.assertEqual(model.v_head.dropout.p, 0.5) | |
| def test_generate(self, model_name): | |
| r""" | |
| Test if `generate` works for every model | |
| """ | |
| generation_config = GenerationConfig(max_new_tokens=9) | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| input_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]) | |
| # Just check if the generation works | |
| _ = model.generate(input_ids, generation_config=generation_config) | |
| def test_transformers_bf16_kwargs(self): | |
| r""" | |
| Test if the transformers kwargs are correctly passed Here we check that loading a model in half precision works | |
| as expected, i.e. the weights of the `pretrained_model` attribute is loaded in half precision and you can run a | |
| dummy forward pass without any issue. | |
| """ | |
| for model_name in self.all_model_names: | |
| trl_model = self.trl_model_class.from_pretrained(model_name, torch_dtype=torch.bfloat16) | |
| lm_head_namings = ["lm_head", "embed_out", "output_layer"] | |
| self.assertTrue( | |
| any(hasattr(trl_model.pretrained_model, lm_head_naming) for lm_head_naming in lm_head_namings), | |
| "Can't test the model because it doesn't have any of the expected lm_head namings", | |
| ) | |
| for lm_head_naming in lm_head_namings: | |
| if hasattr(trl_model.pretrained_model, lm_head_naming): | |
| self.assertEqual(getattr(trl_model.pretrained_model, lm_head_naming).weight.dtype, torch.bfloat16) | |
| dummy_input = torch.LongTensor([[0, 1, 0, 1]]) | |
| # check dummy forward pass works in half precision | |
| _ = trl_model(dummy_input) | |
| def test_push_to_hub(self): | |
| for model_name in self.all_model_names: | |
| model = AutoModelForCausalLMWithValueHead.from_pretrained(model_name) | |
| if "sharded" in model_name: | |
| model.push_to_hub(model_name + "-ppo", use_auth_token=True, max_shard_size="1MB") | |
| else: | |
| model.push_to_hub(model_name + "-ppo", use_auth_token=True) | |
| model_from_pretrained = AutoModelForCausalLMWithValueHead.from_pretrained(model_name + "-ppo") | |
| # check all keys | |
| self.assertEqual(model.state_dict().keys(), model_from_pretrained.state_dict().keys()) | |
| for name, param in model.state_dict().items(): | |
| self.assertTrue( | |
| torch.allclose(param, model_from_pretrained.state_dict()[name]), | |
| f"Parameter {name} is not the same after push_to_hub and from_pretrained", | |
| ) | |
| class Seq2SeqValueHeadModelTester(BaseTester.VHeadModelTester, unittest.TestCase): | |
| """ | |
| Testing suite for v-head models. | |
| """ | |
| all_model_names = ALL_SEQ2SEQ_MODELS | |
| trl_model_class = AutoModelForSeq2SeqLMWithValueHead | |
| transformers_model_class = AutoModelForSeq2SeqLM | |
| def tearDown(self): | |
| # free memory | |
| gc.collect() | |
| def test_inference(self): | |
| r""" | |
| Test if the model can be used for inference and outputs 3 values | |
| - logits, loss, and value states | |
| """ | |
| EXPECTED_OUTPUT_SIZE = 3 | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| input_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]) | |
| decoder_input_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]) | |
| outputs = model(input_ids, decoder_input_ids=decoder_input_ids) | |
| # Check if the outputs are of the right size - here | |
| # we always output 3 values - logits, loss, and value states | |
| self.assertEqual(len(outputs), EXPECTED_OUTPUT_SIZE) | |
| def test_dropout_config(self): | |
| r""" | |
| Test if we instantiate a model by adding `summary_drop_prob` to the config it will be added to the v_head | |
| """ | |
| for model_name in self.all_model_names: | |
| pretrained_model = self.transformers_model_class.from_pretrained(model_name) | |
| pretrained_model.config.summary_dropout_prob = 0.5 | |
| model = self.trl_model_class.from_pretrained(pretrained_model) | |
| # Check if v head of the model has the same dropout as the config | |
| self.assertEqual(model.v_head.dropout.p, pretrained_model.config.summary_dropout_prob) | |
| def test_dropout_kwargs(self): | |
| r""" | |
| Test if we instantiate a model by adding `summary_drop_prob` to the config it will be added to the v_head | |
| """ | |
| for model_name in self.all_model_names: | |
| v_head_kwargs = {"summary_dropout_prob": 0.5} | |
| model = self.trl_model_class.from_pretrained(model_name, **v_head_kwargs) | |
| # Check if v head of the model has the same dropout as the config | |
| self.assertEqual(model.v_head.dropout.p, 0.5) | |
| model = self.trl_model_class.from_pretrained(model_name, summary_dropout_prob=0.5) | |
| # Check if v head of the model has the same dropout as the config | |
| self.assertEqual(model.v_head.dropout.p, 0.5) | |
| def test_generate(self, model_name): | |
| r""" | |
| Test if `generate` works for every model | |
| """ | |
| generation_config = GenerationConfig(max_new_tokens=9) | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| input_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]) | |
| decoder_input_ids = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]) | |
| # Just check if the generation works | |
| _ = model.generate(input_ids, decoder_input_ids=decoder_input_ids, generation_config=generation_config) | |
| def test_push_to_hub(self): | |
| for model_name in self.all_model_names: | |
| model = self.trl_model_class.from_pretrained(model_name) | |
| if "sharded" in model_name: | |
| model.push_to_hub(model_name + "-ppo", use_auth_token=True, max_shard_size="1MB") | |
| else: | |
| model.push_to_hub(model_name + "-ppo", use_auth_token=True) | |
| model_from_pretrained = self.trl_model_class.from_pretrained(model_name + "-ppo") | |
| # check all keys | |
| self.assertEqual(model.state_dict().keys(), model_from_pretrained.state_dict().keys()) | |
| for name, param in model.state_dict().items(): | |
| self.assertTrue( | |
| torch.allclose(param, model_from_pretrained.state_dict()[name]), | |
| f"Parameter {name} is not the same after push_to_hub and from_pretrained", | |
| ) | |
| def test_transformers_bf16_kwargs(self): | |
| r""" | |
| Test if the transformers kwargs are correctly passed Here we check that loading a model in half precision works | |
| as expected, i.e. the weights of the `pretrained_model` attribute is loaded in half precision and you can run a | |
| dummy forward pass without any issue. | |
| """ | |
| for model_name in self.all_model_names: | |
| trl_model = self.trl_model_class.from_pretrained(model_name, torch_dtype=torch.bfloat16) | |
| lm_head_namings = self.trl_model_class.lm_head_namings | |
| self.assertTrue( | |
| any(hasattr(trl_model.pretrained_model, lm_head_naming) for lm_head_naming in lm_head_namings) | |
| ) | |
| for lm_head_naming in lm_head_namings: | |
| if hasattr(trl_model.pretrained_model, lm_head_naming): | |
| self.assertTrue(getattr(trl_model.pretrained_model, lm_head_naming).weight.dtype == torch.bfloat16) | |
| dummy_input = torch.LongTensor([[0, 1, 0, 1]]) | |
| # check dummy forward pass works in half precision | |
| _ = trl_model(input_ids=dummy_input, decoder_input_ids=dummy_input) | |
| class ReferenceModelTest(unittest.TestCase): | |
| def setUp(self): | |
| self.model = AutoModelForCausalLMWithValueHead.from_pretrained("trl-internal-testing/tiny-GPT2LMHeadModel") | |
| self.test_input = torch.tensor([[0, 1, 2, 3]]) | |
| self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=1) | |
| self.layer_format = "pretrained_model.transformer.h.{layer}.attn.c_attn.weight" | |
| def test_independent_reference(self): | |
| layer_0 = self.layer_format.format(layer=0) | |
| layer_1 = self.layer_format.format(layer=1) | |
| ref_model = create_reference_model(self.model) | |
| first_layer_before = self.model.get_parameter(layer_0).data.clone() | |
| last_layer_before = self.model.get_parameter(layer_1).data.clone() # the model only has 2 layers | |
| first_ref_layer_before = ref_model.get_parameter(layer_0).data.clone() | |
| last_ref_layer_before = ref_model.get_parameter(layer_1).data.clone() | |
| output = self.model(input_ids=self.test_input, labels=self.test_input) | |
| output[1].backward() | |
| self.optimizer.step() | |
| first_layer_after = self.model.get_parameter(layer_0).data.clone() | |
| last_layer_after = self.model.get_parameter(layer_1).data.clone() | |
| first_ref_layer_after = ref_model.get_parameter(layer_0).data.clone() | |
| last_ref_layer_after = ref_model.get_parameter(layer_1).data.clone() | |
| # before optimization ref and model are identical | |
| self.assertTrue((first_layer_before == first_ref_layer_before).all()) | |
| self.assertTrue((last_layer_before == last_ref_layer_before).all()) | |
| # ref model stays identical after optimization | |
| self.assertTrue((first_ref_layer_before == first_ref_layer_after).all()) | |
| self.assertTrue((last_ref_layer_before == last_ref_layer_after).all()) | |
| # optimized model changes | |
| self.assertFalse((first_layer_before == first_layer_after).all()) | |
| self.assertFalse((last_layer_before == last_layer_after).all()) | |
| def test_shared_layers(self): | |
| layer_0 = self.layer_format.format(layer=0) | |
| layer_1 = self.layer_format.format(layer=1) | |
| ref_model = create_reference_model(self.model, num_shared_layers=1) | |
| first_layer_before = self.model.get_parameter(layer_0).data.clone() | |
| second_layer_before = self.model.get_parameter(layer_1).data.clone() | |
| first_ref_layer_before = ref_model.get_parameter(layer_0).data.clone() | |
| second_ref_layer_before = ref_model.get_parameter(layer_1).data.clone() | |
| output = self.model(input_ids=self.test_input, labels=self.test_input) | |
| output[1].backward() | |
| self.optimizer.step() | |
| first_layer_after = self.model.get_parameter(layer_0).data.clone() | |
| second_layer_after = self.model.get_parameter(layer_1).data.clone() | |
| first_ref_layer_after = ref_model.get_parameter(layer_0).data.clone() | |
| second_ref_layer_after = ref_model.get_parameter(layer_1).data.clone() | |
| # before optimization ref and model are identical | |
| self.assertTrue((first_layer_before == first_ref_layer_before).all()) | |
| self.assertTrue((second_layer_before == second_ref_layer_before).all()) | |
| # ref model stays identical after optimization | |
| self.assertTrue((first_ref_layer_before == first_ref_layer_after).all()) | |
| self.assertTrue((second_ref_layer_before == second_ref_layer_after).all()) | |
| # first layer of optimized model stays the same | |
| self.assertTrue((first_layer_before == first_layer_after).all()) | |
| # other layers in optimized model change | |
| self.assertFalse((second_layer_before == second_layer_after).all()) | |