Loss functions
black_it.loss_functions.base.BaseLoss (ABC)
BaseLoss interface.
Source code in black_it/loss_functions/base.py
class BaseLoss(ABC):
"""BaseLoss interface."""
def __init__(
self,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
):
"""
Initialize the loss function.
Args:
coordinate_weights: the weights of the loss coordinates.
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
self.coordinate_weights = coordinate_weights
self.coordinate_filters = coordinate_filters
def compute_loss(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Compute the loss between simulated and real data.
Args:
sim_data_ensemble: an ensemble of simulated data, of shape (ensemble_size, N, D)
real_data: the real data, of shape (N, D)
Returns:
The loss value.
"""
num_coords = real_data.shape[1]
if self.coordinate_weights is None:
weights = np.ones(num_coords) / num_coords
else:
nb_coordinate_weights = len(self.coordinate_weights)
_assert(
nb_coordinate_weights == num_coords,
(
"the length of coordinate_weights should be equal "
f"to the number of coordinates, got {nb_coordinate_weights} and {num_coords}"
),
exception_class=ValueError,
)
weights = self.coordinate_weights
filters: List[Optional[Callable]]
if self.coordinate_filters is None:
# a list of identity functions
filters = [None] * num_coords
else:
nb_coordinate_filters = len(self.coordinate_filters)
_assert(
nb_coordinate_filters == num_coords,
(
"the length of coordinate_filters should be equal "
f"to the number of coordinates, got {nb_coordinate_filters} and {num_coords}"
),
exception_class=ValueError,
)
filters = self.coordinate_filters
loss = 0
for i in range(num_coords):
filter_ = filters[i]
if filter_ is None:
filtered_data = sim_data_ensemble[:, :, i]
else:
filtered_data = np.array(
[
filter_(sim_data_ensemble[j, :, i])
for j in range(sim_data_ensemble.shape[0])
]
)
loss += self.compute_loss_1d(filtered_data, real_data[:, i]) * weights[i]
return loss
@abstractmethod
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Return the loss between a specific coordinate of two time series.
Concrete classes have to override this method in order to implement new
loss functions.
Args:
sim_data_ensemble: an ensemble of simulated 1D series, shape (ensemble_size, N, 1)
real_data: the real data, shape (N, 1)
Returns:
the computed loss over the specific coordinate
"""
__init__(self, coordinate_weights=None, coordinate_filters=None)
special
Initialize the loss function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
coordinate_weights |
Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]] |
the weights of the loss coordinates. |
None |
coordinate_filters |
Optional[List[Optional[Callable]]] |
filters/transformations to be applied to each simulated series before the loss computation. |
None |
Source code in black_it/loss_functions/base.py
def __init__(
self,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
):
"""
Initialize the loss function.
Args:
coordinate_weights: the weights of the loss coordinates.
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
self.coordinate_weights = coordinate_weights
self.coordinate_filters = coordinate_filters
compute_loss(self, sim_data_ensemble, real_data)
Compute the loss between simulated and real data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_data_ensemble |
ndarray |
an ensemble of simulated data, of shape (ensemble_size, N, D) |
required |
real_data |
ndarray |
the real data, of shape (N, D) |
required |
Returns:
Type | Description |
---|---|
float |
The loss value. |
Source code in black_it/loss_functions/base.py
def compute_loss(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Compute the loss between simulated and real data.
Args:
sim_data_ensemble: an ensemble of simulated data, of shape (ensemble_size, N, D)
real_data: the real data, of shape (N, D)
Returns:
The loss value.
"""
num_coords = real_data.shape[1]
if self.coordinate_weights is None:
weights = np.ones(num_coords) / num_coords
else:
nb_coordinate_weights = len(self.coordinate_weights)
_assert(
nb_coordinate_weights == num_coords,
(
"the length of coordinate_weights should be equal "
f"to the number of coordinates, got {nb_coordinate_weights} and {num_coords}"
),
exception_class=ValueError,
)
weights = self.coordinate_weights
filters: List[Optional[Callable]]
if self.coordinate_filters is None:
# a list of identity functions
filters = [None] * num_coords
else:
nb_coordinate_filters = len(self.coordinate_filters)
_assert(
nb_coordinate_filters == num_coords,
(
"the length of coordinate_filters should be equal "
f"to the number of coordinates, got {nb_coordinate_filters} and {num_coords}"
),
exception_class=ValueError,
)
filters = self.coordinate_filters
loss = 0
for i in range(num_coords):
filter_ = filters[i]
if filter_ is None:
filtered_data = sim_data_ensemble[:, :, i]
else:
filtered_data = np.array(
[
filter_(sim_data_ensemble[j, :, i])
for j in range(sim_data_ensemble.shape[0])
]
)
loss += self.compute_loss_1d(filtered_data, real_data[:, i]) * weights[i]
return loss
compute_loss_1d(self, sim_data_ensemble, real_data)
Return the loss between a specific coordinate of two time series.
Concrete classes have to override this method in order to implement new loss functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_data_ensemble |
ndarray |
an ensemble of simulated 1D series, shape (ensemble_size, N, 1) |
required |
real_data |
ndarray |
the real data, shape (N, 1) |
required |
Returns:
Type | Description |
---|---|
float |
the computed loss over the specific coordinate |
Source code in black_it/loss_functions/base.py
@abstractmethod
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Return the loss between a specific coordinate of two time series.
Concrete classes have to override this method in order to implement new
loss functions.
Args:
sim_data_ensemble: an ensemble of simulated 1D series, shape (ensemble_size, N, 1)
real_data: the real data, shape (N, 1)
Returns:
the computed loss over the specific coordinate
"""
black_it.loss_functions.fourier
This module contains the implementation of the Fast Fourier Transform loss.
FrequencyFilter
A filter that receives the signal in the frequency domain and returns its filtered version. Used by FourierLoss constructor.
In this version, the filter supports a single parameter: no multi-band filtering is supported yet.
FourierLoss (BaseLoss)
Class for the Fourier loss.
Source code in black_it/loss_functions/fourier.py
class FourierLoss(BaseLoss):
"""Class for the Fourier loss."""
def __init__(
self,
frequency_filter: FrequencyFilter = gaussian_low_pass_filter,
f: float = 0.8,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
"""Loss computed using a distance in the Fourier space of the time series.
This loss is equivalent to the Euclidean loss computed on the time
series after a Fourier-filter.
The parameter f controls the fraction of frequencies that are kept in
the Fourier series.
Args:
frequency_filter: the function used to filter the fourier
frequencies before the distance is computed.
f: fraction of fourier components to keep when computing the
distance between the time series. This parameter will be passed
to frequency_filter.
coordinate_weights: relative weights of the losses computed over
different time series coordinates.
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
_assert(
0.0 < f <= 1.0,
"'f' must be in the interval (0.0, 1.0]",
)
self.frequency_filter = frequency_filter
self.f = f
super().__init__(coordinate_weights, coordinate_filters)
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""Compute Euclidean distance between the Fourier transform of the two time series.
Args:
sim_data_ensemble: the first operand
real_data: the second operand
Returns:
The computed loss over the specific coordinate.
"""
f_real_data = np.fft.rfft(real_data, axis=0)
N = f_real_data.shape[0]
f_real_data = self.frequency_filter(f_real_data, self.f)
# computer mean fft transform of simulated ensemble
f_sim_data = []
for s in sim_data_ensemble:
f_sim_data_ = np.fft.rfft(s, axis=0)
f_sim_data_ = self.frequency_filter(f_sim_data_, self.f)
f_sim_data.append(f_sim_data_)
f_sim_data = np.array(f_sim_data).mean(0)
loss_1d = np.sqrt(np.sum((abs(f_sim_data - f_real_data)) ** 2) / N)
return loss_1d
__init__(self, frequency_filter=<function gaussian_low_pass_filter at 0x7f7159717760>, f=0.8, coordinate_weights=None, coordinate_filters=None)
special
Loss computed using a distance in the Fourier space of the time series.
This loss is equivalent to the Euclidean loss computed on the time series after a Fourier-filter. The parameter f controls the fraction of frequencies that are kept in the Fourier series.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frequency_filter |
Callable[[numpy.ndarray[Any, numpy.dtype[numpy.complex128]], float], numpy.ndarray[Any, numpy.dtype[numpy.complex128]]] |
the function used to filter the fourier frequencies before the distance is computed. |
<function gaussian_low_pass_filter at 0x7f7159717760> |
f |
float |
fraction of fourier components to keep when computing the distance between the time series. This parameter will be passed to frequency_filter. |
0.8 |
coordinate_weights |
Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]] |
relative weights of the losses computed over different time series coordinates. |
None |
coordinate_filters |
Optional[List[Optional[Callable]]] |
filters/transformations to be applied to each simulated series before the loss computation. |
None |
Source code in black_it/loss_functions/fourier.py
def __init__(
self,
frequency_filter: FrequencyFilter = gaussian_low_pass_filter,
f: float = 0.8,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
"""Loss computed using a distance in the Fourier space of the time series.
This loss is equivalent to the Euclidean loss computed on the time
series after a Fourier-filter.
The parameter f controls the fraction of frequencies that are kept in
the Fourier series.
Args:
frequency_filter: the function used to filter the fourier
frequencies before the distance is computed.
f: fraction of fourier components to keep when computing the
distance between the time series. This parameter will be passed
to frequency_filter.
coordinate_weights: relative weights of the losses computed over
different time series coordinates.
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
_assert(
0.0 < f <= 1.0,
"'f' must be in the interval (0.0, 1.0]",
)
self.frequency_filter = frequency_filter
self.f = f
super().__init__(coordinate_weights, coordinate_filters)
compute_loss_1d(self, sim_data_ensemble, real_data)
Compute Euclidean distance between the Fourier transform of the two time series.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_data_ensemble |
ndarray |
the first operand |
required |
real_data |
ndarray |
the second operand |
required |
Returns:
Type | Description |
---|---|
float |
The computed loss over the specific coordinate. |
Source code in black_it/loss_functions/fourier.py
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""Compute Euclidean distance between the Fourier transform of the two time series.
Args:
sim_data_ensemble: the first operand
real_data: the second operand
Returns:
The computed loss over the specific coordinate.
"""
f_real_data = np.fft.rfft(real_data, axis=0)
N = f_real_data.shape[0]
f_real_data = self.frequency_filter(f_real_data, self.f)
# computer mean fft transform of simulated ensemble
f_sim_data = []
for s in sim_data_ensemble:
f_sim_data_ = np.fft.rfft(s, axis=0)
f_sim_data_ = self.frequency_filter(f_sim_data_, self.f)
f_sim_data.append(f_sim_data_)
f_sim_data = np.array(f_sim_data).mean(0)
loss_1d = np.sqrt(np.sum((abs(f_sim_data - f_real_data)) ** 2) / N)
return loss_1d
gaussian_low_pass_filter(signal_frequencies, f)
Gaussian low-pass filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal_frequencies |
ndarray |
the input signal transformed in the frequency domain. |
required |
f |
float |
the fraction of frequencies to keep, this will determine the length scale of the Gaussian filter |
required |
Returns:
Type | Description |
---|---|
ndarray |
the filtered frequencies |
Source code in black_it/loss_functions/fourier.py
def gaussian_low_pass_filter(
signal_frequencies: NDArray[np.complex128],
f: float,
) -> NDArray[np.complex128]:
"""Gaussian low-pass filter.
Args:
signal_frequencies: the input signal transformed in the frequency
domain.
f: the fraction of frequencies to keep, this will determine the length
scale of the Gaussian filter
Returns:
the filtered frequencies
"""
# number of low-frequency component to keep
sigma = np.round(f * signal_frequencies.shape[0])
# gaussian low-pass filter
mask = np.exp(-np.arange(signal_frequencies.shape[0]) ** 2 / (2 * sigma**2))
filtered_frequencies = signal_frequencies * mask
return filtered_frequencies
ideal_low_pass_filter(signal_frequencies, f)
Ideal low-pass filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal_frequencies |
ndarray |
the input signal transformed in the frequency domain. |
required |
f |
float |
the fraction of frequencies to keep unchanged, for f=1 the filter is just the identity. |
required |
Returns:
Type | Description |
---|---|
ndarray |
the filtered frequencies |
Source code in black_it/loss_functions/fourier.py
def ideal_low_pass_filter(
signal_frequencies: NDArray[np.complex128],
f: float,
) -> NDArray[np.complex128]:
"""Ideal low-pass filter.
Args:
signal_frequencies: the input signal transformed in the frequency
domain.
f: the fraction of frequencies to keep unchanged, for f=1 the filter is
just the identity.
Returns:
the filtered frequencies
"""
# number of low-frequency component to keep
n = int(np.round(f * signal_frequencies.shape[0]))
# ideal low-pass filter
mask = np.zeros(signal_frequencies.shape[0])
mask[:n] = 1.0
filtered_frequencies = signal_frequencies * mask
return filtered_frequencies
black_it.loss_functions.minkowski.MinkowskiLoss (BaseLoss)
Class for the Minkowski loss.
Source code in black_it/loss_functions/minkowski.py
class MinkowskiLoss(BaseLoss):
"""Class for the Minkowski loss."""
def __init__(
self,
p: int = 2,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
"""
Loss computed using a Minkowski distance.
The [Minkowski distance](https://en.wikipedia.org/wiki/Minkowski_distance)
is a generalization of both the Manhattan distance (p=1) and the Euclidean distance (p=2).
This function computes the Minkowski distance between two series.
Note: this class is a wrapper of scipy.spatial.distance.minkowski
Args:
p: The order of the norm used to compute the distance between real and simulated series
coordinate_weights: The order of the norm used to compute the distance between real and simulated series
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
self.p = p
super().__init__(coordinate_weights)
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Call scipy.spatial.distance.minkowski() on its arguments.
Args:
sim_data_ensemble: the first operand
real_data: the second operand
Returns:
The computed loss over the specific coordinate.
"""
# average simulated time series
sim_data_ensemble = sim_data_ensemble.mean(axis=0)
loss_1d = minkowski(sim_data_ensemble, real_data, p=self.p)
return loss_1d
__init__(self, p=2, coordinate_weights=None, coordinate_filters=None)
special
Loss computed using a Minkowski distance.
The Minkowski distance is a generalization of both the Manhattan distance (p=1) and the Euclidean distance (p=2).
This function computes the Minkowski distance between two series.
Note: this class is a wrapper of scipy.spatial.distance.minkowski
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
int |
The order of the norm used to compute the distance between real and simulated series |
2 |
coordinate_weights |
Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]] |
The order of the norm used to compute the distance between real and simulated series |
None |
coordinate_filters |
Optional[List[Optional[Callable]]] |
filters/transformations to be applied to each simulated series before the loss computation. |
None |
Source code in black_it/loss_functions/minkowski.py
def __init__(
self,
p: int = 2,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
"""
Loss computed using a Minkowski distance.
The [Minkowski distance](https://en.wikipedia.org/wiki/Minkowski_distance)
is a generalization of both the Manhattan distance (p=1) and the Euclidean distance (p=2).
This function computes the Minkowski distance between two series.
Note: this class is a wrapper of scipy.spatial.distance.minkowski
Args:
p: The order of the norm used to compute the distance between real and simulated series
coordinate_weights: The order of the norm used to compute the distance between real and simulated series
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
self.p = p
super().__init__(coordinate_weights)
compute_loss_1d(self, sim_data_ensemble, real_data)
Call scipy.spatial.distance.minkowski() on its arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_data_ensemble |
ndarray |
the first operand |
required |
real_data |
ndarray |
the second operand |
required |
Returns:
Type | Description |
---|---|
float |
The computed loss over the specific coordinate. |
Source code in black_it/loss_functions/minkowski.py
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Call scipy.spatial.distance.minkowski() on its arguments.
Args:
sim_data_ensemble: the first operand
real_data: the second operand
Returns:
The computed loss over the specific coordinate.
"""
# average simulated time series
sim_data_ensemble = sim_data_ensemble.mean(axis=0)
loss_1d = minkowski(sim_data_ensemble, real_data, p=self.p)
return loss_1d
black_it.loss_functions.msm.MethodOfMomentsLoss (BaseLoss)
Class for the 'method of moments' loss.
Source code in black_it/loss_functions/msm.py
class MethodOfMomentsLoss(BaseLoss):
"""Class for the 'method of moments' loss."""
def __init__(
self,
covariance_mat: Union[str, NDArray[np.float64]] = "identity",
coordinate_weights: Optional[NDArray[np.float64]] = None,
moment_calculator: MomentCalculator = get_mom_ts_1d,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
standardise_moments: bool = False,
):
"""
Initialize the loss function based on the 'method of moments'.
Returns the MSM objective function, i.e. the square difference between
the moments of the two time series.
By default the loss computes the moments using
black_it.utils.time_series.get_mom_ts_1d(), which computes an
18-dimensional vector of statistics.
You can alter the behaviour passing a custom function to
moment_calculator. Please note that there is a constraint between the
moment calculator and the size of the covariance matrix.
Args:
covariance_mat: covariance matrix between the moments.
'identity' (the default) gives the identity matrix, 'inverse_variance' gives the diagonal
matrix of the estimated inverse variances. Alternatively, one can specify a numerical symmetric matrix
whose size must be equal to the number of elements that the moment calculator returns.
coordinate_weights: importance of each coordinate. By default all
coordinates are treated equally.
moment_calculator: a function that takes a 1D time series and
returns a series of moments. The default is
black_it.utils.time_series.get_mom_ts_1d()
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
standardise_moments: if True all moments are divided by the absolute value of the real moments,
default is False.
"""
MethodOfMomentsLoss._validate_covariance_and_calculator(
moment_calculator, covariance_mat
)
super().__init__(coordinate_weights, coordinate_filters)
self._covariance_mat = covariance_mat
self._moment_calculator = moment_calculator
self._standardise_moments = standardise_moments
@staticmethod
def _validate_covariance_and_calculator(
moment_calculator: MomentCalculator,
covariance_mat: Union[NDArray[np.float64], str],
) -> None:
"""
Validate the covariance matrix.
Args:
moment_calculator: the moment calculator
covariance_mat: the covariance matrix, either as a string or as a numpy array
Returns:
None
Raises:
ValueError: if the covariance matrix is not valid.
If it is a string, ot can be invalid if it is not one of the possible options.
If it is a numpy array, it can be invalid if it is not symmetric or if moment_calculator
is the default get_mom_ts_1d and the covariance matrix's shape
is not 18. Other possible errors won't be caught by this
function, and can only be detected at runtime.
"""
if isinstance(covariance_mat, str):
try:
_CovarianceMatrixType(covariance_mat)
except ValueError as exc:
raise ValueError(
f"expected one of {list(map(lambda x: x.value, _CovarianceMatrixType))}, got {covariance_mat})"
) from exc
return
if isinstance(covariance_mat, np.ndarray):
# a non null covariance_mat was given
if not is_symmetric(covariance_mat):
raise ValueError(
"the provided covariance matrix is not valid as it is not a symmetric matrix"
)
if (moment_calculator is get_mom_ts_1d) and (covariance_mat.shape[0] != 18):
raise ValueError(
"the provided covariance matrix is not valid as it has a wrong shape: "
f"expected 18, got {covariance_mat.shape[0]}"
)
return
raise ValueError(
"please specify a valid covariance matrix, either as a string or directly as a numpy array"
)
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Compute the loss based on the 'method of moments'.
Returns the MSM objective function, i.e. the square difference between the moments of the two time series.
Args:
sim_data_ensemble: the first operand
real_data: the second operand
Returns:
the MSM loss over a specific coordinate.
"""
# compute the moments for the simulated ensemble
ensemble_sim_mom_1d = np.array(
[self._moment_calculator(s) for s in sim_data_ensemble]
)
# compute moments of the real time series
real_mom_1d = self._moment_calculator(real_data)
# write moments in relative terms is needed
if self._standardise_moments:
ensemble_sim_mom_1d = ensemble_sim_mom_1d / (abs(real_mom_1d)[None, :])
real_mom_1d = real_mom_1d / abs(real_mom_1d)
# mean simulated moments
sim_mom_1d = np.mean(ensemble_sim_mom_1d, axis=0)
g = real_mom_1d - sim_mom_1d
if self._covariance_mat == _CovarianceMatrixType.IDENTITY.value:
loss_1d = g.dot(g)
return loss_1d
if self._covariance_mat == _CovarianceMatrixType.INVERSE_VARIANCE.value:
W = np.diag(
1.0 / np.mean((real_mom_1d[None, :] - ensemble_sim_mom_1d) ** 2, axis=0)
)
else:
self._covariance_mat = cast(NDArray[np.float64], self._covariance_mat)
W = self._covariance_mat
try:
loss_1d = g.dot(W).dot(g)
except ValueError as e:
covariance_size = W.shape[0]
moments_size = g.shape[0]
if covariance_size == moments_size:
# this value error is not due to a mismatch between the
# covariance matrix size and the number moments. Let's raise the
# original error.
raise
raise ValueError(
f"The size of the covariance matrix ({covariance_size}) "
f"and the number of moments ({moments_size}) should be identical"
) from e
return loss_1d
__init__(self, covariance_mat='identity', coordinate_weights=None, moment_calculator=<function get_mom_ts_1d at 0x7f71442e8430>, coordinate_filters=None, standardise_moments=False)
special
Initialize the loss function based on the 'method of moments'.
Returns the MSM objective function, i.e. the square difference between the moments of the two time series. By default the loss computes the moments using black_it.utils.time_series.get_mom_ts_1d(), which computes an 18-dimensional vector of statistics.
You can alter the behaviour passing a custom function to moment_calculator. Please note that there is a constraint between the moment calculator and the size of the covariance matrix.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
covariance_mat |
Union[str, numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
covariance matrix between the moments. 'identity' (the default) gives the identity matrix, 'inverse_variance' gives the diagonal matrix of the estimated inverse variances. Alternatively, one can specify a numerical symmetric matrix whose size must be equal to the number of elements that the moment calculator returns. |
'identity' |
coordinate_weights |
Optional[numpy.ndarray[Any, numpy.dtype[numpy.float64]]] |
importance of each coordinate. By default all coordinates are treated equally. |
None |
moment_calculator |
Callable[[numpy.ndarray[Any, numpy.dtype[+ScalarType]]], numpy.ndarray[Any, numpy.dtype[+ScalarType]]] |
a function that takes a 1D time series and returns a series of moments. The default is black_it.utils.time_series.get_mom_ts_1d() |
<function get_mom_ts_1d at 0x7f71442e8430> |
coordinate_filters |
Optional[List[Optional[Callable]]] |
filters/transformations to be applied to each simulated series before the loss computation. |
None |
standardise_moments |
bool |
if True all moments are divided by the absolute value of the real moments, default is False. |
False |
Source code in black_it/loss_functions/msm.py
def __init__(
self,
covariance_mat: Union[str, NDArray[np.float64]] = "identity",
coordinate_weights: Optional[NDArray[np.float64]] = None,
moment_calculator: MomentCalculator = get_mom_ts_1d,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
standardise_moments: bool = False,
):
"""
Initialize the loss function based on the 'method of moments'.
Returns the MSM objective function, i.e. the square difference between
the moments of the two time series.
By default the loss computes the moments using
black_it.utils.time_series.get_mom_ts_1d(), which computes an
18-dimensional vector of statistics.
You can alter the behaviour passing a custom function to
moment_calculator. Please note that there is a constraint between the
moment calculator and the size of the covariance matrix.
Args:
covariance_mat: covariance matrix between the moments.
'identity' (the default) gives the identity matrix, 'inverse_variance' gives the diagonal
matrix of the estimated inverse variances. Alternatively, one can specify a numerical symmetric matrix
whose size must be equal to the number of elements that the moment calculator returns.
coordinate_weights: importance of each coordinate. By default all
coordinates are treated equally.
moment_calculator: a function that takes a 1D time series and
returns a series of moments. The default is
black_it.utils.time_series.get_mom_ts_1d()
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
standardise_moments: if True all moments are divided by the absolute value of the real moments,
default is False.
"""
MethodOfMomentsLoss._validate_covariance_and_calculator(
moment_calculator, covariance_mat
)
super().__init__(coordinate_weights, coordinate_filters)
self._covariance_mat = covariance_mat
self._moment_calculator = moment_calculator
self._standardise_moments = standardise_moments
compute_loss_1d(self, sim_data_ensemble, real_data)
Compute the loss based on the 'method of moments'.
Returns the MSM objective function, i.e. the square difference between the moments of the two time series.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_data_ensemble |
ndarray |
the first operand |
required |
real_data |
ndarray |
the second operand |
required |
Returns:
Type | Description |
---|---|
float |
the MSM loss over a specific coordinate. |
Source code in black_it/loss_functions/msm.py
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Compute the loss based on the 'method of moments'.
Returns the MSM objective function, i.e. the square difference between the moments of the two time series.
Args:
sim_data_ensemble: the first operand
real_data: the second operand
Returns:
the MSM loss over a specific coordinate.
"""
# compute the moments for the simulated ensemble
ensemble_sim_mom_1d = np.array(
[self._moment_calculator(s) for s in sim_data_ensemble]
)
# compute moments of the real time series
real_mom_1d = self._moment_calculator(real_data)
# write moments in relative terms is needed
if self._standardise_moments:
ensemble_sim_mom_1d = ensemble_sim_mom_1d / (abs(real_mom_1d)[None, :])
real_mom_1d = real_mom_1d / abs(real_mom_1d)
# mean simulated moments
sim_mom_1d = np.mean(ensemble_sim_mom_1d, axis=0)
g = real_mom_1d - sim_mom_1d
if self._covariance_mat == _CovarianceMatrixType.IDENTITY.value:
loss_1d = g.dot(g)
return loss_1d
if self._covariance_mat == _CovarianceMatrixType.INVERSE_VARIANCE.value:
W = np.diag(
1.0 / np.mean((real_mom_1d[None, :] - ensemble_sim_mom_1d) ** 2, axis=0)
)
else:
self._covariance_mat = cast(NDArray[np.float64], self._covariance_mat)
W = self._covariance_mat
try:
loss_1d = g.dot(W).dot(g)
except ValueError as e:
covariance_size = W.shape[0]
moments_size = g.shape[0]
if covariance_size == moments_size:
# this value error is not due to a mismatch between the
# covariance matrix size and the number moments. Let's raise the
# original error.
raise
raise ValueError(
f"The size of the covariance matrix ({covariance_size}) "
f"and the number of moments ({moments_size}) should be identical"
) from e
return loss_1d
black_it.loss_functions.gsl_div.GslDivLoss (BaseLoss)
Class for the Gsl-div loss.
Examples:
>>> expected_loss = 0.39737637181336855
>>> np.random.seed(11)
>>> series1 = np.random.normal(0, 1, (100, 3))
>>> series2 = np.random.normal(0, 1, (100, 3))
>>> loss_func = GslDivLoss()
>>> loss = loss_func.compute_loss(series1[None, :, :], series2)
>>> assert np.isclose(expected_loss, loss)
Source code in black_it/loss_functions/gsl_div.py
class GslDivLoss(BaseLoss):
"""
Class for the Gsl-div loss.
Example:
>>> expected_loss = 0.39737637181336855
>>> np.random.seed(11)
>>> series1 = np.random.normal(0, 1, (100, 3))
>>> series2 = np.random.normal(0, 1, (100, 3))
>>> loss_func = GslDivLoss()
>>> loss = loss_func.compute_loss(series1[None, :, :], series2)
>>> assert np.isclose(expected_loss, loss)
"""
def __init__(
self,
nb_values: int = None,
nb_word_lengths: int = None,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
"""
Initialize the GSL-div loss object.
Args:
nb_values: number of values the digitised series can take
nb_word_lengths: the number of word length to consider
coordinate_weights: the weights of the loss coordinates
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
super().__init__(coordinate_weights, coordinate_filters)
self.nb_values = nb_values
self.nb_word_lengths = nb_word_lengths
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Return the GSL-div measure.
From (Lamperti, 2017):
> The information loss about the behaviour of the stochastic process
due to the symbolization becomes smaller and smaller as b increases.
On the other side, low values of b would likely wash away
processes’ noise the modeller might not be interested in.
Args:
sim_data_ensemble: the ensemble of simulated data
real_data: the real data
Returns:
the GSL loss
"""
N = len(real_data)
ensemble_size = sim_data_ensemble.shape[0]
if self.nb_values is None:
nb_values = int((N - 1) / 2.0)
else:
nb_values = self.nb_values
if self.nb_word_lengths is None:
nb_word_lengths = int((N - 1) / 2.0)
else:
nb_word_lengths = self.nb_word_lengths
# discretize real time series
obs_xd = self.discretize(
real_data,
nb_values,
np.min(real_data),
np.max(real_data),
)
gsl_loss = 0.0
# average loss over the ensemble
for sim_data in sim_data_ensemble:
# discretize simulated series
sim_xd = self.discretize(
sim_data, nb_values, np.min(sim_data), np.max(sim_data)
)
loss = self.gsl_div_1d_1_sample(
sim_xd, obs_xd, nb_word_lengths, nb_values, N
)
gsl_loss += loss
return gsl_loss / ensemble_size
def gsl_div_1d_1_sample(
self,
sim_xd: NDArray,
obs_xd: NDArray,
nb_word_lengths: int,
nb_values: int,
N: int,
) -> float:
"""Compute the GSL-div for a single realisation of the simulated data.
Args:
sim_xd: discretised simulated series
obs_xd: discretised real series
nb_word_lengths: the number of word length to consider
nb_values: number of values the digitised series can take
N: the length of real and simulated series
Returns:
the computed loss
"""
# outcome measure
gsl_div = 0.0
# weight
weight = 0.0
# for any word len:
for word_length in range(1, nb_word_lengths + 1):
sim_xw = self.get_words(sim_xd, word_length)
obs_xw = self.get_words(obs_xd, word_length)
m_xw = np.concatenate((sim_xw, obs_xw))
sim_xp = self.get_words_est_prob(sim_xw)
m_xp = self.get_words_est_prob(m_xw)
base = float(nb_values**word_length)
sim_entr = self.get_sh_entr(sim_xp, base)
m_entr = self.get_sh_entr(m_xp, base)
# update weight
weight = weight + 2 / (nb_word_lengths * (nb_word_lengths + 1))
# correction
corr = ((len(m_xp) - 1) - (len(sim_xp) - 1)) / (2 * N)
# add to measure
gsl_divl = 2 * m_entr - sim_entr + corr
gsl_div = gsl_div + weight * gsl_divl
# end of cycle, return
return gsl_div
@staticmethod
def discretize(
time_series: NDArray[np.float64],
nb_values: int,
start_index: Union[np.float64, float],
stop_index: Union[np.float64, float],
) -> NDArray[np.int64]:
"""
Discretize the TS in 'nb_values' finite states.
>>> GslDivLoss.discretize(
... [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
... nb_values=3,
... start_index=1,
... stop_index=10
... )
array([1, 1, 1, 2, 2, 2, 2, 3, 3, 3])
Args:
time_series: any univariate time series
nb_values: int, number of values the digitised series can take.
It must be greater than 0.
start_index: the starting point
stop_index: the stopping point
Returns:
the discretised time series
"""
start_index = np.float64(start_index)
stop_index = np.float64(stop_index)
linspace = np.linspace(start_index - EPS, stop_index + EPS, nb_values + 1)
return np.searchsorted(linspace, time_series, side="left")
@staticmethod
def get_words(time_series: NDArray[np.float64], length: int) -> NDArray:
"""
Return an overlapping array of words (int32) of 'length' given a discretised vector.
>>> GslDivLoss.get_words(np.asarray([1, 2, 2, 2]), 2)
array([12, 22, 22])
Args:
time_series: any univariate discretised time series
length: int, len of words to be returned. Must be such that
(len(time_series) + 1 - length) is positive.
Returns:
the time series of overlapping words
"""
tswlen = len(time_series) + 1 - length
_assert(
tswlen >= 0,
"the chosen word length is too high",
exception_class=ValueError,
)
tsw = np.zeros(shape=(tswlen,), dtype=np.int32)
for i in range(length):
k = 10 ** (length - i - 1)
tsw = tsw + time_series[i : tswlen + i] * k
return tsw
@staticmethod
def get_words_est_prob(time_series: NDArray[np.float64]) -> NDArray[np.float64]:
"""
Return an array of estimated probabilities given an array of words (int32).
Args:
time_series: any univariate array of words
Returns:
estimate of probabilities
"""
_, count = np.unique(time_series, return_counts=True)
est_p = np.divide(count, np.sum(count))
return est_p
@staticmethod
def get_sh_entr(probs: NDArray[np.float64], log_base: float) -> float:
"""
Return the Shannon entropy given an array of probabilities.
Args:
probs: an array of probabilities describing the discrete probability distribution
log_base: the entropy logarithm base.
Returns:
the entropy of the discrete probability distribution
"""
log = np.log(probs) / np.log(log_base)
return -np.sum(np.multiply(probs, log))
__init__(self, nb_values=None, nb_word_lengths=None, coordinate_weights=None, coordinate_filters=None)
special
Initialize the GSL-div loss object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nb_values |
int |
number of values the digitised series can take |
None |
nb_word_lengths |
int |
the number of word length to consider |
None |
coordinate_weights |
Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]] |
the weights of the loss coordinates |
None |
coordinate_filters |
Optional[List[Optional[Callable]]] |
filters/transformations to be applied to each simulated series before the loss computation. |
None |
Source code in black_it/loss_functions/gsl_div.py
def __init__(
self,
nb_values: int = None,
nb_word_lengths: int = None,
coordinate_weights: Optional[NDArray] = None,
coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
"""
Initialize the GSL-div loss object.
Args:
nb_values: number of values the digitised series can take
nb_word_lengths: the number of word length to consider
coordinate_weights: the weights of the loss coordinates
coordinate_filters: filters/transformations to be applied to each simulated series before
the loss computation.
"""
super().__init__(coordinate_weights, coordinate_filters)
self.nb_values = nb_values
self.nb_word_lengths = nb_word_lengths
compute_loss_1d(self, sim_data_ensemble, real_data)
Return the GSL-div measure.
From (Lamperti, 2017):
The information loss about the behaviour of the stochastic process due to the symbolization becomes smaller and smaller as b increases. On the other side, low values of b would likely wash away processes’ noise the modeller might not be interested in.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_data_ensemble |
ndarray |
the ensemble of simulated data |
required |
real_data |
ndarray |
the real data |
required |
Returns:
Type | Description |
---|---|
float |
the GSL loss |
Source code in black_it/loss_functions/gsl_div.py
def compute_loss_1d(
self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
"""
Return the GSL-div measure.
From (Lamperti, 2017):
> The information loss about the behaviour of the stochastic process
due to the symbolization becomes smaller and smaller as b increases.
On the other side, low values of b would likely wash away
processes’ noise the modeller might not be interested in.
Args:
sim_data_ensemble: the ensemble of simulated data
real_data: the real data
Returns:
the GSL loss
"""
N = len(real_data)
ensemble_size = sim_data_ensemble.shape[0]
if self.nb_values is None:
nb_values = int((N - 1) / 2.0)
else:
nb_values = self.nb_values
if self.nb_word_lengths is None:
nb_word_lengths = int((N - 1) / 2.0)
else:
nb_word_lengths = self.nb_word_lengths
# discretize real time series
obs_xd = self.discretize(
real_data,
nb_values,
np.min(real_data),
np.max(real_data),
)
gsl_loss = 0.0
# average loss over the ensemble
for sim_data in sim_data_ensemble:
# discretize simulated series
sim_xd = self.discretize(
sim_data, nb_values, np.min(sim_data), np.max(sim_data)
)
loss = self.gsl_div_1d_1_sample(
sim_xd, obs_xd, nb_word_lengths, nb_values, N
)
gsl_loss += loss
return gsl_loss / ensemble_size
discretize(time_series, nb_values, start_index, stop_index)
staticmethod
Discretize the TS in 'nb_values' finite states.
GslDivLoss.discretize( ... [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], ... nb_values=3, ... start_index=1, ... stop_index=10 ... ) array([1, 1, 1, 2, 2, 2, 2, 3, 3, 3])
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_series |
ndarray |
any univariate time series |
required |
nb_values |
int |
int, number of values the digitised series can take. It must be greater than 0. |
required |
start_index |
Union[numpy.float64, float] |
the starting point |
required |
stop_index |
Union[numpy.float64, float] |
the stopping point |
required |
Returns:
Type | Description |
---|---|
ndarray |
the discretised time series |
Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def discretize(
time_series: NDArray[np.float64],
nb_values: int,
start_index: Union[np.float64, float],
stop_index: Union[np.float64, float],
) -> NDArray[np.int64]:
"""
Discretize the TS in 'nb_values' finite states.
>>> GslDivLoss.discretize(
... [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
... nb_values=3,
... start_index=1,
... stop_index=10
... )
array([1, 1, 1, 2, 2, 2, 2, 3, 3, 3])
Args:
time_series: any univariate time series
nb_values: int, number of values the digitised series can take.
It must be greater than 0.
start_index: the starting point
stop_index: the stopping point
Returns:
the discretised time series
"""
start_index = np.float64(start_index)
stop_index = np.float64(stop_index)
linspace = np.linspace(start_index - EPS, stop_index + EPS, nb_values + 1)
return np.searchsorted(linspace, time_series, side="left")
get_sh_entr(probs, log_base)
staticmethod
Return the Shannon entropy given an array of probabilities.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
probs |
ndarray |
an array of probabilities describing the discrete probability distribution |
required |
log_base |
float |
the entropy logarithm base. |
required |
Returns:
Type | Description |
---|---|
float |
the entropy of the discrete probability distribution |
Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def get_sh_entr(probs: NDArray[np.float64], log_base: float) -> float:
"""
Return the Shannon entropy given an array of probabilities.
Args:
probs: an array of probabilities describing the discrete probability distribution
log_base: the entropy logarithm base.
Returns:
the entropy of the discrete probability distribution
"""
log = np.log(probs) / np.log(log_base)
return -np.sum(np.multiply(probs, log))
get_words(time_series, length)
staticmethod
Return an overlapping array of words (int32) of 'length' given a discretised vector.
GslDivLoss.get_words(np.asarray([1, 2, 2, 2]), 2) array([12, 22, 22])
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_series |
ndarray |
any univariate discretised time series |
required |
length |
int |
int, len of words to be returned. Must be such that (len(time_series) + 1 - length) is positive. |
required |
Returns:
Type | Description |
---|---|
ndarray |
the time series of overlapping words |
Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def get_words(time_series: NDArray[np.float64], length: int) -> NDArray:
"""
Return an overlapping array of words (int32) of 'length' given a discretised vector.
>>> GslDivLoss.get_words(np.asarray([1, 2, 2, 2]), 2)
array([12, 22, 22])
Args:
time_series: any univariate discretised time series
length: int, len of words to be returned. Must be such that
(len(time_series) + 1 - length) is positive.
Returns:
the time series of overlapping words
"""
tswlen = len(time_series) + 1 - length
_assert(
tswlen >= 0,
"the chosen word length is too high",
exception_class=ValueError,
)
tsw = np.zeros(shape=(tswlen,), dtype=np.int32)
for i in range(length):
k = 10 ** (length - i - 1)
tsw = tsw + time_series[i : tswlen + i] * k
return tsw
get_words_est_prob(time_series)
staticmethod
Return an array of estimated probabilities given an array of words (int32).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_series |
ndarray |
any univariate array of words |
required |
Returns:
Type | Description |
---|---|
ndarray |
estimate of probabilities |
Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def get_words_est_prob(time_series: NDArray[np.float64]) -> NDArray[np.float64]:
"""
Return an array of estimated probabilities given an array of words (int32).
Args:
time_series: any univariate array of words
Returns:
estimate of probabilities
"""
_, count = np.unique(time_series, return_counts=True)
est_p = np.divide(count, np.sum(count))
return est_p
gsl_div_1d_1_sample(self, sim_xd, obs_xd, nb_word_lengths, nb_values, N)
Compute the GSL-div for a single realisation of the simulated data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sim_xd |
ndarray |
discretised simulated series |
required |
obs_xd |
ndarray |
discretised real series |
required |
nb_word_lengths |
int |
the number of word length to consider |
required |
nb_values |
int |
number of values the digitised series can take |
required |
N |
int |
the length of real and simulated series |
required |
Returns:
Type | Description |
---|---|
float |
the computed loss |
Source code in black_it/loss_functions/gsl_div.py
def gsl_div_1d_1_sample(
self,
sim_xd: NDArray,
obs_xd: NDArray,
nb_word_lengths: int,
nb_values: int,
N: int,
) -> float:
"""Compute the GSL-div for a single realisation of the simulated data.
Args:
sim_xd: discretised simulated series
obs_xd: discretised real series
nb_word_lengths: the number of word length to consider
nb_values: number of values the digitised series can take
N: the length of real and simulated series
Returns:
the computed loss
"""
# outcome measure
gsl_div = 0.0
# weight
weight = 0.0
# for any word len:
for word_length in range(1, nb_word_lengths + 1):
sim_xw = self.get_words(sim_xd, word_length)
obs_xw = self.get_words(obs_xd, word_length)
m_xw = np.concatenate((sim_xw, obs_xw))
sim_xp = self.get_words_est_prob(sim_xw)
m_xp = self.get_words_est_prob(m_xw)
base = float(nb_values**word_length)
sim_entr = self.get_sh_entr(sim_xp, base)
m_entr = self.get_sh_entr(m_xp, base)
# update weight
weight = weight + 2 / (nb_word_lengths * (nb_word_lengths + 1))
# correction
corr = ((len(m_xp) - 1) - (len(sim_xp) - 1)) / (2 * N)
# add to measure
gsl_divl = 2 * m_entr - sim_entr + corr
gsl_div = gsl_div + weight * gsl_divl
# end of cycle, return
return gsl_div