Module mogptk.mosm

Expand source code Browse git
import numpy as np
from .model import model
from .dataset import DataSet
from .sm import _estimate_from_sm
from .kernels import MultiOutputSpectralMixture, Noise
from .plot import plot_spectrum
import matplotlib as mpl
import matplotlib.pyplot as plt

class MOSM(model):
    """
    MOGP with Multi Output Spectral Mixture kernel, as proposed in [1].

    The model contain the dataset and the associated gpflow model, 
    when the mogptk.Model is instanciated the gpflow model is built 
    using random parameters.

    Args:
        dataset (mogptk.dataset.DataSet): DataSet object of data for all channels.
        Q (int, optional): Number of components.
        name (str, optional): Name of the model.
        likelihood (gpflow.likelihoods, optional): Likelihood to use from GPFlow, if None a default exact inference Gaussian likelihood is used.
        variational (bool, optional): If True, use variational inference to approximate function values as Gaussian. If False it will use Monte Carlo Markov Chain.
        sparse (bool, optional): If True, will use sparse GP regression.
        like_params (dict, optional): Parameters to GPflow likelihood.

    Atributes:
        dataset: Constains the mogptk.DataSet associated.
        model: GPflow model.

    Examples:
    >>> import numpy as np
    >>> t = np.linspace(0, 10, 100)
    >>> y1 = np.sin(0.5 * t)
    >>> y2 = 2 * np.sin(0.2 * t)
    >>> import mogptk
    >>> data_list = []
    >>> mogptk.data_list.append(mogptk.Data(t, y1))
    >>> mogptk.data_list.append(mogptk.Data(t, y2))
    >>> model = mogptk.MOSM(data_list, Q=2)
    >>> model.build()
    >>> model.train()
    >>> model.plot_prediction()

    [1] G. Parra and F. Tobar, "Spectral Mixture Kernels for Multi-Output Gaussian Processes", Advances in Neural Information Processing Systems 31, 2017
    """
    def __init__(self, dataset, Q=1, name="MOSM", likelihood=None, variational=False, sparse=False, like_params={}):
        model.__init__(self, name, dataset)
        self.Q = Q

        for q in range(Q):
            kernel = MultiOutputSpectralMixture(
                self.dataset.get_input_dims()[0],
                self.dataset.get_output_dims(),
            )
            if q == 0:
                kernel_set = kernel
            else:
                kernel_set += kernel
        kernel_set += Noise(self.dataset.get_input_dims()[0], self.dataset.get_output_dims())
        self._build(kernel_set, likelihood, variational, sparse, like_params)

    def init_parameters(self, method='BNSE', sm_method='BNSE', sm_opt='BFGS', sm_maxiter=3000, plot=False):
        """
        Initialize kernel parameters.

        The initialization can be done in two ways, the first by estimating the PSD via 
        BNSE (Tobar 2018) and then selecting the greater Q peaks in the estimated spectrum,
        the peaks position, magnitude and width initialize the mean, magnitude and variance
        of the kernel respectively.
        The second way is by fitting independent Gaussian process for each channel, each one
        with SM kernel, using the fitted parameters for initial values of the multioutput kernel.

        In all cases the noise is initialized with 1/30 of the variance 
        of each channel.

        Args:
            method (str, optional): Method of estimation, possible values are 'BNSE' and 'SM'.
            sm_method (str, optional): Method of estimating SM kernels. Only valid in 'SM' mode.
            sm_opt (str, optional): Optimization method for SM kernels. Only valid in 'SM' mode.
            sm_maxiter (str, optional): Maximum iteration for SM kernels. Only valid in 'SM' mode.
            plot (bool, optional): Show the PSD of the kernel after fitting SM kernels. Only valid in 'SM' mode.
        """

        n_channels = self.dataset.get_output_dims()

        if method == 'BNSE':
            amplitudes, means, variances = self.dataset.get_bnse_estimation(self.Q)
            if len(amplitudes) == 0:
                logger.warning('BNSE could not find peaks for MOSM')
                return

            magnitude = np.zeros((n_channels, self.Q))
            for q in range(self.Q):
                mean = np.empty((self.dataset.get_input_dims()[0], n_channels))
                variance = np.empty((self.dataset.get_input_dims()[0], n_channels))
                for channel in range(n_channels):
                    magnitude[channel, q] = amplitudes[channel][:,q].mean()
                    mean[:,channel] = means[channel][:,q] * 2 * np.pi
                    variance[:,channel] = variances[channel][:,q] * 2
            
            # normalize proportional to channels variances
            for channel, data in enumerate(self.dataset):
                magnitude[channel, :] = np.sqrt(magnitude[channel, :] / magnitude[channel, :].sum() * data.Y[data.mask].var()) * 2

            for q in range(self.Q):
                self.set_parameter(q, 'magnitude', magnitude[:, q])
                self.set_parameter(q, 'mean', mean)
                self.set_parameter(q, 'variance', variance)

        elif method == 'SM':
            params = _estimate_from_sm(self.dataset, self.Q, method=sm_method, optimizer=sm_opt, maxiter=sm_maxiter, plot=plot)

            magnitude = np.zeros((n_channels, self.Q))
            for q in range(self.Q):
                magnitude[:, q] = params[q]['weight'].mean(axis=0)
                self.set_parameter(q, 'mean', params[q]['mean'])
                self.set_parameter(q, 'variance', params[q]['scale'] * 2)

            # normalize proportional to channels variances
            for channel, data in enumerate(self.dataset):
                if magnitude[channel, :].sum()==0:
                    raise Exception("Sum of magnitudes equal to zero")

                magnitude[channel, :] = np.sqrt(magnitude[channel, :] / magnitude[channel, :].sum() * data.Y[data.mask].var()) * 2
            for q in range(self.Q):
                self.set_parameter(q, 'magnitude', magnitude[:, q])
        else:
            raise Exception("possible methods of estimation are either 'BNSE' or 'SM'")

        noise = np.empty((n_channels))
        for channel, data in enumerate(self.dataset):
            noise[channel] = (data.Y[data.mask]).var() / 30
        self.set_parameter(self.Q, 'noise', noise)

    def plot(self):
        names = self.dataset.get_names()
        nyquist = self.dataset.get_nyquist_estimation()

        params = self.get_parameters()
        means = np.array([params[q]['mean'] for q in range(self.Q)])
        weights = np.array([params[q]['magnitude'] for q in range(self.Q)])**2
        scales = np.array([params[q]['variance'] for q in range(self.Q)])
        plot_spectrum(means, scales, weights=weights, nyquist=nyquist, titles=names)

    def plot_psd(self, figsize=(20, 14), title=''):
        """
        Plot power spectral density and power cross spectral density.

        Note: Implemented only for 1 input dimension.
        """

        cross_params = self._get_cross_parameters()
        m = self.dataset.get_output_dims()

        fig, axes = plt.subplots(m, m, sharex=False, figsize=figsize, squeeze=False)
        for i in range(m):
            for j in range(i+1):
                self._plot_power_cross_spectral_density(
                    axes[i, j],
                    cross_params,
                    channels=(i, j))

        plt.tight_layout()
        return fig, axes

    def _plot_power_cross_spectral_density(self, ax, params, channels=(0, 0)):
        """
        Plot power cross spectral density given axis.

        Args:
            ax (matplotlib.axis): Axis to plot to.
            params(dict): Kernel parameters.
            channels (tuple of ints): Channels to use.
        """
        i = channels[0]
        j = channels[1]

        mean = params['mean'][i, j, 0, :]
        cov = params['covariance'][i, j, 0, :]
        delay = params['delay'][i, j, 0, :]
        magn = params['magnitude'][i, j, :]
        phase = params['phase'][i, j, :]

        
        w_high = (mean + 2* np.sqrt(cov)).max()

        w = np.linspace(-w_high, w_high, 1000)

        # power spectral density
        if i==j:
            psd_total = np.zeros(len(w))
            for q in range(self.Q):
                psd_q = np.exp(-0.5 * (w - mean[q])**2 / cov[q])
                psd_q += np.exp(-0.5 * (w + mean[q])**2 / cov[q])
                psd_q *= magn[q] * 0.5

                ax.plot(w, psd_q, '--r', lw=0.5)

                psd_total += psd_q
            ax.plot(w, psd_total, 'r', lw=2.1, alpha=0.7)
        # power cross spectral density
        else:
            psd_total = np.zeros(len(w)) + 0.j
            for q in range(self.Q):
                psd_q = np.exp(-0.5 * (w - mean[q])**2 / cov[q] + 1.j * (w * delay[q] + phase[q]))
                psd_q += np.exp(-0.5 * (w + mean[q])**2 / cov[q] + 1.j * (w * delay[q] + phase[q]))
                psd_q *= magn[q] * 0.5

                ax.plot(w, np.real(psd_q), '--b', lw=0.5)
                ax.plot(w, np.imag(psd_q), '--g', lw=0.5)
            
                psd_total += psd_q
            ax.plot(w, np.real(psd_total), 'b', lw=1.2, alpha=0.7)
            ax.plot(w, np.imag(psd_total), 'g', lw=1.2, alpha=0.7)
        ax.set_yticks([])
        return

    def info(self):
        for channel in range(self.dataset.get_output_dims()):
            for q in range(self.Q):
                mean = self.get_parameter(q, "mean")[:,channel]
                var = self.get_parameter(q, "variance")[:,channel]
                if np.linalg.norm(mean) < np.linalg.norm(var):
                    print("‣ MOSM approaches RBF kernel for q=%d in channel='%s'" % (q, self.dataset[channel].name))

    def _get_cross_parameters(self):
        """
        Obtain cross parameters from MOSM

        Returns:
            cross_params(dict): Dictionary with the cross parameters, 'covariance', 'mean',
            'magnitude', 'delay' and 'phase'. Each one a output_dim x output_dim x input_dim x Q
            array with the cross parameters, with the exception of 'magnitude' and 'phase' where 
            the cross parameters are a output_dim x output_dim x Q array.
            NOTE: this assumes the same input dimension for all channels.
        """
        m = self.dataset.get_output_dims()
        d = self.dataset.get_input_dims()[0]
        Q = self.Q

        cross_params = {}

        cross_params['covariance'] = np.zeros((m, m, d, Q))
        cross_params['mean'] = np.zeros((m, m, d, Q))
        cross_params['magnitude'] = np.zeros((m, m, Q))
        cross_params['delay'] = np.zeros((m, m, d, Q))
        cross_params['phase'] = np.zeros((m, m, Q))

        for q in range(Q):
            for i in range(m):
                for j in range(m):
                    var_i = self.get_parameter(q, 'variance')[:, i]
                    var_j = self.get_parameter(q, 'variance')[:, j]
                    mu_i = self.get_parameter(q, 'mean')[:, i]
                    mu_j = self.get_parameter(q, 'mean')[:, j]
                    w_i = self.get_parameter(q, 'magnitude')[i]
                    w_j = self.get_parameter(q, 'magnitude')[j]
                    sv = var_i + var_j

                    # cross covariance
                    cross_params['covariance'][i, j, :, q] = 2 * (var_i * var_j) / sv
                    # cross mean
                    cross_mean_num = var_i.dot(mu_j) + var_j.dot(mu_i)
                    cross_params['mean'][i, j, :, q] = cross_mean_num / sv
                    # cross magnitude
                    exp_term = -1/4 * ((mu_i - mu_j)**2 / sv).sum()
                    cross_params['magnitude'][i, j, q] = w_i * w_j * np.exp(exp_term)
            # cross phase
            phase_q = self.get_parameter(q, 'phase')
            cross_params['phase'][:, :, q] = np.subtract.outer(phase_q, phase_q)
            for n in range(d):
                # cross delay        
                delay_n_q = self.get_parameter(q, 'delay')[n, :]
                cross_params['delay'][:, :, n, q] = np.subtract.outer(delay_n_q, delay_n_q)

        return cross_params

Classes

class MOSM (dataset, Q=1, name='MOSM', likelihood=None, variational=False, sparse=False, like_params={})

MOGP with Multi Output Spectral Mixture kernel, as proposed in [1].

The model contain the dataset and the associated gpflow model, when the mogptk.Model is instanciated the gpflow model is built using random parameters.

Args

dataset : DataSet
DataSet object of data for all channels.
Q : int, optional
Number of components.
name : str, optional
Name of the model.
likelihood : gpflow.likelihoods, optional
Likelihood to use from GPFlow, if None a default exact inference Gaussian likelihood is used.
variational : bool, optional
If True, use variational inference to approximate function values as Gaussian. If False it will use Monte Carlo Markov Chain.
sparse : bool, optional
If True, will use sparse GP regression.
like_params : dict, optional
Parameters to GPflow likelihood.

Atributes

dataset
Constains the mogptk.DataSet associated.
model
GPflow model.

Examples:

>>> import numpy as np
>>> t = np.linspace(0, 10, 100)
>>> y1 = np.sin(0.5 * t)
>>> y2 = 2 * np.sin(0.2 * t)
>>> import mogptk
>>> data_list = []
>>> mogptk.data_list.append(mogptk.Data(t, y1))
>>> mogptk.data_list.append(mogptk.Data(t, y2))
>>> model = mogptk.MOSM(data_list, Q=2)
>>> model.build()
>>> model.train()
>>> model.plot_prediction()

[1] G. Parra and F. Tobar, "Spectral Mixture Kernels for Multi-Output Gaussian Processes", Advances in Neural Information Processing Systems 31, 2017

Base class for Multi-Output Gaussian process models. See subclasses for instantiation.

Args

name : str
Name of the model.
dataset : DataSet, Data
DataSet with Data objects for all the channels.

When a (list or dict of) Data object is passed, it will automatically be converted to a DataSet.

Expand source code Browse git
class MOSM(model):
    """
    MOGP with Multi Output Spectral Mixture kernel, as proposed in [1].

    The model contain the dataset and the associated gpflow model, 
    when the mogptk.Model is instanciated the gpflow model is built 
    using random parameters.

    Args:
        dataset (mogptk.dataset.DataSet): DataSet object of data for all channels.
        Q (int, optional): Number of components.
        name (str, optional): Name of the model.
        likelihood (gpflow.likelihoods, optional): Likelihood to use from GPFlow, if None a default exact inference Gaussian likelihood is used.
        variational (bool, optional): If True, use variational inference to approximate function values as Gaussian. If False it will use Monte Carlo Markov Chain.
        sparse (bool, optional): If True, will use sparse GP regression.
        like_params (dict, optional): Parameters to GPflow likelihood.

    Atributes:
        dataset: Constains the mogptk.DataSet associated.
        model: GPflow model.

    Examples:
    >>> import numpy as np
    >>> t = np.linspace(0, 10, 100)
    >>> y1 = np.sin(0.5 * t)
    >>> y2 = 2 * np.sin(0.2 * t)
    >>> import mogptk
    >>> data_list = []
    >>> mogptk.data_list.append(mogptk.Data(t, y1))
    >>> mogptk.data_list.append(mogptk.Data(t, y2))
    >>> model = mogptk.MOSM(data_list, Q=2)
    >>> model.build()
    >>> model.train()
    >>> model.plot_prediction()

    [1] G. Parra and F. Tobar, "Spectral Mixture Kernels for Multi-Output Gaussian Processes", Advances in Neural Information Processing Systems 31, 2017
    """
    def __init__(self, dataset, Q=1, name="MOSM", likelihood=None, variational=False, sparse=False, like_params={}):
        model.__init__(self, name, dataset)
        self.Q = Q

        for q in range(Q):
            kernel = MultiOutputSpectralMixture(
                self.dataset.get_input_dims()[0],
                self.dataset.get_output_dims(),
            )
            if q == 0:
                kernel_set = kernel
            else:
                kernel_set += kernel
        kernel_set += Noise(self.dataset.get_input_dims()[0], self.dataset.get_output_dims())
        self._build(kernel_set, likelihood, variational, sparse, like_params)

    def init_parameters(self, method='BNSE', sm_method='BNSE', sm_opt='BFGS', sm_maxiter=3000, plot=False):
        """
        Initialize kernel parameters.

        The initialization can be done in two ways, the first by estimating the PSD via 
        BNSE (Tobar 2018) and then selecting the greater Q peaks in the estimated spectrum,
        the peaks position, magnitude and width initialize the mean, magnitude and variance
        of the kernel respectively.
        The second way is by fitting independent Gaussian process for each channel, each one
        with SM kernel, using the fitted parameters for initial values of the multioutput kernel.

        In all cases the noise is initialized with 1/30 of the variance 
        of each channel.

        Args:
            method (str, optional): Method of estimation, possible values are 'BNSE' and 'SM'.
            sm_method (str, optional): Method of estimating SM kernels. Only valid in 'SM' mode.
            sm_opt (str, optional): Optimization method for SM kernels. Only valid in 'SM' mode.
            sm_maxiter (str, optional): Maximum iteration for SM kernels. Only valid in 'SM' mode.
            plot (bool, optional): Show the PSD of the kernel after fitting SM kernels. Only valid in 'SM' mode.
        """

        n_channels = self.dataset.get_output_dims()

        if method == 'BNSE':
            amplitudes, means, variances = self.dataset.get_bnse_estimation(self.Q)
            if len(amplitudes) == 0:
                logger.warning('BNSE could not find peaks for MOSM')
                return

            magnitude = np.zeros((n_channels, self.Q))
            for q in range(self.Q):
                mean = np.empty((self.dataset.get_input_dims()[0], n_channels))
                variance = np.empty((self.dataset.get_input_dims()[0], n_channels))
                for channel in range(n_channels):
                    magnitude[channel, q] = amplitudes[channel][:,q].mean()
                    mean[:,channel] = means[channel][:,q] * 2 * np.pi
                    variance[:,channel] = variances[channel][:,q] * 2
            
            # normalize proportional to channels variances
            for channel, data in enumerate(self.dataset):
                magnitude[channel, :] = np.sqrt(magnitude[channel, :] / magnitude[channel, :].sum() * data.Y[data.mask].var()) * 2

            for q in range(self.Q):
                self.set_parameter(q, 'magnitude', magnitude[:, q])
                self.set_parameter(q, 'mean', mean)
                self.set_parameter(q, 'variance', variance)

        elif method == 'SM':
            params = _estimate_from_sm(self.dataset, self.Q, method=sm_method, optimizer=sm_opt, maxiter=sm_maxiter, plot=plot)

            magnitude = np.zeros((n_channels, self.Q))
            for q in range(self.Q):
                magnitude[:, q] = params[q]['weight'].mean(axis=0)
                self.set_parameter(q, 'mean', params[q]['mean'])
                self.set_parameter(q, 'variance', params[q]['scale'] * 2)

            # normalize proportional to channels variances
            for channel, data in enumerate(self.dataset):
                if magnitude[channel, :].sum()==0:
                    raise Exception("Sum of magnitudes equal to zero")

                magnitude[channel, :] = np.sqrt(magnitude[channel, :] / magnitude[channel, :].sum() * data.Y[data.mask].var()) * 2
            for q in range(self.Q):
                self.set_parameter(q, 'magnitude', magnitude[:, q])
        else:
            raise Exception("possible methods of estimation are either 'BNSE' or 'SM'")

        noise = np.empty((n_channels))
        for channel, data in enumerate(self.dataset):
            noise[channel] = (data.Y[data.mask]).var() / 30
        self.set_parameter(self.Q, 'noise', noise)

    def plot(self):
        names = self.dataset.get_names()
        nyquist = self.dataset.get_nyquist_estimation()

        params = self.get_parameters()
        means = np.array([params[q]['mean'] for q in range(self.Q)])
        weights = np.array([params[q]['magnitude'] for q in range(self.Q)])**2
        scales = np.array([params[q]['variance'] for q in range(self.Q)])
        plot_spectrum(means, scales, weights=weights, nyquist=nyquist, titles=names)

    def plot_psd(self, figsize=(20, 14), title=''):
        """
        Plot power spectral density and power cross spectral density.

        Note: Implemented only for 1 input dimension.
        """

        cross_params = self._get_cross_parameters()
        m = self.dataset.get_output_dims()

        fig, axes = plt.subplots(m, m, sharex=False, figsize=figsize, squeeze=False)
        for i in range(m):
            for j in range(i+1):
                self._plot_power_cross_spectral_density(
                    axes[i, j],
                    cross_params,
                    channels=(i, j))

        plt.tight_layout()
        return fig, axes

    def _plot_power_cross_spectral_density(self, ax, params, channels=(0, 0)):
        """
        Plot power cross spectral density given axis.

        Args:
            ax (matplotlib.axis): Axis to plot to.
            params(dict): Kernel parameters.
            channels (tuple of ints): Channels to use.
        """
        i = channels[0]
        j = channels[1]

        mean = params['mean'][i, j, 0, :]
        cov = params['covariance'][i, j, 0, :]
        delay = params['delay'][i, j, 0, :]
        magn = params['magnitude'][i, j, :]
        phase = params['phase'][i, j, :]

        
        w_high = (mean + 2* np.sqrt(cov)).max()

        w = np.linspace(-w_high, w_high, 1000)

        # power spectral density
        if i==j:
            psd_total = np.zeros(len(w))
            for q in range(self.Q):
                psd_q = np.exp(-0.5 * (w - mean[q])**2 / cov[q])
                psd_q += np.exp(-0.5 * (w + mean[q])**2 / cov[q])
                psd_q *= magn[q] * 0.5

                ax.plot(w, psd_q, '--r', lw=0.5)

                psd_total += psd_q
            ax.plot(w, psd_total, 'r', lw=2.1, alpha=0.7)
        # power cross spectral density
        else:
            psd_total = np.zeros(len(w)) + 0.j
            for q in range(self.Q):
                psd_q = np.exp(-0.5 * (w - mean[q])**2 / cov[q] + 1.j * (w * delay[q] + phase[q]))
                psd_q += np.exp(-0.5 * (w + mean[q])**2 / cov[q] + 1.j * (w * delay[q] + phase[q]))
                psd_q *= magn[q] * 0.5

                ax.plot(w, np.real(psd_q), '--b', lw=0.5)
                ax.plot(w, np.imag(psd_q), '--g', lw=0.5)
            
                psd_total += psd_q
            ax.plot(w, np.real(psd_total), 'b', lw=1.2, alpha=0.7)
            ax.plot(w, np.imag(psd_total), 'g', lw=1.2, alpha=0.7)
        ax.set_yticks([])
        return

    def info(self):
        for channel in range(self.dataset.get_output_dims()):
            for q in range(self.Q):
                mean = self.get_parameter(q, "mean")[:,channel]
                var = self.get_parameter(q, "variance")[:,channel]
                if np.linalg.norm(mean) < np.linalg.norm(var):
                    print("‣ MOSM approaches RBF kernel for q=%d in channel='%s'" % (q, self.dataset[channel].name))

    def _get_cross_parameters(self):
        """
        Obtain cross parameters from MOSM

        Returns:
            cross_params(dict): Dictionary with the cross parameters, 'covariance', 'mean',
            'magnitude', 'delay' and 'phase'. Each one a output_dim x output_dim x input_dim x Q
            array with the cross parameters, with the exception of 'magnitude' and 'phase' where 
            the cross parameters are a output_dim x output_dim x Q array.
            NOTE: this assumes the same input dimension for all channels.
        """
        m = self.dataset.get_output_dims()
        d = self.dataset.get_input_dims()[0]
        Q = self.Q

        cross_params = {}

        cross_params['covariance'] = np.zeros((m, m, d, Q))
        cross_params['mean'] = np.zeros((m, m, d, Q))
        cross_params['magnitude'] = np.zeros((m, m, Q))
        cross_params['delay'] = np.zeros((m, m, d, Q))
        cross_params['phase'] = np.zeros((m, m, Q))

        for q in range(Q):
            for i in range(m):
                for j in range(m):
                    var_i = self.get_parameter(q, 'variance')[:, i]
                    var_j = self.get_parameter(q, 'variance')[:, j]
                    mu_i = self.get_parameter(q, 'mean')[:, i]
                    mu_j = self.get_parameter(q, 'mean')[:, j]
                    w_i = self.get_parameter(q, 'magnitude')[i]
                    w_j = self.get_parameter(q, 'magnitude')[j]
                    sv = var_i + var_j

                    # cross covariance
                    cross_params['covariance'][i, j, :, q] = 2 * (var_i * var_j) / sv
                    # cross mean
                    cross_mean_num = var_i.dot(mu_j) + var_j.dot(mu_i)
                    cross_params['mean'][i, j, :, q] = cross_mean_num / sv
                    # cross magnitude
                    exp_term = -1/4 * ((mu_i - mu_j)**2 / sv).sum()
                    cross_params['magnitude'][i, j, q] = w_i * w_j * np.exp(exp_term)
            # cross phase
            phase_q = self.get_parameter(q, 'phase')
            cross_params['phase'][:, :, q] = np.subtract.outer(phase_q, phase_q)
            for n in range(d):
                # cross delay        
                delay_n_q = self.get_parameter(q, 'delay')[n, :]
                cross_params['delay'][:, :, n, q] = np.subtract.outer(delay_n_q, delay_n_q)

        return cross_params

Ancestors

Methods

def info(self)
Expand source code Browse git
def info(self):
    for channel in range(self.dataset.get_output_dims()):
        for q in range(self.Q):
            mean = self.get_parameter(q, "mean")[:,channel]
            var = self.get_parameter(q, "variance")[:,channel]
            if np.linalg.norm(mean) < np.linalg.norm(var):
                print("‣ MOSM approaches RBF kernel for q=%d in channel='%s'" % (q, self.dataset[channel].name))
def init_parameters(self, method='BNSE', sm_method='BNSE', sm_opt='BFGS', sm_maxiter=3000, plot=False)

Initialize kernel parameters.

The initialization can be done in two ways, the first by estimating the PSD via BNSE (Tobar 2018) and then selecting the greater Q peaks in the estimated spectrum, the peaks position, magnitude and width initialize the mean, magnitude and variance of the kernel respectively. The second way is by fitting independent Gaussian process for each channel, each one with SM kernel, using the fitted parameters for initial values of the multioutput kernel.

In all cases the noise is initialized with 1/30 of the variance of each channel.

Args

method : str, optional
Method of estimation, possible values are 'BNSE' and 'SM'.
sm_method : str, optional
Method of estimating SM kernels. Only valid in 'SM' mode.
sm_opt : str, optional
Optimization method for SM kernels. Only valid in 'SM' mode.
sm_maxiter : str, optional
Maximum iteration for SM kernels. Only valid in 'SM' mode.
plot : bool, optional
Show the PSD of the kernel after fitting SM kernels. Only valid in 'SM' mode.
Expand source code Browse git
def init_parameters(self, method='BNSE', sm_method='BNSE', sm_opt='BFGS', sm_maxiter=3000, plot=False):
    """
    Initialize kernel parameters.

    The initialization can be done in two ways, the first by estimating the PSD via 
    BNSE (Tobar 2018) and then selecting the greater Q peaks in the estimated spectrum,
    the peaks position, magnitude and width initialize the mean, magnitude and variance
    of the kernel respectively.
    The second way is by fitting independent Gaussian process for each channel, each one
    with SM kernel, using the fitted parameters for initial values of the multioutput kernel.

    In all cases the noise is initialized with 1/30 of the variance 
    of each channel.

    Args:
        method (str, optional): Method of estimation, possible values are 'BNSE' and 'SM'.
        sm_method (str, optional): Method of estimating SM kernels. Only valid in 'SM' mode.
        sm_opt (str, optional): Optimization method for SM kernels. Only valid in 'SM' mode.
        sm_maxiter (str, optional): Maximum iteration for SM kernels. Only valid in 'SM' mode.
        plot (bool, optional): Show the PSD of the kernel after fitting SM kernels. Only valid in 'SM' mode.
    """

    n_channels = self.dataset.get_output_dims()

    if method == 'BNSE':
        amplitudes, means, variances = self.dataset.get_bnse_estimation(self.Q)
        if len(amplitudes) == 0:
            logger.warning('BNSE could not find peaks for MOSM')
            return

        magnitude = np.zeros((n_channels, self.Q))
        for q in range(self.Q):
            mean = np.empty((self.dataset.get_input_dims()[0], n_channels))
            variance = np.empty((self.dataset.get_input_dims()[0], n_channels))
            for channel in range(n_channels):
                magnitude[channel, q] = amplitudes[channel][:,q].mean()
                mean[:,channel] = means[channel][:,q] * 2 * np.pi
                variance[:,channel] = variances[channel][:,q] * 2
        
        # normalize proportional to channels variances
        for channel, data in enumerate(self.dataset):
            magnitude[channel, :] = np.sqrt(magnitude[channel, :] / magnitude[channel, :].sum() * data.Y[data.mask].var()) * 2

        for q in range(self.Q):
            self.set_parameter(q, 'magnitude', magnitude[:, q])
            self.set_parameter(q, 'mean', mean)
            self.set_parameter(q, 'variance', variance)

    elif method == 'SM':
        params = _estimate_from_sm(self.dataset, self.Q, method=sm_method, optimizer=sm_opt, maxiter=sm_maxiter, plot=plot)

        magnitude = np.zeros((n_channels, self.Q))
        for q in range(self.Q):
            magnitude[:, q] = params[q]['weight'].mean(axis=0)
            self.set_parameter(q, 'mean', params[q]['mean'])
            self.set_parameter(q, 'variance', params[q]['scale'] * 2)

        # normalize proportional to channels variances
        for channel, data in enumerate(self.dataset):
            if magnitude[channel, :].sum()==0:
                raise Exception("Sum of magnitudes equal to zero")

            magnitude[channel, :] = np.sqrt(magnitude[channel, :] / magnitude[channel, :].sum() * data.Y[data.mask].var()) * 2
        for q in range(self.Q):
            self.set_parameter(q, 'magnitude', magnitude[:, q])
    else:
        raise Exception("possible methods of estimation are either 'BNSE' or 'SM'")

    noise = np.empty((n_channels))
    for channel, data in enumerate(self.dataset):
        noise[channel] = (data.Y[data.mask]).var() / 30
    self.set_parameter(self.Q, 'noise', noise)
def plot(self)
Expand source code Browse git
def plot(self):
    names = self.dataset.get_names()
    nyquist = self.dataset.get_nyquist_estimation()

    params = self.get_parameters()
    means = np.array([params[q]['mean'] for q in range(self.Q)])
    weights = np.array([params[q]['magnitude'] for q in range(self.Q)])**2
    scales = np.array([params[q]['variance'] for q in range(self.Q)])
    plot_spectrum(means, scales, weights=weights, nyquist=nyquist, titles=names)
def plot_psd(self, figsize=(20, 14), title='')

Plot power spectral density and power cross spectral density.

Note: Implemented only for 1 input dimension.

Expand source code Browse git
def plot_psd(self, figsize=(20, 14), title=''):
    """
    Plot power spectral density and power cross spectral density.

    Note: Implemented only for 1 input dimension.
    """

    cross_params = self._get_cross_parameters()
    m = self.dataset.get_output_dims()

    fig, axes = plt.subplots(m, m, sharex=False, figsize=figsize, squeeze=False)
    for i in range(m):
        for j in range(i+1):
            self._plot_power_cross_spectral_density(
                axes[i, j],
                cross_params,
                channels=(i, j))

    plt.tight_layout()
    return fig, axes

Inherited members