|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import pytest |
|
|
import torch |
|
|
from datasets import load_dataset |
|
|
from torch.testing import assert_close |
|
|
|
|
|
from peft.utils.incremental_pca import IncrementalPCA |
|
|
|
|
|
|
|
|
torch.manual_seed(1999) |
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module") |
|
|
def iris(): |
|
|
return load_dataset("scikit-learn/iris", split="train") |
|
|
|
|
|
|
|
|
def test_incremental_pca(iris): |
|
|
|
|
|
n_components = 2 |
|
|
X = torch.tensor([iris["SepalLengthCm"], iris["SepalWidthCm"], iris["PetalLengthCm"], iris["PetalWidthCm"]]).T |
|
|
batch_size = X.shape[0] // 3 |
|
|
ipca = IncrementalPCA(n_components=n_components, batch_size=batch_size) |
|
|
ipca.fit(X) |
|
|
X_transformed = ipca.transform(X) |
|
|
|
|
|
|
|
|
U, S, Vh = torch.linalg.svd(X - torch.mean(X, dim=0)) |
|
|
max_abs_rows = torch.argmax(torch.abs(Vh), dim=1) |
|
|
signs = torch.sign(Vh[range(Vh.shape[0]), max_abs_rows]) |
|
|
Vh *= signs.view(-1, 1) |
|
|
explained_variance = S**2 / (X.size(0) - 1) |
|
|
explained_variance_ratio = explained_variance / explained_variance.sum() |
|
|
|
|
|
assert X_transformed.shape == (X.shape[0], 2) |
|
|
assert_close( |
|
|
ipca.explained_variance_ratio_.sum().item(), |
|
|
explained_variance_ratio[:n_components].sum().item(), |
|
|
rtol=1e-3, |
|
|
atol=1e-3, |
|
|
) |
|
|
|
|
|
|
|
|
def test_incremental_pca_check_projection(): |
|
|
|
|
|
n, p = 100, 3 |
|
|
X = torch.randn(n, p, dtype=torch.float64) * 0.1 |
|
|
X[:10] += torch.tensor([3, 4, 5]) |
|
|
Xt = 0.1 * torch.randn(1, p, dtype=torch.float64) + torch.tensor([3, 4, 5]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Yt = IncrementalPCA(n_components=2).fit(X).transform(Xt) |
|
|
|
|
|
|
|
|
Yt /= torch.sqrt((Yt**2).sum()) |
|
|
|
|
|
|
|
|
|
|
|
assert_close(torch.abs(Yt[0][0]).item(), 1.0, atol=1e-1, rtol=1e-1) |
|
|
|
|
|
|
|
|
def test_incremental_pca_validation(): |
|
|
|
|
|
X = torch.tensor([[0, 1, 0], [1, 0, 0]]) |
|
|
n_samples, n_features = X.shape |
|
|
n_components = 4 |
|
|
with pytest.raises( |
|
|
ValueError, |
|
|
match=( |
|
|
f"n_components={n_components} invalid" |
|
|
f" for n_features={n_features}, need more rows than" |
|
|
" columns for IncrementalPCA" |
|
|
" processing" |
|
|
), |
|
|
): |
|
|
IncrementalPCA(n_components, batch_size=10).fit(X) |
|
|
|
|
|
|
|
|
n_components = 3 |
|
|
with pytest.raises( |
|
|
ValueError, |
|
|
match=(f"n_components={n_components} must be less or equal to the batch number of samples {n_samples}"), |
|
|
): |
|
|
IncrementalPCA(n_components=n_components).partial_fit(X) |
|
|
|
|
|
|
|
|
def test_n_components_none(): |
|
|
|
|
|
for n_samples, n_features in [(50, 10), (10, 50)]: |
|
|
X = torch.rand(n_samples, n_features) |
|
|
ipca = IncrementalPCA(n_components=None) |
|
|
|
|
|
|
|
|
|
|
|
ipca.partial_fit(X) |
|
|
assert ipca.n_components == min(X.shape) |
|
|
|
|
|
|
|
|
def test_incremental_pca_num_features_change(): |
|
|
|
|
|
n_samples = 100 |
|
|
X = torch.randn(n_samples, 20) |
|
|
X2 = torch.randn(n_samples, 50) |
|
|
ipca = IncrementalPCA(n_components=None) |
|
|
ipca.fit(X) |
|
|
with pytest.raises(ValueError): |
|
|
ipca.partial_fit(X2) |
|
|
|
|
|
|
|
|
def test_incremental_pca_batch_signs(): |
|
|
|
|
|
n_samples = 100 |
|
|
n_features = 3 |
|
|
X = torch.randn(n_samples, n_features) |
|
|
all_components = [] |
|
|
batch_sizes = torch.arange(10, 20) |
|
|
for batch_size in batch_sizes: |
|
|
ipca = IncrementalPCA(n_components=None, batch_size=batch_size).fit(X) |
|
|
all_components.append(ipca.components_) |
|
|
|
|
|
for i, j in zip(all_components[:-1], all_components[1:]): |
|
|
assert_close(torch.sign(i), torch.sign(j), rtol=1e-6, atol=1e-6) |
|
|
|
|
|
|
|
|
def test_incremental_pca_batch_values(): |
|
|
|
|
|
n_samples = 100 |
|
|
n_features = 3 |
|
|
X = torch.randn(n_samples, n_features) |
|
|
all_components = [] |
|
|
batch_sizes = torch.arange(20, 40, 3) |
|
|
for batch_size in batch_sizes: |
|
|
ipca = IncrementalPCA(n_components=None, batch_size=batch_size).fit(X) |
|
|
all_components.append(ipca.components_) |
|
|
|
|
|
for i, j in zip(all_components[:-1], all_components[1:]): |
|
|
assert_close(i, j, rtol=1e-1, atol=1e-1) |
|
|
|
|
|
|
|
|
def test_incremental_pca_partial_fit(): |
|
|
|
|
|
n, p = 50, 3 |
|
|
X = torch.randn(n, p) |
|
|
X[:, 1] *= 0.00001 |
|
|
X += torch.tensor([5, 4, 3]) |
|
|
|
|
|
|
|
|
|
|
|
batch_size = 10 |
|
|
ipca = IncrementalPCA(n_components=2, batch_size=batch_size).fit(X) |
|
|
pipca = IncrementalPCA(n_components=2, batch_size=batch_size) |
|
|
|
|
|
batch_itr = torch.arange(0, n + 1, batch_size) |
|
|
for i, j in zip(batch_itr[:-1], batch_itr[1:]): |
|
|
pipca.partial_fit(X[i:j, :]) |
|
|
assert_close(ipca.components_, pipca.components_, rtol=1e-3, atol=1e-3) |
|
|
|
|
|
|
|
|
def test_incremental_pca_lowrank(iris): |
|
|
|
|
|
n_components = 2 |
|
|
X = torch.tensor([iris["SepalLengthCm"], iris["SepalWidthCm"], iris["PetalLengthCm"], iris["PetalWidthCm"]]).T |
|
|
batch_size = X.shape[0] // 3 |
|
|
|
|
|
ipca = IncrementalPCA(n_components=n_components, batch_size=batch_size) |
|
|
ipca.fit(X) |
|
|
|
|
|
ipcalr = IncrementalPCA(n_components=n_components, batch_size=batch_size, lowrank=True) |
|
|
ipcalr.fit(X) |
|
|
|
|
|
assert_close(ipca.components_, ipcalr.components_, rtol=1e-7, atol=1e-7) |
|
|
|