Skip to content

Samplers

black_it.samplers.base.BaseSampler (ABC)

BaseSampler interface.

This is the base class for all samplers.

Source code in black_it/samplers/base.py
class BaseSampler(ABC):
    """
    BaseSampler interface.

    This is the base class for all samplers.
    """

    def __init__(
        self,
        batch_size: int,
        random_state: Optional[int] = None,
        max_deduplication_passes: int = 5,
    ) -> None:
        """
        Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
            max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
        """
        self.random_state: Optional[int] = random_state
        self.batch_size: int = batch_size
        self.max_deduplication_passes = max_deduplication_passes

    @property
    def random_state(self) -> Optional[int]:
        """Get the random state."""
        return self._random_state

    @random_state.setter
    def random_state(self, random_state: Optional[int]) -> None:
        """Set the random state."""
        self._random_state = random_state
        self._random_generator = default_rng(self.random_state)

    @property
    def random_generator(self) -> np.random.Generator:
        """Get the random generator."""
        return self._random_generator

    def _get_random_seed(self) -> int:
        """Get new random seed from the current random generator."""
        return get_random_seed(self._random_generator)

    @abstractmethod
    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """
        Sample a number of new parameters fixed by the 'batch_size' attribute.

        Args:
            batch_size: number of samples to collect
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the new parameters
        """

    def sample(
        self,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """
        Sample from the search space.

        Args:
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the sampled parameters
        """
        samples = self.sample_batch(
            self.batch_size, search_space, existing_points, existing_losses
        )

        for n in range(self.max_deduplication_passes):

            duplicates = self.find_and_get_duplicates(samples, existing_points)

            num_duplicates = len(duplicates)

            if num_duplicates == 0:
                break

            new_samples = self.sample_batch(
                num_duplicates, search_space, existing_points, existing_losses
            )
            samples[duplicates] = new_samples

            if n == self.max_deduplication_passes - 1:
                print(
                    f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
                    " This is probably due to a small search space."
                )

        return samples

    @staticmethod
    def find_and_get_duplicates(
        new_points: NDArray[np.float64], existing_points: NDArray[np.float64]
    ) -> List:
        """Find the points in 'new_points' that are already present in 'existing_points'.

        Args:
            new_points: candidates points for the sampler
            existing_points: previously sampled points

        Returns:
            the location of the duplicates in 'new_points'
        """
        all_points = np.concatenate((existing_points, new_points))
        unq, count = np.unique(all_points, axis=0, return_counts=True)
        repeated_groups = unq[count > 1]

        repeated_pos = []
        if len(repeated_groups) > 0:
            for repeated_group in repeated_groups:
                repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
                for index in repeated_idx:
                    repeated_pos.append(index[0])

        return repeated_pos

random_generator: Generator property readonly

Get the random generator.

random_state: Optional[int] property writable

Get the random state.

__init__(self, batch_size, random_state=None, max_deduplication_passes=5) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state Optional[int]

the internal state of the sampler, fixing this numbers the sampler behaves deterministically

None
max_deduplication_passes int

maximum number of duplication passes done to avoid sampling repeated parameters

5
Source code in black_it/samplers/base.py
def __init__(
    self,
    batch_size: int,
    random_state: Optional[int] = None,
    max_deduplication_passes: int = 5,
) -> None:
    """
    Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
        max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
    """
    self.random_state: Optional[int] = random_state
    self.batch_size: int = batch_size
    self.max_deduplication_passes = max_deduplication_passes

find_and_get_duplicates(new_points, existing_points) staticmethod

Find the points in 'new_points' that are already present in 'existing_points'.

Parameters:

Name Type Description Default
new_points ndarray

candidates points for the sampler

required
existing_points ndarray

previously sampled points

required

Returns:

Type Description
List

the location of the duplicates in 'new_points'

Source code in black_it/samplers/base.py
@staticmethod
def find_and_get_duplicates(
    new_points: NDArray[np.float64], existing_points: NDArray[np.float64]
) -> List:
    """Find the points in 'new_points' that are already present in 'existing_points'.

    Args:
        new_points: candidates points for the sampler
        existing_points: previously sampled points

    Returns:
        the location of the duplicates in 'new_points'
    """
    all_points = np.concatenate((existing_points, new_points))
    unq, count = np.unique(all_points, axis=0, return_counts=True)
    repeated_groups = unq[count > 1]

    repeated_pos = []
    if len(repeated_groups) > 0:
        for repeated_group in repeated_groups:
            repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
            for index in repeated_idx:
                repeated_pos.append(index[0])

    return repeated_pos

sample(self, search_space, existing_points, existing_losses)

Sample from the search space.

Parameters:

Name Type Description Default
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled

required
existing_losses ndarray

the loss corresponding to the sampled parameters

required

Returns:

Type Description
ndarray

the sampled parameters

Source code in black_it/samplers/base.py
def sample(
    self,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """
    Sample from the search space.

    Args:
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the sampled parameters
    """
    samples = self.sample_batch(
        self.batch_size, search_space, existing_points, existing_losses
    )

    for n in range(self.max_deduplication_passes):

        duplicates = self.find_and_get_duplicates(samples, existing_points)

        num_duplicates = len(duplicates)

        if num_duplicates == 0:
            break

        new_samples = self.sample_batch(
            num_duplicates, search_space, existing_points, existing_losses
        )
        samples[duplicates] = new_samples

        if n == self.max_deduplication_passes - 1:
            print(
                f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
                " This is probably due to a small search space."
            )

    return samples

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample a number of new parameters fixed by the 'batch_size' attribute.

Parameters:

Name Type Description Default
batch_size int

number of samples to collect

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled

required
existing_losses ndarray

the loss corresponding to the sampled parameters

required

Returns:

Type Description
ndarray

the new parameters

Source code in black_it/samplers/base.py
@abstractmethod
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """
    Sample a number of new parameters fixed by the 'batch_size' attribute.

    Args:
        batch_size: number of samples to collect
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the new parameters
    """

black_it.samplers.random_uniform.RandomUniformSampler (BaseSampler)

Random uniform sampling.

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample uniformly from the search space.

Parameters:

Name Type Description Default
batch_size int

the number of points to sample

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled

required
existing_losses ndarray

the loss corresponding to the sampled parameters

required

Returns:

Type Description
ndarray

the sampled parameters (an array of shape (self.batch_size, search_space.dims))

black_it.samplers.halton.HaltonSampler (BaseSampler)

Halton low discrepancy sequence.

This snippet implements the Halton sequence following the generalization of a sequence of Van der Corput in n-dimensions.

random_state: Optional[int] property writable

Get the random state.

__init__(self, batch_size, random_state=None, max_deduplication_passes=5) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state Optional[int]

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of sample deduplication passes.

5

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample points using Halton sequence.

Parameters:

Name Type Description Default
batch_size int

the number of samples

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled (not used)

required
existing_losses ndarray

the loss corresponding to the sampled parameters (not used)

required

Returns:

Type Description
ndarray

the parameter sampled

black_it.samplers.r_sequence.RSequenceSampler (BaseSampler)

The R-sequence sampler.

Source code in black_it/samplers/r_sequence.py
class RSequenceSampler(BaseSampler):
    """The R-sequence sampler."""

    def __init__(
        self,
        batch_size: int,
        random_state: Optional[int] = None,
        max_deduplication_passes: int = 5,
    ) -> None:
        """
        Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
                after every batch sampling. Default: 0, i.e. no deduplication happens.
        """
        super().__init__(batch_size, random_state, max_deduplication_passes)

        self._sequence_index: int
        self._sequence_start: float
        self._reset()

    @classmethod
    def compute_phi(cls, nb_dims: int) -> float:
        """
        Get an approximation of phi^nb_dims.

        Args:
            nb_dims: the number of dimensions.

        Returns:
            phi^nb_dims
        """
        check_arg(1 <= nb_dims, f"nb_dims should be greater than 0, got {nb_dims}")
        phi: float = 2.0
        old_phi = None
        while old_phi != phi:
            old_phi = phi
            phi = pow(1 + phi, 1.0 / (nb_dims + 1))
        return phi

    @property
    def random_state(self) -> Optional[int]:
        """Get the random state."""
        return self._random_state

    @random_state.setter
    def random_state(self, random_state: Optional[int]) -> None:
        """Set the random state."""
        self._random_state = random_state
        self._random_generator = default_rng(self.random_state)
        self._reset()

    def _reset(self) -> None:
        """Reset the index of the sequence."""
        self._sequence_index = self.random_generator.integers(
            _MIN_SEQUENCE_START_INDEX, _MAX_SEQUENCE_START_INDEX
        )
        self._sequence_start = self.random_generator.random()

    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """
        Sample points using the R-sequence.

        Args:
            batch_size: the number of samples
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled (not used)
            existing_losses: the loss corresponding to the sampled parameters (not used)

        Returns:
            the parameter sampled
        """
        unit_cube_points: NDArray[np.float64] = self._r_sequence(
            batch_size, search_space.dims
        )
        p_bounds: NDArray[np.float64] = search_space.parameters_bounds
        sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
        return digitize_data(sampled_points, search_space.param_grid)

    def _r_sequence(self, nb_samples: int, dims: int) -> NDArray[np.float64]:
        """
        Compute the R-sequence (http://extremelearning.com.au/unreasonable-effectiveness-of-quasirandom-sequences/).

        Args:
            nb_samples: number of points to sample
            dims: the number of dimensions

        Returns:
            Set of params uniformly placed in d-dimensional unit cube.
        """
        phi = self.compute_phi(dims)
        alpha: NDArray[np.float64] = np.power(1 / phi, np.arange(1, dims + 1)).reshape(
            (1, -1)
        )
        end_index = self._sequence_index + nb_samples
        indexes = np.arange(self._sequence_index, end_index).reshape((-1, 1))
        points: NDArray[np.float64] = (self._sequence_start + indexes.dot(alpha)) % 1
        self._sequence_index = end_index
        return points

random_state: Optional[int] property writable

Get the random state.

__init__(self, batch_size, random_state=None, max_deduplication_passes=5) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state Optional[int]

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

(non-negative integer) the maximum number of deduplication passes that are made after every batch sampling. Default: 0, i.e. no deduplication happens.

5
Source code in black_it/samplers/r_sequence.py
def __init__(
    self,
    batch_size: int,
    random_state: Optional[int] = None,
    max_deduplication_passes: int = 5,
) -> None:
    """
    Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
            after every batch sampling. Default: 0, i.e. no deduplication happens.
    """
    super().__init__(batch_size, random_state, max_deduplication_passes)

    self._sequence_index: int
    self._sequence_start: float
    self._reset()

compute_phi(nb_dims) classmethod

Get an approximation of phi^nb_dims.

Parameters:

Name Type Description Default
nb_dims int

the number of dimensions.

required

Returns:

Type Description
float

phi^nb_dims

Source code in black_it/samplers/r_sequence.py
@classmethod
def compute_phi(cls, nb_dims: int) -> float:
    """
    Get an approximation of phi^nb_dims.

    Args:
        nb_dims: the number of dimensions.

    Returns:
        phi^nb_dims
    """
    check_arg(1 <= nb_dims, f"nb_dims should be greater than 0, got {nb_dims}")
    phi: float = 2.0
    old_phi = None
    while old_phi != phi:
        old_phi = phi
        phi = pow(1 + phi, 1.0 / (nb_dims + 1))
    return phi

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample points using the R-sequence.

Parameters:

Name Type Description Default
batch_size int

the number of samples

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled (not used)

required
existing_losses ndarray

the loss corresponding to the sampled parameters (not used)

required

Returns:

Type Description
ndarray

the parameter sampled

Source code in black_it/samplers/r_sequence.py
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """
    Sample points using the R-sequence.

    Args:
        batch_size: the number of samples
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled (not used)
        existing_losses: the loss corresponding to the sampled parameters (not used)

    Returns:
        the parameter sampled
    """
    unit_cube_points: NDArray[np.float64] = self._r_sequence(
        batch_size, search_space.dims
    )
    p_bounds: NDArray[np.float64] = search_space.parameters_bounds
    sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
    return digitize_data(sampled_points, search_space.param_grid)

black_it.samplers.best_batch.BestBatchSampler (BaseSampler)

This class implements the best-batch sampler.

The sampler is a very essential type of genetic algorithm that takes the parameters corresponding to the current lowest loss values and perturbs them slightly in a purely random fashion. The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.

Source code in black_it/samplers/best_batch.py
class BestBatchSampler(BaseSampler):
    """This class implements the best-batch sampler.

    The sampler is a very essential type of genetic algorithm that takes the parameters corresponding
      to the current lowest loss values and perturbs them slightly in a purely random fashion.
    The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution
      BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects
      that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.

    """

    def __init__(
        self,
        batch_size: int,
        random_state: Optional[int] = None,
        max_deduplication_passes: int = 5,
        a: float = 3.0,
        b: float = 1.0,
        perturbation_range: int = 6,
    ):
        """
        Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes that are made
            a: the a parameter of the beta-binomial distribution
            b: the b parameter of the beta-binomial distribution
            perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
                plus/minus the perturbation_range times the precision of the specific parameter coordinate
        """
        _assert(
            a > 0.0,
            "'a' should be greater than zero",
        )
        _assert(
            b > 0.0,
            "'b' should be greater than zero",
        )
        _assert(
            perturbation_range > 1,
            "'perturbation_range' should be greater than one",
        )

        super().__init__(batch_size, random_state, max_deduplication_passes)
        self.a = a
        self.b = b
        self.perturbation_range = perturbation_range

    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """
        Sample from the search space using a genetic algorithm.

        Args:
            batch_size: the number of points to sample
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
        """
        if len(existing_points) < batch_size:
            raise ValueError(
                "best-batch sampler requires a number of existing points "
                f"which is at least the batch size {batch_size}, "
                f"got {len(existing_points)}"
            )

        # sort existing params
        candidate_points: NDArray[np.float64] = existing_points[
            np.argsort(existing_losses)
        ][:batch_size, :]

        candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
            0, batch_size, size=batch_size
        )
        sampled_points: NDArray[np.float64] = np.copy(
            candidate_points[candidate_point_indexes]
        )

        beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
        beta_binom_rv.random_state = self.random_generator

        for sampled_point in sampled_points:
            num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
            params_shocked: NDArray[np.int64] = self.random_generator.choice(
                search_space.dims, tuple(num_shocks), replace=False
            )
            for index in params_shocked:
                shock_size: int = self.random_generator.integers(
                    1, self.perturbation_range
                )
                shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1

                delta: float = search_space.parameters_precision[index]
                shift: float = delta * shock_sign * shock_size
                sampled_point[index] += shift

                sampled_point[index] = np.clip(
                    sampled_point[index],
                    search_space.parameters_bounds[0][index],
                    search_space.parameters_bounds[1][index],
                )

        return sampled_points

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, a=3.0, b=1.0, perturbation_range=6) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state Optional[int]

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes that are made

5
a float

the a parameter of the beta-binomial distribution

3.0
b float

the b parameter of the beta-binomial distribution

1.0
perturbation_range int

the range of the perturbation applied. The actual perturbation will be in the range plus/minus the perturbation_range times the precision of the specific parameter coordinate

6
Source code in black_it/samplers/best_batch.py
def __init__(
    self,
    batch_size: int,
    random_state: Optional[int] = None,
    max_deduplication_passes: int = 5,
    a: float = 3.0,
    b: float = 1.0,
    perturbation_range: int = 6,
):
    """
    Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes that are made
        a: the a parameter of the beta-binomial distribution
        b: the b parameter of the beta-binomial distribution
        perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
            plus/minus the perturbation_range times the precision of the specific parameter coordinate
    """
    _assert(
        a > 0.0,
        "'a' should be greater than zero",
    )
    _assert(
        b > 0.0,
        "'b' should be greater than zero",
    )
    _assert(
        perturbation_range > 1,
        "'perturbation_range' should be greater than one",
    )

    super().__init__(batch_size, random_state, max_deduplication_passes)
    self.a = a
    self.b = b
    self.perturbation_range = perturbation_range

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample from the search space using a genetic algorithm.

Parameters:

Name Type Description Default
batch_size int

the number of points to sample

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled

required
existing_losses ndarray

the loss corresponding to the sampled parameters

required

Returns:

Type Description
ndarray

the sampled parameters (an array of shape (self.batch_size, search_space.dims))

Source code in black_it/samplers/best_batch.py
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """
    Sample from the search space using a genetic algorithm.

    Args:
        batch_size: the number of points to sample
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
    """
    if len(existing_points) < batch_size:
        raise ValueError(
            "best-batch sampler requires a number of existing points "
            f"which is at least the batch size {batch_size}, "
            f"got {len(existing_points)}"
        )

    # sort existing params
    candidate_points: NDArray[np.float64] = existing_points[
        np.argsort(existing_losses)
    ][:batch_size, :]

    candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
        0, batch_size, size=batch_size
    )
    sampled_points: NDArray[np.float64] = np.copy(
        candidate_points[candidate_point_indexes]
    )

    beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
    beta_binom_rv.random_state = self.random_generator

    for sampled_point in sampled_points:
        num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
        params_shocked: NDArray[np.int64] = self.random_generator.choice(
            search_space.dims, tuple(num_shocks), replace=False
        )
        for index in params_shocked:
            shock_size: int = self.random_generator.integers(
                1, self.perturbation_range
            )
            shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1

            delta: float = search_space.parameters_precision[index]
            shift: float = delta * shock_sign * shock_size
            sampled_point[index] += shift

            sampled_point[index] = np.clip(
                sampled_point[index],
                search_space.parameters_bounds[0][index],
                search_space.parameters_bounds[1][index],
            )

    return sampled_points

black_it.samplers.gaussian_process.GaussianProcessSampler (MLSurrogateSampler)

This class implements the Gaussian process-based sampler.

In particular, the sampling is based on a Gaussian Process interpolation of the loss function.

Note: this class is a wrapper of the GPRegression model of the GPy package.

Source code in black_it/samplers/gaussian_process.py
class GaussianProcessSampler(MLSurrogateSampler):
    """
    This class implements the Gaussian process-based sampler.

    In particular, the sampling is based on a Gaussian Process interpolation of the loss function.

    Note: this class is a wrapper of the GPRegression model of the GPy package.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        batch_size: int,
        random_state: Optional[int] = None,
        max_deduplication_passes: int = 5,
        candidate_pool_size: Optional[int] = None,
        max_iters: int = 1000,
        optimize_restarts: int = 5,
        acquisition: str = "expected_improvement",
    ):
        """
        Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes that are made
            candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
            max_iters: maximum number of iteration in the optimization of the GP hyperparameters
            optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
            acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
        """
        self._validate_acquisition(acquisition)

        super().__init__(
            batch_size, random_state, max_deduplication_passes, candidate_pool_size
        )
        self.max_iters = max_iters
        self.optimize_restarts = optimize_restarts
        self.acquisition = acquisition
        self._gpmodel: Optional[GPRegression] = None

    @staticmethod
    def _validate_acquisition(acquisition: str) -> None:
        """
        Check that the required acquisition is among the supported ones.

        Args:
            acquisition: the acquisition provided as input of the constructor.

        Raises
            ValueError: if the provided acquisition type is not among the allowed ones.
        """
        try:
            _AcquisitionTypes(acquisition)
        except ValueError as e:
            raise ValueError(
                "expected one of the following acquisition types: "
                f"[{' '.join(map(str, _AcquisitionTypes))}], "
                f"got {acquisition}"
            ) from e

    def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
        """Fit a gaussian process surrogate model."""
        y = np.atleast_2d(y).T

        if X.shape[0] > 500:
            warnings.warn(
                "Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
                RuntimeWarning,
            )

        # initialize GP class from GPy with a Matern kernel by default
        dims = X.shape[1]
        kern = GPy.kern.Matern52(dims, variance=1.0, ARD=False)
        noise_var = y.var() * 0.01

        self._gpmodel = GPRegression(
            X, y, kernel=kern, noise_var=noise_var, mean_function=None
        )

        # Make sure we do not get ridiculously small residual noise variance
        self._gpmodel.Gaussian_noise.constrain_bounded(
            1e-9, 1e6, warning=False
        )  # constrain_positive(warning=False)

        # we need to set the seed globally for GPy optimisations
        # to give reproducible results
        np.random.seed(self._get_random_seed())
        random.seed(self._get_random_seed())
        if self.max_iters > 0:
            # --- update the model maximizing the marginal likelihood.
            if self.optimize_restarts == 1:
                self._gpmodel.optimize(
                    optimizer="bfgs",
                    max_iters=self.max_iters,
                    messages=False,
                    ipython_notebook=False,
                )
            else:
                self._gpmodel.optimize_restarts(
                    num_restarts=self.optimize_restarts,
                    optimizer="bfgs",
                    max_iters=self.max_iters,
                    verbose=False,
                )

    def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
        """Predict using a gaussian process surrogate model."""
        # predict mean or expected improvement on the full sample set
        if self.acquisition == _AcquisitionTypes.EI.value:
            # minus sign needed for subsequent sorting
            candidates_score = -self._predict_EI(X)[:, 0]
        else:  # acquisition is "mean"
            candidates_score = self._predict_mean_std(X)[0][:, 0]

        return candidates_score

    def _predict_mean_std(
        self, X: NDArray[np.float64]
    ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
        """
        Predict mean and standard deviation of a fitted GP.

        Args:
            X: the points on which the predictions should be performed

        Returns:
            The pair (mean, std).
        """
        gpmodel = cast(GPRegression, self._gpmodel)
        X = X[None, :] if X.ndim == 1 else X
        m, v = gpmodel.predict(X, full_cov=False, include_likelihood=True)
        v = np.clip(v, 1e-10, np.inf)
        return m, np.sqrt(v)

    def _get_fmin(self) -> float:
        """Return the location where the posterior mean is takes its minimal value."""
        gpmodel = cast(GPRegression, self._gpmodel)
        return gpmodel.predict(gpmodel.X)[0].min()

    def _predict_EI(
        self, X: NDArray[np.float64], jitter: float = 0.1
    ) -> NDArray[np.float64]:
        """
        Compute the Expected Improvement per unit of cost.

        Args:
            X:  the points on which the predictions should be performed
            jitter: positive value to make the acquisition more explorative.

        Returns:
            the expected improvement.
        """
        m, s = self._predict_mean_std(X)

        fmin = self._get_fmin()

        phi, Phi, u = self.get_quantiles(jitter, fmin, m, s)

        f_acqu = s * (u * Phi + phi)

        return f_acqu

    @staticmethod
    def get_quantiles(
        acquisition_par: float,
        fmin: float,
        m: NDArray[np.float64],
        s: NDArray[np.float64],
    ) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
        """
        Quantiles of the Gaussian distribution useful to determine the acquisition function values.

        Args:
            acquisition_par: parameter of the acquisition function
            fmin: current minimum.
            m: vector of means.
            s: vector of standard deviations.

        Returns:
            the quantiles.
        """
        # remove values of variance that are too small
        s[s < 1e-10] = 1e-10

        u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
        phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
        Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2))

        return phi, Phi, u

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, max_iters=1000, optimize_restarts=5, acquisition='expected_improvement') special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state Optional[int]

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes that are made

5
candidate_pool_size Optional[int]

number of randomly sampled points on which the random forest predictions are evaluated

None
max_iters int

maximum number of iteration in the optimization of the GP hyperparameters

1000
optimize_restarts int

number of independent random trials of the optimization of the GP hyperparameters

5
acquisition str

type of acquisition function, it can be 'expected_improvement' of simply 'mean'

'expected_improvement'
Source code in black_it/samplers/gaussian_process.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    batch_size: int,
    random_state: Optional[int] = None,
    max_deduplication_passes: int = 5,
    candidate_pool_size: Optional[int] = None,
    max_iters: int = 1000,
    optimize_restarts: int = 5,
    acquisition: str = "expected_improvement",
):
    """
    Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes that are made
        candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
        max_iters: maximum number of iteration in the optimization of the GP hyperparameters
        optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
        acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
    """
    self._validate_acquisition(acquisition)

    super().__init__(
        batch_size, random_state, max_deduplication_passes, candidate_pool_size
    )
    self.max_iters = max_iters
    self.optimize_restarts = optimize_restarts
    self.acquisition = acquisition
    self._gpmodel: Optional[GPRegression] = None

fit(self, X, y)

Fit a gaussian process surrogate model.

Source code in black_it/samplers/gaussian_process.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
    """Fit a gaussian process surrogate model."""
    y = np.atleast_2d(y).T

    if X.shape[0] > 500:
        warnings.warn(
            "Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
            RuntimeWarning,
        )

    # initialize GP class from GPy with a Matern kernel by default
    dims = X.shape[1]
    kern = GPy.kern.Matern52(dims, variance=1.0, ARD=False)
    noise_var = y.var() * 0.01

    self._gpmodel = GPRegression(
        X, y, kernel=kern, noise_var=noise_var, mean_function=None
    )

    # Make sure we do not get ridiculously small residual noise variance
    self._gpmodel.Gaussian_noise.constrain_bounded(
        1e-9, 1e6, warning=False
    )  # constrain_positive(warning=False)

    # we need to set the seed globally for GPy optimisations
    # to give reproducible results
    np.random.seed(self._get_random_seed())
    random.seed(self._get_random_seed())
    if self.max_iters > 0:
        # --- update the model maximizing the marginal likelihood.
        if self.optimize_restarts == 1:
            self._gpmodel.optimize(
                optimizer="bfgs",
                max_iters=self.max_iters,
                messages=False,
                ipython_notebook=False,
            )
        else:
            self._gpmodel.optimize_restarts(
                num_restarts=self.optimize_restarts,
                optimizer="bfgs",
                max_iters=self.max_iters,
                verbose=False,
            )

get_quantiles(acquisition_par, fmin, m, s) staticmethod

Quantiles of the Gaussian distribution useful to determine the acquisition function values.

Parameters:

Name Type Description Default
acquisition_par float

parameter of the acquisition function

required
fmin float

current minimum.

required
m ndarray

vector of means.

required
s ndarray

vector of standard deviations.

required

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

the quantiles.

Source code in black_it/samplers/gaussian_process.py
@staticmethod
def get_quantiles(
    acquisition_par: float,
    fmin: float,
    m: NDArray[np.float64],
    s: NDArray[np.float64],
) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
    """
    Quantiles of the Gaussian distribution useful to determine the acquisition function values.

    Args:
        acquisition_par: parameter of the acquisition function
        fmin: current minimum.
        m: vector of means.
        s: vector of standard deviations.

    Returns:
        the quantiles.
    """
    # remove values of variance that are too small
    s[s < 1e-10] = 1e-10

    u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
    phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
    Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2))

    return phi, Phi, u

predict(self, X)

Predict using a gaussian process surrogate model.

Source code in black_it/samplers/gaussian_process.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
    """Predict using a gaussian process surrogate model."""
    # predict mean or expected improvement on the full sample set
    if self.acquisition == _AcquisitionTypes.EI.value:
        # minus sign needed for subsequent sorting
        candidates_score = -self._predict_EI(X)[:, 0]
    else:  # acquisition is "mean"
        candidates_score = self._predict_mean_std(X)[0][:, 0]

    return candidates_score

black_it.samplers.random_forest.RandomForestSampler (MLSurrogateSampler)

This class implements random forest sampling.

Source code in black_it/samplers/random_forest.py
class RandomForestSampler(MLSurrogateSampler):
    """This class implements random forest sampling."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        batch_size: int,
        random_state: Optional[int] = None,
        max_deduplication_passes: int = 5,
        candidate_pool_size: Optional[int] = None,
        n_estimators: int = 500,
        criterion: str = "gini",
        n_classes: int = 10,
    ) -> None:
        """
        Random forest sampling.

        Note: this class makes use of sklearn.ensemble.RandomForestClassifier.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes
            candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
            n_estimators: number of trees in the forest
            criterion: the function to measure the quality of a split.
            n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
                of the distribution of loss values.
        """
        _assert(
            n_classes > 2,
            "'n_classes' should be at least 2 to provide meaningful results",
        )

        super().__init__(
            batch_size, random_state, max_deduplication_passes, candidate_pool_size
        )

        self._n_estimators = n_estimators
        self._criterion = criterion
        self._n_classes = n_classes
        self._classifier: Optional[RandomForestClassifier] = None

    @property
    def n_estimators(self) -> int:
        """Get the number of estimators."""
        return self._n_estimators

    @property
    def criterion(self) -> str:
        """Get the criterion."""
        return self._criterion

    @property
    def n_classes(self) -> int:
        """Get the number of classes."""
        return self._n_classes

    def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
        """Fit a random forest surrogate model."""
        # Train surrogate

        X, y_cat, _existing_points_quantiles = self.prepare_data_for_classifier(
            X, y, self.n_classes
        )

        self._classifier = RandomForestClassifier(
            n_estimators=self.n_estimators,
            criterion=self.criterion,
            n_jobs=-1,
            random_state=self._get_random_seed(),
        )
        self._classifier.fit(X, y_cat)

    def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
        """Predict using a random forest surrogate model."""
        # Predict quantiles
        self._classifier = cast(RandomForestClassifier, self._classifier)
        predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)

        return predicted_points_quantiles

    @staticmethod
    def prepare_data_for_classifier(
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
        num_bins: int,
    ) -> Tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
        """
        Prepare data for the classifier.

        Args:
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters
            num_bins: the number of bins

        Returns:
            A triple (x, y, quantiles), where
                - x is the vector of training data
                - y is the vector of targets
                - the quantiles
        """
        x: NDArray[np.float64] = existing_points
        y: NDArray[np.float64] = existing_losses

        cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
        quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)

        for i in range(num_bins - 1):
            quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])

        quantiles[-1] = np.max(y)

        y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
        y_cat = y_cat - 1

        return x, y_cat, quantiles

criterion: str property readonly

Get the criterion.

n_classes: int property readonly

Get the number of classes.

n_estimators: int property readonly

Get the number of estimators.

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, n_estimators=500, criterion='gini', n_classes=10) special

Random forest sampling.

Note: this class makes use of sklearn.ensemble.RandomForestClassifier.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state Optional[int]

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes

5
candidate_pool_size Optional[int]

number of randomly sampled points on which the random forest predictions are evaluated

None
n_estimators int

number of trees in the forest

500
criterion str

the function to measure the quality of a split.

'gini'
n_classes int

the number of classes used in the random forest. The classes are selected as the quantiles of the distribution of loss values.

10
Source code in black_it/samplers/random_forest.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    batch_size: int,
    random_state: Optional[int] = None,
    max_deduplication_passes: int = 5,
    candidate_pool_size: Optional[int] = None,
    n_estimators: int = 500,
    criterion: str = "gini",
    n_classes: int = 10,
) -> None:
    """
    Random forest sampling.

    Note: this class makes use of sklearn.ensemble.RandomForestClassifier.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes
        candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
        n_estimators: number of trees in the forest
        criterion: the function to measure the quality of a split.
        n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
            of the distribution of loss values.
    """
    _assert(
        n_classes > 2,
        "'n_classes' should be at least 2 to provide meaningful results",
    )

    super().__init__(
        batch_size, random_state, max_deduplication_passes, candidate_pool_size
    )

    self._n_estimators = n_estimators
    self._criterion = criterion
    self._n_classes = n_classes
    self._classifier: Optional[RandomForestClassifier] = None

fit(self, X, y)

Fit a random forest surrogate model.

Source code in black_it/samplers/random_forest.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:
    """Fit a random forest surrogate model."""
    # Train surrogate

    X, y_cat, _existing_points_quantiles = self.prepare_data_for_classifier(
        X, y, self.n_classes
    )

    self._classifier = RandomForestClassifier(
        n_estimators=self.n_estimators,
        criterion=self.criterion,
        n_jobs=-1,
        random_state=self._get_random_seed(),
    )
    self._classifier.fit(X, y_cat)

predict(self, X)

Predict using a random forest surrogate model.

Source code in black_it/samplers/random_forest.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:
    """Predict using a random forest surrogate model."""
    # Predict quantiles
    self._classifier = cast(RandomForestClassifier, self._classifier)
    predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)

    return predicted_points_quantiles

prepare_data_for_classifier(existing_points, existing_losses, num_bins) staticmethod

Prepare data for the classifier.

Parameters:

Name Type Description Default
existing_points ndarray

the parameters already sampled

required
existing_losses ndarray

the loss corresponding to the sampled parameters

required
num_bins int

the number of bins

required

Returns:

Type Description
Tuple[numpy.ndarray[Any, numpy.dtype[numpy.float64]], numpy.ndarray[Any, numpy.dtype[numpy.int64]], numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

A triple (x, y, quantiles), where - x is the vector of training data - y is the vector of targets - the quantiles

Source code in black_it/samplers/random_forest.py
@staticmethod
def prepare_data_for_classifier(
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
    num_bins: int,
) -> Tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
    """
    Prepare data for the classifier.

    Args:
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters
        num_bins: the number of bins

    Returns:
        A triple (x, y, quantiles), where
            - x is the vector of training data
            - y is the vector of targets
            - the quantiles
    """
    x: NDArray[np.float64] = existing_points
    y: NDArray[np.float64] = existing_losses

    cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
    quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)

    for i in range(num_bins - 1):
        quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])

    quantiles[-1] = np.max(y)

    y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
    y_cat = y_cat - 1

    return x, y_cat, quantiles