Samplers
black_it.samplers.base.BaseSampler (ABC)
BaseSampler interface.
This is the base class for all samplers.
Source code in black_it/samplers/base.py
class BaseSampler(ABC):
"""
BaseSampler interface.
This is the base class for all samplers.
"""
def __init__(
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
) -> None:
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
"""
self.random_state: Optional[int] = random_state
self.batch_size: int = batch_size
self.max_deduplication_passes = max_deduplication_passes
@property
def random_state(self) -> Optional[int]:
"""Get the random state."""
return self._random_state
@random_state.setter
def random_state(self, random_state: Optional[int]) -> None:
"""Set the random state."""
self._random_state = random_state
self._random_generator = default_rng(self.random_state)
@property
def random_generator(self) -> np.random.Generator:
"""Get the random generator."""
return self._random_generator
def _get_random_seed(self) -> int:
"""Get new random seed from the current random generator."""
return get_random_seed(self._random_generator)
@abstractmethod
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample a number of new parameters fixed by the 'batch_size' attribute.
Args:
batch_size: number of samples to collect
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the new parameters
"""
def sample(
self,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample from the search space.
Args:
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters
"""
samples = self.sample_batch(
self.batch_size, search_space, existing_points, existing_losses
)
for n in range(self.max_deduplication_passes):
duplicates = self.find_and_get_duplicates(samples, existing_points)
num_duplicates = len(duplicates)
if num_duplicates == 0:
break
new_samples = self.sample_batch(
num_duplicates, search_space, existing_points, existing_losses
)
samples[duplicates] = new_samples
if n == self.max_deduplication_passes - 1:
print(
f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
" This is probably due to a small search space."
)
return samples
@staticmethod
def find_and_get_duplicates(
new_points: NDArray[np.float64], existing_points: NDArray[np.float64]
) -> List:
"""Find the points in 'new_points' that are already present in 'existing_points'.
Args:
new_points: candidates points for the sampler
existing_points: previously sampled points
Returns:
the location of the duplicates in 'new_points'
"""
all_points = np.concatenate((existing_points, new_points))
unq, count = np.unique(all_points, axis=0, return_counts=True)
repeated_groups = unq[count > 1]
repeated_pos = []
if len(repeated_groups) > 0:
for repeated_group in repeated_groups:
repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
for index in repeated_idx:
repeated_pos.append(index[0])
return repeated_pos
random_generator: Generator
property
readonly
Get the random generator.
random_state: Optional[int]
property
writable
Get the random state.
__init__(self, batch_size, random_state=None, max_deduplication_passes=5)
special
Initialize the sampler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
Optional[int] |
the internal state of the sampler, fixing this numbers the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
maximum number of duplication passes done to avoid sampling repeated parameters |
5 |
Source code in black_it/samplers/base.py
def __init__(
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
) -> None:
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
"""
self.random_state: Optional[int] = random_state
self.batch_size: int = batch_size
self.max_deduplication_passes = max_deduplication_passes
find_and_get_duplicates(new_points, existing_points)
staticmethod
Find the points in 'new_points' that are already present in 'existing_points'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_points |
ndarray |
candidates points for the sampler |
required |
existing_points |
ndarray |
previously sampled points |
required |
Returns:
Type | Description |
---|---|
List |
the location of the duplicates in 'new_points' |
Source code in black_it/samplers/base.py
@staticmethod
def find_and_get_duplicates(
new_points: NDArray[np.float64], existing_points: NDArray[np.float64]
) -> List:
"""Find the points in 'new_points' that are already present in 'existing_points'.
Args:
new_points: candidates points for the sampler
existing_points: previously sampled points
Returns:
the location of the duplicates in 'new_points'
"""
all_points = np.concatenate((existing_points, new_points))
unq, count = np.unique(all_points, axis=0, return_counts=True)
repeated_groups = unq[count > 1]
repeated_pos = []
if len(repeated_groups) > 0:
for repeated_group in repeated_groups:
repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
for index in repeated_idx:
repeated_pos.append(index[0])
return repeated_pos
sample(self, search_space, existing_points, existing_losses)
Sample from the search space.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters |
required |
Returns:
Type | Description |
---|---|
ndarray |
the sampled parameters |
Source code in black_it/samplers/base.py
def sample(
self,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample from the search space.
Args:
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters
"""
samples = self.sample_batch(
self.batch_size, search_space, existing_points, existing_losses
)
for n in range(self.max_deduplication_passes):
duplicates = self.find_and_get_duplicates(samples, existing_points)
num_duplicates = len(duplicates)
if num_duplicates == 0:
break
new_samples = self.sample_batch(
num_duplicates, search_space, existing_points, existing_losses
)
samples[duplicates] = new_samples
if n == self.max_deduplication_passes - 1:
print(
f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
" This is probably due to a small search space."
)
return samples
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample a number of new parameters fixed by the 'batch_size' attribute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
number of samples to collect |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters |
required |
Returns:
Type | Description |
---|---|
ndarray |
the new parameters |
Source code in black_it/samplers/base.py
@abstractmethod
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample a number of new parameters fixed by the 'batch_size' attribute.
Args:
batch_size: number of samples to collect
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the new parameters
"""
black_it.samplers.random_uniform.RandomUniformSampler (BaseSampler)
Random uniform sampling.
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample uniformly from the search space.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points to sample |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters |
required |
Returns:
Type | Description |
---|---|
ndarray |
the sampled parameters (an array of shape |
black_it.samplers.halton.HaltonSampler (BaseSampler)
Halton low discrepancy sequence.
This snippet implements the Halton sequence following the generalization of a sequence of Van der Corput in n-dimensions.
random_state: Optional[int]
property
writable
Get the random state.
__init__(self, batch_size, random_state=None, max_deduplication_passes=5)
special
Initialize the sampler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
Optional[int] |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of sample deduplication passes. |
5 |
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample points using Halton sequence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of samples |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled (not used) |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters (not used) |
required |
Returns:
Type | Description |
---|---|
ndarray |
the parameter sampled |
black_it.samplers.r_sequence.RSequenceSampler (BaseSampler)
The R-sequence sampler.
Source code in black_it/samplers/r_sequence.py
class RSequenceSampler(BaseSampler):
"""The R-sequence sampler."""
def __init__(
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
) -> None:
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
after every batch sampling. Default: 0, i.e. no deduplication happens.
"""
super().__init__(batch_size, random_state, max_deduplication_passes)
self._sequence_index: int
self._sequence_start: float
self._reset()
@classmethod
def compute_phi(cls, nb_dims: int) -> float:
"""
Get an approximation of phi^nb_dims.
Args:
nb_dims: the number of dimensions.
Returns:
phi^nb_dims
"""
check_arg(1 <= nb_dims, f"nb_dims should be greater than 0, got {nb_dims}")
phi: float = 2.0
old_phi = None
while old_phi != phi:
old_phi = phi
phi = pow(1 + phi, 1.0 / (nb_dims + 1))
return phi
@property
def random_state(self) -> Optional[int]:
"""Get the random state."""
return self._random_state
@random_state.setter
def random_state(self, random_state: Optional[int]) -> None:
"""Set the random state."""
self._random_state = random_state
self._random_generator = default_rng(self.random_state)
self._reset()
def _reset(self) -> None:
"""Reset the index of the sequence."""
self._sequence_index = self.random_generator.integers(
_MIN_SEQUENCE_START_INDEX, _MAX_SEQUENCE_START_INDEX
)
self._sequence_start = self.random_generator.random()
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample points using the R-sequence.
Args:
batch_size: the number of samples
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled (not used)
existing_losses: the loss corresponding to the sampled parameters (not used)
Returns:
the parameter sampled
"""
unit_cube_points: NDArray[np.float64] = self._r_sequence(
batch_size, search_space.dims
)
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
return digitize_data(sampled_points, search_space.param_grid)
def _r_sequence(self, nb_samples: int, dims: int) -> NDArray[np.float64]:
"""
Compute the R-sequence (http://extremelearning.com.au/unreasonable-effectiveness-of-quasirandom-sequences/).
Args:
nb_samples: number of points to sample
dims: the number of dimensions
Returns:
Set of params uniformly placed in d-dimensional unit cube.
"""
phi = self.compute_phi(dims)
alpha: NDArray[np.float64] = np.power(1 / phi, np.arange(1, dims + 1)).reshape(
(1, -1)
)
end_index = self._sequence_index + nb_samples
indexes = np.arange(self._sequence_index, end_index).reshape((-1, 1))
points: NDArray[np.float64] = (self._sequence_start + indexes.dot(alpha)) % 1
self._sequence_index = end_index
return points
random_state: Optional[int]
property
writable
Get the random state.
__init__(self, batch_size, random_state=None, max_deduplication_passes=5)
special
Initialize the sampler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
Optional[int] |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
(non-negative integer) the maximum number of deduplication passes that are made after every batch sampling. Default: 0, i.e. no deduplication happens. |
5 |
Source code in black_it/samplers/r_sequence.py
def __init__(
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
) -> None:
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
after every batch sampling. Default: 0, i.e. no deduplication happens.
"""
super().__init__(batch_size, random_state, max_deduplication_passes)
self._sequence_index: int
self._sequence_start: float
self._reset()
compute_phi(nb_dims)
classmethod
Get an approximation of phi^nb_dims.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nb_dims |
int |
the number of dimensions. |
required |
Returns:
Type | Description |
---|---|
float |
phi^nb_dims |
Source code in black_it/samplers/r_sequence.py
@classmethod
def compute_phi(cls, nb_dims: int) -> float:
"""
Get an approximation of phi^nb_dims.
Args:
nb_dims: the number of dimensions.
Returns:
phi^nb_dims
"""
check_arg(1 <= nb_dims, f"nb_dims should be greater than 0, got {nb_dims}")
phi: float = 2.0
old_phi = None
while old_phi != phi:
old_phi = phi
phi = pow(1 + phi, 1.0 / (nb_dims + 1))
return phi
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample points using the R-sequence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of samples |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled (not used) |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters (not used) |
required |
Returns:
Type | Description |
---|---|
ndarray |
the parameter sampled |
Source code in black_it/samplers/r_sequence.py
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample points using the R-sequence.
Args:
batch_size: the number of samples
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled (not used)
existing_losses: the loss corresponding to the sampled parameters (not used)
Returns:
the parameter sampled
"""
unit_cube_points: NDArray[np.float64] = self._r_sequence(
batch_size, search_space.dims
)
p_bounds: NDArray[np.float64] = search_space.parameters_bounds
sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
return digitize_data(sampled_points, search_space.param_grid)
black_it.samplers.best_batch.BestBatchSampler (BaseSampler)
This class implements the best-batch sampler.
The sampler is a very essential type of genetic algorithm that takes the parameters corresponding to the current lowest loss values and perturbs them slightly in a purely random fashion. The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.
Source code in black_it/samplers/best_batch.py
class BestBatchSampler(BaseSampler):
"""This class implements the best-batch sampler.
The sampler is a very essential type of genetic algorithm that takes the parameters corresponding
to the current lowest loss values and perturbs them slightly in a purely random fashion.
The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution
BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects
that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.
"""
def __init__(
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
a: float = 3.0,
b: float = 1.0,
perturbation_range: int = 6,
):
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
a: the a parameter of the beta-binomial distribution
b: the b parameter of the beta-binomial distribution
perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
plus/minus the perturbation_range times the precision of the specific parameter coordinate
"""
_assert(
a > 0.0,
"'a' should be greater than zero",
)
_assert(
b > 0.0,
"'b' should be greater than zero",
)
_assert(
perturbation_range > 1,
"'perturbation_range' should be greater than one",
)
super().__init__(batch_size, random_state, max_deduplication_passes)
self.a = a
self.b = b
self.perturbation_range = perturbation_range
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample from the search space using a genetic algorithm.
Args:
batch_size: the number of points to sample
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
"""
if len(existing_points) < batch_size:
raise ValueError(
"best-batch sampler requires a number of existing points "
f"which is at least the batch size {batch_size}, "
f"got {len(existing_points)}"
)
# sort existing params
candidate_points: NDArray[np.float64] = existing_points[
np.argsort(existing_losses)
][:batch_size, :]
candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
0, batch_size, size=batch_size
)
sampled_points: NDArray[np.float64] = np.copy(
candidate_points[candidate_point_indexes]
)
beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
beta_binom_rv.random_state = self.random_generator
for sampled_point in sampled_points:
num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
params_shocked: NDArray[np.int64] = self.random_generator.choice(
search_space.dims, tuple(num_shocks), replace=False
)
for index in params_shocked:
shock_size: int = self.random_generator.integers(
1, self.perturbation_range
)
shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1
delta: float = search_space.parameters_precision[index]
shift: float = delta * shock_sign * shock_size
sampled_point[index] += shift
sampled_point[index] = np.clip(
sampled_point[index],
search_space.parameters_bounds[0][index],
search_space.parameters_bounds[1][index],
)
return sampled_points
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, a=3.0, b=1.0, perturbation_range=6)
special
Initialize the sampler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
Optional[int] |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes that are made |
5 |
a |
float |
the a parameter of the beta-binomial distribution |
3.0 |
b |
float |
the b parameter of the beta-binomial distribution |
1.0 |
perturbation_range |
int |
the range of the perturbation applied. The actual perturbation will be in the range plus/minus the perturbation_range times the precision of the specific parameter coordinate |
6 |
Source code in black_it/samplers/best_batch.py
def __init__(
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
a: float = 3.0,
b: float = 1.0,
perturbation_range: int = 6,
):
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
a: the a parameter of the beta-binomial distribution
b: the b parameter of the beta-binomial distribution
perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
plus/minus the perturbation_range times the precision of the specific parameter coordinate
"""
_assert(
a > 0.0,
"'a' should be greater than zero",
)
_assert(
b > 0.0,
"'b' should be greater than zero",
)
_assert(
perturbation_range > 1,
"'perturbation_range' should be greater than one",
)
super().__init__(batch_size, random_state, max_deduplication_passes)
self.a = a
self.b = b
self.perturbation_range = perturbation_range
sample_batch(self, batch_size, search_space, existing_points, existing_losses)
Sample from the search space using a genetic algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points to sample |
required |
search_space |
SearchSpace |
an object containing the details of the parameter search space |
required |
existing_points |
ndarray |
the parameters already sampled |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters |
required |
Returns:
Type | Description |
---|---|
ndarray |
the sampled parameters (an array of shape |
Source code in black_it/samplers/best_batch.py
def sample_batch(
self,
batch_size: int,
search_space: SearchSpace,
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
"""
Sample from the search space using a genetic algorithm.
Args:
batch_size: the number of points to sample
search_space: an object containing the details of the parameter search space
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
Returns:
the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
"""
if len(existing_points) < batch_size:
raise ValueError(
"best-batch sampler requires a number of existing points "
f"which is at least the batch size {batch_size}, "
f"got {len(existing_points)}"
)
# sort existing params
candidate_points: NDArray[np.float64] = existing_points[
np.argsort(existing_losses)
][:batch_size, :]
candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
0, batch_size, size=batch_size
)
sampled_points: NDArray[np.float64] = np.copy(
candidate_points[candidate_point_indexes]
)
beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
beta_binom_rv.random_state = self.random_generator
for sampled_point in sampled_points:
num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
params_shocked: NDArray[np.int64] = self.random_generator.choice(
search_space.dims, tuple(num_shocks), replace=False
)
for index in params_shocked:
shock_size: int = self.random_generator.integers(
1, self.perturbation_range
)
shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1
delta: float = search_space.parameters_precision[index]
shift: float = delta * shock_sign * shock_size
sampled_point[index] += shift
sampled_point[index] = np.clip(
sampled_point[index],
search_space.parameters_bounds[0][index],
search_space.parameters_bounds[1][index],
)
return sampled_points
black_it.samplers.gaussian_process.GaussianProcessSampler (MLSurrogateSampler)
This class implements the Gaussian process-based sampler.
In particular, the sampling is based on a Gaussian Process interpolation of the loss function.
Note: this class is a wrapper of the GPRegression model of the GPy package.
Source code in black_it/samplers/gaussian_process.py
class GaussianProcessSampler(MLSurrogateSampler):
"""
This class implements the Gaussian process-based sampler.
In particular, the sampling is based on a Gaussian Process interpolation of the loss function.
Note: this class is a wrapper of the GPRegression model of the GPy package.
"""
def __init__( # pylint: disable=too-many-arguments
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
candidate_pool_size: Optional[int] = None,
max_iters: int = 1000,
optimize_restarts: int = 5,
acquisition: str = "expected_improvement",
):
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
max_iters: maximum number of iteration in the optimization of the GP hyperparameters
optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
"""
self._validate_acquisition(acquisition)
super().__init__(
batch_size, random_state, max_deduplication_passes, candidate_pool_size
)
self.max_iters = max_iters
self.optimize_restarts = optimize_restarts
self.acquisition = acquisition
self._gpmodel: Optional[GPRegression] = None
@staticmethod
def _validate_acquisition(acquisition: str) -> None:
"""
Check that the required acquisition is among the supported ones.
Args:
acquisition: the acquisition provided as input of the constructor.
Raises
ValueError: if the provided acquisition type is not among the allowed ones.
"""
try:
_AcquisitionTypes(acquisition)
except ValueError as e:
raise ValueError(
"expected one of the following acquisition types: "
f"[{' '.join(map(str, _AcquisitionTypes))}], "
f"got {acquisition}"
) from e
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
"""Fit a gaussian process surrogate model."""
y = np.atleast_2d(y).T
if X.shape[0] > 500:
warnings.warn(
"Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
RuntimeWarning,
)
# initialize GP class from GPy with a Matern kernel by default
dims = X.shape[1]
kern = GPy.kern.Matern52(dims, variance=1.0, ARD=False)
noise_var = y.var() * 0.01
self._gpmodel = GPRegression(
X, y, kernel=kern, noise_var=noise_var, mean_function=None
)
# Make sure we do not get ridiculously small residual noise variance
self._gpmodel.Gaussian_noise.constrain_bounded(
1e-9, 1e6, warning=False
) # constrain_positive(warning=False)
# we need to set the seed globally for GPy optimisations
# to give reproducible results
np.random.seed(self._get_random_seed())
random.seed(self._get_random_seed())
if self.max_iters > 0:
# --- update the model maximizing the marginal likelihood.
if self.optimize_restarts == 1:
self._gpmodel.optimize(
optimizer="bfgs",
max_iters=self.max_iters,
messages=False,
ipython_notebook=False,
)
else:
self._gpmodel.optimize_restarts(
num_restarts=self.optimize_restarts,
optimizer="bfgs",
max_iters=self.max_iters,
verbose=False,
)
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
"""Predict using a gaussian process surrogate model."""
# predict mean or expected improvement on the full sample set
if self.acquisition == _AcquisitionTypes.EI.value:
# minus sign needed for subsequent sorting
candidates_score = -self._predict_EI(X)[:, 0]
else: # acquisition is "mean"
candidates_score = self._predict_mean_std(X)[0][:, 0]
return candidates_score
def _predict_mean_std(
self, X: NDArray[np.float64]
) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Predict mean and standard deviation of a fitted GP.
Args:
X: the points on which the predictions should be performed
Returns:
The pair (mean, std).
"""
gpmodel = cast(GPRegression, self._gpmodel)
X = X[None, :] if X.ndim == 1 else X
m, v = gpmodel.predict(X, full_cov=False, include_likelihood=True)
v = np.clip(v, 1e-10, np.inf)
return m, np.sqrt(v)
def _get_fmin(self) -> float:
"""Return the location where the posterior mean is takes its minimal value."""
gpmodel = cast(GPRegression, self._gpmodel)
return gpmodel.predict(gpmodel.X)[0].min()
def _predict_EI(
self, X: NDArray[np.float64], jitter: float = 0.1
) -> NDArray[np.float64]:
"""
Compute the Expected Improvement per unit of cost.
Args:
X: the points on which the predictions should be performed
jitter: positive value to make the acquisition more explorative.
Returns:
the expected improvement.
"""
m, s = self._predict_mean_std(X)
fmin = self._get_fmin()
phi, Phi, u = self.get_quantiles(jitter, fmin, m, s)
f_acqu = s * (u * Phi + phi)
return f_acqu
@staticmethod
def get_quantiles(
acquisition_par: float,
fmin: float,
m: NDArray[np.float64],
s: NDArray[np.float64],
) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""
Quantiles of the Gaussian distribution useful to determine the acquisition function values.
Args:
acquisition_par: parameter of the acquisition function
fmin: current minimum.
m: vector of means.
s: vector of standard deviations.
Returns:
the quantiles.
"""
# remove values of variance that are too small
s[s < 1e-10] = 1e-10
u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2))
return phi, Phi, u
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, max_iters=1000, optimize_restarts=5, acquisition='expected_improvement')
special
Initialize the sampler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
Optional[int] |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes that are made |
5 |
candidate_pool_size |
Optional[int] |
number of randomly sampled points on which the random forest predictions are evaluated |
None |
max_iters |
int |
maximum number of iteration in the optimization of the GP hyperparameters |
1000 |
optimize_restarts |
int |
number of independent random trials of the optimization of the GP hyperparameters |
5 |
acquisition |
str |
type of acquisition function, it can be 'expected_improvement' of simply 'mean' |
'expected_improvement' |
Source code in black_it/samplers/gaussian_process.py
def __init__( # pylint: disable=too-many-arguments
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
candidate_pool_size: Optional[int] = None,
max_iters: int = 1000,
optimize_restarts: int = 5,
acquisition: str = "expected_improvement",
):
"""
Initialize the sampler.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes that are made
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
max_iters: maximum number of iteration in the optimization of the GP hyperparameters
optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
"""
self._validate_acquisition(acquisition)
super().__init__(
batch_size, random_state, max_deduplication_passes, candidate_pool_size
)
self.max_iters = max_iters
self.optimize_restarts = optimize_restarts
self.acquisition = acquisition
self._gpmodel: Optional[GPRegression] = None
fit(self, X, y)
Fit a gaussian process surrogate model.
Source code in black_it/samplers/gaussian_process.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
"""Fit a gaussian process surrogate model."""
y = np.atleast_2d(y).T
if X.shape[0] > 500:
warnings.warn(
"Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
RuntimeWarning,
)
# initialize GP class from GPy with a Matern kernel by default
dims = X.shape[1]
kern = GPy.kern.Matern52(dims, variance=1.0, ARD=False)
noise_var = y.var() * 0.01
self._gpmodel = GPRegression(
X, y, kernel=kern, noise_var=noise_var, mean_function=None
)
# Make sure we do not get ridiculously small residual noise variance
self._gpmodel.Gaussian_noise.constrain_bounded(
1e-9, 1e6, warning=False
) # constrain_positive(warning=False)
# we need to set the seed globally for GPy optimisations
# to give reproducible results
np.random.seed(self._get_random_seed())
random.seed(self._get_random_seed())
if self.max_iters > 0:
# --- update the model maximizing the marginal likelihood.
if self.optimize_restarts == 1:
self._gpmodel.optimize(
optimizer="bfgs",
max_iters=self.max_iters,
messages=False,
ipython_notebook=False,
)
else:
self._gpmodel.optimize_restarts(
num_restarts=self.optimize_restarts,
optimizer="bfgs",
max_iters=self.max_iters,
verbose=False,
)
get_quantiles(acquisition_par, fmin, m, s)
staticmethod
Quantiles of the Gaussian distribution useful to determine the acquisition function values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
acquisition_par |
float |
parameter of the acquisition function |
required |
fmin |
float |
current minimum. |
required |
m |
ndarray |
vector of means. |
required |
s |
ndarray |
vector of standard deviations. |
required |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
the quantiles. |
Source code in black_it/samplers/gaussian_process.py
@staticmethod
def get_quantiles(
acquisition_par: float,
fmin: float,
m: NDArray[np.float64],
s: NDArray[np.float64],
) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""
Quantiles of the Gaussian distribution useful to determine the acquisition function values.
Args:
acquisition_par: parameter of the acquisition function
fmin: current minimum.
m: vector of means.
s: vector of standard deviations.
Returns:
the quantiles.
"""
# remove values of variance that are too small
s[s < 1e-10] = 1e-10
u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2))
return phi, Phi, u
predict(self, X)
Predict using a gaussian process surrogate model.
Source code in black_it/samplers/gaussian_process.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
"""Predict using a gaussian process surrogate model."""
# predict mean or expected improvement on the full sample set
if self.acquisition == _AcquisitionTypes.EI.value:
# minus sign needed for subsequent sorting
candidates_score = -self._predict_EI(X)[:, 0]
else: # acquisition is "mean"
candidates_score = self._predict_mean_std(X)[0][:, 0]
return candidates_score
black_it.samplers.random_forest.RandomForestSampler (MLSurrogateSampler)
This class implements random forest sampling.
Source code in black_it/samplers/random_forest.py
class RandomForestSampler(MLSurrogateSampler):
"""This class implements random forest sampling."""
def __init__( # pylint: disable=too-many-arguments
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
candidate_pool_size: Optional[int] = None,
n_estimators: int = 500,
criterion: str = "gini",
n_classes: int = 10,
) -> None:
"""
Random forest sampling.
Note: this class makes use of sklearn.ensemble.RandomForestClassifier.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
n_estimators: number of trees in the forest
criterion: the function to measure the quality of a split.
n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
of the distribution of loss values.
"""
_assert(
n_classes > 2,
"'n_classes' should be at least 2 to provide meaningful results",
)
super().__init__(
batch_size, random_state, max_deduplication_passes, candidate_pool_size
)
self._n_estimators = n_estimators
self._criterion = criterion
self._n_classes = n_classes
self._classifier: Optional[RandomForestClassifier] = None
@property
def n_estimators(self) -> int:
"""Get the number of estimators."""
return self._n_estimators
@property
def criterion(self) -> str:
"""Get the criterion."""
return self._criterion
@property
def n_classes(self) -> int:
"""Get the number of classes."""
return self._n_classes
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
"""Fit a random forest surrogate model."""
# Train surrogate
X, y_cat, _existing_points_quantiles = self.prepare_data_for_classifier(
X, y, self.n_classes
)
self._classifier = RandomForestClassifier(
n_estimators=self.n_estimators,
criterion=self.criterion,
n_jobs=-1,
random_state=self._get_random_seed(),
)
self._classifier.fit(X, y_cat)
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
"""Predict using a random forest surrogate model."""
# Predict quantiles
self._classifier = cast(RandomForestClassifier, self._classifier)
predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)
return predicted_points_quantiles
@staticmethod
def prepare_data_for_classifier(
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
num_bins: int,
) -> Tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
"""
Prepare data for the classifier.
Args:
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
num_bins: the number of bins
Returns:
A triple (x, y, quantiles), where
- x is the vector of training data
- y is the vector of targets
- the quantiles
"""
x: NDArray[np.float64] = existing_points
y: NDArray[np.float64] = existing_losses
cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)
for i in range(num_bins - 1):
quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])
quantiles[-1] = np.max(y)
y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
y_cat = y_cat - 1
return x, y_cat, quantiles
criterion: str
property
readonly
Get the criterion.
n_classes: int
property
readonly
Get the number of classes.
n_estimators: int
property
readonly
Get the number of estimators.
__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, n_estimators=500, criterion='gini', n_classes=10)
special
Random forest sampling.
Note: this class makes use of sklearn.ensemble.RandomForestClassifier.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
the number of points sampled every time the sampler is called |
required |
random_state |
Optional[int] |
the random state of the sampler, fixing this number the sampler behaves deterministically |
None |
max_deduplication_passes |
int |
the maximum number of deduplication passes |
5 |
candidate_pool_size |
Optional[int] |
number of randomly sampled points on which the random forest predictions are evaluated |
None |
n_estimators |
int |
number of trees in the forest |
500 |
criterion |
str |
the function to measure the quality of a split. |
'gini' |
n_classes |
int |
the number of classes used in the random forest. The classes are selected as the quantiles of the distribution of loss values. |
10 |
Source code in black_it/samplers/random_forest.py
def __init__( # pylint: disable=too-many-arguments
self,
batch_size: int,
random_state: Optional[int] = None,
max_deduplication_passes: int = 5,
candidate_pool_size: Optional[int] = None,
n_estimators: int = 500,
criterion: str = "gini",
n_classes: int = 10,
) -> None:
"""
Random forest sampling.
Note: this class makes use of sklearn.ensemble.RandomForestClassifier.
Args:
batch_size: the number of points sampled every time the sampler is called
random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
max_deduplication_passes: the maximum number of deduplication passes
candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
n_estimators: number of trees in the forest
criterion: the function to measure the quality of a split.
n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
of the distribution of loss values.
"""
_assert(
n_classes > 2,
"'n_classes' should be at least 2 to provide meaningful results",
)
super().__init__(
batch_size, random_state, max_deduplication_passes, candidate_pool_size
)
self._n_estimators = n_estimators
self._criterion = criterion
self._n_classes = n_classes
self._classifier: Optional[RandomForestClassifier] = None
fit(self, X, y)
Fit a random forest surrogate model.
Source code in black_it/samplers/random_forest.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
"""Fit a random forest surrogate model."""
# Train surrogate
X, y_cat, _existing_points_quantiles = self.prepare_data_for_classifier(
X, y, self.n_classes
)
self._classifier = RandomForestClassifier(
n_estimators=self.n_estimators,
criterion=self.criterion,
n_jobs=-1,
random_state=self._get_random_seed(),
)
self._classifier.fit(X, y_cat)
predict(self, X)
Predict using a random forest surrogate model.
Source code in black_it/samplers/random_forest.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
"""Predict using a random forest surrogate model."""
# Predict quantiles
self._classifier = cast(RandomForestClassifier, self._classifier)
predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)
return predicted_points_quantiles
prepare_data_for_classifier(existing_points, existing_losses, num_bins)
staticmethod
Prepare data for the classifier.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
existing_points |
ndarray |
the parameters already sampled |
required |
existing_losses |
ndarray |
the loss corresponding to the sampled parameters |
required |
num_bins |
int |
the number of bins |
required |
Returns:
Type | Description |
---|---|
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
A triple (x, y, quantiles), where - x is the vector of training data - y is the vector of targets - the quantiles |
Source code in black_it/samplers/random_forest.py
@staticmethod
def prepare_data_for_classifier(
existing_points: NDArray[np.float64],
existing_losses: NDArray[np.float64],
num_bins: int,
) -> Tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
"""
Prepare data for the classifier.
Args:
existing_points: the parameters already sampled
existing_losses: the loss corresponding to the sampled parameters
num_bins: the number of bins
Returns:
A triple (x, y, quantiles), where
- x is the vector of training data
- y is the vector of targets
- the quantiles
"""
x: NDArray[np.float64] = existing_points
y: NDArray[np.float64] = existing_losses
cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)
for i in range(num_bins - 1):
quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])
quantiles[-1] = np.max(y)
y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
y_cat = y_cat - 1
return x, y_cat, quantiles