Schedulers
BaseScheduler interface.
This is the base class for all schedulers.
Source code in black_it/schedulers/base.py
class BaseScheduler(BaseSeedable, ABC):
"""BaseScheduler interface.
This is the base class for all schedulers.
"""
def __init__(
self,
samplers: Sequence[BaseSampler],
random_state: int | None = None,
) -> None:
"""Initialize the scheduler.
Args:
samplers: the list of samplers to be scheduled
random_state: the random seed for the scheduler behaviour
"""
# need to set __samplers first because _set_random_state requires samplers to be set
self._samplers = tuple(samplers)
BaseSeedable.__init__(self, random_state)
@property
def samplers(self) -> tuple[BaseSampler, ...]:
"""Get the sequence of samplers."""
return self._samplers
def _set_random_state(self, random_state: int | None) -> None:
"""Set the random state (private use)."""
super()._set_random_state(random_state)
for sampler in self.samplers:
sampler.random_state = self._get_random_seed()
def start_session(self) -> None:
"""Set up the scheduler for a new session.
The default is a no-op.
"""
@abstractmethod
def get_next_sampler(self) -> BaseSampler:
"""Get the sampler to use for the next batch."""
@abstractmethod
def update(
self,
batch_id: int,
new_params: NDArray[np.float64],
new_losses: NDArray[np.float64],
new_simulated_data: NDArray[np.float64],
) -> None:
"""Update the state of the scheduler after each batch.
Args:
batch_id: the batch id of the . Must be an integer equal or greater than 0.
new_params: the new set of parameters sampled in this batch.
new_losses: the new set of losses corresponding to the batch.
new_simulated_data: the new set of simulated data, one for each sampled parameter.
"""
def end_session(self) -> None:
"""Tear down the scheduler at the end of the session.
The default is a no-op.
"""
@contextlib.contextmanager
def session(self) -> Generator[None, None, None]:
"""Start the session of the scheduler with a context manager."""
self.start_session()
yield
self.end_session()
samplers: tuple[BaseSampler, ...]
property
readonly
Get the sequence of samplers.
__init__(self, samplers, random_state=None)
special
Initialize the scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samplers |
Sequence[BaseSampler] |
the list of samplers to be scheduled |
required |
random_state |
int | None |
the random seed for the scheduler behaviour |
None |
Source code in black_it/schedulers/base.py
def __init__(
self,
samplers: Sequence[BaseSampler],
random_state: int | None = None,
) -> None:
"""Initialize the scheduler.
Args:
samplers: the list of samplers to be scheduled
random_state: the random seed for the scheduler behaviour
"""
# need to set __samplers first because _set_random_state requires samplers to be set
self._samplers = tuple(samplers)
BaseSeedable.__init__(self, random_state)
end_session(self)
Tear down the scheduler at the end of the session.
The default is a no-op.
Source code in black_it/schedulers/base.py
def end_session(self) -> None:
"""Tear down the scheduler at the end of the session.
The default is a no-op.
"""
get_next_sampler(self)
Get the sampler to use for the next batch.
Source code in black_it/schedulers/base.py
@abstractmethod
def get_next_sampler(self) -> BaseSampler:
"""Get the sampler to use for the next batch."""
session(self)
Start the session of the scheduler with a context manager.
Source code in black_it/schedulers/base.py
@contextlib.contextmanager
def session(self) -> Generator[None, None, None]:
"""Start the session of the scheduler with a context manager."""
self.start_session()
yield
self.end_session()
start_session(self)
Set up the scheduler for a new session.
The default is a no-op.
Source code in black_it/schedulers/base.py
def start_session(self) -> None:
"""Set up the scheduler for a new session.
The default is a no-op.
"""
update(self, batch_id, new_params, new_losses, new_simulated_data)
Update the state of the scheduler after each batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_id |
int |
the batch id of the . Must be an integer equal or greater than 0. |
required |
new_params |
NDArray[np.float64] |
the new set of parameters sampled in this batch. |
required |
new_losses |
NDArray[np.float64] |
the new set of losses corresponding to the batch. |
required |
new_simulated_data |
NDArray[np.float64] |
the new set of simulated data, one for each sampled parameter. |
required |
Source code in black_it/schedulers/base.py
@abstractmethod
def update(
self,
batch_id: int,
new_params: NDArray[np.float64],
new_losses: NDArray[np.float64],
new_simulated_data: NDArray[np.float64],
) -> None:
"""Update the state of the scheduler after each batch.
Args:
batch_id: the batch id of the . Must be an integer equal or greater than 0.
new_params: the new set of parameters sampled in this batch.
new_losses: the new set of losses corresponding to the batch.
new_simulated_data: the new set of simulated data, one for each sampled parameter.
"""
This class implement a simple round-robin sampler scheduler.
The round-robin scheduler takes in input a list of samplers [S_0, S_1, ..., S_{n-1}], and, at batch i, it proposes the (i % n)-th sampler.
Source code in black_it/schedulers/round_robin.py
class RoundRobinScheduler(BaseScheduler):
"""This class implement a simple round-robin sampler scheduler.
The round-robin scheduler takes in input a list of samplers [S_0, S_1, ..., S_{n-1}],
and, at batch i, it proposes the (i % n)-th sampler.
"""
def __init__( # type: ignore[no-untyped-def]
self,
*args, # noqa: ANN002
**kwargs, # noqa: ANN003
) -> None:
"""Initialize the round-robin scheduler."""
super().__init__(*args, **kwargs)
self._batch_id = 0
def get_next_sampler(self) -> BaseSampler:
"""Get the next sampler."""
return self.samplers[self._batch_id % len(self.samplers)]
def update(
self,
batch_id: int, # noqa: ARG002
new_params: NDArray[np.float64], # noqa: ARG002
new_losses: NDArray[np.float64], # noqa: ARG002
new_simulated_data: NDArray[np.float64], # noqa: ARG002
) -> None:
"""Update the state of the scheduler after each batch."""
self._batch_id += 1
__init__(self, *args, **kwargs)
special
Initialize the round-robin scheduler.
Source code in black_it/schedulers/round_robin.py
def __init__( # type: ignore[no-untyped-def]
self,
*args, # noqa: ANN002
**kwargs, # noqa: ANN003
) -> None:
"""Initialize the round-robin scheduler."""
super().__init__(*args, **kwargs)
self._batch_id = 0
get_next_sampler(self)
Get the next sampler.
Source code in black_it/schedulers/round_robin.py
def get_next_sampler(self) -> BaseSampler:
"""Get the next sampler."""
return self.samplers[self._batch_id % len(self.samplers)]
update(self, batch_id, new_params, new_losses, new_simulated_data)
Update the state of the scheduler after each batch.
Source code in black_it/schedulers/round_robin.py
def update(
self,
batch_id: int, # noqa: ARG002
new_params: NDArray[np.float64], # noqa: ARG002
new_losses: NDArray[np.float64], # noqa: ARG002
new_simulated_data: NDArray[np.float64], # noqa: ARG002
) -> None:
"""Update the state of the scheduler after each batch."""
self._batch_id += 1
This class implement a RL-based scheduler.
It is agnostic wrt the RL algorithm being used.
Source code in black_it/schedulers/rl/rl_scheduler.py
class RLScheduler(BaseScheduler):
"""This class implement a RL-based scheduler.
It is agnostic wrt the RL algorithm being used.
"""
def __init__(
self,
samplers: Sequence[BaseSampler],
agent: Agent,
env: CalibrationEnv,
random_state: int | None = None,
) -> None:
"""Initialize the scheduler."""
self._original_samplers = samplers
new_samplers, self._halton_sampler_id = self._add_or_get_bootstrap_sampler(
samplers,
)
self._agent = agent
self._env = env
super().__init__(new_samplers, random_state)
self._in_queue: Queue = self._env._out_queue # noqa: SLF001
self._out_queue: Queue = self._env._in_queue # noqa: SLF001
self._best_param: float | None = None
self._best_loss: float | None = None
self._agent_thread: threading.Thread | None = None
self._stopped: bool = True
def _set_random_state(self, random_state: int | None) -> None:
"""Set the random state (private use)."""
super()._set_random_state(random_state)
for sampler in self.samplers:
sampler.random_state = self._get_random_seed()
self._agent.random_state = self._get_random_seed()
self._env.reset(seed=self._get_random_seed())
@classmethod
def _add_or_get_bootstrap_sampler(
cls,
samplers: Sequence[BaseSampler],
) -> tuple[Sequence[BaseSampler], int]:
"""Add or retrieve a sampler for bootstrapping.
Many samplers do require some "bootstrapping" of the calibration process, i.e. a set of parameters
whose loss has been already evaluated, e.g. samplers based on ML surrogates or on evolutionary approaches.
Therefore, this scheduler must guarantee that the first proposed sampler is one that does not need previous
model evaluations in input. One of such samplers is the Halton sampler
Therefore, this function checks that the HaltonSampler is present in the set of samplers. If so, it returns
the same set of samplers, and the index corresponding to that sampler in the sequence. Otherwise, a new
instance of HaltonSampler is added to the list as first element.
Args:
samplers: the list of available samplers
Returns:
The pair (new_samplers, halton_sampler_id).
"""
sampler_types = {type(s): i for i, s in enumerate(samplers)}
if HaltonSampler in sampler_types:
return samplers, sampler_types[HaltonSampler]
new_sampler = HaltonSampler(batch_size=1)
return tuple(list(samplers) + cast("list[BaseSampler]", [new_sampler])), len(
samplers,
)
def _train(self) -> None:
"""Run the training loop."""
state = self._env.reset()
while not self._stopped:
# Get the action chosen by the agent
action = self._agent.policy(state)
# Interact with the environment
next_state, reward, _, _, _ = self._env.step(action)
# Learn from interaction
self._agent.learn(state, action, reward, next_state)
state = next_state
def start_session(self) -> None:
"""Set up the scheduler for a new session."""
if not self._stopped:
msg = "cannot start session: the session has already started"
raise ValueError(msg)
self._stopped = False
self._agent_thread = threading.Thread(target=self._train)
self._agent_thread.start()
def get_next_sampler(self) -> BaseSampler:
"""Get the next sampler."""
if self._best_loss is None:
# first call, return halton sampler
return self.samplers[self._halton_sampler_id]
chosen_sampler_id = self._in_queue.get()
return self.samplers[chosen_sampler_id]
def update(
self,
batch_id: int, # noqa: ARG002
new_params: NDArray[np.float64],
new_losses: NDArray[np.float64],
new_simulated_data: NDArray[np.float64], # noqa: ARG002
) -> None:
"""Update the RL scheduler."""
best_new_loss = float(np.min(new_losses))
if self._best_loss is None:
self._best_loss = best_new_loss
self._best_param = new_params[np.argmin(new_losses)]
self._env._curr_best_loss = best_new_loss # noqa: SLF001
return
if best_new_loss < cast("float", self._best_loss):
self._best_loss = best_new_loss
self._best_param = new_params[np.argmin(new_losses)]
self._out_queue.put((self._best_param, self._best_loss))
def end_session(self) -> None:
"""Tear down the scheduler at the end of the session."""
if self._stopped:
msg = "cannot start session: the session has not started yet"
raise ValueError(msg)
self._stopped = True
self._out_queue.put(None)
cast("threading.Thread", self._agent_thread).join()
__init__(self, samplers, agent, env, random_state=None)
special
Initialize the scheduler.
Source code in black_it/schedulers/rl/rl_scheduler.py
def __init__(
self,
samplers: Sequence[BaseSampler],
agent: Agent,
env: CalibrationEnv,
random_state: int | None = None,
) -> None:
"""Initialize the scheduler."""
self._original_samplers = samplers
new_samplers, self._halton_sampler_id = self._add_or_get_bootstrap_sampler(
samplers,
)
self._agent = agent
self._env = env
super().__init__(new_samplers, random_state)
self._in_queue: Queue = self._env._out_queue # noqa: SLF001
self._out_queue: Queue = self._env._in_queue # noqa: SLF001
self._best_param: float | None = None
self._best_loss: float | None = None
self._agent_thread: threading.Thread | None = None
self._stopped: bool = True
end_session(self)
Tear down the scheduler at the end of the session.
Source code in black_it/schedulers/rl/rl_scheduler.py
def end_session(self) -> None:
"""Tear down the scheduler at the end of the session."""
if self._stopped:
msg = "cannot start session: the session has not started yet"
raise ValueError(msg)
self._stopped = True
self._out_queue.put(None)
cast("threading.Thread", self._agent_thread).join()
get_next_sampler(self)
Get the next sampler.
Source code in black_it/schedulers/rl/rl_scheduler.py
def get_next_sampler(self) -> BaseSampler:
"""Get the next sampler."""
if self._best_loss is None:
# first call, return halton sampler
return self.samplers[self._halton_sampler_id]
chosen_sampler_id = self._in_queue.get()
return self.samplers[chosen_sampler_id]
start_session(self)
Set up the scheduler for a new session.
Source code in black_it/schedulers/rl/rl_scheduler.py
def start_session(self) -> None:
"""Set up the scheduler for a new session."""
if not self._stopped:
msg = "cannot start session: the session has already started"
raise ValueError(msg)
self._stopped = False
self._agent_thread = threading.Thread(target=self._train)
self._agent_thread.start()
update(self, batch_id, new_params, new_losses, new_simulated_data)
Update the RL scheduler.
Source code in black_it/schedulers/rl/rl_scheduler.py
def update(
self,
batch_id: int, # noqa: ARG002
new_params: NDArray[np.float64],
new_losses: NDArray[np.float64],
new_simulated_data: NDArray[np.float64], # noqa: ARG002
) -> None:
"""Update the RL scheduler."""
best_new_loss = float(np.min(new_losses))
if self._best_loss is None:
self._best_loss = best_new_loss
self._best_param = new_params[np.argmin(new_losses)]
self._env._curr_best_loss = best_new_loss # noqa: SLF001
return
if best_new_loss < cast("float", self._best_loss):
self._best_loss = best_new_loss
self._best_param = new_params[np.argmin(new_losses)]
self._out_queue.put((self._best_param, self._best_loss))
Implementation of a MAB eps-greedy algorithm.
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
class MABEpsilonGreedy(Agent[int, int]):
"""Implementation of a MAB eps-greedy algorithm."""
def __init__(
self,
n_actions: int,
alpha: float,
eps: float,
initial_values: float = 0.0,
random_state: int | None = None,
) -> None:
"""Initialize the agent object.
Args:
n_actions: the number of actions
alpha: the learning rate
eps: the epsilon parameter
initial_values: the initial value for the Q-function
random_state: the random state
"""
super().__init__(random_state=random_state)
self.n_actions = n_actions
self.actions_count = [0] * self.n_actions
self.Q = [initial_values] * self.n_actions
self.alpha = alpha
self.eps = eps
self.initial_values = initial_values
def get_step_size(self, action: int) -> float:
"""Get the step size."""
return 1 / self.actions_count[action] if self.alpha == -1 else self.alpha
def learn(
self,
state: int, # noqa: ARG002
action: int,
reward: SupportsFloat,
next_state: int, # noqa: ARG002
) -> None:
"""Learn from an agent-environment interaction timestep."""
self.actions_count[action] += 1
step_size = self.get_step_size(action)
# do the learning
self.Q[action] += step_size * (cast("float", reward) - self.Q[action])
def policy(self, _obs: int) -> int:
"""Get the action for this observation."""
best_action = np.argmax(self.Q)
random_e = self.random_generator.random()
if not random_e < self.eps:
action = best_action
else:
options = np.arange(self.n_actions)
action = self.random_generator.choice(options, 1)[0]
return int(action)
def reset(self) -> None:
"""Reset the agent."""
self.Q = [0.0] * self.n_actions
self.actions_count = [0] * self.n_actions
__init__(self, n_actions, alpha, eps, initial_values=0.0, random_state=None)
special
Initialize the agent object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_actions |
int |
the number of actions |
required |
alpha |
float |
the learning rate |
required |
eps |
float |
the epsilon parameter |
required |
initial_values |
float |
the initial value for the Q-function |
0.0 |
random_state |
int | None |
the random state |
None |
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def __init__(
self,
n_actions: int,
alpha: float,
eps: float,
initial_values: float = 0.0,
random_state: int | None = None,
) -> None:
"""Initialize the agent object.
Args:
n_actions: the number of actions
alpha: the learning rate
eps: the epsilon parameter
initial_values: the initial value for the Q-function
random_state: the random state
"""
super().__init__(random_state=random_state)
self.n_actions = n_actions
self.actions_count = [0] * self.n_actions
self.Q = [initial_values] * self.n_actions
self.alpha = alpha
self.eps = eps
self.initial_values = initial_values
get_step_size(self, action)
Get the step size.
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def get_step_size(self, action: int) -> float:
"""Get the step size."""
return 1 / self.actions_count[action] if self.alpha == -1 else self.alpha
learn(self, state, action, reward, next_state)
Learn from an agent-environment interaction timestep.
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def learn(
self,
state: int, # noqa: ARG002
action: int,
reward: SupportsFloat,
next_state: int, # noqa: ARG002
) -> None:
"""Learn from an agent-environment interaction timestep."""
self.actions_count[action] += 1
step_size = self.get_step_size(action)
# do the learning
self.Q[action] += step_size * (cast("float", reward) - self.Q[action])
policy(self, _obs)
Get the action for this observation.
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def policy(self, _obs: int) -> int:
"""Get the action for this observation."""
best_action = np.argmax(self.Q)
random_e = self.random_generator.random()
if not random_e < self.eps:
action = best_action
else:
options = np.arange(self.n_actions)
action = self.random_generator.choice(options, 1)[0]
return int(action)
reset(self)
Reset the agent.
Source code in black_it/schedulers/rl/agents/epsilon_greedy.py
def reset(self) -> None:
"""Reset the agent."""
self.Q = [0.0] * self.n_actions
self.actions_count = [0] * self.n_actions