Skip to content

Schedulers

BaseScheduler interface.

This is the base class for all schedulers.

Source code in black_it/schedulers/base.py
class BaseScheduler(BaseSeedable, ABC):
    """BaseScheduler interface.

    This is the base class for all schedulers.
    """

    def __init__(
        self,
        samplers: Sequence[BaseSampler],
        random_state: int | None = None,
    ) -> None:
        """Initialize the scheduler.

        Args:
            samplers: the list of samplers to be scheduled
            random_state: the random seed for the scheduler behaviour
        """
        # need to set __samplers first because _set_random_state requires samplers to be set
        self._samplers = tuple(samplers)
        BaseSeedable.__init__(self, random_state)

    @property
    def samplers(self) -> tuple[BaseSampler, ...]:
        """Get the sequence of samplers."""
        return self._samplers

    def _set_random_state(self, random_state: int | None) -> None:
        """Set the random state (private use)."""
        super()._set_random_state(random_state)
        for sampler in self.samplers:
            sampler.random_state = self._get_random_seed()

    def start_session(self) -> None:
        """Set up the scheduler for a new session.

        The default is a no-op.
        """

    @abstractmethod
    def get_next_sampler(self) -> BaseSampler:
        """Get the sampler to use for the next batch."""

    @abstractmethod
    def update(
        self,
        batch_id: int,
        new_params: NDArray[np.float64],
        new_losses: NDArray[np.float64],
        new_simulated_data: NDArray[np.float64],
    ) -> None:
        """Update the state of the scheduler after each batch.

        Args:
            batch_id: the batch id of the . Must be an integer equal or greater than 0.
            new_params: the new set of parameters sampled in this batch.
            new_losses: the new set of losses corresponding to the batch.
            new_simulated_data: the new set of simulated data, one for each sampled parameter.
        """

    def end_session(self) -> None:
        """Tear down the scheduler at the end of the session.

        The default is a no-op.
        """

    @contextlib.contextmanager
    def session(self) -> Generator[None, None, None]:
        """Start the session of the scheduler with a context manager."""
        self.start_session()
        yield
        self.end_session()

samplers: tuple[BaseSampler, ...] property readonly

Get the sequence of samplers.

__init__(self, samplers, random_state=None) special

Initialize the scheduler.

Parameters:

Name Type Description Default
samplers Sequence[BaseSampler]

the list of samplers to be scheduled

required
random_state int | None

the random seed for the scheduler behaviour

None
Source code in black_it/schedulers/base.py
def __init__(
    self,
    samplers: Sequence[BaseSampler],
    random_state: int | None = None,
) -> None:
    """Initialize the scheduler.

    Args:
        samplers: the list of samplers to be scheduled
        random_state: the random seed for the scheduler behaviour
    """
    # need to set __samplers first because _set_random_state requires samplers to be set
    self._samplers = tuple(samplers)
    BaseSeedable.__init__(self, random_state)

end_session(self)

Tear down the scheduler at the end of the session.

The default is a no-op.

Source code in black_it/schedulers/base.py
def end_session(self) -> None:
    """Tear down the scheduler at the end of the session.

    The default is a no-op.
    """

get_next_sampler(self)

Get the sampler to use for the next batch.

Source code in black_it/schedulers/base.py
@abstractmethod
def get_next_sampler(self) -> BaseSampler:
    """Get the sampler to use for the next batch."""

session(self)

Start the session of the scheduler with a context manager.

Source code in black_it/schedulers/base.py
@contextlib.contextmanager
def session(self) -> Generator[None, None, None]:
    """Start the session of the scheduler with a context manager."""
    self.start_session()
    yield
    self.end_session()

start_session(self)

Set up the scheduler for a new session.

The default is a no-op.

Source code in black_it/schedulers/base.py
def start_session(self) -> None:
    """Set up the scheduler for a new session.

    The default is a no-op.
    """

update(self, batch_id, new_params, new_losses, new_simulated_data)

Update the state of the scheduler after each batch.

Parameters:

Name Type Description Default
batch_id int

the batch id of the . Must be an integer equal or greater than 0.

required
new_params NDArray[np.float64]

the new set of parameters sampled in this batch.

required
new_losses NDArray[np.float64]

the new set of losses corresponding to the batch.

required
new_simulated_data NDArray[np.float64]

the new set of simulated data, one for each sampled parameter.

required
Source code in black_it/schedulers/base.py
@abstractmethod
def update(
    self,
    batch_id: int,
    new_params: NDArray[np.float64],
    new_losses: NDArray[np.float64],
    new_simulated_data: NDArray[np.float64],
) -> None:
    """Update the state of the scheduler after each batch.

    Args:
        batch_id: the batch id of the . Must be an integer equal or greater than 0.
        new_params: the new set of parameters sampled in this batch.
        new_losses: the new set of losses corresponding to the batch.
        new_simulated_data: the new set of simulated data, one for each sampled parameter.
    """

This class implement a simple round-robin sampler scheduler.

The round-robin scheduler takes in input a list of samplers [S_0, S_1, ..., S_{n-1}], and, at batch i, it proposes the (i % n)-th sampler.

Source code in black_it/schedulers/round_robin.py
class RoundRobinScheduler(BaseScheduler):
    """This class implement a simple round-robin sampler scheduler.

    The round-robin scheduler takes in input a list of samplers [S_0, S_1, ..., S_{n-1}],
      and, at batch i, it proposes the (i % n)-th sampler.
    """

    def __init__(  # type: ignore[no-untyped-def]
        self,
        *args,  # noqa: ANN002
        **kwargs,  # noqa: ANN003
    ) -> None:
        """Initialize the round-robin scheduler."""
        super().__init__(*args, **kwargs)

        self._batch_id = 0

    def get_next_sampler(self) -> BaseSampler:
        """Get the next sampler."""
        return self.samplers[self._batch_id % len(self.samplers)]

    def update(
        self,
        batch_id: int,  # noqa: ARG002
        new_params: NDArray[np.float64],  # noqa: ARG002
        new_losses: NDArray[np.float64],  # noqa: ARG002
        new_simulated_data: NDArray[np.float64],  # noqa: ARG002
    ) -> None:
        """Update the state of the scheduler after each batch."""
        self._batch_id += 1

__init__(self, *args, **kwargs) special

Initialize the round-robin scheduler.

Source code in black_it/schedulers/round_robin.py
def __init__(  # type: ignore[no-untyped-def]
    self,
    *args,  # noqa: ANN002
    **kwargs,  # noqa: ANN003
) -> None:
    """Initialize the round-robin scheduler."""
    super().__init__(*args, **kwargs)

    self._batch_id = 0

get_next_sampler(self)

Get the next sampler.

Source code in black_it/schedulers/round_robin.py
def get_next_sampler(self) -> BaseSampler:
    """Get the next sampler."""
    return self.samplers[self._batch_id % len(self.samplers)]

update(self, batch_id, new_params, new_losses, new_simulated_data)

Update the state of the scheduler after each batch.

Source code in black_it/schedulers/round_robin.py
def update(
    self,
    batch_id: int,  # noqa: ARG002
    new_params: NDArray[np.float64],  # noqa: ARG002
    new_losses: NDArray[np.float64],  # noqa: ARG002
    new_simulated_data: NDArray[np.float64],  # noqa: ARG002
) -> None:
    """Update the state of the scheduler after each batch."""
    self._batch_id += 1

This class implement a RL-based scheduler.

It is agnostic wrt the RL algorithm being used.

Source code in black_it/schedulers/rl/rl_scheduler.py
class RLScheduler(BaseScheduler):
    """This class implement a RL-based scheduler.

    It is agnostic wrt the RL algorithm being used.
    """

    def __init__(
        self,
        samplers: Sequence[BaseSampler],
        agent: Agent,
        env: CalibrationEnv,
        random_state: int | None = None,
    ) -> None:
        """Initialize the scheduler."""
        self._original_samplers = samplers
        new_samplers, self._halton_sampler_id = self._add_or_get_bootstrap_sampler(
            samplers,
        )

        self._agent = agent
        self._env = env

        super().__init__(new_samplers, random_state)

        self._in_queue: Queue = self._env._out_queue  # noqa: SLF001
        self._out_queue: Queue = self._env._in_queue  # noqa: SLF001

        self._best_param: float | None = None
        self._best_loss: float | None = None

        self._agent_thread: threading.Thread | None = None
        self._stopped: bool = True

    def _set_random_state(self, random_state: int | None) -> None:
        """Set the random state (private use)."""
        super()._set_random_state(random_state)
        for sampler in self.samplers:
            sampler.random_state = self._get_random_seed()
        self._agent.random_state = self._get_random_seed()
        self._env.reset(seed=self._get_random_seed())

    @classmethod
    def _add_or_get_bootstrap_sampler(
        cls,
        samplers: Sequence[BaseSampler],
    ) -> tuple[Sequence[BaseSampler], int]:
        """Add or retrieve a sampler for bootstrapping.

        Many samplers do require some "bootstrapping" of the calibration process, i.e. a set of parameters
          whose loss has been already evaluated, e.g. samplers based on ML surrogates or on evolutionary approaches.
          Therefore, this scheduler must guarantee that the first proposed sampler is one that does not need previous
          model evaluations in input. One of such samplers is the Halton sampler

        Therefore, this function checks that the HaltonSampler is present in the set of samplers. If so, it returns
          the same set of samplers, and the index corresponding to that sampler in the sequence. Otherwise, a new
          instance of HaltonSampler is added to the list as first element.

        Args:
            samplers: the list of available samplers

        Returns:
            The pair (new_samplers, halton_sampler_id).
        """
        sampler_types = {type(s): i for i, s in enumerate(samplers)}
        if HaltonSampler in sampler_types:
            return samplers, sampler_types[HaltonSampler]

        new_sampler = HaltonSampler(batch_size=1)
        return tuple(list(samplers) + cast("list[BaseSampler]", [new_sampler])), len(
            samplers,
        )

    def _train(self) -> None:
        """Run the training loop."""
        state = self._env.reset()
        while not self._stopped:
            # Get the action chosen by the agent
            action = self._agent.policy(state)
            # Interact with the environment
            next_state, reward, _, _, _ = self._env.step(action)
            # Learn from interaction
            self._agent.learn(state, action, reward, next_state)
            state = next_state

    def start_session(self) -> None:
        """Set up the scheduler for a new session."""
        if not self._stopped:
            msg = "cannot start session: the session has already started"
            raise ValueError(msg)
        self._stopped = False
        self._agent_thread = threading.Thread(target=self._train)
        self._agent_thread.start()

    def get_next_sampler(self) -> BaseSampler:
        """Get the next sampler."""
        if self._best_loss is None:
            # first call, return halton sampler
            return self.samplers[self._halton_sampler_id]
        chosen_sampler_id = self._in_queue.get()
        return self.samplers[chosen_sampler_id]

    def update(
        self,
        batch_id: int,  # noqa: ARG002
        new_params: NDArray[np.float64],
        new_losses: NDArray[np.float64],
        new_simulated_data: NDArray[np.float64],  # noqa: ARG002
    ) -> None:
        """Update the RL scheduler."""
        best_new_loss = float(np.min(new_losses))
        if self._best_loss is None:
            self._best_loss = best_new_loss
            self._best_param = new_params[np.argmin(new_losses)]
            self._env._curr_best_loss = best_new_loss  # noqa: SLF001
            return
        if best_new_loss < cast("float", self._best_loss):
            self._best_loss = best_new_loss
            self._best_param = new_params[np.argmin(new_losses)]

        self._out_queue.put((self._best_param, self._best_loss))

    def end_session(self) -> None:
        """Tear down the scheduler at the end of the session."""
        if self._stopped:
            msg = "cannot start session: the session has not started yet"
            raise ValueError(msg)
        self._stopped = True
        self._out_queue.put(None)
        cast("threading.Thread", self._agent_thread).join()

__init__(self, samplers, agent, env, random_state=None) special

Initialize the scheduler.

Source code in black_it/schedulers/rl/rl_scheduler.py
def __init__(
    self,
    samplers: Sequence[BaseSampler],
    agent: Agent,
    env: CalibrationEnv,
    random_state: int | None = None,
) -> None:
    """Initialize the scheduler."""
    self._original_samplers = samplers
    new_samplers, self._halton_sampler_id = self._add_or_get_bootstrap_sampler(
        samplers,
    )

    self._agent = agent
    self._env = env

    super().__init__(new_samplers, random_state)

    self._in_queue: Queue = self._env._out_queue  # noqa: SLF001
    self._out_queue: Queue = self._env._in_queue  # noqa: SLF001

    self._best_param: float | None = None
    self._best_loss: float | None = None

    self._agent_thread: threading.Thread | None = None
    self._stopped: bool = True

end_session(self)

Tear down the scheduler at the end of the session.

Source code in black_it/schedulers/rl/rl_scheduler.py
def end_session(self) -> None:
    """Tear down the scheduler at the end of the session."""
    if self._stopped:
        msg = "cannot start session: the session has not started yet"
        raise ValueError(msg)
    self._stopped = True
    self._out_queue.put(None)
    cast("threading.Thread", self._agent_thread).join()

get_next_sampler(self)

Get the next sampler.

Source code in black_it/schedulers/rl/rl_scheduler.py
def get_next_sampler(self) -> BaseSampler:
    """Get the next sampler."""
    if self._best_loss is None:
        # first call, return halton sampler
        return self.samplers[self._halton_sampler_id]
    chosen_sampler_id = self._in_queue.get()
    return self.samplers[chosen_sampler_id]

start_session(self)

Set up the scheduler for a new session.

Source code in black_it/schedulers/rl/rl_scheduler.py
def start_session(self) -> None:
    """Set up the scheduler for a new session."""
    if not self._stopped:
        msg = "cannot start session: the session has already started"
        raise ValueError(msg)
    self._stopped = False
    self._agent_thread = threading.Thread(target=self._train)
    self._agent_thread.start()

update(self, batch_id, new_params, new_losses, new_simulated_data)

Update the RL scheduler.

Source code in black_it/schedulers/rl/rl_scheduler.py
def update(
    self,
    batch_id: int,  # noqa: ARG002
    new_params: NDArray[np.float64],
    new_losses: NDArray[np.float64],
    new_simulated_data: NDArray[np.float64],  # noqa: ARG002
) -> None:
    """Update the RL scheduler."""
    best_new_loss = float(np.min(new_losses))
    if self._best_loss is None:
        self._best_loss = best_new_loss
        self._best_param = new_params[np.argmin(new_losses)]
        self._env._curr_best_loss = best_new_loss  # noqa: SLF001
        return
    if best_new_loss < cast("float", self._best_loss):
        self._best_loss = best_new_loss
        self._best_param = new_params[np.argmin(new_losses)]

    self._out_queue.put((self._best_param, self._best_loss))

Implementation of a MAB eps-greedy algorithm.

Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
class MABEpsilonGreedy(Agent[int, int]):
    """Implementation of a MAB eps-greedy algorithm."""

    def __init__(
        self,
        n_actions: int,
        alpha: float,
        eps: float,
        initial_values: float = 0.0,
        random_state: int | None = None,
    ) -> None:
        """Initialize the agent object.

        Args:
            n_actions: the number of actions
            alpha: the learning rate
            eps: the epsilon parameter
            initial_values: the initial value for the Q-function
            random_state: the random state
        """
        super().__init__(random_state=random_state)
        self.n_actions = n_actions
        self.actions_count = [0] * self.n_actions

        self.Q = [initial_values] * self.n_actions
        self.alpha = alpha
        self.eps = eps
        self.initial_values = initial_values

    def get_step_size(self, action: int) -> float:
        """Get the step size."""
        return 1 / self.actions_count[action] if self.alpha == -1 else self.alpha

    def learn(
        self,
        state: int,  # noqa: ARG002
        action: int,
        reward: SupportsFloat,
        next_state: int,  # noqa: ARG002
    ) -> None:
        """Learn from an agent-environment interaction timestep."""
        self.actions_count[action] += 1

        step_size = self.get_step_size(action)

        # do the learning
        self.Q[action] += step_size * (cast("float", reward) - self.Q[action])

    def policy(self, _obs: int) -> int:
        """Get the action for this observation."""
        best_action = np.argmax(self.Q)

        random_e = self.random_generator.random()
        if not random_e < self.eps:
            action = best_action
        else:
            options = np.arange(self.n_actions)
            action = self.random_generator.choice(options, 1)[0]

        return int(action)

    def reset(self) -> None:
        """Reset the agent."""
        self.Q = [0.0] * self.n_actions
        self.actions_count = [0] * self.n_actions

__init__(self, n_actions, alpha, eps, initial_values=0.0, random_state=None) special

Initialize the agent object.

Parameters:

Name Type Description Default
n_actions int

the number of actions

required
alpha float

the learning rate

required
eps float

the epsilon parameter

required
initial_values float

the initial value for the Q-function

0.0
random_state int | None

the random state

None
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def __init__(
    self,
    n_actions: int,
    alpha: float,
    eps: float,
    initial_values: float = 0.0,
    random_state: int | None = None,
) -> None:
    """Initialize the agent object.

    Args:
        n_actions: the number of actions
        alpha: the learning rate
        eps: the epsilon parameter
        initial_values: the initial value for the Q-function
        random_state: the random state
    """
    super().__init__(random_state=random_state)
    self.n_actions = n_actions
    self.actions_count = [0] * self.n_actions

    self.Q = [initial_values] * self.n_actions
    self.alpha = alpha
    self.eps = eps
    self.initial_values = initial_values

get_step_size(self, action)

Get the step size.

Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def get_step_size(self, action: int) -> float:
    """Get the step size."""
    return 1 / self.actions_count[action] if self.alpha == -1 else self.alpha

learn(self, state, action, reward, next_state)

Learn from an agent-environment interaction timestep.

Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def learn(
    self,
    state: int,  # noqa: ARG002
    action: int,
    reward: SupportsFloat,
    next_state: int,  # noqa: ARG002
) -> None:
    """Learn from an agent-environment interaction timestep."""
    self.actions_count[action] += 1

    step_size = self.get_step_size(action)

    # do the learning
    self.Q[action] += step_size * (cast("float", reward) - self.Q[action])

policy(self, _obs)

Get the action for this observation.

Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def policy(self, _obs: int) -> int:
    """Get the action for this observation."""
    best_action = np.argmax(self.Q)

    random_e = self.random_generator.random()
    if not random_e < self.eps:
        action = best_action
    else:
        options = np.arange(self.n_actions)
        action = self.random_generator.choice(options, 1)[0]

    return int(action)

reset(self)

Reset the agent.

Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def reset(self) -> None:
    """Reset the agent."""
    self.Q = [0.0] * self.n_actions
    self.actions_count = [0] * self.n_actions