Skip to content

Samplers

BaseSampler interface.

This is the base class for all samplers.

Source code in black_it/samplers/base.py
class BaseSampler(BaseSeedable, ABC):
    """BaseSampler interface.

    This is the base class for all samplers.
    """

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
    ) -> None:
        """Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
            max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
        """
        BaseSeedable.__init__(self, random_state=random_state)
        self.batch_size: int = batch_size
        self.max_deduplication_passes = max_deduplication_passes

    @abstractmethod
    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """Sample a number of new parameters fixed by the 'batch_size' attribute.

        Args:
            batch_size: number of samples to collect
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the new parameters
        """

    def sample(
        self,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """Sample from the search space.

        Args:
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the sampled parameters
        """
        samples = self.sample_batch(
            self.batch_size,
            search_space,
            existing_points,
            existing_losses,
        )

        for n in range(self.max_deduplication_passes):
            duplicates = self.find_and_get_duplicates(samples, existing_points)

            num_duplicates = len(duplicates)

            if num_duplicates == 0:
                break

            new_samples = self.sample_batch(
                num_duplicates,
                search_space,
                existing_points,
                existing_losses,
            )
            samples[duplicates] = new_samples

            if n == self.max_deduplication_passes - 1:
                print(
                    f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
                    " This is probably due to a small search space.",
                )

        return samples

    @staticmethod
    def find_and_get_duplicates(
        new_points: NDArray[np.float64],
        existing_points: NDArray[np.float64],
    ) -> list:
        """Find the points in 'new_points' that are already present in 'existing_points'.

        Args:
            new_points: candidates points for the sampler
            existing_points: previously sampled points

        Returns:
            the location of the duplicates in 'new_points'
        """
        all_points = np.concatenate((existing_points, new_points))
        unq, count = np.unique(all_points, axis=0, return_counts=True)
        repeated_groups = unq[count > 1]

        repeated_pos = []
        if len(repeated_groups) > 0:
            for repeated_group in repeated_groups:
                repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
                for index in repeated_idx:
                    repeated_pos.append(index[0])  # noqa: PERF401

        return repeated_pos

__init__(self, batch_size, random_state=None, max_deduplication_passes=5) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the internal state of the sampler, fixing this numbers the sampler behaves deterministically

None
max_deduplication_passes int

maximum number of duplication passes done to avoid sampling repeated parameters

5
Source code in black_it/samplers/base.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
) -> None:
    """Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the internal state of the sampler, fixing this numbers the sampler behaves deterministically
        max_deduplication_passes: maximum number of duplication passes done to avoid sampling repeated parameters
    """
    BaseSeedable.__init__(self, random_state=random_state)
    self.batch_size: int = batch_size
    self.max_deduplication_passes = max_deduplication_passes

find_and_get_duplicates(new_points, existing_points) staticmethod

Find the points in 'new_points' that are already present in 'existing_points'.

Parameters:

Name Type Description Default
new_points NDArray[np.float64]

candidates points for the sampler

required
existing_points NDArray[np.float64]

previously sampled points

required

Returns:

Type Description
list

the location of the duplicates in 'new_points'

Source code in black_it/samplers/base.py
@staticmethod
def find_and_get_duplicates(
    new_points: NDArray[np.float64],
    existing_points: NDArray[np.float64],
) -> list:
    """Find the points in 'new_points' that are already present in 'existing_points'.

    Args:
        new_points: candidates points for the sampler
        existing_points: previously sampled points

    Returns:
        the location of the duplicates in 'new_points'
    """
    all_points = np.concatenate((existing_points, new_points))
    unq, count = np.unique(all_points, axis=0, return_counts=True)
    repeated_groups = unq[count > 1]

    repeated_pos = []
    if len(repeated_groups) > 0:
        for repeated_group in repeated_groups:
            repeated_idx = np.argwhere(np.all(new_points == repeated_group, axis=1))
            for index in repeated_idx:
                repeated_pos.append(index[0])  # noqa: PERF401

    return repeated_pos

sample(self, search_space, existing_points, existing_losses)

Sample from the search space.

Parameters:

Name Type Description Default
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points NDArray[np.float64]

the parameters already sampled

required
existing_losses NDArray[np.float64]

the loss corresponding to the sampled parameters

required

Returns:

Type Description
NDArray[np.float64]

the sampled parameters

Source code in black_it/samplers/base.py
def sample(
    self,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """Sample from the search space.

    Args:
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the sampled parameters
    """
    samples = self.sample_batch(
        self.batch_size,
        search_space,
        existing_points,
        existing_losses,
    )

    for n in range(self.max_deduplication_passes):
        duplicates = self.find_and_get_duplicates(samples, existing_points)

        num_duplicates = len(duplicates)

        if num_duplicates == 0:
            break

        new_samples = self.sample_batch(
            num_duplicates,
            search_space,
            existing_points,
            existing_losses,
        )
        samples[duplicates] = new_samples

        if n == self.max_deduplication_passes - 1:
            print(
                f"Warning: Repeated samples still found after {self.max_deduplication_passes} duplication passes."
                " This is probably due to a small search space.",
            )

    return samples

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample a number of new parameters fixed by the 'batch_size' attribute.

Parameters:

Name Type Description Default
batch_size int

number of samples to collect

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points NDArray[np.float64]

the parameters already sampled

required
existing_losses NDArray[np.float64]

the loss corresponding to the sampled parameters

required

Returns:

Type Description
NDArray[np.float64]

the new parameters

Source code in black_it/samplers/base.py
@abstractmethod
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """Sample a number of new parameters fixed by the 'batch_size' attribute.

    Args:
        batch_size: number of samples to collect
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the new parameters
    """

Random uniform sampling.

Source code in black_it/samplers/random_uniform.py
class RandomUniformSampler(BaseSampler):
    """Random uniform sampling."""

    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],  # noqa: ARG002
        existing_losses: NDArray[np.float64],  # noqa: ARG002
    ) -> NDArray[np.float64]:
        """Sample uniformly from the search space.

        Args:
            batch_size: the number of points to sample
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
        """
        candidates = np.zeros((batch_size, search_space.dims))
        for i, params in enumerate(search_space.param_grid):
            candidates[:, i] = self.random_generator.choice(params, size=(batch_size,))
        return candidates

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample uniformly from the search space.

Parameters:

Name Type Description Default
batch_size int

the number of points to sample

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points ndarray

the parameters already sampled

required
existing_losses ndarray

the loss corresponding to the sampled parameters

required

Returns:

Type Description
ndarray

the sampled parameters (an array of shape (self.batch_size, search_space.dims))

Source code in black_it/samplers/random_uniform.py
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],  # noqa: ARG002
    existing_losses: NDArray[np.float64],  # noqa: ARG002
) -> NDArray[np.float64]:
    """Sample uniformly from the search space.

    Args:
        batch_size: the number of points to sample
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
    """
    candidates = np.zeros((batch_size, search_space.dims))
    for i, params in enumerate(search_space.param_grid):
        candidates[:, i] = self.random_generator.choice(params, size=(batch_size,))
    return candidates

Halton low discrepancy sequence.

This snippet implements the Halton sequence following the generalization of a sequence of Van der Corput in n-dimensions.

Source code in black_it/samplers/halton.py
class HaltonSampler(BaseSampler):
    """Halton low discrepancy sequence.

    This snippet implements the Halton sequence following the generalization of
    a sequence of *Van der Corput* in n-dimensions.
    """

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
    ) -> None:
        """Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of sample deduplication passes.
        """
        super().__init__(batch_size, random_state, max_deduplication_passes)
        self._prime_number_generator = _CachedPrimesCalculator()

        # drop first N entries to avoid linear correlation
        self._reset_sequence_index()

    def _set_random_state(self, random_state: int | None) -> None:
        """Set the random state (private use).

        For the Halton sampler, it also resets the sequence index.

        Args:
            random_state: the random seed
        """
        super()._set_random_state(random_state)
        self._reset_sequence_index()

    def _reset_sequence_index(self) -> None:
        """Reset the sequence index pointer."""
        self._sequence_index = self.random_generator.integers(
            _MIN_SEQUENCE_START_INDEX,
            _MAX_SEQUENCE_START_INDEX,
        )

    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],  # noqa: ARG002
        existing_losses: NDArray[np.float64],  # noqa: ARG002
    ) -> NDArray[np.float64]:
        """Sample points using Halton sequence.

        Args:
            batch_size: the number of samples
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled (not used)
            existing_losses: the loss corresponding to the sampled parameters (not used)

        Returns:
            the parameter sampled
        """
        unit_cube_points: NDArray[np.float64] = self._halton(
            batch_size,
            search_space.dims,
        )
        p_bounds: NDArray[np.float64] = search_space.parameters_bounds
        sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
        return digitize_data(sampled_points, search_space.param_grid)

    def _halton(self, nb_samples: int, dims: int) -> NDArray[np.float64]:
        """Get a Halton sequence.

        It uses a simple prime number generator, which takes the first `dims` primes.

        Args:
            nb_samples: number of samples
            dims: the number of dimensions of the space to sample from the unitary cube

        Returns:
            sequence of Halton.
        """
        bases: NDArray[np.int64] = self._prime_number_generator.get_n_primes(dims)
        # Generate a sample using a Halton sequence.
        sample: NDArray[np.float64] = halton(
            sample_size=nb_samples,
            bases=bases,
            n_start=self._sequence_index,
        )

        # increment sequence start index for the next sampling
        self._sequence_index += nb_samples
        return sample

__init__(self, batch_size, random_state=None, max_deduplication_passes=5) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of sample deduplication passes.

5
Source code in black_it/samplers/halton.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
) -> None:
    """Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of sample deduplication passes.
    """
    super().__init__(batch_size, random_state, max_deduplication_passes)
    self._prime_number_generator = _CachedPrimesCalculator()

    # drop first N entries to avoid linear correlation
    self._reset_sequence_index()

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample points using Halton sequence.

Parameters:

Name Type Description Default
batch_size int

the number of samples

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points NDArray[np.float64]

the parameters already sampled (not used)

required
existing_losses NDArray[np.float64]

the loss corresponding to the sampled parameters (not used)

required

Returns:

Type Description
NDArray[np.float64]

the parameter sampled

Source code in black_it/samplers/halton.py
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],  # noqa: ARG002
    existing_losses: NDArray[np.float64],  # noqa: ARG002
) -> NDArray[np.float64]:
    """Sample points using Halton sequence.

    Args:
        batch_size: the number of samples
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled (not used)
        existing_losses: the loss corresponding to the sampled parameters (not used)

    Returns:
        the parameter sampled
    """
    unit_cube_points: NDArray[np.float64] = self._halton(
        batch_size,
        search_space.dims,
    )
    p_bounds: NDArray[np.float64] = search_space.parameters_bounds
    sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
    return digitize_data(sampled_points, search_space.param_grid)

The R-sequence sampler.

Source code in black_it/samplers/r_sequence.py
class RSequenceSampler(BaseSampler):
    """The R-sequence sampler."""

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
    ) -> None:
        """Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
                after every batch sampling. Default: 0, i.e. no deduplication happens.
        """
        super().__init__(batch_size, random_state, max_deduplication_passes)

        self._sequence_index: int
        self._sequence_start: float
        self._reset()

    @classmethod
    def compute_phi(cls, nb_dims: int) -> float:
        """Get an approximation of phi^nb_dims.

        Args:
            nb_dims: the number of dimensions.

        Returns:
            phi^nb_dims
        """
        check_arg(nb_dims >= 1, f"nb_dims should be greater than 0, got {nb_dims}")
        phi: float = 2.0
        old_phi = None
        while old_phi != phi:
            old_phi = phi
            phi = pow(1 + phi, 1.0 / (nb_dims + 1))
        return phi

    def _set_random_state(self, random_state: int | None) -> None:
        """Set the random state (private use).

        For the RSequence sampler, it also resets the sequence index and the sequence start.

        Args:
            random_state: the random seed
        """
        super()._set_random_state(random_state)
        self._reset()

    def _reset(self) -> None:
        """Reset the index of the sequence."""
        self._sequence_index = self.random_generator.integers(
            _MIN_SEQUENCE_START_INDEX,
            _MAX_SEQUENCE_START_INDEX,
        )
        self._sequence_start = self.random_generator.random()

    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],  # noqa: ARG002
        existing_losses: NDArray[np.float64],  # noqa: ARG002
    ) -> NDArray[np.float64]:
        """Sample points using the R-sequence.

        Args:
            batch_size: the number of samples
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled (not used)
            existing_losses: the loss corresponding to the sampled parameters (not used)

        Returns:
            the parameter sampled
        """
        unit_cube_points: NDArray[np.float64] = self._r_sequence(
            batch_size,
            search_space.dims,
        )
        p_bounds: NDArray[np.float64] = search_space.parameters_bounds
        sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
        return digitize_data(sampled_points, search_space.param_grid)

    def _r_sequence(self, nb_samples: int, dims: int) -> NDArray[np.float64]:
        """Compute the R-sequence (http://extremelearning.com.au/unreasonable-effectiveness-of-quasirandom-sequences/).

        Args:
            nb_samples: number of points to sample
            dims: the number of dimensions

        Returns:
            Set of params uniformly placed in d-dimensional unit cube.
        """
        phi = self.compute_phi(dims)
        alpha: NDArray[np.float64] = np.power(1 / phi, np.arange(1, dims + 1)).reshape(
            (1, -1),
        )
        end_index = self._sequence_index + nb_samples
        indexes = np.arange(self._sequence_index, end_index).reshape((-1, 1))
        points: NDArray[np.float64] = (self._sequence_start + indexes.dot(alpha)) % 1
        self._sequence_index = end_index
        return points

__init__(self, batch_size, random_state=None, max_deduplication_passes=5) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

(non-negative integer) the maximum number of deduplication passes that are made after every batch sampling. Default: 0, i.e. no deduplication happens.

5
Source code in black_it/samplers/r_sequence.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
) -> None:
    """Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: (non-negative integer) the maximum number of deduplication passes that are made
            after every batch sampling. Default: 0, i.e. no deduplication happens.
    """
    super().__init__(batch_size, random_state, max_deduplication_passes)

    self._sequence_index: int
    self._sequence_start: float
    self._reset()

compute_phi(nb_dims) classmethod

Get an approximation of phi^nb_dims.

Parameters:

Name Type Description Default
nb_dims int

the number of dimensions.

required

Returns:

Type Description
float

phi^nb_dims

Source code in black_it/samplers/r_sequence.py
@classmethod
def compute_phi(cls, nb_dims: int) -> float:
    """Get an approximation of phi^nb_dims.

    Args:
        nb_dims: the number of dimensions.

    Returns:
        phi^nb_dims
    """
    check_arg(nb_dims >= 1, f"nb_dims should be greater than 0, got {nb_dims}")
    phi: float = 2.0
    old_phi = None
    while old_phi != phi:
        old_phi = phi
        phi = pow(1 + phi, 1.0 / (nb_dims + 1))
    return phi

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample points using the R-sequence.

Parameters:

Name Type Description Default
batch_size int

the number of samples

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points NDArray[np.float64]

the parameters already sampled (not used)

required
existing_losses NDArray[np.float64]

the loss corresponding to the sampled parameters (not used)

required

Returns:

Type Description
NDArray[np.float64]

the parameter sampled

Source code in black_it/samplers/r_sequence.py
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],  # noqa: ARG002
    existing_losses: NDArray[np.float64],  # noqa: ARG002
) -> NDArray[np.float64]:
    """Sample points using the R-sequence.

    Args:
        batch_size: the number of samples
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled (not used)
        existing_losses: the loss corresponding to the sampled parameters (not used)

    Returns:
        the parameter sampled
    """
    unit_cube_points: NDArray[np.float64] = self._r_sequence(
        batch_size,
        search_space.dims,
    )
    p_bounds: NDArray[np.float64] = search_space.parameters_bounds
    sampled_points = p_bounds[0] + unit_cube_points * (p_bounds[1] - p_bounds[0])
    return digitize_data(sampled_points, search_space.param_grid)

This class implements the best-batch sampler.

The sampler is a very essential type of genetic algorithm that takes the parameters corresponding to the current lowest loss values and perturbs them slightly in a purely random fashion. The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.

Source code in black_it/samplers/best_batch.py
class BestBatchSampler(BaseSampler):
    """This class implements the best-batch sampler.

    The sampler is a very essential type of genetic algorithm that takes the parameters corresponding
      to the current lowest loss values and perturbs them slightly in a purely random fashion.
    The sampler first chooses the total number of coordinates to perturb via a beta-binomial distribution
      BetaBin(dims, a, b) --where dims is the total number of dimensions in the search space --, it then selects
      that many coordinate randomly, and perturbs them uniformly within the range specified by 'perturbation_range'.

    """

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
        a: float = 3.0,
        b: float = 1.0,
        perturbation_range: int = 6,
    ) -> None:
        """Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes that are made
            a: the a parameter of the beta-binomial distribution
            b: the b parameter of the beta-binomial distribution
            perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
                plus/minus the perturbation_range times the precision of the specific parameter coordinate
        """
        _assert(
            a > 0,
            "'a' should be greater than zero",
        )
        _assert(
            b > 0,
            "'b' should be greater than zero",
        )
        _assert(
            perturbation_range > 1,
            "'perturbation_range' should be greater than one",
        )

        super().__init__(batch_size, random_state, max_deduplication_passes)
        self.a = a
        self.b = b
        self.perturbation_range = perturbation_range

    def sample_batch(
        self,
        batch_size: int,
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """Sample from the search space using a genetic algorithm.

        Args:
            batch_size: the number of points to sample
            search_space: an object containing the details of the parameter search space
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters

        Returns:
            the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
        """
        if len(existing_points) < batch_size:
            msg = (
                "best-batch sampler requires a number of existing points "
                f"which is at least the batch size {batch_size}, got {len(existing_points)}"
            )
            raise ValueError(
                msg,
            )

        # sort existing params
        candidate_points: NDArray[np.float64] = existing_points[np.argsort(existing_losses)][:batch_size, :]

        candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
            0,
            batch_size,
            size=batch_size,
        )
        sampled_points: NDArray[np.float64] = np.copy(
            candidate_points[candidate_point_indexes],
        )

        beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
        beta_binom_rv.random_state = self.random_generator

        for sampled_point in sampled_points:
            num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
            params_shocked: NDArray[np.int64] = self.random_generator.choice(
                search_space.dims,
                tuple(num_shocks),
                replace=False,
            )
            for index in params_shocked:
                shock_size: int = self.random_generator.integers(
                    1,
                    self.perturbation_range,
                )
                shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1

                delta: float = search_space.parameters_precision[index]
                shift: float = delta * shock_sign * shock_size
                sampled_point[index] += shift

                sampled_point[index] = np.clip(
                    sampled_point[index],
                    search_space.parameters_bounds[0][index],
                    search_space.parameters_bounds[1][index],
                )

        return sampled_points

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, a=3.0, b=1.0, perturbation_range=6) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes that are made

5
a float

the a parameter of the beta-binomial distribution

3.0
b float

the b parameter of the beta-binomial distribution

1.0
perturbation_range int

the range of the perturbation applied. The actual perturbation will be in the range plus/minus the perturbation_range times the precision of the specific parameter coordinate

6
Source code in black_it/samplers/best_batch.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
    a: float = 3.0,
    b: float = 1.0,
    perturbation_range: int = 6,
) -> None:
    """Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes that are made
        a: the a parameter of the beta-binomial distribution
        b: the b parameter of the beta-binomial distribution
        perturbation_range: the range of the perturbation applied. The actual perturbation will be in the range
            plus/minus the perturbation_range times the precision of the specific parameter coordinate
    """
    _assert(
        a > 0,
        "'a' should be greater than zero",
    )
    _assert(
        b > 0,
        "'b' should be greater than zero",
    )
    _assert(
        perturbation_range > 1,
        "'perturbation_range' should be greater than one",
    )

    super().__init__(batch_size, random_state, max_deduplication_passes)
    self.a = a
    self.b = b
    self.perturbation_range = perturbation_range

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample from the search space using a genetic algorithm.

Parameters:

Name Type Description Default
batch_size int

the number of points to sample

required
search_space SearchSpace

an object containing the details of the parameter search space

required
existing_points NDArray[np.float64]

the parameters already sampled

required
existing_losses NDArray[np.float64]

the loss corresponding to the sampled parameters

required

Returns:

Type Description
NDArray[np.float64]

the sampled parameters (an array of shape (self.batch_size, search_space.dims))

Source code in black_it/samplers/best_batch.py
def sample_batch(
    self,
    batch_size: int,
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """Sample from the search space using a genetic algorithm.

    Args:
        batch_size: the number of points to sample
        search_space: an object containing the details of the parameter search space
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters

    Returns:
        the sampled parameters (an array of shape `(self.batch_size, search_space.dims)`)
    """
    if len(existing_points) < batch_size:
        msg = (
            "best-batch sampler requires a number of existing points "
            f"which is at least the batch size {batch_size}, got {len(existing_points)}"
        )
        raise ValueError(
            msg,
        )

    # sort existing params
    candidate_points: NDArray[np.float64] = existing_points[np.argsort(existing_losses)][:batch_size, :]

    candidate_point_indexes: NDArray[np.int64] = self.random_generator.integers(
        0,
        batch_size,
        size=batch_size,
    )
    sampled_points: NDArray[np.float64] = np.copy(
        candidate_points[candidate_point_indexes],
    )

    beta_binom_rv = betabinom(n=search_space.dims - 1, a=self.a, b=self.b)
    beta_binom_rv.random_state = self.random_generator

    for sampled_point in sampled_points:
        num_shocks: NDArray[np.int64] = beta_binom_rv.rvs(size=1) + 1
        params_shocked: NDArray[np.int64] = self.random_generator.choice(
            search_space.dims,
            tuple(num_shocks),
            replace=False,
        )
        for index in params_shocked:
            shock_size: int = self.random_generator.integers(
                1,
                self.perturbation_range,
            )
            shock_sign: int = (self.random_generator.integers(0, 2) * 2) - 1

            delta: float = search_space.parameters_precision[index]
            shift: float = delta * shock_sign * shock_size
            sampled_point[index] += shift

            sampled_point[index] = np.clip(
                sampled_point[index],
                search_space.parameters_bounds[0][index],
                search_space.parameters_bounds[1][index],
            )

    return sampled_points

This class implements the Gaussian process-based sampler.

In particular, the sampling is based on a Gaussian Process interpolation of the loss function.

Note: this class is a wrapper of the GaussianProcessRegressor model of the scikit-learn package.

Source code in black_it/samplers/gaussian_process.py
class GaussianProcessSampler(MLSurrogateSampler):
    """This class implements the Gaussian process-based sampler.

    In particular, the sampling is based on a Gaussian Process interpolation of the loss function.

    Note: this class is a wrapper of the GaussianProcessRegressor model of the scikit-learn package.
    """

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
        candidate_pool_size: int | None = None,
        optimize_restarts: int = 5,
        acquisition: str = "expected_improvement",
        jitter: float = 0.1,
    ) -> None:
        """Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes that are made
            candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
            optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
            acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
            jitter: positive value to make the "expected_improvement" acquisition more explorative.
        """
        self._validate_acquisition(acquisition)

        super().__init__(
            batch_size,
            random_state,
            max_deduplication_passes,
            candidate_pool_size,
        )
        self.optimize_restarts = optimize_restarts
        self.acquisition = acquisition
        self.jitter = jitter
        self._gpmodel: GaussianProcessRegressor | None = None
        self._fmin: np.double | float | None = None

    @staticmethod
    def _validate_acquisition(acquisition: str) -> None:
        """Check that the required acquisition is among the supported ones.

        Args:
            acquisition: the acquisition provided as input of the constructor.

        Raises:
            ValueError: if the provided acquisition type is not among the allowed ones.
        """
        try:
            _AcquisitionTypes(acquisition)
        except ValueError as e:
            msg = (
                "expected one of the following acquisition types: "
                f"[{' '.join(map(str, _AcquisitionTypes))}], got {acquisition}"
            )
            raise ValueError(
                msg,
            ) from e

    def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:  # noqa: N803
        """Fit a gaussian process surrogate model."""
        y = np.atleast_2d(y).T

        if X.shape[0] > _BIG_DATASET_SIZE_WARNING_THRESHOLD:
            warnings.warn(  # noqa: B028
                "Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
                RuntimeWarning,
            )

        # initialize GP class from scikit-learn with a Matern kernel
        kernel = kernels.Matern(length_scale=1, length_scale_bounds=(1e-5, 1e5), nu=2.5)

        noise_var = y.var() * 0.01

        self._gpmodel = GaussianProcessRegressor(
            kernel=kernel,
            alpha=noise_var,
            n_restarts_optimizer=self.optimize_restarts,
            optimizer="fmin_l_bfgs_b",
            random_state=self._get_random_seed(),
        )

        self._gpmodel.fit(X, y)

        # store minimum
        if self.acquisition == "expected_improvement":
            m, _ = self._predict_mean_std(X)
            self._fmin = np.min(m)

    def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:  # noqa: N803
        """Predict using a gaussian process surrogate model."""
        # predict mean or expected improvement on the full sample set
        if self.acquisition == _AcquisitionTypes.EI.value:
            # minus sign needed for subsequent sorting
            candidates_score = -self._predict_EI(X, self.jitter)
        else:  # acquisition is "mean"
            candidates_score = self._predict_mean_std(X)[0]

        return candidates_score

    def _predict_mean_std(
        self,
        X: NDArray[np.float64],  # noqa: N803
    ) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
        """Predict mean and standard deviation of a fitted GP.

        Args:
            X: the points on which the predictions should be performed

        Returns:
            The pair (mean, std).
        """
        gpmodel = cast("GaussianProcessRegressor", self._gpmodel)
        X = X[None, :] if X.ndim == 1 else X  # noqa: N806
        m, s = gpmodel.predict(X, return_std=True, return_cov=False)
        s = np.clip(s, 1e-5, np.inf)
        return m, s

    def _predict_EI(  # noqa: N802
        self,
        X: NDArray[np.float64],  # noqa: N803
        jitter: float = 0.1,
    ) -> NDArray[np.float64]:
        """Compute the Expected Improvement per unit of cost.

        Args:
            X:  the points on which the predictions should be performed
            jitter: positive value to make the acquisition more explorative.

        Returns:
            the expected improvement.
        """
        m, s = self._predict_mean_std(X)

        fmin = cast("float", self._fmin)

        phi, Phi, u = self.get_quantiles(jitter, fmin, m, s)  # noqa: N806

        return s * (u * Phi + phi)

    @staticmethod
    def get_quantiles(
        acquisition_par: float,
        fmin: float,
        m: NDArray[np.float64],
        s: NDArray[np.float64],
    ) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
        """Quantiles of the Gaussian distribution useful to determine the acquisition function values.

        Args:
            acquisition_par: parameter of the acquisition function
            fmin: current minimum.
            m: vector of means.
            s: vector of standard deviations.

        Returns:
            the quantiles.
        """
        # remove values of variance that are too small
        s[s < _SMALL_VARIANCE_VALUES] = _SMALL_VARIANCE_VALUES

        u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
        phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
        Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2))  # noqa: N806

        return phi, Phi, u

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, optimize_restarts=5, acquisition='expected_improvement', jitter=0.1) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes that are made

5
candidate_pool_size int | None

number of randomly sampled points on which the random forest predictions are evaluated

None
optimize_restarts int

number of independent random trials of the optimization of the GP hyperparameters

5
acquisition str

type of acquisition function, it can be 'expected_improvement' of simply 'mean'

'expected_improvement'
jitter float

positive value to make the "expected_improvement" acquisition more explorative.

0.1
Source code in black_it/samplers/gaussian_process.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
    candidate_pool_size: int | None = None,
    optimize_restarts: int = 5,
    acquisition: str = "expected_improvement",
    jitter: float = 0.1,
) -> None:
    """Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes that are made
        candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
        optimize_restarts: number of independent random trials of the optimization of the GP hyperparameters
        acquisition: type of acquisition function, it can be 'expected_improvement' of simply 'mean'
        jitter: positive value to make the "expected_improvement" acquisition more explorative.
    """
    self._validate_acquisition(acquisition)

    super().__init__(
        batch_size,
        random_state,
        max_deduplication_passes,
        candidate_pool_size,
    )
    self.optimize_restarts = optimize_restarts
    self.acquisition = acquisition
    self.jitter = jitter
    self._gpmodel: GaussianProcessRegressor | None = None
    self._fmin: np.double | float | None = None

fit(self, X, y)

Fit a gaussian process surrogate model.

Source code in black_it/samplers/gaussian_process.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:  # noqa: N803
    """Fit a gaussian process surrogate model."""
    y = np.atleast_2d(y).T

    if X.shape[0] > _BIG_DATASET_SIZE_WARNING_THRESHOLD:
        warnings.warn(  # noqa: B028
            "Standard GP evaluations can be expensive for large datasets, consider implementing a sparse GP",
            RuntimeWarning,
        )

    # initialize GP class from scikit-learn with a Matern kernel
    kernel = kernels.Matern(length_scale=1, length_scale_bounds=(1e-5, 1e5), nu=2.5)

    noise_var = y.var() * 0.01

    self._gpmodel = GaussianProcessRegressor(
        kernel=kernel,
        alpha=noise_var,
        n_restarts_optimizer=self.optimize_restarts,
        optimizer="fmin_l_bfgs_b",
        random_state=self._get_random_seed(),
    )

    self._gpmodel.fit(X, y)

    # store minimum
    if self.acquisition == "expected_improvement":
        m, _ = self._predict_mean_std(X)
        self._fmin = np.min(m)

get_quantiles(acquisition_par, fmin, m, s) staticmethod

Quantiles of the Gaussian distribution useful to determine the acquisition function values.

Parameters:

Name Type Description Default
acquisition_par float

parameter of the acquisition function

required
fmin float

current minimum.

required
m NDArray[np.float64]

vector of means.

required
s NDArray[np.float64]

vector of standard deviations.

required

Returns:

Type Description
tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]

the quantiles.

Source code in black_it/samplers/gaussian_process.py
@staticmethod
def get_quantiles(
    acquisition_par: float,
    fmin: float,
    m: NDArray[np.float64],
    s: NDArray[np.float64],
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
    """Quantiles of the Gaussian distribution useful to determine the acquisition function values.

    Args:
        acquisition_par: parameter of the acquisition function
        fmin: current minimum.
        m: vector of means.
        s: vector of standard deviations.

    Returns:
        the quantiles.
    """
    # remove values of variance that are too small
    s[s < _SMALL_VARIANCE_VALUES] = _SMALL_VARIANCE_VALUES

    u: NDArray[np.float64] = (fmin - m - acquisition_par) / s
    phi: NDArray[np.float64] = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
    Phi: NDArray[np.float64] = 0.5 * erfc(-u / np.sqrt(2))  # noqa: N806

    return phi, Phi, u

predict(self, X)

Predict using a gaussian process surrogate model.

Source code in black_it/samplers/gaussian_process.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:  # noqa: N803
    """Predict using a gaussian process surrogate model."""
    # predict mean or expected improvement on the full sample set
    if self.acquisition == _AcquisitionTypes.EI.value:
        # minus sign needed for subsequent sorting
        candidates_score = -self._predict_EI(X, self.jitter)
    else:  # acquisition is "mean"
        candidates_score = self._predict_mean_std(X)[0]

    return candidates_score

This class implements random forest sampling.

Source code in black_it/samplers/random_forest.py
class RandomForestSampler(MLSurrogateSampler):
    """This class implements random forest sampling."""

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
        candidate_pool_size: int | None = None,
        n_estimators: int = 500,
        criterion: str = "gini",
        n_classes: int = 10,
    ) -> None:
        """Random forest sampling.

        Note: this class makes use of sklearn.ensemble.RandomForestClassifier.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes
            candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
            n_estimators: number of trees in the forest
            criterion: the function to measure the quality of a split.
            n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
                of the distribution of loss values.
        """
        _assert(
            n_classes > _MIN_RF_CLASSES,
            "'n_classes' should be at least 2 to provide meaningful results",
        )

        super().__init__(
            batch_size,
            random_state,
            max_deduplication_passes,
            candidate_pool_size,
        )

        self._n_estimators = n_estimators
        self._criterion = criterion
        self._n_classes = n_classes
        self._classifier: RandomForestClassifier | None = None

    @property
    def n_estimators(self) -> int:
        """Get the number of estimators."""
        return self._n_estimators

    @property
    def criterion(self) -> str:
        """Get the criterion."""
        return self._criterion

    @property
    def n_classes(self) -> int:
        """Get the number of classes."""
        return self._n_classes

    def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:  # noqa: N803
        """Fit a random forest surrogate model."""
        # Train surrogate

        (
            X,  # noqa: N806
            y_cat,
            _existing_points_quantiles,
        ) = self.prepare_data_for_classifier(X, y, self.n_classes)

        self._classifier = RandomForestClassifier(
            n_estimators=self.n_estimators,
            criterion=self.criterion,
            n_jobs=-1,
            random_state=self._get_random_seed(),
        )
        self._classifier.fit(X, y_cat)

    def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:  # noqa: N803
        """Predict using a random forest surrogate model."""
        # Predict quantiles
        self._classifier = cast("RandomForestClassifier", self._classifier)
        predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)

        return predicted_points_quantiles

    @staticmethod
    def prepare_data_for_classifier(
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
        num_bins: int,
    ) -> tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
        """Prepare data for the classifier.

        Args:
            existing_points: the parameters already sampled
            existing_losses: the loss corresponding to the sampled parameters
            num_bins: the number of bins

        Returns:
            A triple (x, y, quantiles), where
                - x is the vector of training data
                - y is the vector of targets
                - the quantiles
        """
        x: NDArray[np.float64] = existing_points
        y: NDArray[np.float64] = existing_losses

        cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
        quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)

        for i in range(num_bins - 1):
            quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])

        quantiles[-1] = np.max(y)

        y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
        y_cat = y_cat - 1

        return x, y_cat, quantiles

criterion: str property readonly

Get the criterion.

n_classes: int property readonly

Get the number of classes.

n_estimators: int property readonly

Get the number of estimators.

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, n_estimators=500, criterion='gini', n_classes=10) special

Random forest sampling.

Note: this class makes use of sklearn.ensemble.RandomForestClassifier.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes

5
candidate_pool_size int | None

number of randomly sampled points on which the random forest predictions are evaluated

None
n_estimators int

number of trees in the forest

500
criterion str

the function to measure the quality of a split.

'gini'
n_classes int

the number of classes used in the random forest. The classes are selected as the quantiles of the distribution of loss values.

10
Source code in black_it/samplers/random_forest.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
    candidate_pool_size: int | None = None,
    n_estimators: int = 500,
    criterion: str = "gini",
    n_classes: int = 10,
) -> None:
    """Random forest sampling.

    Note: this class makes use of sklearn.ensemble.RandomForestClassifier.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes
        candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
        n_estimators: number of trees in the forest
        criterion: the function to measure the quality of a split.
        n_classes: the number of classes used in the random forest. The classes are selected as the quantiles
            of the distribution of loss values.
    """
    _assert(
        n_classes > _MIN_RF_CLASSES,
        "'n_classes' should be at least 2 to provide meaningful results",
    )

    super().__init__(
        batch_size,
        random_state,
        max_deduplication_passes,
        candidate_pool_size,
    )

    self._n_estimators = n_estimators
    self._criterion = criterion
    self._n_classes = n_classes
    self._classifier: RandomForestClassifier | None = None

fit(self, X, y)

Fit a random forest surrogate model.

Source code in black_it/samplers/random_forest.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:  # noqa: N803
    """Fit a random forest surrogate model."""
    # Train surrogate

    (
        X,  # noqa: N806
        y_cat,
        _existing_points_quantiles,
    ) = self.prepare_data_for_classifier(X, y, self.n_classes)

    self._classifier = RandomForestClassifier(
        n_estimators=self.n_estimators,
        criterion=self.criterion,
        n_jobs=-1,
        random_state=self._get_random_seed(),
    )
    self._classifier.fit(X, y_cat)

predict(self, X)

Predict using a random forest surrogate model.

Source code in black_it/samplers/random_forest.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:  # noqa: N803
    """Predict using a random forest surrogate model."""
    # Predict quantiles
    self._classifier = cast("RandomForestClassifier", self._classifier)
    predicted_points_quantiles: NDArray[np.float64] = self._classifier.predict(X)

    return predicted_points_quantiles

prepare_data_for_classifier(existing_points, existing_losses, num_bins) staticmethod

Prepare data for the classifier.

Parameters:

Name Type Description Default
existing_points NDArray[np.float64]

the parameters already sampled

required
existing_losses NDArray[np.float64]

the loss corresponding to the sampled parameters

required
num_bins int

the number of bins

required

Returns:

Type Description
tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]

A triple (x, y, quantiles), where - x is the vector of training data - y is the vector of targets - the quantiles

Source code in black_it/samplers/random_forest.py
@staticmethod
def prepare_data_for_classifier(
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
    num_bins: int,
) -> tuple[NDArray[np.float64], NDArray[np.int64], NDArray[np.float64]]:
    """Prepare data for the classifier.

    Args:
        existing_points: the parameters already sampled
        existing_losses: the loss corresponding to the sampled parameters
        num_bins: the number of bins

    Returns:
        A triple (x, y, quantiles), where
            - x is the vector of training data
            - y is the vector of targets
            - the quantiles
    """
    x: NDArray[np.float64] = existing_points
    y: NDArray[np.float64] = existing_losses

    cutoffs: NDArray[np.float64] = np.linspace(0, 1, num_bins + 1)
    quantiles: NDArray[np.float64] = np.zeros(num_bins + 1)

    for i in range(num_bins - 1):
        quantiles[i + 1] = np.quantile(y, cutoffs[i + 1])

    quantiles[-1] = np.max(y)

    y_cat: NDArray[np.int64] = np.digitize(y, quantiles, right=True)
    y_cat = y_cat - 1

    return x, y_cat, quantiles

This class implements xgboost sampling.

Source code in black_it/samplers/xgboost.py
class XGBoostSampler(MLSurrogateSampler):
    """This class implements xgboost sampling."""

    def __init__(  # noqa: PLR0913
        self,
        batch_size: int,
        random_state: int | None = None,
        max_deduplication_passes: int = 5,
        candidate_pool_size: int | None = None,
        colsample_bytree: float = 0.3,
        learning_rate: float = 0.1,
        max_depth: int = 5,
        alpha: float = 1.0,
        n_estimators: int = 10,
    ) -> None:
        """Sampler based on a xgboost surrogate model of the loss function.

        Note: this class makes use of the xgboost library, for more information on the XGBoost parameters
            visit https://xgboost.readthedocs.io/en/stable/parameter.html.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            max_deduplication_passes: the maximum number of deduplication passes
            candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
            colsample_bytree: subsample ratio of columns when constructing each tree
            learning_rate: the learning rate
            max_depth: maximum depth of XGBoost trees
            alpha: L1 regularization term on weights
            n_estimators: number of estimators

        References:
            Lamperti, Roventini, and Sani, "Agent-based model calibration using machine learning surrogates"
        """
        super().__init__(
            batch_size,
            random_state,
            max_deduplication_passes,
            candidate_pool_size,
        )

        self._colsample_bytree = colsample_bytree
        self._learning_rate = learning_rate
        self._max_depth = max_depth
        self._alpha = alpha
        self._n_estimators = n_estimators
        self._xg_regressor: xgb.XGBRegressor | None = None

    @property
    def colsample_bytree(self) -> float:
        """Get the colsample_bytree parameter."""
        return self._colsample_bytree

    @property
    def learning_rate(self) -> float:
        """Get the learning rate."""
        return self._learning_rate

    @property
    def max_depth(self) -> int:
        """Get the maximum tree depth."""
        return self._max_depth

    @property
    def alpha(self) -> float:
        """Get the alpha regularisation parameter."""
        return self._alpha

    @property
    def n_estimators(self) -> int:
        """Get the number of estimators."""
        return self._n_estimators

    @staticmethod
    def _clip_losses(y: NDArray[np.float64]) -> NDArray[np.float64]:
        """Check that loss values fall within the float32 limits needed for XGBoost to work."""
        large_floats = np.where(y >= MAX_FLOAT32)[0]
        small_floats = np.where(y <= MIN_FLOAT32)[0]

        if len(large_floats) == 0 and len(small_floats) == 0:
            return y

        warnings.warn(  # noqa: B028
            "Found loss values out of float32 limits, clipping them for XGBoost.",
            RuntimeWarning,
        )
        if len(large_floats) > 0:
            y[large_floats] = MAX_FLOAT32 - EPS_FLOAT32

        if len(small_floats) > 0:
            y[small_floats] = MIN_FLOAT32 + EPS_FLOAT32

        return y

    def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:  # noqa: N803
        """Fit a xgboost surrogate model."""
        # prepare data
        y = self._clip_losses(y)
        _ = xgb.DMatrix(data=X, label=y)

        # train surrogate
        self._xg_regressor = xgb.XGBRegressor(
            objective="reg:squarederror",  # original: objective ='reg:linear',
            colsample_bytree=self.colsample_bytree,
            learning_rate=self.learning_rate,
            max_depth=self.max_depth,  # original: 5
            alpha=self.alpha,
            n_estimators=self.n_estimators,
            random_state=self._get_random_seed(),
        )  # original: 10

        self._xg_regressor.fit(X, y)

    def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:  # noqa: N803
        """Predict using a xgboost surrogate model."""
        # predict over large pool of candidates
        _ = xgb.DMatrix(data=X)

        self._xg_regressor = cast("xgb.XGBRegressor", self._xg_regressor)
        return self._xg_regressor.predict(X)

alpha: float property readonly

Get the alpha regularisation parameter.

colsample_bytree: float property readonly

Get the colsample_bytree parameter.

learning_rate: float property readonly

Get the learning rate.

max_depth: int property readonly

Get the maximum tree depth.

n_estimators: int property readonly

Get the number of estimators.

__init__(self, batch_size, random_state=None, max_deduplication_passes=5, candidate_pool_size=None, colsample_bytree=0.3, learning_rate=0.1, max_depth=5, alpha=1.0, n_estimators=10) special

Sampler based on a xgboost surrogate model of the loss function.

this class makes use of the xgboost library, for more information on the XGBoost parameters

visit https://xgboost.readthedocs.io/en/stable/parameter.html.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
max_deduplication_passes int

the maximum number of deduplication passes

5
candidate_pool_size int | None

number of randomly sampled points on which the random forest predictions are evaluated

None
colsample_bytree float

subsample ratio of columns when constructing each tree

0.3
learning_rate float

the learning rate

0.1
max_depth int

maximum depth of XGBoost trees

5
alpha float

L1 regularization term on weights

1.0
n_estimators int

number of estimators

10

References

Lamperti, Roventini, and Sani, "Agent-based model calibration using machine learning surrogates"

Source code in black_it/samplers/xgboost.py
def __init__(  # noqa: PLR0913
    self,
    batch_size: int,
    random_state: int | None = None,
    max_deduplication_passes: int = 5,
    candidate_pool_size: int | None = None,
    colsample_bytree: float = 0.3,
    learning_rate: float = 0.1,
    max_depth: int = 5,
    alpha: float = 1.0,
    n_estimators: int = 10,
) -> None:
    """Sampler based on a xgboost surrogate model of the loss function.

    Note: this class makes use of the xgboost library, for more information on the XGBoost parameters
        visit https://xgboost.readthedocs.io/en/stable/parameter.html.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        max_deduplication_passes: the maximum number of deduplication passes
        candidate_pool_size: number of randomly sampled points on which the random forest predictions are evaluated
        colsample_bytree: subsample ratio of columns when constructing each tree
        learning_rate: the learning rate
        max_depth: maximum depth of XGBoost trees
        alpha: L1 regularization term on weights
        n_estimators: number of estimators

    References:
        Lamperti, Roventini, and Sani, "Agent-based model calibration using machine learning surrogates"
    """
    super().__init__(
        batch_size,
        random_state,
        max_deduplication_passes,
        candidate_pool_size,
    )

    self._colsample_bytree = colsample_bytree
    self._learning_rate = learning_rate
    self._max_depth = max_depth
    self._alpha = alpha
    self._n_estimators = n_estimators
    self._xg_regressor: xgb.XGBRegressor | None = None

fit(self, X, y)

Fit a xgboost surrogate model.

Source code in black_it/samplers/xgboost.py
def fit(self, X: NDArray[np.float64], y: NDArray[np.float64]) -> None:  # noqa: N803
    """Fit a xgboost surrogate model."""
    # prepare data
    y = self._clip_losses(y)
    _ = xgb.DMatrix(data=X, label=y)

    # train surrogate
    self._xg_regressor = xgb.XGBRegressor(
        objective="reg:squarederror",  # original: objective ='reg:linear',
        colsample_bytree=self.colsample_bytree,
        learning_rate=self.learning_rate,
        max_depth=self.max_depth,  # original: 5
        alpha=self.alpha,
        n_estimators=self.n_estimators,
        random_state=self._get_random_seed(),
    )  # original: 10

    self._xg_regressor.fit(X, y)

predict(self, X)

Predict using a xgboost surrogate model.

Source code in black_it/samplers/xgboost.py
def predict(self, X: NDArray[np.float64]) -> NDArray[np.float64]:  # noqa: N803
    """Predict using a xgboost surrogate model."""
    # predict over large pool of candidates
    _ = xgb.DMatrix(data=X)

    self._xg_regressor = cast("xgb.XGBRegressor", self._xg_regressor)
    return self._xg_regressor.predict(X)

Implementation of a particle swarm sampler.

This sampler implements the particle swarm sampling method commonly used in particle swarm optimization (PSO), introduced in:

Eberhart, Russell, and James Kennedy. "A new optimizer using particle swarm theory."
  MHS'95. Proceedings of the sixth international symposium on micro machine and human science. IEEE, 1995.

In a particle swarm optimizer, there is a set of particles that are "evolved" by cooperation and competition among the individuals themselves through generations. Each particle adjusts its flying according to its own flying experience and its companions' flying experience. Each particle, in fact, represents a potential solution to a problem. Each particle is treated as a point in a D-dimensional space. The ith particle is represented as Xi = (x_{i1},...,x_{iD}). The best previous position (the position giving the best fitness value) of any particle is recorded and represented as Pi = (p_{i1},...,p_{iD}). The index of the best particle among all the particles in the population is represented by the symbol g. The rate of the position change (velocity) for particle i is represented as Vi = (v_{i1},...,v_{iD}). The particles are manipulated according to the following equation:

v_{id} = (ω * v_{id}) + (c1 * r1 * (p_{id} - x_{id})) + (c2 * r2 * (p_{gd} - x_{id})
x_{id} = x_{id} + v_{id}

Where

  • ω is the inertia weight to control the influence of the previous velocity;
  • c1 and c2 are positive values that represent the acceleration constants;
  • r1 and r2 are two random numbers uniformly distributed in the range of (0, 1).

Note that p_{gd}, the global best position found across the dynamics, can optionally be computed by also considering the sampling performed by other samplers in order to let them interfere constructively with the Particle Swarm Sampler.

Source code in black_it/samplers/particle_swarm.py
class ParticleSwarmSampler(BaseSampler):
    """Implementation of a particle swarm sampler.

    This sampler implements the particle swarm sampling method commonly used in particle swarm optimization (PSO),
    introduced in:

        Eberhart, Russell, and James Kennedy. "A new optimizer using particle swarm theory."
          MHS'95. Proceedings of the sixth international symposium on micro machine and human science. IEEE, 1995.

    In a particle swarm optimizer, there is a set of particles that are "evolved" by cooperation and competition
    among the individuals themselves through generations. Each particle adjusts its flying  according to its own
    flying experience and its companions' flying experience. Each particle, in fact, represents a potential solution
    to a problem. Each particle is treated as a point in a D-dimensional space.  The ith particle is represented as
    Xi = (x_{i1},...,x_{iD}). The best previous position (the position giving the best fitness value) of any particle
    is recorded and represented as Pi = (p_{i1},...,p_{iD}). The index of the best particle among all the particles
    in the population is represented by the symbol g. The rate of the position change (velocity) for particle i is
    represented as Vi = (v_{i1},...,v_{iD}). The particles are manipulated according to the following equation:

        v_{id} = (ω * v_{id}) + (c1 * r1 * (p_{id} - x_{id})) + (c2 * r2 * (p_{gd} - x_{id})
        x_{id} = x_{id} + v_{id}

    Where:
        - ω is the inertia weight  to control the influence of the previous velocity;
        - c1 and c2 are positive values that represent the acceleration constants;
        - r1 and r2 are two random numbers uniformly distributed in the range of (0, 1).

    Note that p_{gd}, the global best position found across the dynamics, can optionally be computed by also
    considering the sampling performed by other samplers in order to let them interfere constructively with the
    Particle Swarm Sampler.
    """

    def __init__(
        self,
        batch_size: int,
        random_state: int | None = None,
        inertia: float = 0.9,
        c1: float = 0.1,
        c2: float = 0.1,
        global_minimum_across_samplers: bool = False,
    ) -> None:
        """Initialize the sampler.

        Args:
            batch_size: the number of points sampled every time the sampler is called
            random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
            inertia: the inertia of the particles' motion
            c1: first acceleration constant
            c2: second acceleration constant
            global_minimum_across_samplers: if True, the global minimum attractor of the particles' dynamics is computed
                taking into consideration also parameters sampled by other samplers, default is False
        """
        # max_duplication_passes must be zero because the sampler is stateful
        super().__init__(
            batch_size,
            random_state=random_state,
            max_deduplication_passes=0,
        )

        # The batch size is the number of sampled parameters per iteration. In a Black-it sampler, each call to
        # sample_batch represent an iteration of the particle swarm sampler, so it seems natural to set the number of
        # particles to the batch size, as at each iteration sample_batch returns the current positions of the
        # particles.
        self.nb_particles = batch_size

        self._inertia = positive_float(inertia)
        self._c1 = positive_float(c1)
        self._c2 = positive_float(c2)
        self._global_minimum_across_samplers = global_minimum_across_samplers

        # all current particle positions; shape=(nb_particles, space dimensions)
        self._curr_particle_positions: NDArray | None = None
        # all current particle velocities; shape=(nb_particles, space dimensions)
        self._curr_particle_velocities: NDArray | None = None
        # best particle positions, i.e. ; shape=(nb_particles, space dimensions)
        self._best_particle_positions: NDArray | None = None
        # losses of the best positions
        self._best_position_losses: NDArray | None = None
        # particle id of the global best particle position
        self._global_best_particle_id: int | None = None

        # best point in parameter space - could be the best across samplers
        self._best_point: NDArray | None = None

        self._previous_batch_index_start: int | None = None

    @property
    def is_set_up(self) -> bool:
        """Return true iff the sampler is already set up."""
        return self._curr_particle_positions is not None

    @property
    def inertia(self) -> float:
        """Get the inertia weight."""
        return self._inertia

    @property
    def c1(self) -> float:
        """Get the c1 constant."""
        return self._c1

    @property
    def c2(self) -> float:
        """Get the c2 constant."""
        return self._c2

    def _set_up(self, dims: int) -> None:
        """Set up the sampler."""
        self._curr_particle_positions = self.random_generator.random(
            size=(self.batch_size, dims),
        )
        self._curr_particle_velocities = (
            self.random_generator.random(
                size=cast("NDArray", self._curr_particle_positions).shape,
            )
            - 0.5
        )
        self._best_particle_positions = self._curr_particle_positions
        # set losses to inf as we are interested to the min
        self._best_position_losses = np.full(self.nb_particles, np.inf)
        # we don't know yet which is the best index - initialize to 0
        self._global_best_particle_id = 0

    def _get_best_position(self) -> NDArray[np.float64]:
        """Get the position corresponding to the global optimum the particles should converge to.

        If _global_minimum_across_samplers is False, then this method returns the current position
        of the particle that in its history has sampled, so far, the best set of parameters.

        Else, if _global_minimum_across_samplers is True, then this method returns the point
        in parameter space that achieved the minimum loss. Note that this point could have been
        sampled by a different sampler than "self".

        Returns:
            a Numpy array
        """
        if not self._global_minimum_across_samplers:
            best_particle_positions = cast("NDArray", self._best_particle_positions)
            return best_particle_positions[self._global_best_particle_id]
        return cast("NDArray", self._best_point)

    def reset(self) -> None:
        """Reset the sampler."""
        self._curr_particle_positions = None
        self._curr_particle_velocities = None
        self._best_particle_positions = None
        self._best_position_losses = None
        self._global_best_particle_id = None
        self._previous_batch_index_start = None
        _assert(
            not self.is_set_up,
            error_message="reset call did not work, sampler still set up",
            exception_class=RuntimeError,
        )

    def sample_batch(
        self,
        batch_size: int,  # noqa: ARG002
        search_space: SearchSpace,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> NDArray[np.float64]:
        """Sample a batch of parameters."""
        if not self.is_set_up:
            self._set_up(search_space.dims)
            self._previous_batch_index_start = len(existing_points)
            return digitize_data(
                cast("NDArray[np.float64]", self._best_particle_positions),
                search_space.param_grid,
            )

        self._update_best(existing_points, existing_losses)
        self._do_step()

        p_bounds: NDArray[np.float64] = search_space.parameters_bounds
        sampled_points = p_bounds[0] + self._curr_particle_positions * (p_bounds[1] - p_bounds[0])
        self._previous_batch_index_start = len(existing_points)

        return digitize_data(sampled_points, search_space.param_grid)

    def _update_best(
        self,
        existing_points: NDArray[np.float64],
        existing_losses: NDArray[np.float64],
    ) -> None:
        """Update the best local and global positions."""
        _assert(
            self._previous_batch_index_start is not None,
            exception_class=AssertionError,
            error_message="should have been set",
        )

        # set best loss and best point
        best_point_index = np.argmin(existing_losses)
        self._best_point = existing_points[best_point_index]

        # set best particle position
        batch_index_start = cast("int", self._previous_batch_index_start)
        batch_index_stop = batch_index_start + self.batch_size
        previous_points = existing_points[batch_index_start:batch_index_stop]
        previous_losses = existing_losses[batch_index_start:batch_index_stop]
        for particle_id, (point, loss) in enumerate(
            zip(previous_points, previous_losses),
        ):
            best_particle_positions = cast("NDArray", self._best_particle_positions)
            best_position_losses = cast("NDArray", self._best_position_losses)
            if best_position_losses[particle_id] > loss:
                best_particle_positions[particle_id] = point
                best_position_losses[particle_id] = loss

                # check if also the global best should be updated
                best_global_loss = best_position_losses[self._global_best_particle_id]
                if loss < best_global_loss:
                    self._global_best_particle_id = particle_id

    def _do_step(self) -> None:
        """Do a step by updating particle positions and velocities."""
        curr_particle_positions = cast("NDArray", self._curr_particle_positions)
        curr_particle_velocities = cast("NDArray", self._curr_particle_velocities)
        best_particle_positions = cast("NDArray", self._best_particle_positions)
        r1_vec = self.random_generator.random(size=curr_particle_positions.shape)
        r2_vec = self.random_generator.random(size=curr_particle_positions.shape)
        new_particle_velocities = (
            self.inertia * curr_particle_velocities
            + self.c1 * r1_vec * (best_particle_positions - self._curr_particle_positions)
            + self.c2 * r2_vec * (self._get_best_position() - self._curr_particle_positions)  # type: ignore[operator]
        )

        self._curr_particle_positions = np.clip(
            self._curr_particle_positions + new_particle_velocities,
            a_min=0.0,
            a_max=1.0,
        )
        self._curr_particle_velocities = new_particle_velocities

c1: float property readonly

Get the c1 constant.

c2: float property readonly

Get the c2 constant.

inertia: float property readonly

Get the inertia weight.

is_set_up: bool property readonly

Return true iff the sampler is already set up.

__init__(self, batch_size, random_state=None, inertia=0.9, c1=0.1, c2=0.1, global_minimum_across_samplers=False) special

Initialize the sampler.

Parameters:

Name Type Description Default
batch_size int

the number of points sampled every time the sampler is called

required
random_state int | None

the random state of the sampler, fixing this number the sampler behaves deterministically

None
inertia float

the inertia of the particles' motion

0.9
c1 float

first acceleration constant

0.1
c2 float

second acceleration constant

0.1
global_minimum_across_samplers bool

if True, the global minimum attractor of the particles' dynamics is computed taking into consideration also parameters sampled by other samplers, default is False

False
Source code in black_it/samplers/particle_swarm.py
def __init__(
    self,
    batch_size: int,
    random_state: int | None = None,
    inertia: float = 0.9,
    c1: float = 0.1,
    c2: float = 0.1,
    global_minimum_across_samplers: bool = False,
) -> None:
    """Initialize the sampler.

    Args:
        batch_size: the number of points sampled every time the sampler is called
        random_state: the random state of the sampler, fixing this number the sampler behaves deterministically
        inertia: the inertia of the particles' motion
        c1: first acceleration constant
        c2: second acceleration constant
        global_minimum_across_samplers: if True, the global minimum attractor of the particles' dynamics is computed
            taking into consideration also parameters sampled by other samplers, default is False
    """
    # max_duplication_passes must be zero because the sampler is stateful
    super().__init__(
        batch_size,
        random_state=random_state,
        max_deduplication_passes=0,
    )

    # The batch size is the number of sampled parameters per iteration. In a Black-it sampler, each call to
    # sample_batch represent an iteration of the particle swarm sampler, so it seems natural to set the number of
    # particles to the batch size, as at each iteration sample_batch returns the current positions of the
    # particles.
    self.nb_particles = batch_size

    self._inertia = positive_float(inertia)
    self._c1 = positive_float(c1)
    self._c2 = positive_float(c2)
    self._global_minimum_across_samplers = global_minimum_across_samplers

    # all current particle positions; shape=(nb_particles, space dimensions)
    self._curr_particle_positions: NDArray | None = None
    # all current particle velocities; shape=(nb_particles, space dimensions)
    self._curr_particle_velocities: NDArray | None = None
    # best particle positions, i.e. ; shape=(nb_particles, space dimensions)
    self._best_particle_positions: NDArray | None = None
    # losses of the best positions
    self._best_position_losses: NDArray | None = None
    # particle id of the global best particle position
    self._global_best_particle_id: int | None = None

    # best point in parameter space - could be the best across samplers
    self._best_point: NDArray | None = None

    self._previous_batch_index_start: int | None = None

reset(self)

Reset the sampler.

Source code in black_it/samplers/particle_swarm.py
def reset(self) -> None:
    """Reset the sampler."""
    self._curr_particle_positions = None
    self._curr_particle_velocities = None
    self._best_particle_positions = None
    self._best_position_losses = None
    self._global_best_particle_id = None
    self._previous_batch_index_start = None
    _assert(
        not self.is_set_up,
        error_message="reset call did not work, sampler still set up",
        exception_class=RuntimeError,
    )

sample_batch(self, batch_size, search_space, existing_points, existing_losses)

Sample a batch of parameters.

Source code in black_it/samplers/particle_swarm.py
def sample_batch(
    self,
    batch_size: int,  # noqa: ARG002
    search_space: SearchSpace,
    existing_points: NDArray[np.float64],
    existing_losses: NDArray[np.float64],
) -> NDArray[np.float64]:
    """Sample a batch of parameters."""
    if not self.is_set_up:
        self._set_up(search_space.dims)
        self._previous_batch_index_start = len(existing_points)
        return digitize_data(
            cast("NDArray[np.float64]", self._best_particle_positions),
            search_space.param_grid,
        )

    self._update_best(existing_points, existing_losses)
    self._do_step()

    p_bounds: NDArray[np.float64] = search_space.parameters_bounds
    sampled_points = p_bounds[0] + self._curr_particle_positions * (p_bounds[1] - p_bounds[0])
    self._previous_batch_index_start = len(existing_points)

    return digitize_data(sampled_points, search_space.param_grid)