Skip to content

Loss functions

black_it.loss_functions.base.BaseLoss (ABC)

BaseLoss interface.

Source code in black_it/loss_functions/base.py
class BaseLoss(ABC):
    """BaseLoss interface."""

    def __init__(
        self,
        coordinate_weights: Optional[NDArray] = None,
        coordinate_filters: Optional[List[Optional[Callable]]] = None,
    ):
        """
        Initialize the loss function.

        Args:
            coordinate_weights: the weights of the loss coordinates.
            coordinate_filters: filters/transformations to be applied to each simulated series before
                the loss computation.
        """
        self.coordinate_weights = coordinate_weights
        self.coordinate_filters = coordinate_filters

    def compute_loss(
        self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
    ) -> float:
        """
        Compute the loss between simulated and real data.

        Args:
            sim_data_ensemble: an ensemble of simulated data, of shape (ensemble_size, N, D)
            real_data: the real data, of shape (N, D)

        Returns:
            The loss value.
        """
        num_coords = real_data.shape[1]

        if self.coordinate_weights is None:
            weights = np.ones(num_coords) / num_coords
        else:
            nb_coordinate_weights = len(self.coordinate_weights)
            _assert(
                nb_coordinate_weights == num_coords,
                (
                    "the length of coordinate_weights should be equal "
                    f"to the number of coordinates, got {nb_coordinate_weights} and {num_coords}"
                ),
                exception_class=ValueError,
            )
            weights = self.coordinate_weights

        filters: List[Optional[Callable]]
        if self.coordinate_filters is None:
            # a list of identity functions
            filters = [None] * num_coords
        else:
            nb_coordinate_filters = len(self.coordinate_filters)
            _assert(
                nb_coordinate_filters == num_coords,
                (
                    "the length of coordinate_filters should be equal "
                    f"to the number of coordinates, got {nb_coordinate_filters} and {num_coords}"
                ),
                exception_class=ValueError,
            )
            filters = self.coordinate_filters

        loss = 0
        for i in range(num_coords):
            filter_ = filters[i]
            if filter_ is None:
                filtered_data = sim_data_ensemble[:, :, i]
            else:
                filtered_data = np.array(
                    [
                        filter_(sim_data_ensemble[j, :, i])
                        for j in range(sim_data_ensemble.shape[0])
                    ]
                )

            loss += self.compute_loss_1d(filtered_data, real_data[:, i]) * weights[i]

        return loss

    @abstractmethod
    def compute_loss_1d(
        self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
    ) -> float:
        """
        Return the loss between a specific coordinate of two time series.

        Concrete classes have to override this method in order to implement new
        loss functions.

        Args:
            sim_data_ensemble: an ensemble of simulated 1D series, shape (ensemble_size, N, 1)
            real_data: the real data, shape (N, 1)

        Returns:
            the computed loss over the specific coordinate
        """

__init__(self, coordinate_weights=None, coordinate_filters=None) special

Initialize the loss function.

Parameters:

Name Type Description Default
coordinate_weights Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]]

the weights of the loss coordinates.

None
coordinate_filters Optional[List[Optional[Callable]]]

filters/transformations to be applied to each simulated series before the loss computation.

None
Source code in black_it/loss_functions/base.py
def __init__(
    self,
    coordinate_weights: Optional[NDArray] = None,
    coordinate_filters: Optional[List[Optional[Callable]]] = None,
):
    """
    Initialize the loss function.

    Args:
        coordinate_weights: the weights of the loss coordinates.
        coordinate_filters: filters/transformations to be applied to each simulated series before
            the loss computation.
    """
    self.coordinate_weights = coordinate_weights
    self.coordinate_filters = coordinate_filters

compute_loss(self, sim_data_ensemble, real_data)

Compute the loss between simulated and real data.

Parameters:

Name Type Description Default
sim_data_ensemble ndarray

an ensemble of simulated data, of shape (ensemble_size, N, D)

required
real_data ndarray

the real data, of shape (N, D)

required

Returns:

Type Description
float

The loss value.

Source code in black_it/loss_functions/base.py
def compute_loss(
    self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
    """
    Compute the loss between simulated and real data.

    Args:
        sim_data_ensemble: an ensemble of simulated data, of shape (ensemble_size, N, D)
        real_data: the real data, of shape (N, D)

    Returns:
        The loss value.
    """
    num_coords = real_data.shape[1]

    if self.coordinate_weights is None:
        weights = np.ones(num_coords) / num_coords
    else:
        nb_coordinate_weights = len(self.coordinate_weights)
        _assert(
            nb_coordinate_weights == num_coords,
            (
                "the length of coordinate_weights should be equal "
                f"to the number of coordinates, got {nb_coordinate_weights} and {num_coords}"
            ),
            exception_class=ValueError,
        )
        weights = self.coordinate_weights

    filters: List[Optional[Callable]]
    if self.coordinate_filters is None:
        # a list of identity functions
        filters = [None] * num_coords
    else:
        nb_coordinate_filters = len(self.coordinate_filters)
        _assert(
            nb_coordinate_filters == num_coords,
            (
                "the length of coordinate_filters should be equal "
                f"to the number of coordinates, got {nb_coordinate_filters} and {num_coords}"
            ),
            exception_class=ValueError,
        )
        filters = self.coordinate_filters

    loss = 0
    for i in range(num_coords):
        filter_ = filters[i]
        if filter_ is None:
            filtered_data = sim_data_ensemble[:, :, i]
        else:
            filtered_data = np.array(
                [
                    filter_(sim_data_ensemble[j, :, i])
                    for j in range(sim_data_ensemble.shape[0])
                ]
            )

        loss += self.compute_loss_1d(filtered_data, real_data[:, i]) * weights[i]

    return loss

compute_loss_1d(self, sim_data_ensemble, real_data)

Return the loss between a specific coordinate of two time series.

Concrete classes have to override this method in order to implement new loss functions.

Parameters:

Name Type Description Default
sim_data_ensemble ndarray

an ensemble of simulated 1D series, shape (ensemble_size, N, 1)

required
real_data ndarray

the real data, shape (N, 1)

required

Returns:

Type Description
float

the computed loss over the specific coordinate

Source code in black_it/loss_functions/base.py
@abstractmethod
def compute_loss_1d(
    self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
    """
    Return the loss between a specific coordinate of two time series.

    Concrete classes have to override this method in order to implement new
    loss functions.

    Args:
        sim_data_ensemble: an ensemble of simulated 1D series, shape (ensemble_size, N, 1)
        real_data: the real data, shape (N, 1)

    Returns:
        the computed loss over the specific coordinate
    """

black_it.loss_functions.fourier

This module contains the implementation of the Fast Fourier Transform loss.

FrequencyFilter

A filter that receives the signal in the frequency domain and returns its filtered version. Used by FourierLoss constructor.

In this version, the filter supports a single parameter: no multi-band filtering is supported yet.

FourierLoss (BaseLoss)

Class for the Fourier loss.

Source code in black_it/loss_functions/fourier.py
class FourierLoss(BaseLoss):
    """Class for the Fourier loss."""

    def __init__(
        self,
        frequency_filter: FrequencyFilter = gaussian_low_pass_filter,
        f: float = 0.8,
        coordinate_weights: Optional[NDArray] = None,
        coordinate_filters: Optional[List[Optional[Callable]]] = None,
    ) -> None:
        """Loss computed using a distance in the Fourier space of the time series.

        This loss is equivalent to the Euclidean loss computed on the time
        series after a Fourier-filter.
        The parameter f controls the fraction of frequencies that are kept in
        the Fourier series.

        Args:
            frequency_filter: the function used to filter the fourier
                frequencies before the distance is computed.
            f: fraction of fourier components to keep when computing the
                distance between the time series. This parameter will be passed
                to frequency_filter.
            coordinate_weights: relative weights of the losses computed over
                different time series coordinates.
            coordinate_filters: filters/transformations to be applied to each simulated series before
                the loss computation.
        """
        _assert(
            0.0 < f <= 1.0,
            "'f' must be in the interval (0.0, 1.0]",
        )
        self.frequency_filter = frequency_filter
        self.f = f
        super().__init__(coordinate_weights, coordinate_filters)

    def compute_loss_1d(
        self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
    ) -> float:
        """Compute Euclidean distance between the Fourier transform of the two time series.

        Args:
            sim_data_ensemble: the first operand
            real_data: the second operand

        Returns:
            The computed loss over the specific coordinate.
        """
        f_real_data = np.fft.rfft(real_data, axis=0)
        N = f_real_data.shape[0]
        f_real_data = self.frequency_filter(f_real_data, self.f)
        # computer mean fft transform of simulated ensemble
        f_sim_data = []
        for s in sim_data_ensemble:
            f_sim_data_ = np.fft.rfft(s, axis=0)
            f_sim_data_ = self.frequency_filter(f_sim_data_, self.f)
            f_sim_data.append(f_sim_data_)

        f_sim_data = np.array(f_sim_data).mean(0)

        loss_1d = np.sqrt(np.sum((abs(f_sim_data - f_real_data)) ** 2) / N)

        return loss_1d

__init__(self, frequency_filter=<function gaussian_low_pass_filter at 0x7f7159717760>, f=0.8, coordinate_weights=None, coordinate_filters=None) special

Loss computed using a distance in the Fourier space of the time series.

This loss is equivalent to the Euclidean loss computed on the time series after a Fourier-filter. The parameter f controls the fraction of frequencies that are kept in the Fourier series.

Parameters:

Name Type Description Default
frequency_filter Callable[[numpy.ndarray[Any, numpy.dtype[numpy.complex128]], float], numpy.ndarray[Any, numpy.dtype[numpy.complex128]]]

the function used to filter the fourier frequencies before the distance is computed.

<function gaussian_low_pass_filter at 0x7f7159717760>
f float

fraction of fourier components to keep when computing the distance between the time series. This parameter will be passed to frequency_filter.

0.8
coordinate_weights Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]]

relative weights of the losses computed over different time series coordinates.

None
coordinate_filters Optional[List[Optional[Callable]]]

filters/transformations to be applied to each simulated series before the loss computation.

None
Source code in black_it/loss_functions/fourier.py
def __init__(
    self,
    frequency_filter: FrequencyFilter = gaussian_low_pass_filter,
    f: float = 0.8,
    coordinate_weights: Optional[NDArray] = None,
    coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
    """Loss computed using a distance in the Fourier space of the time series.

    This loss is equivalent to the Euclidean loss computed on the time
    series after a Fourier-filter.
    The parameter f controls the fraction of frequencies that are kept in
    the Fourier series.

    Args:
        frequency_filter: the function used to filter the fourier
            frequencies before the distance is computed.
        f: fraction of fourier components to keep when computing the
            distance between the time series. This parameter will be passed
            to frequency_filter.
        coordinate_weights: relative weights of the losses computed over
            different time series coordinates.
        coordinate_filters: filters/transformations to be applied to each simulated series before
            the loss computation.
    """
    _assert(
        0.0 < f <= 1.0,
        "'f' must be in the interval (0.0, 1.0]",
    )
    self.frequency_filter = frequency_filter
    self.f = f
    super().__init__(coordinate_weights, coordinate_filters)

compute_loss_1d(self, sim_data_ensemble, real_data)

Compute Euclidean distance between the Fourier transform of the two time series.

Parameters:

Name Type Description Default
sim_data_ensemble ndarray

the first operand

required
real_data ndarray

the second operand

required

Returns:

Type Description
float

The computed loss over the specific coordinate.

Source code in black_it/loss_functions/fourier.py
def compute_loss_1d(
    self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
    """Compute Euclidean distance between the Fourier transform of the two time series.

    Args:
        sim_data_ensemble: the first operand
        real_data: the second operand

    Returns:
        The computed loss over the specific coordinate.
    """
    f_real_data = np.fft.rfft(real_data, axis=0)
    N = f_real_data.shape[0]
    f_real_data = self.frequency_filter(f_real_data, self.f)
    # computer mean fft transform of simulated ensemble
    f_sim_data = []
    for s in sim_data_ensemble:
        f_sim_data_ = np.fft.rfft(s, axis=0)
        f_sim_data_ = self.frequency_filter(f_sim_data_, self.f)
        f_sim_data.append(f_sim_data_)

    f_sim_data = np.array(f_sim_data).mean(0)

    loss_1d = np.sqrt(np.sum((abs(f_sim_data - f_real_data)) ** 2) / N)

    return loss_1d

gaussian_low_pass_filter(signal_frequencies, f)

Gaussian low-pass filter.

Parameters:

Name Type Description Default
signal_frequencies ndarray

the input signal transformed in the frequency domain.

required
f float

the fraction of frequencies to keep, this will determine the length scale of the Gaussian filter

required

Returns:

Type Description
ndarray

the filtered frequencies

Source code in black_it/loss_functions/fourier.py
def gaussian_low_pass_filter(
    signal_frequencies: NDArray[np.complex128],
    f: float,
) -> NDArray[np.complex128]:
    """Gaussian low-pass filter.

    Args:
        signal_frequencies: the input signal transformed in the frequency
            domain.
        f: the fraction of frequencies to keep, this will determine the length
            scale of the Gaussian filter

    Returns:
        the filtered frequencies
    """
    # number of low-frequency component to keep
    sigma = np.round(f * signal_frequencies.shape[0])

    # gaussian low-pass filter
    mask = np.exp(-np.arange(signal_frequencies.shape[0]) ** 2 / (2 * sigma**2))
    filtered_frequencies = signal_frequencies * mask

    return filtered_frequencies

ideal_low_pass_filter(signal_frequencies, f)

Ideal low-pass filter.

Parameters:

Name Type Description Default
signal_frequencies ndarray

the input signal transformed in the frequency domain.

required
f float

the fraction of frequencies to keep unchanged, for f=1 the filter is just the identity.

required

Returns:

Type Description
ndarray

the filtered frequencies

Source code in black_it/loss_functions/fourier.py
def ideal_low_pass_filter(
    signal_frequencies: NDArray[np.complex128],
    f: float,
) -> NDArray[np.complex128]:
    """Ideal low-pass filter.

    Args:
        signal_frequencies: the input signal transformed in the frequency
            domain.
        f: the fraction of frequencies to keep unchanged, for f=1 the filter is
            just the identity.

    Returns:
        the filtered frequencies
    """
    # number of low-frequency component to keep
    n = int(np.round(f * signal_frequencies.shape[0]))

    # ideal low-pass filter
    mask = np.zeros(signal_frequencies.shape[0])
    mask[:n] = 1.0
    filtered_frequencies = signal_frequencies * mask

    return filtered_frequencies

black_it.loss_functions.minkowski.MinkowskiLoss (BaseLoss)

Class for the Minkowski loss.

Source code in black_it/loss_functions/minkowski.py
class MinkowskiLoss(BaseLoss):
    """Class for the Minkowski loss."""

    def __init__(
        self,
        p: int = 2,
        coordinate_weights: Optional[NDArray] = None,
        coordinate_filters: Optional[List[Optional[Callable]]] = None,
    ) -> None:
        """
        Loss computed using a Minkowski distance.

        The [Minkowski distance](https://en.wikipedia.org/wiki/Minkowski_distance)
        is a generalization of both the Manhattan distance (p=1) and the Euclidean distance (p=2).

        This function computes the Minkowski distance between two series.

        Note: this class is a wrapper of scipy.spatial.distance.minkowski

        Args:
            p: The order of the norm used to compute the distance between real and simulated series
            coordinate_weights: The order of the norm used to compute the distance between real and simulated series
            coordinate_filters: filters/transformations to be applied to each simulated series before
                the loss computation.
        """
        self.p = p
        super().__init__(coordinate_weights)

    def compute_loss_1d(
        self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
    ) -> float:
        """
        Call scipy.spatial.distance.minkowski() on its arguments.

        Args:
            sim_data_ensemble: the first operand
            real_data: the second operand

        Returns:
            The computed loss over the specific coordinate.
        """
        # average simulated time series
        sim_data_ensemble = sim_data_ensemble.mean(axis=0)

        loss_1d = minkowski(sim_data_ensemble, real_data, p=self.p)

        return loss_1d

__init__(self, p=2, coordinate_weights=None, coordinate_filters=None) special

Loss computed using a Minkowski distance.

The Minkowski distance is a generalization of both the Manhattan distance (p=1) and the Euclidean distance (p=2).

This function computes the Minkowski distance between two series.

Note: this class is a wrapper of scipy.spatial.distance.minkowski

Parameters:

Name Type Description Default
p int

The order of the norm used to compute the distance between real and simulated series

2
coordinate_weights Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]]

The order of the norm used to compute the distance between real and simulated series

None
coordinate_filters Optional[List[Optional[Callable]]]

filters/transformations to be applied to each simulated series before the loss computation.

None
Source code in black_it/loss_functions/minkowski.py
def __init__(
    self,
    p: int = 2,
    coordinate_weights: Optional[NDArray] = None,
    coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
    """
    Loss computed using a Minkowski distance.

    The [Minkowski distance](https://en.wikipedia.org/wiki/Minkowski_distance)
    is a generalization of both the Manhattan distance (p=1) and the Euclidean distance (p=2).

    This function computes the Minkowski distance between two series.

    Note: this class is a wrapper of scipy.spatial.distance.minkowski

    Args:
        p: The order of the norm used to compute the distance between real and simulated series
        coordinate_weights: The order of the norm used to compute the distance between real and simulated series
        coordinate_filters: filters/transformations to be applied to each simulated series before
            the loss computation.
    """
    self.p = p
    super().__init__(coordinate_weights)

compute_loss_1d(self, sim_data_ensemble, real_data)

Call scipy.spatial.distance.minkowski() on its arguments.

Parameters:

Name Type Description Default
sim_data_ensemble ndarray

the first operand

required
real_data ndarray

the second operand

required

Returns:

Type Description
float

The computed loss over the specific coordinate.

Source code in black_it/loss_functions/minkowski.py
def compute_loss_1d(
    self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
    """
    Call scipy.spatial.distance.minkowski() on its arguments.

    Args:
        sim_data_ensemble: the first operand
        real_data: the second operand

    Returns:
        The computed loss over the specific coordinate.
    """
    # average simulated time series
    sim_data_ensemble = sim_data_ensemble.mean(axis=0)

    loss_1d = minkowski(sim_data_ensemble, real_data, p=self.p)

    return loss_1d

black_it.loss_functions.msm.MethodOfMomentsLoss (BaseLoss)

Class for the 'method of moments' loss.

Source code in black_it/loss_functions/msm.py
class MethodOfMomentsLoss(BaseLoss):
    """Class for the 'method of moments' loss."""

    def __init__(
        self,
        covariance_mat: Union[str, NDArray[np.float64]] = "identity",
        coordinate_weights: Optional[NDArray[np.float64]] = None,
        moment_calculator: MomentCalculator = get_mom_ts_1d,
        coordinate_filters: Optional[List[Optional[Callable]]] = None,
        standardise_moments: bool = False,
    ):
        """
        Initialize the loss function based on the 'method of moments'.

        Returns the MSM objective function, i.e. the square difference between
        the moments of the two time series.
        By default the loss computes the moments using
        black_it.utils.time_series.get_mom_ts_1d(), which computes an
        18-dimensional vector of statistics.

        You can alter the behaviour passing a custom function to
        moment_calculator. Please note that there is a constraint between the
        moment calculator and the size of the covariance matrix.

        Args:
            covariance_mat: covariance matrix between the moments.
                'identity' (the default) gives the identity matrix, 'inverse_variance' gives the diagonal
                matrix of the estimated inverse variances. Alternatively, one can specify a numerical symmetric matrix
                whose size must be equal to the number of elements that the moment calculator returns.
            coordinate_weights: importance of each coordinate. By default all
                coordinates are treated equally.
            moment_calculator: a function that takes a 1D time series and
                returns a series of moments. The default is
                black_it.utils.time_series.get_mom_ts_1d()
            coordinate_filters: filters/transformations to be applied to each simulated series before
                the loss computation.
            standardise_moments: if True all moments are divided by the absolute value of the real moments,
                default is False.
        """
        MethodOfMomentsLoss._validate_covariance_and_calculator(
            moment_calculator, covariance_mat
        )

        super().__init__(coordinate_weights, coordinate_filters)
        self._covariance_mat = covariance_mat
        self._moment_calculator = moment_calculator
        self._standardise_moments = standardise_moments

    @staticmethod
    def _validate_covariance_and_calculator(
        moment_calculator: MomentCalculator,
        covariance_mat: Union[NDArray[np.float64], str],
    ) -> None:
        """
        Validate the covariance matrix.

        Args:
            moment_calculator: the moment calculator
            covariance_mat: the covariance matrix, either as a string or as a numpy array

        Returns:
            None

        Raises:
            ValueError: if the covariance matrix is not valid.
                If it is a string, ot can be invalid if it is not one of the possible options.
                If it is a numpy array, it can be invalid if it is not symmetric or if moment_calculator
                is the default get_mom_ts_1d and the covariance matrix's shape
                is not 18. Other possible errors won't be caught by this
                function, and can only be detected at runtime.
        """
        if isinstance(covariance_mat, str):

            try:
                _CovarianceMatrixType(covariance_mat)
            except ValueError as exc:
                raise ValueError(
                    f"expected one of {list(map(lambda x: x.value, _CovarianceMatrixType))}, got {covariance_mat})"
                ) from exc
            return

        if isinstance(covariance_mat, np.ndarray):
            # a non null covariance_mat was given
            if not is_symmetric(covariance_mat):
                raise ValueError(
                    "the provided covariance matrix is not valid as it is not a symmetric matrix"
                )
            if (moment_calculator is get_mom_ts_1d) and (covariance_mat.shape[0] != 18):
                raise ValueError(
                    "the provided covariance matrix is not valid as it has a wrong shape: "
                    f"expected 18, got {covariance_mat.shape[0]}"
                )
            return

        raise ValueError(
            "please specify a valid covariance matrix, either as a string or directly as a numpy array"
        )

    def compute_loss_1d(
        self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
    ) -> float:
        """
        Compute the loss based on the 'method of moments'.

        Returns the MSM objective function, i.e. the square difference between the moments of the two time series.

        Args:
            sim_data_ensemble: the first operand
            real_data: the second operand

        Returns:
            the MSM loss over a specific coordinate.
        """
        # compute the moments for the simulated ensemble
        ensemble_sim_mom_1d = np.array(
            [self._moment_calculator(s) for s in sim_data_ensemble]
        )

        # compute moments of the real time series
        real_mom_1d = self._moment_calculator(real_data)

        # write moments in relative terms is needed
        if self._standardise_moments:
            ensemble_sim_mom_1d = ensemble_sim_mom_1d / (abs(real_mom_1d)[None, :])
            real_mom_1d = real_mom_1d / abs(real_mom_1d)

        # mean simulated moments
        sim_mom_1d = np.mean(ensemble_sim_mom_1d, axis=0)

        g = real_mom_1d - sim_mom_1d

        if self._covariance_mat == _CovarianceMatrixType.IDENTITY.value:
            loss_1d = g.dot(g)
            return loss_1d
        if self._covariance_mat == _CovarianceMatrixType.INVERSE_VARIANCE.value:
            W = np.diag(
                1.0 / np.mean((real_mom_1d[None, :] - ensemble_sim_mom_1d) ** 2, axis=0)
            )
        else:
            self._covariance_mat = cast(NDArray[np.float64], self._covariance_mat)
            W = self._covariance_mat
        try:
            loss_1d = g.dot(W).dot(g)
        except ValueError as e:
            covariance_size = W.shape[0]
            moments_size = g.shape[0]

            if covariance_size == moments_size:
                # this value error is not due to a mismatch between the
                # covariance matrix size and the number moments. Let's raise the
                # original error.
                raise

            raise ValueError(
                f"The size of the covariance matrix ({covariance_size}) "
                f"and the number of moments ({moments_size}) should be identical"
            ) from e

        return loss_1d

__init__(self, covariance_mat='identity', coordinate_weights=None, moment_calculator=<function get_mom_ts_1d at 0x7f71442e8430>, coordinate_filters=None, standardise_moments=False) special

Initialize the loss function based on the 'method of moments'.

Returns the MSM objective function, i.e. the square difference between the moments of the two time series. By default the loss computes the moments using black_it.utils.time_series.get_mom_ts_1d(), which computes an 18-dimensional vector of statistics.

You can alter the behaviour passing a custom function to moment_calculator. Please note that there is a constraint between the moment calculator and the size of the covariance matrix.

Parameters:

Name Type Description Default
covariance_mat Union[str, numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

covariance matrix between the moments. 'identity' (the default) gives the identity matrix, 'inverse_variance' gives the diagonal matrix of the estimated inverse variances. Alternatively, one can specify a numerical symmetric matrix whose size must be equal to the number of elements that the moment calculator returns.

'identity'
coordinate_weights Optional[numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

importance of each coordinate. By default all coordinates are treated equally.

None
moment_calculator Callable[[numpy.ndarray[Any, numpy.dtype[+ScalarType]]], numpy.ndarray[Any, numpy.dtype[+ScalarType]]]

a function that takes a 1D time series and returns a series of moments. The default is black_it.utils.time_series.get_mom_ts_1d()

<function get_mom_ts_1d at 0x7f71442e8430>
coordinate_filters Optional[List[Optional[Callable]]]

filters/transformations to be applied to each simulated series before the loss computation.

None
standardise_moments bool

if True all moments are divided by the absolute value of the real moments, default is False.

False
Source code in black_it/loss_functions/msm.py
def __init__(
    self,
    covariance_mat: Union[str, NDArray[np.float64]] = "identity",
    coordinate_weights: Optional[NDArray[np.float64]] = None,
    moment_calculator: MomentCalculator = get_mom_ts_1d,
    coordinate_filters: Optional[List[Optional[Callable]]] = None,
    standardise_moments: bool = False,
):
    """
    Initialize the loss function based on the 'method of moments'.

    Returns the MSM objective function, i.e. the square difference between
    the moments of the two time series.
    By default the loss computes the moments using
    black_it.utils.time_series.get_mom_ts_1d(), which computes an
    18-dimensional vector of statistics.

    You can alter the behaviour passing a custom function to
    moment_calculator. Please note that there is a constraint between the
    moment calculator and the size of the covariance matrix.

    Args:
        covariance_mat: covariance matrix between the moments.
            'identity' (the default) gives the identity matrix, 'inverse_variance' gives the diagonal
            matrix of the estimated inverse variances. Alternatively, one can specify a numerical symmetric matrix
            whose size must be equal to the number of elements that the moment calculator returns.
        coordinate_weights: importance of each coordinate. By default all
            coordinates are treated equally.
        moment_calculator: a function that takes a 1D time series and
            returns a series of moments. The default is
            black_it.utils.time_series.get_mom_ts_1d()
        coordinate_filters: filters/transformations to be applied to each simulated series before
            the loss computation.
        standardise_moments: if True all moments are divided by the absolute value of the real moments,
            default is False.
    """
    MethodOfMomentsLoss._validate_covariance_and_calculator(
        moment_calculator, covariance_mat
    )

    super().__init__(coordinate_weights, coordinate_filters)
    self._covariance_mat = covariance_mat
    self._moment_calculator = moment_calculator
    self._standardise_moments = standardise_moments

compute_loss_1d(self, sim_data_ensemble, real_data)

Compute the loss based on the 'method of moments'.

Returns the MSM objective function, i.e. the square difference between the moments of the two time series.

Parameters:

Name Type Description Default
sim_data_ensemble ndarray

the first operand

required
real_data ndarray

the second operand

required

Returns:

Type Description
float

the MSM loss over a specific coordinate.

Source code in black_it/loss_functions/msm.py
def compute_loss_1d(
    self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
    """
    Compute the loss based on the 'method of moments'.

    Returns the MSM objective function, i.e. the square difference between the moments of the two time series.

    Args:
        sim_data_ensemble: the first operand
        real_data: the second operand

    Returns:
        the MSM loss over a specific coordinate.
    """
    # compute the moments for the simulated ensemble
    ensemble_sim_mom_1d = np.array(
        [self._moment_calculator(s) for s in sim_data_ensemble]
    )

    # compute moments of the real time series
    real_mom_1d = self._moment_calculator(real_data)

    # write moments in relative terms is needed
    if self._standardise_moments:
        ensemble_sim_mom_1d = ensemble_sim_mom_1d / (abs(real_mom_1d)[None, :])
        real_mom_1d = real_mom_1d / abs(real_mom_1d)

    # mean simulated moments
    sim_mom_1d = np.mean(ensemble_sim_mom_1d, axis=0)

    g = real_mom_1d - sim_mom_1d

    if self._covariance_mat == _CovarianceMatrixType.IDENTITY.value:
        loss_1d = g.dot(g)
        return loss_1d
    if self._covariance_mat == _CovarianceMatrixType.INVERSE_VARIANCE.value:
        W = np.diag(
            1.0 / np.mean((real_mom_1d[None, :] - ensemble_sim_mom_1d) ** 2, axis=0)
        )
    else:
        self._covariance_mat = cast(NDArray[np.float64], self._covariance_mat)
        W = self._covariance_mat
    try:
        loss_1d = g.dot(W).dot(g)
    except ValueError as e:
        covariance_size = W.shape[0]
        moments_size = g.shape[0]

        if covariance_size == moments_size:
            # this value error is not due to a mismatch between the
            # covariance matrix size and the number moments. Let's raise the
            # original error.
            raise

        raise ValueError(
            f"The size of the covariance matrix ({covariance_size}) "
            f"and the number of moments ({moments_size}) should be identical"
        ) from e

    return loss_1d

black_it.loss_functions.gsl_div.GslDivLoss (BaseLoss)

Class for the Gsl-div loss.

Examples:

>>> expected_loss = 0.39737637181336855
>>> np.random.seed(11)
>>> series1 = np.random.normal(0, 1, (100, 3))
>>> series2 = np.random.normal(0, 1, (100, 3))
>>> loss_func = GslDivLoss()
>>> loss = loss_func.compute_loss(series1[None, :, :], series2)
>>> assert np.isclose(expected_loss, loss)
Source code in black_it/loss_functions/gsl_div.py
class GslDivLoss(BaseLoss):
    """
    Class for the Gsl-div loss.

    Example:
        >>> expected_loss = 0.39737637181336855
        >>> np.random.seed(11)
        >>> series1 = np.random.normal(0, 1, (100, 3))
        >>> series2 = np.random.normal(0, 1, (100, 3))
        >>> loss_func = GslDivLoss()
        >>> loss = loss_func.compute_loss(series1[None, :, :], series2)
        >>> assert np.isclose(expected_loss, loss)
    """

    def __init__(
        self,
        nb_values: int = None,
        nb_word_lengths: int = None,
        coordinate_weights: Optional[NDArray] = None,
        coordinate_filters: Optional[List[Optional[Callable]]] = None,
    ) -> None:
        """
        Initialize the GSL-div loss object.

        Args:
            nb_values: number of values the digitised series can take
            nb_word_lengths: the number of word length to consider
            coordinate_weights: the weights of the loss coordinates
            coordinate_filters: filters/transformations to be applied to each simulated series before
                the loss computation.
        """
        super().__init__(coordinate_weights, coordinate_filters)
        self.nb_values = nb_values
        self.nb_word_lengths = nb_word_lengths

    def compute_loss_1d(
        self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
    ) -> float:
        """
        Return the GSL-div measure.

        From (Lamperti, 2017):

        > The information loss about the behaviour of the stochastic process
          due to the symbolization becomes smaller and smaller as b increases.
          On the other side, low values of b would likely wash away
          processes’ noise the modeller might not be interested in.

        Args:
            sim_data_ensemble: the ensemble of simulated data
            real_data: the real data

        Returns:
            the GSL loss
        """
        N = len(real_data)
        ensemble_size = sim_data_ensemble.shape[0]

        if self.nb_values is None:
            nb_values = int((N - 1) / 2.0)
        else:
            nb_values = self.nb_values

        if self.nb_word_lengths is None:
            nb_word_lengths = int((N - 1) / 2.0)
        else:
            nb_word_lengths = self.nb_word_lengths

        # discretize real time series
        obs_xd = self.discretize(
            real_data,
            nb_values,
            np.min(real_data),
            np.max(real_data),
        )

        gsl_loss = 0.0

        # average loss over the ensemble
        for sim_data in sim_data_ensemble:

            # discretize simulated series
            sim_xd = self.discretize(
                sim_data, nb_values, np.min(sim_data), np.max(sim_data)
            )

            loss = self.gsl_div_1d_1_sample(
                sim_xd, obs_xd, nb_word_lengths, nb_values, N
            )

            gsl_loss += loss

        return gsl_loss / ensemble_size

    def gsl_div_1d_1_sample(
        self,
        sim_xd: NDArray,
        obs_xd: NDArray,
        nb_word_lengths: int,
        nb_values: int,
        N: int,
    ) -> float:
        """Compute the GSL-div for a single realisation of the simulated data.

        Args:
            sim_xd: discretised simulated series
            obs_xd: discretised real series
            nb_word_lengths: the number of word length to consider
            nb_values: number of values the digitised series can take
            N: the length of real and simulated series

        Returns:
            the computed loss
        """
        # outcome measure
        gsl_div = 0.0

        # weight
        weight = 0.0

        # for any word len:
        for word_length in range(1, nb_word_lengths + 1):
            sim_xw = self.get_words(sim_xd, word_length)
            obs_xw = self.get_words(obs_xd, word_length)
            m_xw = np.concatenate((sim_xw, obs_xw))
            sim_xp = self.get_words_est_prob(sim_xw)
            m_xp = self.get_words_est_prob(m_xw)
            base = float(nb_values**word_length)
            sim_entr = self.get_sh_entr(sim_xp, base)
            m_entr = self.get_sh_entr(m_xp, base)

            # update weight
            weight = weight + 2 / (nb_word_lengths * (nb_word_lengths + 1))

            # correction
            corr = ((len(m_xp) - 1) - (len(sim_xp) - 1)) / (2 * N)

            # add to measure
            gsl_divl = 2 * m_entr - sim_entr + corr
            gsl_div = gsl_div + weight * gsl_divl

        # end of cycle, return
        return gsl_div

    @staticmethod
    def discretize(
        time_series: NDArray[np.float64],
        nb_values: int,
        start_index: Union[np.float64, float],
        stop_index: Union[np.float64, float],
    ) -> NDArray[np.int64]:
        """
        Discretize the TS in 'nb_values' finite states.

        >>> GslDivLoss.discretize(
        ...     [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        ...     nb_values=3,
        ...     start_index=1,
        ...     stop_index=10
        ... )
        array([1, 1, 1, 2, 2, 2, 2, 3, 3, 3])

        Args:
            time_series: any univariate time series
            nb_values: int, number of values the digitised series can take.
                It must be greater than 0.
            start_index: the starting point
            stop_index: the stopping point

        Returns:
            the discretised time series
        """
        start_index = np.float64(start_index)
        stop_index = np.float64(stop_index)
        linspace = np.linspace(start_index - EPS, stop_index + EPS, nb_values + 1)

        return np.searchsorted(linspace, time_series, side="left")

    @staticmethod
    def get_words(time_series: NDArray[np.float64], length: int) -> NDArray:
        """
        Return an overlapping array of words (int32) of 'length' given a discretised vector.

        >>> GslDivLoss.get_words(np.asarray([1, 2, 2, 2]), 2)
        array([12, 22, 22])

        Args:
            time_series: any univariate discretised time series
            length: int, len of words to be returned. Must be such that
                (len(time_series) + 1 - length) is positive.

        Returns:
            the time series of overlapping words

        """
        tswlen = len(time_series) + 1 - length
        _assert(
            tswlen >= 0,
            "the chosen word length is too high",
            exception_class=ValueError,
        )
        tsw = np.zeros(shape=(tswlen,), dtype=np.int32)

        for i in range(length):
            k = 10 ** (length - i - 1)
            tsw = tsw + time_series[i : tswlen + i] * k

        return tsw

    @staticmethod
    def get_words_est_prob(time_series: NDArray[np.float64]) -> NDArray[np.float64]:
        """
        Return an array of estimated probabilities given an array of words (int32).

        Args:
            time_series: any univariate array of words

        Returns:
            estimate of probabilities
        """
        _, count = np.unique(time_series, return_counts=True)
        est_p = np.divide(count, np.sum(count))
        return est_p

    @staticmethod
    def get_sh_entr(probs: NDArray[np.float64], log_base: float) -> float:
        """
        Return the Shannon entropy given an array of probabilities.

        Args:
            probs: an array of probabilities describing the discrete probability distribution
            log_base: the entropy logarithm base.

        Returns:
            the entropy of the discrete probability distribution
        """
        log = np.log(probs) / np.log(log_base)
        return -np.sum(np.multiply(probs, log))

__init__(self, nb_values=None, nb_word_lengths=None, coordinate_weights=None, coordinate_filters=None) special

Initialize the GSL-div loss object.

Parameters:

Name Type Description Default
nb_values int

number of values the digitised series can take

None
nb_word_lengths int

the number of word length to consider

None
coordinate_weights Optional[numpy.ndarray[Any, numpy.dtype[+ScalarType]]]

the weights of the loss coordinates

None
coordinate_filters Optional[List[Optional[Callable]]]

filters/transformations to be applied to each simulated series before the loss computation.

None
Source code in black_it/loss_functions/gsl_div.py
def __init__(
    self,
    nb_values: int = None,
    nb_word_lengths: int = None,
    coordinate_weights: Optional[NDArray] = None,
    coordinate_filters: Optional[List[Optional[Callable]]] = None,
) -> None:
    """
    Initialize the GSL-div loss object.

    Args:
        nb_values: number of values the digitised series can take
        nb_word_lengths: the number of word length to consider
        coordinate_weights: the weights of the loss coordinates
        coordinate_filters: filters/transformations to be applied to each simulated series before
            the loss computation.
    """
    super().__init__(coordinate_weights, coordinate_filters)
    self.nb_values = nb_values
    self.nb_word_lengths = nb_word_lengths

compute_loss_1d(self, sim_data_ensemble, real_data)

Return the GSL-div measure.

From (Lamperti, 2017):

The information loss about the behaviour of the stochastic process due to the symbolization becomes smaller and smaller as b increases. On the other side, low values of b would likely wash away processes’ noise the modeller might not be interested in.

Parameters:

Name Type Description Default
sim_data_ensemble ndarray

the ensemble of simulated data

required
real_data ndarray

the real data

required

Returns:

Type Description
float

the GSL loss

Source code in black_it/loss_functions/gsl_div.py
def compute_loss_1d(
    self, sim_data_ensemble: NDArray[np.float64], real_data: NDArray[np.float64]
) -> float:
    """
    Return the GSL-div measure.

    From (Lamperti, 2017):

    > The information loss about the behaviour of the stochastic process
      due to the symbolization becomes smaller and smaller as b increases.
      On the other side, low values of b would likely wash away
      processes’ noise the modeller might not be interested in.

    Args:
        sim_data_ensemble: the ensemble of simulated data
        real_data: the real data

    Returns:
        the GSL loss
    """
    N = len(real_data)
    ensemble_size = sim_data_ensemble.shape[0]

    if self.nb_values is None:
        nb_values = int((N - 1) / 2.0)
    else:
        nb_values = self.nb_values

    if self.nb_word_lengths is None:
        nb_word_lengths = int((N - 1) / 2.0)
    else:
        nb_word_lengths = self.nb_word_lengths

    # discretize real time series
    obs_xd = self.discretize(
        real_data,
        nb_values,
        np.min(real_data),
        np.max(real_data),
    )

    gsl_loss = 0.0

    # average loss over the ensemble
    for sim_data in sim_data_ensemble:

        # discretize simulated series
        sim_xd = self.discretize(
            sim_data, nb_values, np.min(sim_data), np.max(sim_data)
        )

        loss = self.gsl_div_1d_1_sample(
            sim_xd, obs_xd, nb_word_lengths, nb_values, N
        )

        gsl_loss += loss

    return gsl_loss / ensemble_size

discretize(time_series, nb_values, start_index, stop_index) staticmethod

Discretize the TS in 'nb_values' finite states.

GslDivLoss.discretize( ... [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], ... nb_values=3, ... start_index=1, ... stop_index=10 ... ) array([1, 1, 1, 2, 2, 2, 2, 3, 3, 3])

Parameters:

Name Type Description Default
time_series ndarray

any univariate time series

required
nb_values int

int, number of values the digitised series can take. It must be greater than 0.

required
start_index Union[numpy.float64, float]

the starting point

required
stop_index Union[numpy.float64, float]

the stopping point

required

Returns:

Type Description
ndarray

the discretised time series

Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def discretize(
    time_series: NDArray[np.float64],
    nb_values: int,
    start_index: Union[np.float64, float],
    stop_index: Union[np.float64, float],
) -> NDArray[np.int64]:
    """
    Discretize the TS in 'nb_values' finite states.

    >>> GslDivLoss.discretize(
    ...     [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    ...     nb_values=3,
    ...     start_index=1,
    ...     stop_index=10
    ... )
    array([1, 1, 1, 2, 2, 2, 2, 3, 3, 3])

    Args:
        time_series: any univariate time series
        nb_values: int, number of values the digitised series can take.
            It must be greater than 0.
        start_index: the starting point
        stop_index: the stopping point

    Returns:
        the discretised time series
    """
    start_index = np.float64(start_index)
    stop_index = np.float64(stop_index)
    linspace = np.linspace(start_index - EPS, stop_index + EPS, nb_values + 1)

    return np.searchsorted(linspace, time_series, side="left")

get_sh_entr(probs, log_base) staticmethod

Return the Shannon entropy given an array of probabilities.

Parameters:

Name Type Description Default
probs ndarray

an array of probabilities describing the discrete probability distribution

required
log_base float

the entropy logarithm base.

required

Returns:

Type Description
float

the entropy of the discrete probability distribution

Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def get_sh_entr(probs: NDArray[np.float64], log_base: float) -> float:
    """
    Return the Shannon entropy given an array of probabilities.

    Args:
        probs: an array of probabilities describing the discrete probability distribution
        log_base: the entropy logarithm base.

    Returns:
        the entropy of the discrete probability distribution
    """
    log = np.log(probs) / np.log(log_base)
    return -np.sum(np.multiply(probs, log))

get_words(time_series, length) staticmethod

Return an overlapping array of words (int32) of 'length' given a discretised vector.

GslDivLoss.get_words(np.asarray([1, 2, 2, 2]), 2) array([12, 22, 22])

Parameters:

Name Type Description Default
time_series ndarray

any univariate discretised time series

required
length int

int, len of words to be returned. Must be such that (len(time_series) + 1 - length) is positive.

required

Returns:

Type Description
ndarray

the time series of overlapping words

Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def get_words(time_series: NDArray[np.float64], length: int) -> NDArray:
    """
    Return an overlapping array of words (int32) of 'length' given a discretised vector.

    >>> GslDivLoss.get_words(np.asarray([1, 2, 2, 2]), 2)
    array([12, 22, 22])

    Args:
        time_series: any univariate discretised time series
        length: int, len of words to be returned. Must be such that
            (len(time_series) + 1 - length) is positive.

    Returns:
        the time series of overlapping words

    """
    tswlen = len(time_series) + 1 - length
    _assert(
        tswlen >= 0,
        "the chosen word length is too high",
        exception_class=ValueError,
    )
    tsw = np.zeros(shape=(tswlen,), dtype=np.int32)

    for i in range(length):
        k = 10 ** (length - i - 1)
        tsw = tsw + time_series[i : tswlen + i] * k

    return tsw

get_words_est_prob(time_series) staticmethod

Return an array of estimated probabilities given an array of words (int32).

Parameters:

Name Type Description Default
time_series ndarray

any univariate array of words

required

Returns:

Type Description
ndarray

estimate of probabilities

Source code in black_it/loss_functions/gsl_div.py
@staticmethod
def get_words_est_prob(time_series: NDArray[np.float64]) -> NDArray[np.float64]:
    """
    Return an array of estimated probabilities given an array of words (int32).

    Args:
        time_series: any univariate array of words

    Returns:
        estimate of probabilities
    """
    _, count = np.unique(time_series, return_counts=True)
    est_p = np.divide(count, np.sum(count))
    return est_p

gsl_div_1d_1_sample(self, sim_xd, obs_xd, nb_word_lengths, nb_values, N)

Compute the GSL-div for a single realisation of the simulated data.

Parameters:

Name Type Description Default
sim_xd ndarray

discretised simulated series

required
obs_xd ndarray

discretised real series

required
nb_word_lengths int

the number of word length to consider

required
nb_values int

number of values the digitised series can take

required
N int

the length of real and simulated series

required

Returns:

Type Description
float

the computed loss

Source code in black_it/loss_functions/gsl_div.py
def gsl_div_1d_1_sample(
    self,
    sim_xd: NDArray,
    obs_xd: NDArray,
    nb_word_lengths: int,
    nb_values: int,
    N: int,
) -> float:
    """Compute the GSL-div for a single realisation of the simulated data.

    Args:
        sim_xd: discretised simulated series
        obs_xd: discretised real series
        nb_word_lengths: the number of word length to consider
        nb_values: number of values the digitised series can take
        N: the length of real and simulated series

    Returns:
        the computed loss
    """
    # outcome measure
    gsl_div = 0.0

    # weight
    weight = 0.0

    # for any word len:
    for word_length in range(1, nb_word_lengths + 1):
        sim_xw = self.get_words(sim_xd, word_length)
        obs_xw = self.get_words(obs_xd, word_length)
        m_xw = np.concatenate((sim_xw, obs_xw))
        sim_xp = self.get_words_est_prob(sim_xw)
        m_xp = self.get_words_est_prob(m_xw)
        base = float(nb_values**word_length)
        sim_entr = self.get_sh_entr(sim_xp, base)
        m_entr = self.get_sh_entr(m_xp, base)

        # update weight
        weight = weight + 2 / (nb_word_lengths * (nb_word_lengths + 1))

        # correction
        corr = ((len(m_xp) - 1) - (len(sim_xp) - 1)) / (2 * N)

        # add to measure
        gsl_divl = 2 * m_entr - sim_entr + corr
        gsl_div = gsl_div + weight * gsl_divl

    # end of cycle, return
    return gsl_div