146 lines
6.3 KiB
Python
146 lines
6.3 KiB
Python
import numpy as np
|
|
from hypothesis import given, settings, HealthCheck, strategies as st
|
|
from returns.result import Success
|
|
|
|
from models.kohonen_network import Node, KohonenParams, train_kohonen_network, _find_best_matching_unit, _calculate_neighbourhood_radius, _calculate_learning_rate, _update_weights
|
|
from .common import generate_kohonen_weights, generate_kohonen_samples, MINIMUM_NETWORK_DIMENSION
|
|
|
|
@given(
|
|
data=st.data(),
|
|
width=st.integers(min_value=MINIMUM_NETWORK_DIMENSION, max_value=100),
|
|
height=st.integers(min_value=MINIMUM_NETWORK_DIMENSION, max_value=100),
|
|
feature_size=st.integers(min_value=1, max_value=10),
|
|
)
|
|
@settings(max_examples=5, deadline=None, suppress_health_check=(HealthCheck.too_slow,))
|
|
def test_find_best_matching_unit_success(data, width, height, feature_size):
|
|
weights = data.draw(generate_kohonen_weights(width, height, feature_size))
|
|
target_index = data.draw(st.tuples(st.integers(min_value=0, max_value=height-1),
|
|
st.integers(min_value=0, max_value=width-1)))
|
|
|
|
x = weights[target_index[0]][target_index[1]]
|
|
|
|
# Make a modification so `x` is exactly one of the weights
|
|
weights = np.array(weights)
|
|
x = weights[target_index[0], target_index[1], :].copy()
|
|
|
|
bmu = _find_best_matching_unit(weights, x, width, height)
|
|
|
|
# Check if the returned node is correct
|
|
assert bmu == Node(target_index[0], target_index[1]), f"Expected Node({target_index[0]}, {target_index[1]}), got {bmu}"
|
|
|
|
def test_neighbourhood_radius_strictly_decreasing():
|
|
params = KohonenParams(
|
|
initial_neighbourhood_radius=5.0,
|
|
initial_learning_rate=0.1,
|
|
exp_time_const=20.0, # Affects rate of radius decrease
|
|
width=10,
|
|
height=10,
|
|
num_iterations=100,
|
|
feature_size=3
|
|
)
|
|
|
|
previous_radius = _calculate_neighbourhood_radius(0, params)
|
|
strictly_decreasing = True
|
|
|
|
# Check the radius for the first 50 iterations to ensure it's strictly decreasing
|
|
for iteration in range(1, 50):
|
|
current_radius = _calculate_neighbourhood_radius(iteration, params)
|
|
if current_radius >= previous_radius:
|
|
strictly_decreasing = False
|
|
break
|
|
previous_radius = current_radius
|
|
|
|
assert strictly_decreasing, "The neighbourhood radius is not strictly decreasing over iterations."
|
|
|
|
def test_learning_rate_strictly_decreasing():
|
|
params = KohonenParams(
|
|
initial_neighbourhood_radius=5.0,
|
|
initial_learning_rate=0.1,
|
|
exp_time_const=20.0, # Affects rate of learning rate decrease
|
|
width=10,
|
|
height=10,
|
|
num_iterations=100,
|
|
feature_size=3
|
|
)
|
|
|
|
previous_rate = _calculate_learning_rate(0, params)
|
|
strictly_decreasing = True
|
|
|
|
# Check the learning rate for the first 50 iterations to ensure it's strictly decreasing
|
|
for iteration in range(1, 50):
|
|
current_rate = _calculate_learning_rate(iteration, params)
|
|
if current_rate >= previous_rate:
|
|
strictly_decreasing = False
|
|
break
|
|
previous_rate = current_rate
|
|
|
|
assert strictly_decreasing, "The learning rate is not strictly decreasing as expected."
|
|
|
|
@given(
|
|
data=st.data(),
|
|
feature_size=st.integers(min_value=1, max_value=3),
|
|
width=st.integers(min_value=MINIMUM_NETWORK_DIMENSION, max_value=50),
|
|
height=st.integers(min_value=MINIMUM_NETWORK_DIMENSION, max_value=50),
|
|
neighbourhood_radius=st.floats(min_value=0.01, max_value=1.0),
|
|
learning_rate=st.floats(min_value=0.001, max_value=0.01)
|
|
)
|
|
@settings(max_examples=5, deadline=None, suppress_health_check=(HealthCheck.too_slow,))
|
|
def test_update_weights_properties(data, feature_size, width, height, neighbourhood_radius, learning_rate):
|
|
X = data.draw(generate_kohonen_samples(feature_size))
|
|
weights = data.draw(generate_kohonen_weights(width, height, feature_size))
|
|
|
|
params = KohonenParams(
|
|
initial_neighbourhood_radius=5.0,
|
|
initial_learning_rate=0.1,
|
|
exp_time_const=20.0,
|
|
width=width,
|
|
height=height,
|
|
num_iterations=10,
|
|
feature_size=feature_size
|
|
)
|
|
|
|
x = X[0]
|
|
updated_weights = _update_weights(x, weights, neighbourhood_radius, learning_rate, params)
|
|
|
|
if np.all(x == 0) and np.all(weights == 0):
|
|
assert np.all(updated_weights == 0), "Weights should be zero when x and initial weights are zero."
|
|
else:
|
|
assert not np.array_equal(weights, updated_weights), "Weights should have changed after update"
|
|
|
|
# Ensure all weights are updated towards the input vector x
|
|
original_distances = np.linalg.norm(weights - x, axis=2)
|
|
new_distances = np.linalg.norm(updated_weights - x, axis=2)
|
|
assert np.all(new_distances <= original_distances), "All weights should move closer to input vector x"
|
|
|
|
@given(
|
|
data=st.data(),
|
|
initial_neighbourhood_radius=st.floats(min_value=1, max_value=10),
|
|
initial_learning_rate=st.floats(min_value=0.01, max_value=10),
|
|
exp_time_const=st.floats(min_value=10, max_value=100),
|
|
width=st.integers(min_value=MINIMUM_NETWORK_DIMENSION, max_value=50),
|
|
height=st.integers(min_value=MINIMUM_NETWORK_DIMENSION, max_value=50),
|
|
num_iterations=st.integers(min_value=0, max_value=100),
|
|
feature_size=st.integers(min_value=1, max_value=3)
|
|
)
|
|
@settings(max_examples=5, deadline=None, suppress_health_check=(HealthCheck.too_slow,))
|
|
def test_train_kohonen_network_valid_success(data, initial_neighbourhood_radius, initial_learning_rate, exp_time_const, width, height, num_iterations, feature_size):
|
|
X = data.draw(generate_kohonen_samples(feature_size))
|
|
|
|
params = KohonenParams(
|
|
initial_neighbourhood_radius=initial_neighbourhood_radius,
|
|
initial_learning_rate=initial_learning_rate,
|
|
exp_time_const=exp_time_const,
|
|
width=width,
|
|
height=height,
|
|
num_iterations=num_iterations,
|
|
feature_size=feature_size
|
|
)
|
|
|
|
result = train_kohonen_network(X, params, use_mlflow=False)
|
|
|
|
# Assert that the result is a Success and the data type and shape are correct
|
|
assert isinstance(result, Success), "Expected the result to be a Success instance"
|
|
weights = result.unwrap()
|
|
assert isinstance(weights, np.ndarray), "Expected the result content to be a numpy array"
|
|
assert weights.dtype == np.float32, "Expected the numpy array to be of type float32"
|
|
assert weights.shape == (height, width, feature_size), "Unexpected shape of result weights" |