Samplers
BaseSampler interface.
This is the base class for all samplers.
Source code in black_it/samplers/base.py
class BaseSampler(BaseSeedable, ABC):
"""BaseSampler interface.
This is the base class for all samplers.
"""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
"""
BaseSeedable.__init__(self, random_state=random_state)
self.batch_size: int = batch_size
self.max_deduplication_passes = max_deduplication_passes
@abstractmethod
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample a number of new parameters fixed by the 'batch_size' attribute.
Args:
batch_size: number of samples to collect
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the new parameters
"""
def sample(
self,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample from the search space.
Args:
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters
"""
samples = self.sample_batch(
self.batch_size,
search_space,
existing_points,
existing_losses,
)
for n in range(self.max_deduplication_passes):
duplicates = self.find_and_get_duplicates(samples, existing_points)
num_duplicates = len(duplicates)
if num_duplicates == 0:
break
new_samples = self.sample_batch(
num_duplicates,
search_space,
existing_points,
existing_losses,
)
samples[duplicates] = new_samples
if n == self.max_deduplication_passes - 1:
print(
f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
" This is probably due to a small search space.",
)
return samples
@staticmethod
def find_and_get_duplicates(
new_points: NDArray[np.float64],
existing_points: NDArray[np.float64],
) -> list:
"""Find the points in 'new_points' that are already present in 'existing_points'.
Args:
new_points: candidates points for the sampler
existing_points: previously sampled points
Returns:
the location of the duplicates in 'new_points'
"""
all_points = np.concatenate((existing_points, new_points))
unq, count = np.unique(all_points, axis=0, return_counts=True)
repeated_groups = unq[count > 1]
repeated_pos = []
if len(repeated_groups) > 0:
for repeated_group in repeated_groups:
repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
for index in repeated_idx:
repeated_pos.append(index[0]) # noqa: PERF401
return repeated_pos
__init__(self, batch_size, random_state=None, max_deduplication_passes=5)
special
Initialize the sampler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the internal state of the sampler, fixing this numbers the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
maximum number of duplication passes done to avoid sampling repeated parameters |
5 |
Source code in black_it/samplers/base.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
"""
BaseSeedable.__init__(self, random_state=random_state)
self.batch_size: int = batch_size
self.max_deduplication_passes = max_deduplication_passes
find_and_get_duplicates(new_points, existing_points)
staticmethod
Find the points in 'new_points' that are already present in 'existing_points'.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_points |
NDArray[np.float64] |
candidates points for the sampler |
required |
existing_points |
NDArray[np.float64] |
previously sampled points |
required |
Returns:
| Type | Description |
|---|---|
list |
the location of the duplicates in 'new_points' |
Source code in black_it/samplers/base.py
@staticmethod
def find_and_get_duplicates(
new_points: NDArray[np.float64],
existing_points: NDArray[np.float64],
) -> list:
"""Find the points in 'new_points' that are already present in 'existing_points'.
Args:
new_points: candidates points for the sampler
existing_points: previously sampled points
Returns:
the location of the duplicates in 'new_points'
"""
all_points = np.concatenate((existing_points, new_points))
unq, count = np.unique(all_points, axis=0, return_counts=True)
repeated_groups = unq[count > 1]
repeated_pos = []
if len(repeated_groups) > 0:
for repeated_group in repeated_groups:
repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
for index in repeated_idx:
repeated_pos.append(index[0]) # noqa: PERF401
return repeated_pos
sample(self, search_space, existing_points, existing_losses)
Sample from the search space.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
NDArray[np.float64] |
the parameters already sampled |
required |
existing_losses |
NDArray[np.float64] |
the loss corresponding to the sampled parameters |
required |
Returns:
| Type | Description |
|---|---|
NDArray[np.float64] |
the sampled parameters |
Source code in black_it/samplers/base.py
def sample(
self,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample from the search space.
Args:
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters
"""
samples = self.sample_batch(
self.batch_size,
search_space,
existing_points,
existing_losses,
)
for n in range(self.max_deduplication_passes):
duplicates = self.find_and_get_duplicates(samples, existing_points)
num_duplicates = len(duplicates)
if num_duplicates == 0:
break
new_samples = self.sample_batch(
num_duplicates,
search_space,
existing_points,
existing_losses,
)
samples[duplicates] = new_samples
if n == self.max_deduplication_passes - 1:
print(
f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
" This is probably due to a small search space.",
)
return samples
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample a number of new parameters fixed by the 'batch_size' attribute.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
number of samples to collect |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
NDArray[np.float64] |
the parameters already sampled |
required |
existing_losses |
NDArray[np.float64] |
the loss corresponding to the sampled parameters |
required |
Returns:
| Type | Description |
|---|---|
NDArray[np.float64] |
the new parameters |
Source code in black_it/samplers/base.py
@abstractmethod
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample a number of new parameters fixed by the 'batch_size' attribute.
Args:
batch_size: number of samples to collect
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the new parameters
"""
Random uniform sampling.
Source code in black_it/samplers/random_uniform.py
class RandomUniformSampler(BaseSampler):
"""Random uniform sampling."""
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64], # noqa: ARG002
existing_losses: NDArray[np.float64], # noqa: ARG002
) -> NDArray[np.float64]:
"""Sample uniformly from the search space.
Args:
batch_size: the number of points to sample
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
"""
candidates = np.zeros((batch_size, search_space.dims))
for i, params in enumerate(search_space.param_grid):
candidates[:, i] = self.random_generator.choice(params, size=(batch_size,))
return candidates
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample uniformly from the search space.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points to sample |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters |
required |
Returns:
| Type | Description |
|---|---|
ndarray |
the sampled parameters (an array of shape |
Source code in black_it/samplers/random_uniform.py
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64], # noqa: ARG002
existing_losses: NDArray[np.float64], # noqa: ARG002
) -> NDArray[np.float64]:
"""Sample uniformly from the search space.
Args:
batch_size: the number of points to sample
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
"""
candidates = np.zeros((batch_size, search_space.dims))
for i, params in enumerate(search_space.param_grid):
candidates[:, i] = self.random_generator.choice(params, size=(batch_size,))
return candidates
Halton low discrepancy sequence.
This snippet implements the Halton sequence following the generalization of a sequence of Van der Corput in n-dimensions.
Source code in black_it/samplers/halton.py
class HaltonSampler(BaseSampler):
"""Halton low discrepancy sequence.
This snippet implements the Halton sequence following the generalization of
a sequence of *Van der Corput* in n-dimensions.
"""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of sample deduplication passes.
"""
super().__init__(batch_size, random_state, max_deduplication_passes)
self._prime_number_generator = _CachedPrimesCalculator()
# drop first N entries to avoid linear correlation
self._reset_sequence_index()
def _set_random_state(self, random_state: int | None) -> None:
"""Set the random state (private use).
For the Halton sampler, it also resets the sequence index.
Args:
random_state: the random seed
"""
super()._set_random_state(random_state)
self._reset_sequence_index()
def _reset_sequence_index(self) -> None:
"""Reset the sequence index pointer."""
self._sequence_index = self.random_generator.integers(
_MIN_SEQUENCE_START_INDEX,
_MAX_SEQUENCE_START_INDEX,
)
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64], # noqa: ARG002
existing_losses: NDArray[np.float64], # noqa: ARG002
) -> NDArray[np.float64]:
"""Sample points using Halton sequence.
Args:
batch_size: the number of samples
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled (not used)
existing_losses: the loss corresponding to the sampled parameters (not used)
Returns:
the parameter sampled
"""
unit_cube_points: NDArray[np.float64] = self._halton(
batch_size,
search_space.dims,
)
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
return digitize_data(sampled_points, search_space.param_grid)
def _halton(self, nb_samples: int, dims: int) -> NDArray[np.float64]:
"""Get a Halton sequence.
It uses a simple prime number generator, which takes the first `dims` primes.
Args:
nb_samples: number of samples
dims: the number of dimensions of the space to sample from the unitary cube
Returns:
sequence of Halton.
"""
bases: NDArray[np.int64] = self._prime_number_generator.get_n_primes(dims)
# Generate a sample using a Halton sequence.
sample: NDArray[np.float64] = halton(
sample_size=nb_samples,
bases=bases,
n_start=self._sequence_index,
)
# increment sequence start index for the next sampling
self._sequence_index += nb_samples
return sample
__init__(self, batch_size, random_state=None, max_deduplication_passes=5)
special
Initialize the sampler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of sample deduplication passes. |
5 |
Source code in black_it/samplers/halton.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of sample deduplication passes.
"""
super().__init__(batch_size, random_state, max_deduplication_passes)
self._prime_number_generator = _CachedPrimesCalculator()
# drop first N entries to avoid linear correlation
self._reset_sequence_index()
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample points using Halton sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of samples |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
NDArray[np.float64] |
the parameters already sampled (not used) |
required |
existing_losses |
NDArray[np.float64] |
the loss corresponding to the sampled parameters (not used) |
required |
Returns:
| Type | Description |
|---|---|
NDArray[np.float64] |
the parameter sampled |
Source code in black_it/samplers/halton.py
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64], # noqa: ARG002
existing_losses: NDArray[np.float64], # noqa: ARG002
) -> NDArray[np.float64]:
"""Sample points using Halton sequence.
Args:
batch_size: the number of samples
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled (not used)
existing_losses: the loss corresponding to the sampled parameters (not used)
Returns:
the parameter sampled
"""
unit_cube_points: NDArray[np.float64] = self._halton(
batch_size,
search_space.dims,
)
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
return digitize_data(sampled_points, search_space.param_grid)
The R-sequence sampler.
Source code in black_it/samplers/r_sequence.py
class RSequenceSampler(BaseSampler):
"""The R-sequence sampler."""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
after every batch sampling. Default: 0, i.e. no deduplication happens.
"""
super().__init__(batch_size, random_state, max_deduplication_passes)
self._sequence_index: int
self._sequence_start: float
self._reset()
@classmethod
def compute_phi(cls, nb_dims: int) -> float:
"""Get an approximation of phi^nb_dims.
Args:
nb_dims: the number of dimensions.
Returns:
phi^nb_dims
"""
check_arg(nb_dims >= 1, f"nb_dims should be greater than 0, got {nb_dims}")
phi: float = 2.0
old_phi = None
while old_phi != phi:
old_phi = phi
phi = pow(1 + phi, 1.0 / (nb_dims + 1))
return phi
def _set_random_state(self, random_state: int | None) -> None:
"""Set the random state (private use).
For the RSequence sampler, it also resets the sequence index and the sequence start.
Args:
random_state: the random seed
"""
super()._set_random_state(random_state)
self._reset()
def _reset(self) -> None:
"""Reset the index of the sequence."""
self._sequence_index = self.random_generator.integers(
_MIN_SEQUENCE_START_INDEX,
_MAX_SEQUENCE_START_INDEX,
)
self._sequence_start = self.random_generator.random()
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64], # noqa: ARG002
existing_losses: NDArray[np.float64], # noqa: ARG002
) -> NDArray[np.float64]:
"""Sample points using the R-sequence.
Args:
batch_size: the number of samples
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled (not used)
existing_losses: the loss corresponding to the sampled parameters (not used)
Returns:
the parameter sampled
"""
unit_cube_points: NDArray[np.float64] = self._r_sequence(
batch_size,
search_space.dims,
)
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
return digitize_data(sampled_points, search_space.param_grid)
def _r_sequence(self, nb_samples: int, dims: int) -> NDArray[np.float64]:
"""Compute the R-sequence (http://extremelearning.com.au/unreasonable-effectiveness-of-quasirandom-sequences/).
Args:
nb_samples: number of points to sample
dims: the number of dimensions
Returns:
Set of params uniformly placed in d-dimensional unit cube.
"""
phi = self.compute_phi(dims)
alpha: NDArray[np.float64] = np.power(1 / phi, np.arange(1, dims + 1)).reshape(
(1, -1),
)
end_index = self._sequence_index + nb_samples
indexes = np.arange(self._sequence_index, end_index).reshape((-1, 1))
points: NDArray[np.float64] = (self._sequence_start + indexes.dot(alpha)) % 1
self._sequence_index = end_index
return points
__init__(self, batch_size, random_state=None, max_deduplication_passes=5)
special
Initialize the sampler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
(non-negative integer) the maximum number of deduplication passes that are made after every batch sampling. Default: 0, i.e. no deduplication happens. |
5 |
Source code in black_it/samplers/r_sequence.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
after every batch sampling. Default: 0, i.e. no deduplication happens.
"""
super().__init__(batch_size, random_state, max_deduplication_passes)
self._sequence_index: int
self._sequence_start: float
self._reset()
compute_phi(nb_dims)
classmethod
Get an approximation of phi^nb_dims.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nb_dims |
int |
the number of dimensions. |
required |
Returns:
| Type | Description |
|---|---|
float |
phi^nb_dims |
Source code in black_it/samplers/r_sequence.py
@classmethod
def compute_phi(cls, nb_dims: int) -> float:
"""Get an approximation of phi^nb_dims.
Args:
nb_dims: the number of dimensions.
Returns:
phi^nb_dims
"""
check_arg(nb_dims >= 1, f"nb_dims should be greater than 0, got {nb_dims}")
phi: float = 2.0
old_phi = None
while old_phi != phi:
old_phi = phi
phi = pow(1 + phi, 1.0 / (nb_dims + 1))
return phi
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample points using the R-sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of samples |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
NDArray[np.float64] |
the parameters already sampled (not used) |
required |
existing_losses |
NDArray[np.float64] |
the loss corresponding to the sampled parameters (not used) |
required |
Returns:
| Type | Description |
|---|---|
NDArray[np.float64] |
the parameter sampled |
Source code in black_it/samplers/r_sequence.py
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64], # noqa: ARG002
existing_losses: NDArray[np.float64], # noqa: ARG002
) -> NDArray[np.float64]:
"""Sample points using the R-sequence.
Args:
batch_size: the number of samples
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled (not used)
existing_losses: the loss corresponding to the sampled parameters (not used)
Returns:
the parameter sampled
"""
unit_cube_points: NDArray[np.float64] = self._r_sequence(
batch_size,
search_space.dims,
)
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
return digitize_data(sampled_points, search_space.param_grid)
This class implements the best-batch sampler.
The sampler is a very essential type of genetic algorithm that takes the parameters corresponding to the current lowest loss values and perturbs them slightly in a purely random fashion. The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.
Source code in black_it/samplers/best_batch.py
class BestBatchSampler(BaseSampler):
"""This class implements the best-batch sampler.
The sampler is a very essential type of genetic algorithm that takes the parameters corresponding
to the current lowest loss values and perturbs them slightly in a purely random fashion.
The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution
BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects
that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.
"""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
a: float = 3.0,
b: float = 1.0,
perturbation_range: int = 6,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
a: the a parameter of the beta-binomial distribution
b: the b parameter of the beta-binomial distribution
perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
plus/minus the perturbation_range times the precision of the specific parameter coordinate
"""
_assert(
a > 0,
"'a' should be greater than zero",
)
_assert(
b > 0,
"'b' should be greater than zero",
)
_assert(
perturbation_range > 1,
"'perturbation_range' should be greater than one",
)
super().__init__(batch_size, random_state, max_deduplication_passes)
self.a = a
self.b = b
self.perturbation_range = perturbation_range
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample from the search space using a genetic algorithm.
Args:
batch_size: the number of points to sample
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
"""
if len(existing_points) < batch_size:
msg = (
"best-batch sampler requires a number of existing points "
f"which is at least the batch size {batch_size}, got {len(existing_points)}"
)
raise ValueError(
msg,
)
# sort existing params
candidate_points: NDArray[np.float64] = existing_points[np.argsort(existing_losses)][:batch_size, :]
candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
0,
batch_size,
size=batch_size,
)
sampled_points: NDArray[np.float64] = np.copy(
candidate_points[candidate_point_indexes],
)
beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
beta_binom_rv.random_state = self.random_generator
for sampled_point in sampled_points:
num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
params_shocked: NDArray[np.int64] = self.random_generator.choice(
search_space.dims,
tuple(num_shocks),
replace=False,
)
for index in params_shocked:
shock_size: int = self.random_generator.integers(
1,
self.perturbation_range,
)
shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1
delta: float = search_space.parameters_precision[index]
shift: float = delta * shock_sign * shock_size
sampled_point[index] += shift
sampled_point[index] = np.clip(
sampled_point[index],
search_space.parameters_bounds[0][index],
search_space.parameters_bounds[1][index],
)
return sampled_points
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, a=3.0, b=1.0, perturbation_range=6)
special
Initialize the sampler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes that are made |
5 |
a |
float |
the a parameter of the beta-binomial distribution |
3.0 |
b |
float |
the b parameter of the beta-binomial distribution |
1.0 |
perturbation_range |
int |
the range of the perturbation applied. The actual perturbation will be in the range plus/minus the perturbation_range times the precision of the specific parameter coordinate |
6 |
Source code in black_it/samplers/best_batch.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
a: float = 3.0,
b: float = 1.0,
perturbation_range: int = 6,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
a: the a parameter of the beta-binomial distribution
b: the b parameter of the beta-binomial distribution
perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
plus/minus the perturbation_range times the precision of the specific parameter coordinate
"""
_assert(
a > 0,
"'a' should be greater than zero",
)
_assert(
b > 0,
"'b' should be greater than zero",
)
_assert(
perturbation_range > 1,
"'perturbation_range' should be greater than one",
)
super().__init__(batch_size, random_state, max_deduplication_passes)
self.a = a
self.b = b
self.perturbation_range = perturbation_range
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample from the search space using a genetic algorithm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points to sample |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
NDArray[np.float64] |
the parameters already sampled |
required |
existing_losses |
NDArray[np.float64] |
the loss corresponding to the sampled parameters |
required |
Returns:
| Type | Description |
|---|---|
NDArray[np.float64] |
the sampled parameters (an array of shape |
Source code in black_it/samplers/best_batch.py
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample from the search space using a genetic algorithm.
Args:
batch_size: the number of points to sample
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
"""
if len(existing_points) < batch_size:
msg = (
"best-batch sampler requires a number of existing points "
f"which is at least the batch size {batch_size}, got {len(existing_points)}"
)
raise ValueError(
msg,
)
# sort existing params
candidate_points: NDArray[np.float64] = existing_points[np.argsort(existing_losses)][:batch_size, :]
candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
0,
batch_size,
size=batch_size,
)
sampled_points: NDArray[np.float64] = np.copy(
candidate_points[candidate_point_indexes],
)
beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
beta_binom_rv.random_state = self.random_generator
for sampled_point in sampled_points:
num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
params_shocked: NDArray[np.int64] = self.random_generator.choice(
search_space.dims,
tuple(num_shocks),
replace=False,
)
for index in params_shocked:
shock_size: int = self.random_generator.integers(
1,
self.perturbation_range,
)
shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1
delta: float = search_space.parameters_precision[index]
shift: float = delta * shock_sign * shock_size
sampled_point[index] += shift
sampled_point[index] = np.clip(
sampled_point[index],
search_space.parameters_bounds[0][index],
search_space.parameters_bounds[1][index],
)
return sampled_points
This class implements the Gaussian process-based sampler.
In particular, the sampling is based on a Gaussian Process interpolation of the loss function.
Note: this class is a wrapper of the GaussianProcessRegressor model of the scikit-learn package.
Source code in black_it/samplers/gaussian_process.py
class GaussianProcessSampler(MLSurrogateSampler):
"""This class implements the Gaussian process-based sampler.
In particular, the sampling is based on a Gaussian Process interpolation of the loss function.
Note: this class is a wrapper of the GaussianProcessRegressor model of the scikit-learn package.
"""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
candidate_pool_size: int | None = None,
optimize_restarts: int = 5,
acquisition: str = "expected_improvement",
jitter: float = 0.1,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
jitter: positive value to make the "expected_improvement" acquisition more explorative.
"""
self._validate_acquisition(acquisition)
super().__init__(
batch_size,
random_state,
max_deduplication_passes,
candidate_pool_size,
)
self.optimize_restarts = optimize_restarts
self.acquisition = acquisition
self.jitter = jitter
self._gpmodel: GaussianProcessRegressor | None = None
self._fmin: np.double | float | None = None
@staticmethod
def _validate_acquisition(acquisition: str) -> None:
"""Check that the required acquisition is among the supported ones.
Args:
acquisition: the acquisition provided as input of the constructor.
Raises:
ValueError: if the provided acquisition type is not among the allowed ones.
"""
try:
_AcquisitionTypes(acquisition)
except ValueError as e:
msg = (
"expected one of the following acquisition types: "
f"[{' '.join(map(str, _AcquisitionTypes))}], got {acquisition}"
)
raise ValueError(
msg,
) from e
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None: # noqa: N803
"""Fit a gaussian process surrogate model."""
y = np.atleast_2d(y).T
if X.shape[0] > _BIG_DATASET_SIZE_WARNING_THRESHOLD:
warnings.warn( # noqa: B028
"Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
RuntimeWarning,
)
# initialize GP class from scikit-learn with a Matern kernel
kernel = kernels.Matern(length_scale=1, length_scale_bounds=(1e-5, 1e5), nu=2.5)
noise_var = y.var() * 0.01
self._gpmodel = GaussianProcessRegressor(
kernel=kernel,
alpha=noise_var,
n_restarts_optimizer=self.optimize_restarts,
optimizer="fmin_l_bfgs_b",
random_state=self._get_random_seed(),
)
self._gpmodel.fit(X, y)
# store minimum
if self.acquisition == "expected_improvement":
m, _ = self._predict_mean_std(X)
self._fmin = np.min(m)
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]: # noqa: N803
"""Predict using a gaussian process surrogate model."""
# predict mean or expected improvement on the full sample set
if self.acquisition == _AcquisitionTypes.EI.value:
# minus sign needed for subsequent sorting
candidates_score = -self._predict_EI(X, self.jitter)
else: # acquisition is "mean"
candidates_score = self._predict_mean_std(X)[0]
return candidates_score
def _predict_mean_std(
self,
X: NDArray[np.float64], # noqa: N803
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Predict mean and standard deviation of a fitted GP.
Args:
X: the points on which the predictions should be performed
Returns:
The pair (mean, std).
"""
gpmodel = cast("GaussianProcessRegressor", self._gpmodel)
X = X[None, :] if X.ndim == 1 else X # noqa: N806
m, s = gpmodel.predict(X, return_std=True, return_cov=False)
s = np.clip(s, 1e-5, np.inf)
return m, s
def _predict_EI( # noqa: N802
self,
X: NDArray[np.float64], # noqa: N803
jitter: float = 0.1,
) -> NDArray[np.float64]:
"""Compute the Expected Improvement per unit of cost.
Args:
X: the points on which the predictions should be performed
jitter: positive value to make the acquisition more explorative.
Returns:
the expected improvement.
"""
m, s = self._predict_mean_std(X)
fmin = cast("float", self._fmin)
phi, Phi, u = self.get_quantiles(jitter, fmin, m, s) # noqa: N806
return s * (u * Phi + phi)
@staticmethod
def get_quantiles(
acquisition_par: float,
fmin: float,
m: NDArray[np.float64],
s: NDArray[np.float64],
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""Quantiles of the Gaussian distribution useful to determine the acquisition function values.
Args:
acquisition_par: parameter of the acquisition function
fmin: current minimum.
m: vector of means.
s: vector of standard deviations.
Returns:
the quantiles.
"""
# remove values of variance that are too small
s[s < _SMALL_VARIANCE_VALUES] = _SMALL_VARIANCE_VALUES
u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2)) # noqa: N806
return phi, Phi, u
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, optimize_restarts=5, acquisition='expected_improvement', jitter=0.1)
special
Initialize the sampler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes that are made |
5 |
candidate_pool_size |
int | None |
number of randomly sampled points on which the random forest predictions are evaluated |
None |
optimize_restarts |
int |
number of independent random trials of the optimization of the GP hyperparameters |
5 |
acquisition |
str |
type of acquisition function, it can be 'expected_improvement' of simply 'mean' |
'expected_improvement' |
jitter |
float |
positive value to make the "expected_improvement" acquisition more explorative. |
0.1 |
Source code in black_it/samplers/gaussian_process.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
candidate_pool_size: int | None = None,
optimize_restarts: int = 5,
acquisition: str = "expected_improvement",
jitter: float = 0.1,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
jitter: positive value to make the "expected_improvement" acquisition more explorative.
"""
self._validate_acquisition(acquisition)
super().__init__(
batch_size,
random_state,
max_deduplication_passes,
candidate_pool_size,
)
self.optimize_restarts = optimize_restarts
self.acquisition = acquisition
self.jitter = jitter
self._gpmodel: GaussianProcessRegressor | None = None
self._fmin: np.double | float | None = None
fit(self, X, y)
Fit a gaussian process surrogate model.
Source code in black_it/samplers/gaussian_process.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None: # noqa: N803
"""Fit a gaussian process surrogate model."""
y = np.atleast_2d(y).T
if X.shape[0] > _BIG_DATASET_SIZE_WARNING_THRESHOLD:
warnings.warn( # noqa: B028
"Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
RuntimeWarning,
)
# initialize GP class from scikit-learn with a Matern kernel
kernel = kernels.Matern(length_scale=1, length_scale_bounds=(1e-5, 1e5), nu=2.5)
noise_var = y.var() * 0.01
self._gpmodel = GaussianProcessRegressor(
kernel=kernel,
alpha=noise_var,
n_restarts_optimizer=self.optimize_restarts,
optimizer="fmin_l_bfgs_b",
random_state=self._get_random_seed(),
)
self._gpmodel.fit(X, y)
# store minimum
if self.acquisition == "expected_improvement":
m, _ = self._predict_mean_std(X)
self._fmin = np.min(m)
get_quantiles(acquisition_par, fmin, m, s)
staticmethod
Quantiles of the Gaussian distribution useful to determine the acquisition function values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
acquisition_par |
float |
parameter of the acquisition function |
required |
fmin |
float |
current minimum. |
required |
m |
NDArray[np.float64] |
vector of means. |
required |
s |
NDArray[np.float64] |
vector of standard deviations. |
required |
Returns:
| Type | Description |
|---|---|
tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]] |
the quantiles. |
Source code in black_it/samplers/gaussian_process.py
@staticmethod
def get_quantiles(
acquisition_par: float,
fmin: float,
m: NDArray[np.float64],
s: NDArray[np.float64],
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""Quantiles of the Gaussian distribution useful to determine the acquisition function values.
Args:
acquisition_par: parameter of the acquisition function
fmin: current minimum.
m: vector of means.
s: vector of standard deviations.
Returns:
the quantiles.
"""
# remove values of variance that are too small
s[s < _SMALL_VARIANCE_VALUES] = _SMALL_VARIANCE_VALUES
u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2)) # noqa: N806
return phi, Phi, u
predict(self, X)
Predict using a gaussian process surrogate model.
Source code in black_it/samplers/gaussian_process.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]: # noqa: N803
"""Predict using a gaussian process surrogate model."""
# predict mean or expected improvement on the full sample set
if self.acquisition == _AcquisitionTypes.EI.value:
# minus sign needed for subsequent sorting
candidates_score = -self._predict_EI(X, self.jitter)
else: # acquisition is "mean"
candidates_score = self._predict_mean_std(X)[0]
return candidates_score
This class implements random forest sampling.
Source code in black_it/samplers/random_forest.py
class RandomForestSampler(MLSurrogateSampler):
"""This class implements random forest sampling."""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
candidate_pool_size: int | None = None,
n_estimators: int = 500,
criterion: str = "gini",
n_classes: int = 10,
) -> None:
"""Random forest sampling.
Note: this class makes use of sklearn.ensemble.RandomForestClassifier.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
n_estimators: number of trees in the forest
criterion: the function to measure the quality of a split.
n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
of the distribution of loss values.
"""
_assert(
n_classes > _MIN_RF_CLASSES,
"'n_classes' should be at least 2 to provide meaningful results",
)
super().__init__(
batch_size,
random_state,
max_deduplication_passes,
candidate_pool_size,
)
self._n_estimators = n_estimators
self._criterion = criterion
self._n_classes = n_classes
self._classifier: RandomForestClassifier | None = None
@property
def n_estimators(self) -> int:
"""Get the number of estimators."""
return self._n_estimators
@property
def criterion(self) -> str:
"""Get the criterion."""
return self._criterion
@property
def n_classes(self) -> int:
"""Get the number of classes."""
return self._n_classes
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None: # noqa: N803
"""Fit a random forest surrogate model."""
# Train surrogate
(
X, # noqa: N806
y_cat,
_existing_points_quantiles,
) = self.prepare_data_for_classifier(X, y, self.n_classes)
self._classifier = RandomForestClassifier(
n_estimators=self.n_estimators,
criterion=self.criterion,
n_jobs=-1,
random_state=self._get_random_seed(),
)
self._classifier.fit(X, y_cat)
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]: # noqa: N803
"""Predict using a random forest surrogate model."""
# Predict quantiles
self._classifier = cast("RandomForestClassifier", self._classifier)
predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)
return predicted_points_quantiles
@staticmethod
def prepare_data_for_classifier(
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
num_bins: int,
) -> tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
"""Prepare data for the classifier.
Args:
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
num_bins: the number of bins
Returns:
A triple (x, y, quantiles), where
- x is the vector of training data
- y is the vector of targets
- the quantiles
"""
x: NDArray[np.float64] = existing_points
y: NDArray[np.float64] = existing_losses
cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)
for i in range(num_bins - 1):
quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])
quantiles[-1] = np.max(y)
y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
y_cat = y_cat - 1
return x, y_cat, quantiles
criterion: str
property
readonly
Get the criterion.
n_classes: int
property
readonly
Get the number of classes.
n_estimators: int
property
readonly
Get the number of estimators.
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, n_estimators=500, criterion='gini', n_classes=10)
special
Random forest sampling.
Note: this class makes use of sklearn.ensemble.RandomForestClassifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes |
5 |
candidate_pool_size |
int | None |
number of randomly sampled points on which the random forest predictions are evaluated |
None |
n_estimators |
int |
number of trees in the forest |
500 |
criterion |
str |
the function to measure the quality of a split. |
'gini' |
n_classes |
int |
the number of classes used in the random forest. The classes are selected as the quantiles of the distribution of loss values. |
10 |
Source code in black_it/samplers/random_forest.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
candidate_pool_size: int | None = None,
n_estimators: int = 500,
criterion: str = "gini",
n_classes: int = 10,
) -> None:
"""Random forest sampling.
Note: this class makes use of sklearn.ensemble.RandomForestClassifier.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
n_estimators: number of trees in the forest
criterion: the function to measure the quality of a split.
n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
of the distribution of loss values.
"""
_assert(
n_classes > _MIN_RF_CLASSES,
"'n_classes' should be at least 2 to provide meaningful results",
)
super().__init__(
batch_size,
random_state,
max_deduplication_passes,
candidate_pool_size,
)
self._n_estimators = n_estimators
self._criterion = criterion
self._n_classes = n_classes
self._classifier: RandomForestClassifier | None = None
fit(self, X, y)
Fit a random forest surrogate model.
Source code in black_it/samplers/random_forest.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None: # noqa: N803
"""Fit a random forest surrogate model."""
# Train surrogate
(
X, # noqa: N806
y_cat,
_existing_points_quantiles,
) = self.prepare_data_for_classifier(X, y, self.n_classes)
self._classifier = RandomForestClassifier(
n_estimators=self.n_estimators,
criterion=self.criterion,
n_jobs=-1,
random_state=self._get_random_seed(),
)
self._classifier.fit(X, y_cat)
predict(self, X)
Predict using a random forest surrogate model.
Source code in black_it/samplers/random_forest.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]: # noqa: N803
"""Predict using a random forest surrogate model."""
# Predict quantiles
self._classifier = cast("RandomForestClassifier", self._classifier)
predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)
return predicted_points_quantiles
prepare_data_for_classifier(existing_points, existing_losses, num_bins)
staticmethod
Prepare data for the classifier.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
existing_points |
NDArray[np.float64] |
the parameters already sampled |
required |
existing_losses |
NDArray[np.float64] |
the loss corresponding to the sampled parameters |
required |
num_bins |
int |
the number of bins |
required |
Returns:
| Type | Description |
|---|---|
tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]] |
A triple (x, y, quantiles), where - x is the vector of training data - y is the vector of targets - the quantiles |
Source code in black_it/samplers/random_forest.py
@staticmethod
def prepare_data_for_classifier(
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
num_bins: int,
) -> tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
"""Prepare data for the classifier.
Args:
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
num_bins: the number of bins
Returns:
A triple (x, y, quantiles), where
- x is the vector of training data
- y is the vector of targets
- the quantiles
"""
x: NDArray[np.float64] = existing_points
y: NDArray[np.float64] = existing_losses
cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)
for i in range(num_bins - 1):
quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])
quantiles[-1] = np.max(y)
y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
y_cat = y_cat - 1
return x, y_cat, quantiles
This class implements xgboost sampling.
Source code in black_it/samplers/xgboost.py
class XGBoostSampler(MLSurrogateSampler):
"""This class implements xgboost sampling."""
def __init__( # noqa: PLR0913
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
candidate_pool_size: int | None = None,
colsample_bytree: float = 0.3,
learning_rate: float = 0.1,
max_depth: int = 5,
alpha: float = 1.0,
n_estimators: int = 10,
) -> None:
"""Sampler based on a xgboost surrogate model of the loss function.
Note: this class makes use of the xgboost library, for more information on the XGBoost parameters
visit https://xgboost.readthedocs.io/en/stable/parameter.html.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
colsample_bytree: subsample ratio of columns when constructing each tree
learning_rate: the learning rate
max_depth: maximum depth of XGBoost trees
alpha: L1 regularization term on weights
n_estimators: number of estimators
References:
Lamperti, Roventini, and Sani, "Agent-based model calibration using machine learning surrogates"
"""
super().__init__(
batch_size,
random_state,
max_deduplication_passes,
candidate_pool_size,
)
self._colsample_bytree = colsample_bytree
self._learning_rate = learning_rate
self._max_depth = max_depth
self._alpha = alpha
self._n_estimators = n_estimators
self._xg_regressor: xgb.XGBRegressor | None = None
@property
def colsample_bytree(self) -> float:
"""Get the colsample_bytree parameter."""
return self._colsample_bytree
@property
def learning_rate(self) -> float:
"""Get the learning rate."""
return self._learning_rate
@property
def max_depth(self) -> int:
"""Get the maximum tree depth."""
return self._max_depth
@property
def alpha(self) -> float:
"""Get the alpha regularisation parameter."""
return self._alpha
@property
def n_estimators(self) -> int:
"""Get the number of estimators."""
return self._n_estimators
@staticmethod
def _clip_losses(y: NDArray[np.float64]) -> NDArray[np.float64]:
"""Check that loss values fall within the float32 limits needed for XGBoost to work."""
large_floats = np.where(y >= MAX_FLOAT32)[0]
small_floats = np.where(y <= MIN_FLOAT32)[0]
if len(large_floats) == 0 and len(small_floats) == 0:
return y
warnings.warn( # noqa: B028
"Found loss values out of float32 limits, clipping them for XGBoost.",
RuntimeWarning,
)
if len(large_floats) > 0:
y[large_floats] = MAX_FLOAT32 - EPS_FLOAT32
if len(small_floats) > 0:
y[small_floats] = MIN_FLOAT32 + EPS_FLOAT32
return y
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None: # noqa: N803
"""Fit a xgboost surrogate model."""
# prepare data
y = self._clip_losses(y)
_ = xgb.DMatrix(data=X, label=y)
# train surrogate
self._xg_regressor = xgb.XGBRegressor(
objective="reg:squarederror", # original: objective ='reg:linear',
colsample_bytree=self.colsample_bytree,
learning_rate=self.learning_rate,
max_depth=self.max_depth, # original: 5
alpha=self.alpha,
n_estimators=self.n_estimators,
random_state=self._get_random_seed(),
) # original: 10
self._xg_regressor.fit(X, y)
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]: # noqa: N803
"""Predict using a xgboost surrogate model."""
# predict over large pool of candidates
_ = xgb.DMatrix(data=X)
self._xg_regressor = cast("xgb.XGBRegressor", self._xg_regressor)
return self._xg_regressor.predict(X)
alpha: float
property
readonly
Get the alpha regularisation parameter.
colsample_bytree: float
property
readonly
Get the colsample_bytree parameter.
learning_rate: float
property
readonly
Get the learning rate.
max_depth: int
property
readonly
Get the maximum tree depth.
n_estimators: int
property
readonly
Get the number of estimators.
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, colsample_bytree=0.3, learning_rate=0.1, max_depth=5, alpha=1.0, n_estimators=10)
special
Sampler based on a xgboost surrogate model of the loss function.
this class makes use of the xgboost library, for more information on the XGBoost parameters
visit https://xgboost.readthedocs.io/en/stable/parameter.html.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes |
5 |
candidate_pool_size |
int | None |
number of randomly sampled points on which the random forest predictions are evaluated |
None |
colsample_bytree |
float |
subsample ratio of columns when constructing each tree |
0.3 |
learning_rate |
float |
the learning rate |
0.1 |
max_depth |
int |
maximum depth of XGBoost trees |
5 |
alpha |
float |
L1 regularization term on weights |
1.0 |
n_estimators |
int |
number of estimators |
10 |
References
Lamperti, Roventini, and Sani, "Agent-based model calibration using machine learning surrogates"
Source code in black_it/samplers/xgboost.py
def __init__( # noqa: PLR0913
self,
batch_size: int,
random_state: int | None = None,
max_deduplication_passes: int = 5,
candidate_pool_size: int | None = None,
colsample_bytree: float = 0.3,
learning_rate: float = 0.1,
max_depth: int = 5,
alpha: float = 1.0,
n_estimators: int = 10,
) -> None:
"""Sampler based on a xgboost surrogate model of the loss function.
Note: this class makes use of the xgboost library, for more information on the XGBoost parameters
visit https://xgboost.readthedocs.io/en/stable/parameter.html.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
colsample_bytree: subsample ratio of columns when constructing each tree
learning_rate: the learning rate
max_depth: maximum depth of XGBoost trees
alpha: L1 regularization term on weights
n_estimators: number of estimators
References:
Lamperti, Roventini, and Sani, "Agent-based model calibration using machine learning surrogates"
"""
super().__init__(
batch_size,
random_state,
max_deduplication_passes,
candidate_pool_size,
)
self._colsample_bytree = colsample_bytree
self._learning_rate = learning_rate
self._max_depth = max_depth
self._alpha = alpha
self._n_estimators = n_estimators
self._xg_regressor: xgb.XGBRegressor | None = None
fit(self, X, y)
Fit a xgboost surrogate model.
Source code in black_it/samplers/xgboost.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None: # noqa: N803
"""Fit a xgboost surrogate model."""
# prepare data
y = self._clip_losses(y)
_ = xgb.DMatrix(data=X, label=y)
# train surrogate
self._xg_regressor = xgb.XGBRegressor(
objective="reg:squarederror", # original: objective ='reg:linear',
colsample_bytree=self.colsample_bytree,
learning_rate=self.learning_rate,
max_depth=self.max_depth, # original: 5
alpha=self.alpha,
n_estimators=self.n_estimators,
random_state=self._get_random_seed(),
) # original: 10
self._xg_regressor.fit(X, y)
predict(self, X)
Predict using a xgboost surrogate model.
Source code in black_it/samplers/xgboost.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]: # noqa: N803
"""Predict using a xgboost surrogate model."""
# predict over large pool of candidates
_ = xgb.DMatrix(data=X)
self._xg_regressor = cast("xgb.XGBRegressor", self._xg_regressor)
return self._xg_regressor.predict(X)
Implementation of a particle swarm sampler.
This sampler implements the particle swarm sampling method commonly used in particle swarm optimization (PSO), introduced in:
Eberhart, Russell, and James Kennedy. "A new optimizer using particle swarm theory."
MHS'95. Proceedings of the sixth international symposium on micro machine and human science. IEEE, 1995.
In a particle swarm optimizer, there is a set of particles that are "evolved" by cooperation and competition among the individuals themselves through generations. Each particle adjusts its flying according to its own flying experience and its companions' flying experience. Each particle, in fact, represents a potential solution to a problem. Each particle is treated as a point in a D-dimensional space. The ith particle is represented as Xi = (x_{i1},...,x_{iD}). The best previous position (the position giving the best fitness value) of any particle is recorded and represented as Pi = (p_{i1},...,p_{iD}). The index of the best particle among all the particles in the population is represented by the symbol g. The rate of the position change (velocity) for particle i is represented as Vi = (v_{i1},...,v_{iD}). The particles are manipulated according to the following equation:
v_{id} = (ω * v_{id}) + (c1 * r1 * (p_{id} - x_{id})) + (c2 * r2 * (p_{gd} - x_{id})
x_{id} = x_{id} + v_{id}
Where
- ω is the inertia weight to control the influence of the previous velocity;
- c1 and c2 are positive values that represent the acceleration constants;
- r1 and r2 are two random numbers uniformly distributed in the range of (0, 1).
Note that p_{gd}, the global best position found across the dynamics, can optionally be computed by also considering the sampling performed by other samplers in order to let them interfere constructively with the Particle Swarm Sampler.
Source code in black_it/samplers/particle_swarm.py
class ParticleSwarmSampler(BaseSampler):
"""Implementation of a particle swarm sampler.
This sampler implements the particle swarm sampling method commonly used in particle swarm optimization (PSO),
introduced in:
Eberhart, Russell, and James Kennedy. "A new optimizer using particle swarm theory."
MHS'95. Proceedings of the sixth international symposium on micro machine and human science. IEEE, 1995.
In a particle swarm optimizer, there is a set of particles that are "evolved" by cooperation and competition
among the individuals themselves through generations. Each particle adjusts its flying according to its own
flying experience and its companions' flying experience. Each particle, in fact, represents a potential solution
to a problem. Each particle is treated as a point in a D-dimensional space. The ith particle is represented as
Xi = (x_{i1},...,x_{iD}). The best previous position (the position giving the best fitness value) of any particle
is recorded and represented as Pi = (p_{i1},...,p_{iD}). The index of the best particle among all the particles
in the population is represented by the symbol g. The rate of the position change (velocity) for particle i is
represented as Vi = (v_{i1},...,v_{iD}). The particles are manipulated according to the following equation:
v_{id} = (ω * v_{id}) + (c1 * r1 * (p_{id} - x_{id})) + (c2 * r2 * (p_{gd} - x_{id})
x_{id} = x_{id} + v_{id}
Where:
- ω is the inertia weight to control the influence of the previous velocity;
- c1 and c2 are positive values that represent the acceleration constants;
- r1 and r2 are two random numbers uniformly distributed in the range of (0, 1).
Note that p_{gd}, the global best position found across the dynamics, can optionally be computed by also
considering the sampling performed by other samplers in order to let them interfere constructively with the
Particle Swarm Sampler.
"""
def __init__(
self,
batch_size: int,
random_state: int | None = None,
inertia: float = 0.9,
c1: float = 0.1,
c2: float = 0.1,
global_minimum_across_samplers: bool = False,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
inertia: the inertia of the particles' motion
c1: first acceleration constant
c2: second acceleration constant
global_minimum_across_samplers: if True, the global minimum attractor of the particles' dynamics is computed
taking into consideration also parameters sampled by other samplers, default is False
"""
# max_duplication_passes must be zero because the sampler is stateful
super().__init__(
batch_size,
random_state=random_state,
max_deduplication_passes=0,
)
# The batch size is the number of sampled parameters per iteration. In a Black-it sampler, each call to
# sample_batch represent an iteration of the particle swarm sampler, so it seems natural to set the number of
# particles to the batch size, as at each iteration sample_batch returns the current positions of the
# particles.
self.nb_particles = batch_size
self._inertia = positive_float(inertia)
self._c1 = positive_float(c1)
self._c2 = positive_float(c2)
self._global_minimum_across_samplers = global_minimum_across_samplers
# all current particle positions; shape=(nb_particles, space dimensions)
self._curr_particle_positions: NDArray | None = None
# all current particle velocities; shape=(nb_particles, space dimensions)
self._curr_particle_velocities: NDArray | None = None
# best particle positions, i.e. ; shape=(nb_particles, space dimensions)
self._best_particle_positions: NDArray | None = None
# losses of the best positions
self._best_position_losses: NDArray | None = None
# particle id of the global best particle position
self._global_best_particle_id: int | None = None
# best point in parameter space - could be the best across samplers
self._best_point: NDArray | None = None
self._previous_batch_index_start: int | None = None
@property
def is_set_up(self) -> bool:
"""Return true iff the sampler is already set up."""
return self._curr_particle_positions is not None
@property
def inertia(self) -> float:
"""Get the inertia weight."""
return self._inertia
@property
def c1(self) -> float:
"""Get the c1 constant."""
return self._c1
@property
def c2(self) -> float:
"""Get the c2 constant."""
return self._c2
def _set_up(self, dims: int) -> None:
"""Set up the sampler."""
self._curr_particle_positions = self.random_generator.random(
size=(self.batch_size, dims),
)
self._curr_particle_velocities = (
self.random_generator.random(
size=cast("NDArray", self._curr_particle_positions).shape,
)
- 0.5
)
self._best_particle_positions = self._curr_particle_positions
# set losses to inf as we are interested to the min
self._best_position_losses = np.full(self.nb_particles, np.inf)
# we don't know yet which is the best index - initialize to 0
self._global_best_particle_id = 0
def _get_best_position(self) -> NDArray[np.float64]:
"""Get the position corresponding to the global optimum the particles should converge to.
If _global_minimum_across_samplers is False, then this method returns the current position
of the particle that in its history has sampled, so far, the best set of parameters.
Else, if _global_minimum_across_samplers is True, then this method returns the point
in parameter space that achieved the minimum loss. Note that this point could have been
sampled by a different sampler than "self".
Returns:
a Numpy array
"""
if not self._global_minimum_across_samplers:
best_particle_positions = cast("NDArray", self._best_particle_positions)
return best_particle_positions[self._global_best_particle_id]
return cast("NDArray", self._best_point)
def reset(self) -> None:
"""Reset the sampler."""
self._curr_particle_positions = None
self._curr_particle_velocities = None
self._best_particle_positions = None
self._best_position_losses = None
self._global_best_particle_id = None
self._previous_batch_index_start = None
_assert(
not self.is_set_up,
error_message="reset call did not work, sampler still set up",
exception_class=RuntimeError,
)
def sample_batch(
self,
batch_size: int, # noqa: ARG002
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample a batch of parameters."""
if not self.is_set_up:
self._set_up(search_space.dims)
self._previous_batch_index_start = len(existing_points)
return digitize_data(
cast("NDArray[np.float64]", self._best_particle_positions),
search_space.param_grid,
)
self._update_best(existing_points, existing_losses)
self._do_step()
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + self._curr_particle_positions * (p_bounds[1] - p_bounds[0])
self._previous_batch_index_start = len(existing_points)
return digitize_data(sampled_points, search_space.param_grid)
def _update_best(
self,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> None:
"""Update the best local and global positions."""
_assert(
self._previous_batch_index_start is not None,
exception_class=AssertionError,
error_message="should have been set",
)
# set best loss and best point
best_point_index = np.argmin(existing_losses)
self._best_point = existing_points[best_point_index]
# set best particle position
batch_index_start = cast("int", self._previous_batch_index_start)
batch_index_stop = batch_index_start + self.batch_size
previous_points = existing_points[batch_index_start:batch_index_stop]
previous_losses = existing_losses[batch_index_start:batch_index_stop]
for particle_id, (point, loss) in enumerate(
zip(previous_points, previous_losses),
):
best_particle_positions = cast("NDArray", self._best_particle_positions)
best_position_losses = cast("NDArray", self._best_position_losses)
if best_position_losses[particle_id] > loss:
best_particle_positions[particle_id] = point
best_position_losses[particle_id] = loss
# check if also the global best should be updated
best_global_loss = best_position_losses[self._global_best_particle_id]
if loss < best_global_loss:
self._global_best_particle_id = particle_id
def _do_step(self) -> None:
"""Do a step by updating particle positions and velocities."""
curr_particle_positions = cast("NDArray", self._curr_particle_positions)
curr_particle_velocities = cast("NDArray", self._curr_particle_velocities)
best_particle_positions = cast("NDArray", self._best_particle_positions)
r1_vec = self.random_generator.random(size=curr_particle_positions.shape)
r2_vec = self.random_generator.random(size=curr_particle_positions.shape)
new_particle_velocities = (
self.inertia * curr_particle_velocities
+ self.c1 * r1_vec * (best_particle_positions - self._curr_particle_positions)
+ self.c2 * r2_vec * (self._get_best_position() - self._curr_particle_positions) # type: ignore[operator]
)
self._curr_particle_positions = np.clip(
self._curr_particle_positions + new_particle_velocities,
a_min=0.0,
a_max=1.0,
)
self._curr_particle_velocities = new_particle_velocities
c1: float
property
readonly
Get the c1 constant.
c2: float
property
readonly
Get the c2 constant.
inertia: float
property
readonly
Get the inertia weight.
is_set_up: bool
property
readonly
Return true iff the sampler is already set up.
__init__(self, batch_size, random_state=None, inertia=0.9, c1=0.1, c2=0.1, global_minimum_across_samplers=False)
special
Initialize the sampler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
int | None |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
inertia |
float |
the inertia of the particles' motion |
0.9 |
c1 |
float |
first acceleration constant |
0.1 |
c2 |
float |
second acceleration constant |
0.1 |
global_minimum_across_samplers |
bool |
if True, the global minimum attractor of the particles' dynamics is computed taking into consideration also parameters sampled by other samplers, default is False |
False |
Source code in black_it/samplers/particle_swarm.py
def __init__(
self,
batch_size: int,
random_state: int | None = None,
inertia: float = 0.9,
c1: float = 0.1,
c2: float = 0.1,
global_minimum_across_samplers: bool = False,
) -> None:
"""Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
inertia: the inertia of the particles' motion
c1: first acceleration constant
c2: second acceleration constant
global_minimum_across_samplers: if True, the global minimum attractor of the particles' dynamics is computed
taking into consideration also parameters sampled by other samplers, default is False
"""
# max_duplication_passes must be zero because the sampler is stateful
super().__init__(
batch_size,
random_state=random_state,
max_deduplication_passes=0,
)
# The batch size is the number of sampled parameters per iteration. In a Black-it sampler, each call to
# sample_batch represent an iteration of the particle swarm sampler, so it seems natural to set the number of
# particles to the batch size, as at each iteration sample_batch returns the current positions of the
# particles.
self.nb_particles = batch_size
self._inertia = positive_float(inertia)
self._c1 = positive_float(c1)
self._c2 = positive_float(c2)
self._global_minimum_across_samplers = global_minimum_across_samplers
# all current particle positions; shape=(nb_particles, space dimensions)
self._curr_particle_positions: NDArray | None = None
# all current particle velocities; shape=(nb_particles, space dimensions)
self._curr_particle_velocities: NDArray | None = None
# best particle positions, i.e. ; shape=(nb_particles, space dimensions)
self._best_particle_positions: NDArray | None = None
# losses of the best positions
self._best_position_losses: NDArray | None = None
# particle id of the global best particle position
self._global_best_particle_id: int | None = None
# best point in parameter space - could be the best across samplers
self._best_point: NDArray | None = None
self._previous_batch_index_start: int | None = None
reset(self)
Reset the sampler.
Source code in black_it/samplers/particle_swarm.py
def reset(self) -> None:
"""Reset the sampler."""
self._curr_particle_positions = None
self._curr_particle_velocities = None
self._best_particle_positions = None
self._best_position_losses = None
self._global_best_particle_id = None
self._previous_batch_index_start = None
_assert(
not self.is_set_up,
error_message="reset call did not work, sampler still set up",
exception_class=RuntimeError,
)
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample a batch of parameters.
Source code in black_it/samplers/particle_swarm.py
def sample_batch(
self,
batch_size: int, # noqa: ARG002
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""Sample a batch of parameters."""
if not self.is_set_up:
self._set_up(search_space.dims)
self._previous_batch_index_start = len(existing_points)
return digitize_data(
cast("NDArray[np.float64]", self._best_particle_positions),
search_space.param_grid,
)
self._update_best(existing_points, existing_losses)
self._do_step()
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + self._curr_particle_positions * (p_bounds[1] - p_bounds[0])
self._previous_batch_index_start = len(existing_points)
return digitize_data(sampled_points, search_space.param_grid)