from typing import Callable import pytest import torch pytest.importorskip("torch.cuda") from .test_setup import DTYPES, DTYPE_TO_TOLS, PARAMS, SEED from flex_sae import ( triton_hierarchical_sae_loss, hierarchical_sae_loss, triton_topk_sae_loss, topk_sae_loss, ) @pytest.fixture(autouse=True) def _set_cuda_default_device(): torch.set_default_device("cuda") def run_funcs(B, K, F, D, dtype, *, kernel_foo: Callable, ref_foo: Callable): if dtype is torch.bfloat16 and not torch.cuda.is_bf16_supported(): pytest.skip("BF16 not supported on this GPU") torch.manual_seed(SEED) indices = torch.randint(0, F, (B, K), dtype=torch.long, device="cuda") vals = torch.randn(B, K, dtype=dtype, device="cuda").abs().requires_grad_() decoder = torch.randn(F, D, dtype=dtype, device="cuda", requires_grad=True) bias = torch.randn(D, dtype=dtype, device="cuda", requires_grad=True) target = torch.randn(B, D, dtype=dtype, device="cuda") sv_ref = vals.clone().detach().requires_grad_() dec_ref = decoder.clone().detach().requires_grad_() bias_ref = bias.clone().detach().requires_grad_() loss_f = kernel_foo(indices, decoder, vals, bias, target) loss_r = ref_foo(indices, dec_ref, sv_ref, bias_ref, target) torch.testing.assert_close(loss_f, loss_r, **DTYPE_TO_TOLS[dtype]) grad_out = torch.randn((), device="cuda", dtype=torch.float32) loss_f.backward(grad_out) loss_r.backward(grad_out.clone()) torch.testing.assert_close(vals.grad, sv_ref.grad, **DTYPE_TO_TOLS[dtype]) torch.testing.assert_close(decoder.grad, dec_ref.grad, **DTYPE_TO_TOLS[dtype]) torch.testing.assert_close(bias.grad, bias_ref.grad, **DTYPE_TO_TOLS[dtype]) assert indices.grad is None @pytest.mark.parametrize("B, K, F, D", PARAMS) @pytest.mark.parametrize("dtype", DTYPES) def test_triton_hierarchical_sae_loss_and_grads(B, K, F, D, dtype): run_funcs(B, K, F, D, dtype, kernel_foo=triton_hierarchical_sae_loss, ref_foo=hierarchical_sae_loss) torch.cuda.empty_cache() @pytest.mark.parametrize("B, K, F, D", PARAMS) @pytest.mark.parametrize("dtype", DTYPES) def test_topk_sae_loss_and_grads(B, K, F, D, dtype): run_funcs( B, K, F, D, dtype, kernel_foo=triton_topk_sae_loss, ref_foo=topk_sae_loss ) torch.cuda.empty_cache()