Module mogptk.gpr.model

Expand source code Browse git
import sys
import torch
import numpy as np
from scipy.stats import qmc, gaussian_kde
from IPython.display import display, HTML
from . import Parameter, Mean, Kernel, MultiOutputKernel, Likelihood, MultiOutputLikelihood, GaussianLikelihood, config, plot_gram

def _init_grid(N, X):
    n = np.power(N,1.0/X.shape[1])
    if not n.is_integer():
        raise ValueError("number of inducing points must equal N = n^%d" % X.shape[1])
    n = int(n)

    Z = torch.empty((N,X.shape[1]), device=config.device, dtype=config.dtype)
    grid = torch.meshgrid([torch.linspace(torch.min(X[:,i]), torch.max(X[:,i]), n) for i in range(X.shape[1])])
    for i in range(X.shape[1]):
        Z[:,i] = grid[i].flatten()
    return Z

def _init_random(N, X):
    sampler = qmc.Halton(d=X.shape[1])
    samples = torch.tensor(sampler.random(n=N), device=config.device, dtype=config.dtype)
    Z = torch.empty((N,X.shape[1]), device=config.device, dtype=config.dtype)
    for i in range(X.shape[1]):
        Z[:,i] = torch.min(X[:,i]) + (torch.max(X[:,i])-torch.min(X[:,i]))*samples[:,i]
    return Z

def _init_density(N, X):
    kernel = gaussian_kde(X.T.detach().cpu().numpy(), bw_method='scott')
    Z = torch.tensor(kernel.resample(N).T, device=config.device, dtype=config.dtype)
    return Z

def init_inducing_points(Z, X, method='grid', output_dims=None):
    """
    Initialize locations for inducing points.

    Args:
        Z (int,list): Number of inducing points. A list of ints of shape (output_dims,) will initialize the specified number of inducing points per output dimension.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        method (str): Method for initialization, can be `grid`, `random`, or `density`.
        output_dims (int): Number of output dimensions. If not None, the first input dimension of the input data must contain the channel IDs.

    Returns:
        torch.tensor: Inducing point locations of shape (data_points,input_dims). In case of multiple output dimensions, the first input dimension will be the channel ID.
    """
    _init = _init_grid
    if method == 'random':
        _init = _init_random
    elif method == 'density':
        _init = _init_density

    if output_dims is not None:
        if isinstance(Z, int) or all(isinstance(z, int) for z in Z) and len(Z) == output_dims:
            if isinstance(Z, int):
                Z = [Z] * output_dims
            M = Z
            Z = torch.zeros((sum(M),X.shape[1]))
            for j in range(len(M)):
                m0 = sum(M[:j])
                m = M[j]
                Z[m0:m0+m,0] = j
                Z[m0:m0+m,1:] = _init(m, X[X[:,0] == j,1:])
    elif isinstance(Z, int):
        M = Z
        Z = _init(M, X)
    return Z

class CholeskyException(Exception):
    def __init__(self, message, K, model):
        self.message = message
        self.K = K
        self.model = model

    def __str__(self):
        return self.message

class Model:
    """
    Base model class.

    Attributes:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        mean (mogptk.gpr.mean.Mean): Mean.
    """
    def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None, name=None):
        if not issubclass(type(kernel), Kernel):
            raise ValueError("kernel must derive from mogptk.gpr.Kernel")
        X, y = self._check_input(X, y)
        if mean is not None:
            if not issubclass(type(mean), Mean):
                raise ValueError("mean must derive from mogptk.gpr.Mean")
            mu = mean(X).reshape(-1,1)
            if mu.shape != y.shape:
                raise ValueError("mean and y data must match shapes: %s != %s" % (mu.shape, y.shape))

        if issubclass(type(likelihood), MultiOutputLikelihood) and likelihood.output_dims != kernel.output_dims:
            raise ValueError("kernel and likelihood must have matching output dimensions")
        likelihood.validate_y(y, X=X)

        # limit to number of significant digits
        if config.dtype == torch.float32:
            jitter = max(jitter, 1e-6)
        elif config.dtype == torch.float64:
            jitter = max(jitter, 1e-15)

        self._params = []
        self.kernel = kernel
        self.X = X
        self.y = y
        self.likelihood = likelihood
        self.jitter = jitter
        self.mean = mean
        self.name = name
        self.input_dims = X.shape[1]

    def __setattr__(self, name, val):
        if hasattr(self, name) and isinstance(getattr(self, name), Parameter):
            raise AttributeError("parameter is read-only, use Parameter.assign()")
        elif isinstance(val, (Parameter, Kernel, Mean, Likelihood)):
            self._register_parameters(val)
        super().__setattr__(name, val)        

    def _check_input(self, X, y=None):
        if not isinstance(X, torch.Tensor):
            X = torch.tensor(X, device=config.device, dtype=config.dtype)
        else:
            X = X.to(config.device, config.dtype)
        if X.ndim == 0:
            X = X.reshape(1,1)
        elif X.ndim == 1:
            X = X.reshape(-1,1)
        elif X.ndim != 2:
            raise ValueError("X must have dimensions (data_points,input_dims) with input_dims optional")
        if X.shape[0] == 0 or X.shape[1] == 0:
            raise ValueError("X must not be empty")

        if y is not None:
            if not isinstance(y, torch.Tensor):
                y = torch.tensor(y, device=config.device, dtype=config.dtype)
            else:
                y = y.to(config.device, config.dtype)
            if y.ndim == 0:
                y = y.reshape(1,1)
            elif y.ndim == 1:
                y = y.reshape(-1,1)
            elif y.ndim != 2 or y.shape[1] != 1:
                raise ValueError("y must have one dimension (data_points,)")
            if X.shape[0] != y.shape[0]:
                raise ValueError("number of data points for X and y must match")
            return X, y
        else:
            # X is for prediction
            if X.shape[1] != self.input_dims:
                raise ValueError("X must have %s input dimensions" % self.input_dims)
            return X

    def _index_channel(self, value, X):
        if self.kernel.output_dims is not None and 0 < value.ndim and value.shape[0] == self.kernel.output_dims:
            return torch.index_select(value, dim=0, index=X[:,0].long())
        return value

    def _register_parameters(self, obj, name=None):
        if isinstance(obj, Parameter):
            if obj.name is not None:
                if name is None:
                    name = obj.name
                else:
                    name += "." + obj.name
            elif name is None:
                name = ""
            obj.name = name
            self._params.append(obj)
        elif isinstance(obj, list):
            for i, v in enumerate(obj):
                self._register_parameters(v, (name if name is not None else "")+"["+str(i)+"]")
        elif issubclass(type(obj), (Kernel, Mean, Likelihood)):
            for v in obj.__dict__.values():
                self._register_parameters(v, (name+"." if name is not None else "")+obj.name)

    def zero_grad(self):
        for p in self._params:
            p = p.unconstrained
            if p.grad is not None:
                if p.grad.grad_fn is not None:
                    p.grad.detach_()
                else:
                    p.grad.requires_grad_(False)
                p.grad.zero_()

    def parameters(self):
        """
        Yield trainable parameters of model.

        Returns:
            Parameter generator
        """
        for p in self._params:
            if p.train:
                yield p.unconstrained

    def get_parameters(self):
        """
        Return all parameters of model.

        Returns:
            list: List of Parameters.
        """
        return self._params
    
    def print_parameters(self, file=None):
        """
        Print parameters and their values.
        """
        def param_range(lower, upper, train=True, pegged=False):
            if lower is not None:
                if lower.size == 1:
                    lower = lower.item()
                elif (lower.max()-lower.min())/lower.mean() < 1e-6:
                    lower = lower.mean().item()
                else:
                    lower = lower.tolist()
            if upper is not None:
                if upper.size == 1:
                    upper = upper.item()
                elif (upper.max()-upper.min())/upper.mean() < 1e-6:
                    upper = upper.mean().item()
                else:
                    upper = upper.tolist()

            if pegged:
                return "pegged"
            elif not train:
                return "fixed"
            if lower is None and upper is None:
                return "(-∞, ∞)"
            elif lower is None:
                return "(-∞, %s]" % upper
            elif upper is None:
                return "[%s, ∞)" % lower
            return "[%s, %s]" % (lower, upper)

        if file is None:
            try:
                get_ipython  # fails if we're not in a notebook
                table = '<table><tr><th style="text-align:left">Name</th><th>Range</th><th>Value</th></tr>'
                for p in self._params:
                    table += '<tr><td style="text-align:left">%s</td><td>%s</td><td>%s</td></tr>' % (p.name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy())
                table += '</table>'
                display(HTML(table))
                return
            except Exception as e:
                pass

        vals = [["Name", "Range", "Value"]]
        for p in self._params:
            vals.append([p.name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy().tolist()])

        nameWidth = max([len(val[0]) for val in vals])
        rangeWidth = max([len(val[1]) for val in vals])
        for val in vals:
            #print("%-*s  %-*s  %s" % (nameWidth, val[0], rangeWidth, val[1], val[2]), file=file)
            print("%-*s  %s" % (nameWidth, val[0], val[2]), file=file)

    def _cholesky(self, K, add_jitter=False):
        if add_jitter:
            K += (self.jitter * K.diagonal().mean()).repeat(K.shape[0]).diagflat()
        try:
            return torch.linalg.cholesky(K)
        except RuntimeError as e:
            print("ERROR:", e.args[0], file=sys.__stdout__)
            if K.isnan().any():
                print("ERROR: kernel matrix has NaNs!", file=sys.__stdout__)
            if K.isinf().any():
                print("ERROR: kernel matrix has infinities!", file=sys.__stdout__)
            self.print_parameters()
            plot_gram(K)
            raise CholeskyException(e.args[0], K, self)

    def log_marginal_likelihood(self):
        """
        Return the log marginal likelihood given by

        $$ \\log p(y) $$

        Returns:
            torch.tensor: Log marginal likelihood.
        """
        raise NotImplementedError()

    def log_prior(self):
        """
        Return the log prior given by

        $$ \\log p(\\theta) $$

        Returns:
            torch.tensor: Log prior.
        """
        return sum([p.log_prior() for p in self._params])

    def loss(self):
        """
        Model loss for training.

        Returns:
            torch.tensor: Loss.
        """
        self.zero_grad()
        loss = -self.log_marginal_likelihood() - self.log_prior()
        loss.backward()
        return loss

    def K(self, X1, X2=None):
        """
        Evaluate kernel at `X1` and `X2` and return the NumPy representation.

        Args:
            X1 (torch.tensor): Input of shape (data_points0,input_dims).
            X2 (torch.tensor): Input of shape (data_points1,input_dims).

        Returns:
            numpy.ndarray: Kernel matrix of shape (data_points0,data_points1).
        """
        with torch.no_grad():
            return self.kernel(X1, X2).cpu().numpy()

    def sample(self, Z, n=None, predict_y=True, prior=False):
        """
        Sample from model.

        Args:
            Z (torch.tensor): Input of shape (data_points,input_dims).
            n (int): Number of samples.
            predict_y (boolean): Predict the data values \\(y\\) instead of the function values \\(f\\).
            prior (boolean): Sample from prior instead of posterior.

        Returns:
            torch.tensor: Samples of shape (data_points,samples) or (data_points,) if `n` is not given.
        """
        with torch.no_grad():
            S = n
            if n is None:
                S = 1

            # TODO: predict_y and non-Gaussian likelihoods
            if prior:
                mu, var = self.mean(Z), self.kernel(Z)
            else:
                mu, var = self.predict(Z, full=True, tensor=True, predict_y=predict_y)  # MxD, MxMxD
            eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
            var += self.jitter * var.diagonal().mean() * eye  # MxM

            u = torch.normal(
                    torch.zeros(Z.shape[0], S, device=config.device, dtype=config.dtype),
                    torch.tensor(1.0, device=config.device, dtype=config.dtype))  # MxS
            L = torch.linalg.cholesky(var)  # MxM
            samples = mu + L.mm(u)  # MxS

            if n is None:
                samples = samples.squeeze()
            return samples.cpu().numpy()

class Exact(Model):
    """
    Regular Gaussian process regression with a Gaussian likelihood which allows for exact inference.

    $$ y \\sim \\mathcal{N}(0, K + \\sigma^2I) $$

    Args:
        kernel (mpgptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
        data_variance (torch.tensor): Assign different and fixed variances per data point. These are added to the kernel's diagonal while still training an additional Gaussian variance.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.
    """
    def __init__(self, kernel, X, y, variance=1.0, data_variance=None, jitter=1e-8, mean=None, name="Exact"):
        if data_variance is not None:
            data_variance = Parameter.to_tensor(data_variance)
            if data_variance.ndim != 1 or X.ndim == 2 and data_variance.shape[0] != X.shape[0]:
                raise ValueError("data variance must have shape (data_points,)")
            data_variance = data_variance.diagflat()
        self.data_variance = data_variance

        variance = Parameter.to_tensor(variance)
        channels = 1
        if kernel.output_dims is not None:
            channels = kernel.output_dims
        if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != channels:
            raise ValueError("variance must be float or have shape (channels,)")

        super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean, name)

        self.eye = torch.eye(self.X.shape[0], device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)

    def log_marginal_likelihood(self):
        Kff = self.kernel.K(self.X)
        Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye  # NxN
        if self.data_variance is not None:
            Kff += self.data_variance
        L = self._cholesky(Kff, add_jitter=True)  # NxN

        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        p = -self.log_marginal_likelihood_constant
        p -= L.diagonal().log().sum() # 0.5 is taken inside the log: L is the square root
        p -= 0.5*y.T.mm(torch.cholesky_solve(y,L)).squeeze()
        return p

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD
            if self.mean is not None:
                y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
            else:
                y = self.y  # Nx1

            Kff = self.kernel.K(self.X)
            Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye  # NxN
            if self.data_variance is not None:
                Kff += self.data_variance
            Kfs = self.kernel.K(self.X,Xs)  # NxM

            Lff = self._cholesky(Kff, add_jitter=True)  # NxN
            v = torch.linalg.solve_triangular(Lff,Kfs,upper=False)  # NxM

            mu = Kfs.T.mm(torch.cholesky_solve(y,Lff))  # Mx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Mx1

            if full:
                Kss = self.kernel.K(Xs)  # MxM
                var = Kss - v.T.mm(v)  # MxM
                if predict_y:
                    eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                    var += self._index_channel(self.likelihood.scale().square(), Xs) * eye
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - v.T.square().sum(dim=1)  # M
                if predict_y:
                    var += self._index_channel(self.likelihood.scale().square(), Xs)
                var = var.reshape(-1,1)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

class Snelson(Model):
    """
    A sparse Gaussian process regression based on Snelson and Ghahramani [1] with a Gaussian likelihood and inducing points.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
        Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
        variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] E. Snelson, Z. Ghahramani, "Sparse Gaussian Processes using Pseudo-inputs", 2005
    """
    def __init__(self, kernel, X, y, Z=10, Z_init='grid', variance=1.0, jitter=1e-8, mean=None,
                 name="Snelson"):
        variance = Parameter.to_tensor(variance).squeeze()
        if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != kernel.output_dims:
            raise ValueError("variance must be float or have shape (channels,)")

        super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean, name)

        Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
        Z = self._check_input(Z)
        
        self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
        self.Z = Parameter(Z, name="induction_points")
        if kernel.output_dims is not None:
            self.Z.num_parameters -= self.Z().shape[0]

    def log_marginal_likelihood(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kff_diag = self.kernel.K_diag(self.X)  # N
        Kuf = self.kernel.K(self.Z(),self.X)  # MxN
        Kuu = self.kernel.K(self.Z())  # MxM

        Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Luu = Kuu^(1/2)
        v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
        g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X)  # N;  diag(Kff-Qff) + sigma^2.I
        G = torch.diagflat(1.0/g)  # N
        L = self._cholesky(v.mm(G).mm(v.T) + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)

        c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False)  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.G.y

        p = -self.log_marginal_likelihood_constant
        p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
        p -= 0.5*g.log().sum()
        p -= 0.5*y.T.mm(G).mm(y).squeeze()
        p += 0.5*c.T.mm(c).squeeze()
        return p

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD
            if self.mean is not None:
                y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
            else:
                y = self.y  # Nx1

            Kff_diag = self.kernel.K_diag(self.X)  # N
            Kuf = self.kernel.K(self.Z(),self.X)  # MxN
            Kuu = self.kernel.K(self.Z())  # MxM
            Kus = self.kernel.K(self.Z(),Xs)  # MxS

            Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
            v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
            g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X)
            G = torch.diagflat(1.0/g)  # N
            L = self._cholesky(v.mm(G).mm(v.T) + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)

            a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # NxM
            b = torch.linalg.solve_triangular(L,a,upper=False)
            c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False)  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.G.y

            mu = b.T.mm(c)  # Mx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Mx1

            if full:
                Kss = self.kernel(Xs)  # MxM
                var = Kss - a.T.mm(w) + b.T.mm(u)  # MxM
                if predict_y:
                    eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                    var += self._select_channel(self.likelihood.scale().square(), Xs) * eye
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
                if predict_y:
                    var += self._index_channel(self.likelihood.scale().square(), Xs)
                var = var.reshape(-1,1)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

class OpperArchambeau(Model):
    """
    A Gaussian process regression based on Opper and Archambeau [1] with a non-Gaussian likelihood.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] M. Opper, C. Archambeau, "The Variational Gaussian Approximation Revisited", 2009
    """
    def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0),
                 jitter=1e-8, mean=None, name="OpperArchambeau"):
        super().__init__(kernel, X, y, likelihood, jitter, mean, name)

        n = self.X.shape[0]
        self.eye = torch.eye(n, device=config.device, dtype=config.dtype)
        self.q_nu = Parameter(torch.zeros(n,1), name="q_nu")
        self.q_lambda = Parameter(torch.ones(n,1), name="q_lambda", lower=config.positive_minimum)
        self.likelihood = likelihood

    def elbo(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        q_nu = self.q_nu()
        q_lambda = self.q_lambda()

        Kff = self.kernel(self.X)  # NxN
        L = self._cholesky(q_lambda*q_lambda.T*Kff + self.eye)
        invL = torch.linalg.solve_triangular(L,self.eye,upper=False)  # NxN

        qf_mu = Kff.mm(q_nu)
        qf_var_diag = 1.0/q_lambda.square() - (invL.T.mm(invL)/q_lambda/q_lambda.T).diagonal().reshape(-1,1)

        kl = -q_nu.shape[0]
        kl += q_nu.T.mm(qf_mu).squeeze()  # Mahalanobis
        kl += L.diagonal().square().log().sum()  # determinant TODO: is this correct?
        #kl += invL.diagonal().square().sum()  # trace
        kl += invL.square().sum()  # trace

        if self.mean is not None:
            qf_mu = qf_mu - self.mean(self.X).reshape(-1,1)  # Sx1
        var_exp = self.likelihood.variational_expectation(y, qf_mu, qf_var_diag, X=self.X)

        #eye = torch.eye(q_lambda.shape[0], device=config.device, dtype=config.dtype)
        #qf_var = (1.0/q_lambda.square())*eye - invL.T.mm(invL)/q_lambda/q_lambda.T
        #kl = -q_nu.shape[0]
        #kl += q_nu.T.mm(qf_mu).squeeze()  # Mahalanobis
        #kl -= qf_var.det().log()  # determinant
        #kl += invL.diagonal().square().sum()  # trace

        #kl = -q_nu.shape[0]
        #kl += q_nu.T.mm(q_nu).squeeze()  # Mahalanobis
        #kl -= qf_var.det().log()  # determinant
        #kl += qf_var_diag.sum()  # trace
        return var_exp - 0.5*kl

    def log_marginal_likelihood(self):
        # maximize the lower bound
        return self.elbo()

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD

            Kff = self.kernel(self.X)
            Kfs = self.kernel(self.X,Xs)  # NxS

            L = self._cholesky(Kff + (1.0/self.q_lambda().square()).diagflat())  # NxN
            a = torch.linalg.solve_triangular(L,Kfs,upper=False)  # NxS;  Kuu^(-1/2).Kus

            mu = Kfs.T.mm(self.q_nu())  # Sx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Sx1

            if full:
                Kss = self.kernel(Xs)  # SxS
                var = Kss - a.T.mm(a)  # SxS
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - a.T.square().sum(dim=1)  # M
                var = var.reshape(-1,1)

            if predict_y:
                mu, var = self.likelihood.predict(mu, var, full=full, X=Xs)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

class Titsias(Model):
    """
    A sparse Gaussian process regression based on Titsias [1] with a Gaussian likelihood.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
        Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
        variance (float): Gaussian likelihood initial variance.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] Titsias, "Variational learning of induced variables in sparse Gaussian processes", 2009
    """
    # See: http://krasserm.github.io/2020/12/12/gaussian-processes-sparse/
    def __init__(self, kernel, X, y, Z, Z_init='grid', variance=1.0, jitter=1e-8,
                 mean=None, name="Titsias"):
        # TODO: variance per channel
        variance = Parameter.to_tensor(variance)
        super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean, name)

        Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
        Z = self._check_input(Z)

        self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
        self.Z = Parameter(Z, name="induction_points")
        if kernel.output_dims is not None:
            self.Z.num_parameters -= self.Z().shape[0]

    def elbo(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kff_diag = self.kernel.K_diag(self.X)  # N
        Kuf = self.kernel(self.Z(),self.X)  # MxN
        Kuu = self.kernel(self.Z())  # MxM

        Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
        v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
        Q = v.mm(v.T)  # MxM;  Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)
        L = self._cholesky(Q/self.likelihood.scale().square() + self.eye)  # MxM;  (Q/sigma^2 + I)^(1/2)

        c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square()  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.y

        # p = log N(0, Kfu.Kuu^(-1).Kuf + I/sigma^2) - 1/(2.sigma^2).Trace(Kff - Kfu.Kuu^(-1).Kuf)
        p = -self.log_marginal_likelihood_constant
        p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
        p -= self.X.shape[0]*self.likelihood.scale().log()
        p -= 0.5*y.T.mm(y).squeeze()/self.likelihood.scale().square()
        p += 0.5*c.T.mm(c).squeeze()
        p -= 0.5*(Kff_diag.sum() - Q.trace())/self.likelihood.scale().square() # trace
        return p

    def log_marginal_likelihood(self):
        # maximize the lower bound
        return self.elbo()

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD
            if self.mean is not None:
                y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
            else:
                y = self.y  # Nx1

            Kus = self.kernel(self.Z(),Xs)  # MxS
            Kuf = self.kernel(self.Z(),self.X)  # MxN
            Kuu = self.kernel(self.Z())  # MxM

            Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
            v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
            L = self._cholesky(v.mm(v.T)/self.likelihood.scale().square() + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)/sigma^2 + I)^(1/2)

            a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # MxS;  Kuu^(-1/2).Kus
            b = torch.linalg.solve_triangular(L,a,upper=False)  # MxS;  L^(-1).Kuu^(-1/2).Kus
            c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square()  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.y

            # mu = sigma^(-2).Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kuf.y
            mu = b.T.mm(c)  # Mx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Mx1

            # var = Kss - Qsf.(Qff + sigma^2 I)^(-1).Qfs
            # below is the equivalent but more stable version by using the matrix inversion lemma
            # var = Kss - Ksu.Kuu^(-1).Kus + Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kus
            if full:
                Kss = self.kernel(Xs)  # MxM
                var = Kss - a.T.mm(a) + b.T.mm(b)  # MxM
                if predict_y:
                    eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                    var += self.likelihood.scale().square() * eye
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
                if predict_y:
                    var += self.likelihood.scale().square()
                var = var.reshape(-1,1)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

class SparseHensman(Model):
    """
    A sparse Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
        Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
    """
    # This version replaces mu_q by L.mu_q and sigma_q by L.sigma_q.L^T, where LL^T = Kuu
    # So that p(u) ~ N(0,1) and q(u) ~ N(L.mu_q, L.sigma_q.L^T)
    def __init__(self, kernel, X, y, Z=None, Z_init='grid',
                 likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None,
                 name="SparseHensman"):
        super().__init__(kernel, X, y, likelihood, jitter, mean, name)

        n = self.X.shape[0]
        self.is_sparse = Z is not None
        if self.is_sparse:
            Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
            Z = self._check_input(Z)
            n = Z.shape[0]

        self.eye = torch.eye(n, device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
        self.q_mu = Parameter(torch.zeros(n,1), name="q_mu")
        self.q_sqrt = Parameter(torch.eye(n), name="q_sqrt")
        self.q_sqrt.num_parameters = int((n*n+n)/2)
        if self.is_sparse:
            self.Z = Parameter(Z, name="induction_points")
            if kernel.output_dims is not None:
                self.Z.num_parameters -= self.Z().shape[0]
        else:
            self.Z = Parameter(self.X, train=False)  # don't use inducing points

    def kl_gaussian(self, q_mu, q_sqrt):
        S_diag = q_sqrt.diagonal().square() # NxN
        kl = -q_mu.shape[0]
        kl += q_mu.T.mm(q_mu).squeeze()  # Mahalanobis
        kl -= S_diag.log().sum()  # determinant of q_var
        kl += S_diag.sum()  # same as Trace(p_var^(-1).q_var)
        return 0.5*kl

    def elbo(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        if self.is_sparse:
            qf_mu, qf_var_diag = self._predict(self.X, full=False)
        else:
            Kff = self.kernel(self.X)
            Lff = self._cholesky(Kff, add_jitter=True)  # NxN

            qf_mu = Lff.mm(self.q_mu())
            if self.mean is not None:
                qf_mu -= self.mean(self.X).reshape(-1,1)  # Sx1

            qf_sqrt = Lff.mm(self.q_sqrt().tril())
            qf_var_diag = qf_sqrt.mm(qf_sqrt.T).diagonal().reshape(-1,1)

        var_exp = self.likelihood.variational_expectation(y, qf_mu, qf_var_diag, X=self.X)
        kl = self.kl_gaussian(self.q_mu(), self.q_sqrt())
        return var_exp - kl

    def log_marginal_likelihood(self):
        # maximize the lower bound
        return self.elbo()

    def _predict(self, Xs, full=False):
        Kuu = self.kernel(self.Z())
        Kus = self.kernel(self.Z(),Xs)  # NxS

        Luu = self._cholesky(Kuu, add_jitter=True)  # NxN
        a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # NxS;  Kuu^(-1/2).Kus
        b = self.q_sqrt().tril().T.mm(torch.linalg.solve_triangular(Luu,Kus,upper=False))

        mu = Kus.T.mm(torch.linalg.solve_triangular(Luu.T,self.q_mu(),upper=True))  # Sx1
        if full:
            Kss = self.kernel(Xs)  # SxS
            var = Kss - a.T.mm(a) + b.T.mm(b)  # SxS
        else:
            Kss_diag = self.kernel.K_diag(Xs)  # M
            var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
            var = var.reshape(-1,1)
        return mu, var

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD

            mu, var = self._predict(Xs, full=full)
            if predict_y:
                mu, var = self.likelihood.predict(mu, var, full=full, X=Xs)
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Sx1

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

class Hensman(SparseHensman):
    """
    A Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. This is equivalent to the `SparseHensman` model if we set the inducing points equal to the data points and by turning off training the inducing points.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
    """
    def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8,
                 mean=None, name="Hensman"):
        super().__init__(kernel, X, y, None, likelihood, jitter, mean, name)

Functions

def init_inducing_points(Z, X, method='grid', output_dims=None)

Initialize locations for inducing points.

Args

Z : int,list
Number of inducing points. A list of ints of shape (output_dims,) will initialize the specified number of inducing points per output dimension.
X : torch.tensor
Input data of shape (data_points,input_dims).
method : str
Method for initialization, can be grid, random, or density.
output_dims : int
Number of output dimensions. If not None, the first input dimension of the input data must contain the channel IDs.

Returns

torch.tensor
Inducing point locations of shape (data_points,input_dims). In case of multiple output dimensions, the first input dimension will be the channel ID.
Expand source code Browse git
def init_inducing_points(Z, X, method='grid', output_dims=None):
    """
    Initialize locations for inducing points.

    Args:
        Z (int,list): Number of inducing points. A list of ints of shape (output_dims,) will initialize the specified number of inducing points per output dimension.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        method (str): Method for initialization, can be `grid`, `random`, or `density`.
        output_dims (int): Number of output dimensions. If not None, the first input dimension of the input data must contain the channel IDs.

    Returns:
        torch.tensor: Inducing point locations of shape (data_points,input_dims). In case of multiple output dimensions, the first input dimension will be the channel ID.
    """
    _init = _init_grid
    if method == 'random':
        _init = _init_random
    elif method == 'density':
        _init = _init_density

    if output_dims is not None:
        if isinstance(Z, int) or all(isinstance(z, int) for z in Z) and len(Z) == output_dims:
            if isinstance(Z, int):
                Z = [Z] * output_dims
            M = Z
            Z = torch.zeros((sum(M),X.shape[1]))
            for j in range(len(M)):
                m0 = sum(M[:j])
                m = M[j]
                Z[m0:m0+m,0] = j
                Z[m0:m0+m,1:] = _init(m, X[X[:,0] == j,1:])
    elif isinstance(Z, int):
        M = Z
        Z = _init(M, X)
    return Z

Classes

class CholeskyException (message, K, model)

Common base class for all non-exit exceptions.

Expand source code Browse git
class CholeskyException(Exception):
    def __init__(self, message, K, model):
        self.message = message
        self.K = K
        self.model = model

    def __str__(self):
        return self.message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class Exact (kernel, X, y, variance=1.0, data_variance=None, jitter=1e-08, mean=None, name='Exact')

Regular Gaussian process regression with a Gaussian likelihood which allows for exact inference.

y \sim \mathcal{N}(0, K + \sigma^2I)

Args

kernel : mpgptk.gpr.kernel.Kernel
Kernel.
X : torch.tensor
Input data of shape (data_points,input_dims).
y : torch.tensor
Output data of shape (data_points,).
variance : float,torch.tensor
Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
data_variance : torch.tensor
Assign different and fixed variances per data point. These are added to the kernel's diagonal while still training an additional Gaussian variance.
jitter : float
Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean : Mean
Mean.
name : str
Name of the model.
Expand source code Browse git
class Exact(Model):
    """
    Regular Gaussian process regression with a Gaussian likelihood which allows for exact inference.

    $$ y \\sim \\mathcal{N}(0, K + \\sigma^2I) $$

    Args:
        kernel (mpgptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
        data_variance (torch.tensor): Assign different and fixed variances per data point. These are added to the kernel's diagonal while still training an additional Gaussian variance.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.
    """
    def __init__(self, kernel, X, y, variance=1.0, data_variance=None, jitter=1e-8, mean=None, name="Exact"):
        if data_variance is not None:
            data_variance = Parameter.to_tensor(data_variance)
            if data_variance.ndim != 1 or X.ndim == 2 and data_variance.shape[0] != X.shape[0]:
                raise ValueError("data variance must have shape (data_points,)")
            data_variance = data_variance.diagflat()
        self.data_variance = data_variance

        variance = Parameter.to_tensor(variance)
        channels = 1
        if kernel.output_dims is not None:
            channels = kernel.output_dims
        if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != channels:
            raise ValueError("variance must be float or have shape (channels,)")

        super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean, name)

        self.eye = torch.eye(self.X.shape[0], device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)

    def log_marginal_likelihood(self):
        Kff = self.kernel.K(self.X)
        Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye  # NxN
        if self.data_variance is not None:
            Kff += self.data_variance
        L = self._cholesky(Kff, add_jitter=True)  # NxN

        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        p = -self.log_marginal_likelihood_constant
        p -= L.diagonal().log().sum() # 0.5 is taken inside the log: L is the square root
        p -= 0.5*y.T.mm(torch.cholesky_solve(y,L)).squeeze()
        return p

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD
            if self.mean is not None:
                y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
            else:
                y = self.y  # Nx1

            Kff = self.kernel.K(self.X)
            Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye  # NxN
            if self.data_variance is not None:
                Kff += self.data_variance
            Kfs = self.kernel.K(self.X,Xs)  # NxM

            Lff = self._cholesky(Kff, add_jitter=True)  # NxN
            v = torch.linalg.solve_triangular(Lff,Kfs,upper=False)  # NxM

            mu = Kfs.T.mm(torch.cholesky_solve(y,Lff))  # Mx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Mx1

            if full:
                Kss = self.kernel.K(Xs)  # MxM
                var = Kss - v.T.mm(v)  # MxM
                if predict_y:
                    eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                    var += self._index_channel(self.likelihood.scale().square(), Xs) * eye
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - v.T.square().sum(dim=1)  # M
                if predict_y:
                    var += self._index_channel(self.likelihood.scale().square(), Xs)
                var = var.reshape(-1,1)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

Ancestors

Methods

def predict(self, Xs, full=False, tensor=False, predict_y=True)
Expand source code Browse git
def predict(self, Xs, full=False, tensor=False, predict_y=True):
    with torch.no_grad():
        Xs = self._check_input(Xs)  # MxD
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kff = self.kernel.K(self.X)
        Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye  # NxN
        if self.data_variance is not None:
            Kff += self.data_variance
        Kfs = self.kernel.K(self.X,Xs)  # NxM

        Lff = self._cholesky(Kff, add_jitter=True)  # NxN
        v = torch.linalg.solve_triangular(Lff,Kfs,upper=False)  # NxM

        mu = Kfs.T.mm(torch.cholesky_solve(y,Lff))  # Mx1
        if self.mean is not None:
            mu += self.mean(Xs).reshape(-1,1)  # Mx1

        if full:
            Kss = self.kernel.K(Xs)  # MxM
            var = Kss - v.T.mm(v)  # MxM
            if predict_y:
                eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                var += self._index_channel(self.likelihood.scale().square(), Xs) * eye
        else:
            Kss_diag = self.kernel.K_diag(Xs)  # M
            var = Kss_diag - v.T.square().sum(dim=1)  # M
            if predict_y:
                var += self._index_channel(self.likelihood.scale().square(), Xs)
            var = var.reshape(-1,1)

        if tensor:
            return mu, var
        else:
            return mu.cpu().numpy(), var.cpu().numpy()

Inherited members

class Hensman (kernel, X, y, likelihood=<mogptk.gpr.likelihood.GaussianLikelihood object>, jitter=1e-08, mean=None, name='Hensman')

A Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. This is equivalent to the SparseHensman model if we set the inducing points equal to the data points and by turning off training the inducing points.

Args

kernel : Kernel
Kernel.
X : torch.tensor
Input data of shape (data_points,input_dims).
y : torch.tensor
Output data of shape (data_points,).
likelihood : Likelihood
Likelihood.
jitter : float
Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean : Mean
Mean.
name : str
Name of the model.

[1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015

Expand source code Browse git
class Hensman(SparseHensman):
    """
    A Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. This is equivalent to the `SparseHensman` model if we set the inducing points equal to the data points and by turning off training the inducing points.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
    """
    def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8,
                 mean=None, name="Hensman"):
        super().__init__(kernel, X, y, None, likelihood, jitter, mean, name)

Ancestors

Inherited members

class Model (kernel, X, y, likelihood=<mogptk.gpr.likelihood.GaussianLikelihood object>, jitter=1e-08, mean=None, name=None)

Base model class.

Attributes

kernel : Kernel
Kernel.
likelihood : Likelihood
Likelihood.
mean : Mean
Mean.
Expand source code Browse git
class Model:
    """
    Base model class.

    Attributes:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        mean (mogptk.gpr.mean.Mean): Mean.
    """
    def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None, name=None):
        if not issubclass(type(kernel), Kernel):
            raise ValueError("kernel must derive from mogptk.gpr.Kernel")
        X, y = self._check_input(X, y)
        if mean is not None:
            if not issubclass(type(mean), Mean):
                raise ValueError("mean must derive from mogptk.gpr.Mean")
            mu = mean(X).reshape(-1,1)
            if mu.shape != y.shape:
                raise ValueError("mean and y data must match shapes: %s != %s" % (mu.shape, y.shape))

        if issubclass(type(likelihood), MultiOutputLikelihood) and likelihood.output_dims != kernel.output_dims:
            raise ValueError("kernel and likelihood must have matching output dimensions")
        likelihood.validate_y(y, X=X)

        # limit to number of significant digits
        if config.dtype == torch.float32:
            jitter = max(jitter, 1e-6)
        elif config.dtype == torch.float64:
            jitter = max(jitter, 1e-15)

        self._params = []
        self.kernel = kernel
        self.X = X
        self.y = y
        self.likelihood = likelihood
        self.jitter = jitter
        self.mean = mean
        self.name = name
        self.input_dims = X.shape[1]

    def __setattr__(self, name, val):
        if hasattr(self, name) and isinstance(getattr(self, name), Parameter):
            raise AttributeError("parameter is read-only, use Parameter.assign()")
        elif isinstance(val, (Parameter, Kernel, Mean, Likelihood)):
            self._register_parameters(val)
        super().__setattr__(name, val)        

    def _check_input(self, X, y=None):
        if not isinstance(X, torch.Tensor):
            X = torch.tensor(X, device=config.device, dtype=config.dtype)
        else:
            X = X.to(config.device, config.dtype)
        if X.ndim == 0:
            X = X.reshape(1,1)
        elif X.ndim == 1:
            X = X.reshape(-1,1)
        elif X.ndim != 2:
            raise ValueError("X must have dimensions (data_points,input_dims) with input_dims optional")
        if X.shape[0] == 0 or X.shape[1] == 0:
            raise ValueError("X must not be empty")

        if y is not None:
            if not isinstance(y, torch.Tensor):
                y = torch.tensor(y, device=config.device, dtype=config.dtype)
            else:
                y = y.to(config.device, config.dtype)
            if y.ndim == 0:
                y = y.reshape(1,1)
            elif y.ndim == 1:
                y = y.reshape(-1,1)
            elif y.ndim != 2 or y.shape[1] != 1:
                raise ValueError("y must have one dimension (data_points,)")
            if X.shape[0] != y.shape[0]:
                raise ValueError("number of data points for X and y must match")
            return X, y
        else:
            # X is for prediction
            if X.shape[1] != self.input_dims:
                raise ValueError("X must have %s input dimensions" % self.input_dims)
            return X

    def _index_channel(self, value, X):
        if self.kernel.output_dims is not None and 0 < value.ndim and value.shape[0] == self.kernel.output_dims:
            return torch.index_select(value, dim=0, index=X[:,0].long())
        return value

    def _register_parameters(self, obj, name=None):
        if isinstance(obj, Parameter):
            if obj.name is not None:
                if name is None:
                    name = obj.name
                else:
                    name += "." + obj.name
            elif name is None:
                name = ""
            obj.name = name
            self._params.append(obj)
        elif isinstance(obj, list):
            for i, v in enumerate(obj):
                self._register_parameters(v, (name if name is not None else "")+"["+str(i)+"]")
        elif issubclass(type(obj), (Kernel, Mean, Likelihood)):
            for v in obj.__dict__.values():
                self._register_parameters(v, (name+"." if name is not None else "")+obj.name)

    def zero_grad(self):
        for p in self._params:
            p = p.unconstrained
            if p.grad is not None:
                if p.grad.grad_fn is not None:
                    p.grad.detach_()
                else:
                    p.grad.requires_grad_(False)
                p.grad.zero_()

    def parameters(self):
        """
        Yield trainable parameters of model.

        Returns:
            Parameter generator
        """
        for p in self._params:
            if p.train:
                yield p.unconstrained

    def get_parameters(self):
        """
        Return all parameters of model.

        Returns:
            list: List of Parameters.
        """
        return self._params
    
    def print_parameters(self, file=None):
        """
        Print parameters and their values.
        """
        def param_range(lower, upper, train=True, pegged=False):
            if lower is not None:
                if lower.size == 1:
                    lower = lower.item()
                elif (lower.max()-lower.min())/lower.mean() < 1e-6:
                    lower = lower.mean().item()
                else:
                    lower = lower.tolist()
            if upper is not None:
                if upper.size == 1:
                    upper = upper.item()
                elif (upper.max()-upper.min())/upper.mean() < 1e-6:
                    upper = upper.mean().item()
                else:
                    upper = upper.tolist()

            if pegged:
                return "pegged"
            elif not train:
                return "fixed"
            if lower is None and upper is None:
                return "(-∞, ∞)"
            elif lower is None:
                return "(-∞, %s]" % upper
            elif upper is None:
                return "[%s, ∞)" % lower
            return "[%s, %s]" % (lower, upper)

        if file is None:
            try:
                get_ipython  # fails if we're not in a notebook
                table = '<table><tr><th style="text-align:left">Name</th><th>Range</th><th>Value</th></tr>'
                for p in self._params:
                    table += '<tr><td style="text-align:left">%s</td><td>%s</td><td>%s</td></tr>' % (p.name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy())
                table += '</table>'
                display(HTML(table))
                return
            except Exception as e:
                pass

        vals = [["Name", "Range", "Value"]]
        for p in self._params:
            vals.append([p.name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy().tolist()])

        nameWidth = max([len(val[0]) for val in vals])
        rangeWidth = max([len(val[1]) for val in vals])
        for val in vals:
            #print("%-*s  %-*s  %s" % (nameWidth, val[0], rangeWidth, val[1], val[2]), file=file)
            print("%-*s  %s" % (nameWidth, val[0], val[2]), file=file)

    def _cholesky(self, K, add_jitter=False):
        if add_jitter:
            K += (self.jitter * K.diagonal().mean()).repeat(K.shape[0]).diagflat()
        try:
            return torch.linalg.cholesky(K)
        except RuntimeError as e:
            print("ERROR:", e.args[0], file=sys.__stdout__)
            if K.isnan().any():
                print("ERROR: kernel matrix has NaNs!", file=sys.__stdout__)
            if K.isinf().any():
                print("ERROR: kernel matrix has infinities!", file=sys.__stdout__)
            self.print_parameters()
            plot_gram(K)
            raise CholeskyException(e.args[0], K, self)

    def log_marginal_likelihood(self):
        """
        Return the log marginal likelihood given by

        $$ \\log p(y) $$

        Returns:
            torch.tensor: Log marginal likelihood.
        """
        raise NotImplementedError()

    def log_prior(self):
        """
        Return the log prior given by

        $$ \\log p(\\theta) $$

        Returns:
            torch.tensor: Log prior.
        """
        return sum([p.log_prior() for p in self._params])

    def loss(self):
        """
        Model loss for training.

        Returns:
            torch.tensor: Loss.
        """
        self.zero_grad()
        loss = -self.log_marginal_likelihood() - self.log_prior()
        loss.backward()
        return loss

    def K(self, X1, X2=None):
        """
        Evaluate kernel at `X1` and `X2` and return the NumPy representation.

        Args:
            X1 (torch.tensor): Input of shape (data_points0,input_dims).
            X2 (torch.tensor): Input of shape (data_points1,input_dims).

        Returns:
            numpy.ndarray: Kernel matrix of shape (data_points0,data_points1).
        """
        with torch.no_grad():
            return self.kernel(X1, X2).cpu().numpy()

    def sample(self, Z, n=None, predict_y=True, prior=False):
        """
        Sample from model.

        Args:
            Z (torch.tensor): Input of shape (data_points,input_dims).
            n (int): Number of samples.
            predict_y (boolean): Predict the data values \\(y\\) instead of the function values \\(f\\).
            prior (boolean): Sample from prior instead of posterior.

        Returns:
            torch.tensor: Samples of shape (data_points,samples) or (data_points,) if `n` is not given.
        """
        with torch.no_grad():
            S = n
            if n is None:
                S = 1

            # TODO: predict_y and non-Gaussian likelihoods
            if prior:
                mu, var = self.mean(Z), self.kernel(Z)
            else:
                mu, var = self.predict(Z, full=True, tensor=True, predict_y=predict_y)  # MxD, MxMxD
            eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
            var += self.jitter * var.diagonal().mean() * eye  # MxM

            u = torch.normal(
                    torch.zeros(Z.shape[0], S, device=config.device, dtype=config.dtype),
                    torch.tensor(1.0, device=config.device, dtype=config.dtype))  # MxS
            L = torch.linalg.cholesky(var)  # MxM
            samples = mu + L.mm(u)  # MxS

            if n is None:
                samples = samples.squeeze()
            return samples.cpu().numpy()

Subclasses

Methods

def K(self, X1, X2=None)

Evaluate kernel at X1 and X2 and return the NumPy representation.

Args

X1 : torch.tensor
Input of shape (data_points0,input_dims).
X2 : torch.tensor
Input of shape (data_points1,input_dims).

Returns

numpy.ndarray
Kernel matrix of shape (data_points0,data_points1).
Expand source code Browse git
def K(self, X1, X2=None):
    """
    Evaluate kernel at `X1` and `X2` and return the NumPy representation.

    Args:
        X1 (torch.tensor): Input of shape (data_points0,input_dims).
        X2 (torch.tensor): Input of shape (data_points1,input_dims).

    Returns:
        numpy.ndarray: Kernel matrix of shape (data_points0,data_points1).
    """
    with torch.no_grad():
        return self.kernel(X1, X2).cpu().numpy()
def get_parameters(self)

Return all parameters of model.

Returns

list
List of Parameters.
Expand source code Browse git
def get_parameters(self):
    """
    Return all parameters of model.

    Returns:
        list: List of Parameters.
    """
    return self._params
def log_marginal_likelihood(self)

Return the log marginal likelihood given by

\log p(y)

Returns

torch.tensor
Log marginal likelihood.
Expand source code Browse git
def log_marginal_likelihood(self):
    """
    Return the log marginal likelihood given by

    $$ \\log p(y) $$

    Returns:
        torch.tensor: Log marginal likelihood.
    """
    raise NotImplementedError()
def log_prior(self)

Return the log prior given by

\log p(\theta)

Returns

torch.tensor
Log prior.
Expand source code Browse git
def log_prior(self):
    """
    Return the log prior given by

    $$ \\log p(\\theta) $$

    Returns:
        torch.tensor: Log prior.
    """
    return sum([p.log_prior() for p in self._params])
def loss(self)

Model loss for training.

Returns

torch.tensor
Loss.
Expand source code Browse git
def loss(self):
    """
    Model loss for training.

    Returns:
        torch.tensor: Loss.
    """
    self.zero_grad()
    loss = -self.log_marginal_likelihood() - self.log_prior()
    loss.backward()
    return loss
def parameters(self)

Yield trainable parameters of model.

Returns

Parameter generator

Expand source code Browse git
def parameters(self):
    """
    Yield trainable parameters of model.

    Returns:
        Parameter generator
    """
    for p in self._params:
        if p.train:
            yield p.unconstrained
def print_parameters(self, file=None)

Print parameters and their values.

Expand source code Browse git
def print_parameters(self, file=None):
    """
    Print parameters and their values.
    """
    def param_range(lower, upper, train=True, pegged=False):
        if lower is not None:
            if lower.size == 1:
                lower = lower.item()
            elif (lower.max()-lower.min())/lower.mean() < 1e-6:
                lower = lower.mean().item()
            else:
                lower = lower.tolist()
        if upper is not None:
            if upper.size == 1:
                upper = upper.item()
            elif (upper.max()-upper.min())/upper.mean() < 1e-6:
                upper = upper.mean().item()
            else:
                upper = upper.tolist()

        if pegged:
            return "pegged"
        elif not train:
            return "fixed"
        if lower is None and upper is None:
            return "(-∞, ∞)"
        elif lower is None:
            return "(-∞, %s]" % upper
        elif upper is None:
            return "[%s, ∞)" % lower
        return "[%s, %s]" % (lower, upper)

    if file is None:
        try:
            get_ipython  # fails if we're not in a notebook
            table = '<table><tr><th style="text-align:left">Name</th><th>Range</th><th>Value</th></tr>'
            for p in self._params:
                table += '<tr><td style="text-align:left">%s</td><td>%s</td><td>%s</td></tr>' % (p.name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy())
            table += '</table>'
            display(HTML(table))
            return
        except Exception as e:
            pass

    vals = [["Name", "Range", "Value"]]
    for p in self._params:
        vals.append([p.name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy().tolist()])

    nameWidth = max([len(val[0]) for val in vals])
    rangeWidth = max([len(val[1]) for val in vals])
    for val in vals:
        #print("%-*s  %-*s  %s" % (nameWidth, val[0], rangeWidth, val[1], val[2]), file=file)
        print("%-*s  %s" % (nameWidth, val[0], val[2]), file=file)
def sample(self, Z, n=None, predict_y=True, prior=False)

Sample from model.

Args

Z : torch.tensor
Input of shape (data_points,input_dims).
n : int
Number of samples.
predict_y : boolean
Predict the data values y instead of the function values f.
prior : boolean
Sample from prior instead of posterior.

Returns

torch.tensor
Samples of shape (data_points,samples) or (data_points,) if n is not given.
Expand source code Browse git
def sample(self, Z, n=None, predict_y=True, prior=False):
    """
    Sample from model.

    Args:
        Z (torch.tensor): Input of shape (data_points,input_dims).
        n (int): Number of samples.
        predict_y (boolean): Predict the data values \\(y\\) instead of the function values \\(f\\).
        prior (boolean): Sample from prior instead of posterior.

    Returns:
        torch.tensor: Samples of shape (data_points,samples) or (data_points,) if `n` is not given.
    """
    with torch.no_grad():
        S = n
        if n is None:
            S = 1

        # TODO: predict_y and non-Gaussian likelihoods
        if prior:
            mu, var = self.mean(Z), self.kernel(Z)
        else:
            mu, var = self.predict(Z, full=True, tensor=True, predict_y=predict_y)  # MxD, MxMxD
        eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
        var += self.jitter * var.diagonal().mean() * eye  # MxM

        u = torch.normal(
                torch.zeros(Z.shape[0], S, device=config.device, dtype=config.dtype),
                torch.tensor(1.0, device=config.device, dtype=config.dtype))  # MxS
        L = torch.linalg.cholesky(var)  # MxM
        samples = mu + L.mm(u)  # MxS

        if n is None:
            samples = samples.squeeze()
        return samples.cpu().numpy()
def zero_grad(self)
Expand source code Browse git
def zero_grad(self):
    for p in self._params:
        p = p.unconstrained
        if p.grad is not None:
            if p.grad.grad_fn is not None:
                p.grad.detach_()
            else:
                p.grad.requires_grad_(False)
            p.grad.zero_()
class OpperArchambeau (kernel, X, y, likelihood=<mogptk.gpr.likelihood.GaussianLikelihood object>, jitter=1e-08, mean=None, name='OpperArchambeau')

A Gaussian process regression based on Opper and Archambeau [1] with a non-Gaussian likelihood.

Args

kernel : Kernel
Kernel.
X : torch.tensor
Input data of shape (data_points,input_dims).
y : torch.tensor
Output data of shape (data_points,).
likelihood : Likelihood
Likelihood.
jitter : float
Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean : Mean
Mean.
name : str
Name of the model.

[1] M. Opper, C. Archambeau, "The Variational Gaussian Approximation Revisited", 2009

Expand source code Browse git
class OpperArchambeau(Model):
    """
    A Gaussian process regression based on Opper and Archambeau [1] with a non-Gaussian likelihood.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] M. Opper, C. Archambeau, "The Variational Gaussian Approximation Revisited", 2009
    """
    def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0),
                 jitter=1e-8, mean=None, name="OpperArchambeau"):
        super().__init__(kernel, X, y, likelihood, jitter, mean, name)

        n = self.X.shape[0]
        self.eye = torch.eye(n, device=config.device, dtype=config.dtype)
        self.q_nu = Parameter(torch.zeros(n,1), name="q_nu")
        self.q_lambda = Parameter(torch.ones(n,1), name="q_lambda", lower=config.positive_minimum)
        self.likelihood = likelihood

    def elbo(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        q_nu = self.q_nu()
        q_lambda = self.q_lambda()

        Kff = self.kernel(self.X)  # NxN
        L = self._cholesky(q_lambda*q_lambda.T*Kff + self.eye)
        invL = torch.linalg.solve_triangular(L,self.eye,upper=False)  # NxN

        qf_mu = Kff.mm(q_nu)
        qf_var_diag = 1.0/q_lambda.square() - (invL.T.mm(invL)/q_lambda/q_lambda.T).diagonal().reshape(-1,1)

        kl = -q_nu.shape[0]
        kl += q_nu.T.mm(qf_mu).squeeze()  # Mahalanobis
        kl += L.diagonal().square().log().sum()  # determinant TODO: is this correct?
        #kl += invL.diagonal().square().sum()  # trace
        kl += invL.square().sum()  # trace

        if self.mean is not None:
            qf_mu = qf_mu - self.mean(self.X).reshape(-1,1)  # Sx1
        var_exp = self.likelihood.variational_expectation(y, qf_mu, qf_var_diag, X=self.X)

        #eye = torch.eye(q_lambda.shape[0], device=config.device, dtype=config.dtype)
        #qf_var = (1.0/q_lambda.square())*eye - invL.T.mm(invL)/q_lambda/q_lambda.T
        #kl = -q_nu.shape[0]
        #kl += q_nu.T.mm(qf_mu).squeeze()  # Mahalanobis
        #kl -= qf_var.det().log()  # determinant
        #kl += invL.diagonal().square().sum()  # trace

        #kl = -q_nu.shape[0]
        #kl += q_nu.T.mm(q_nu).squeeze()  # Mahalanobis
        #kl -= qf_var.det().log()  # determinant
        #kl += qf_var_diag.sum()  # trace
        return var_exp - 0.5*kl

    def log_marginal_likelihood(self):
        # maximize the lower bound
        return self.elbo()

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD

            Kff = self.kernel(self.X)
            Kfs = self.kernel(self.X,Xs)  # NxS

            L = self._cholesky(Kff + (1.0/self.q_lambda().square()).diagflat())  # NxN
            a = torch.linalg.solve_triangular(L,Kfs,upper=False)  # NxS;  Kuu^(-1/2).Kus

            mu = Kfs.T.mm(self.q_nu())  # Sx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Sx1

            if full:
                Kss = self.kernel(Xs)  # SxS
                var = Kss - a.T.mm(a)  # SxS
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - a.T.square().sum(dim=1)  # M
                var = var.reshape(-1,1)

            if predict_y:
                mu, var = self.likelihood.predict(mu, var, full=full, X=Xs)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

Ancestors

Methods

def elbo(self)
Expand source code Browse git
def elbo(self):
    if self.mean is not None:
        y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
    else:
        y = self.y  # Nx1

    q_nu = self.q_nu()
    q_lambda = self.q_lambda()

    Kff = self.kernel(self.X)  # NxN
    L = self._cholesky(q_lambda*q_lambda.T*Kff + self.eye)
    invL = torch.linalg.solve_triangular(L,self.eye,upper=False)  # NxN

    qf_mu = Kff.mm(q_nu)
    qf_var_diag = 1.0/q_lambda.square() - (invL.T.mm(invL)/q_lambda/q_lambda.T).diagonal().reshape(-1,1)

    kl = -q_nu.shape[0]
    kl += q_nu.T.mm(qf_mu).squeeze()  # Mahalanobis
    kl += L.diagonal().square().log().sum()  # determinant TODO: is this correct?
    #kl += invL.diagonal().square().sum()  # trace
    kl += invL.square().sum()  # trace

    if self.mean is not None:
        qf_mu = qf_mu - self.mean(self.X).reshape(-1,1)  # Sx1
    var_exp = self.likelihood.variational_expectation(y, qf_mu, qf_var_diag, X=self.X)

    #eye = torch.eye(q_lambda.shape[0], device=config.device, dtype=config.dtype)
    #qf_var = (1.0/q_lambda.square())*eye - invL.T.mm(invL)/q_lambda/q_lambda.T
    #kl = -q_nu.shape[0]
    #kl += q_nu.T.mm(qf_mu).squeeze()  # Mahalanobis
    #kl -= qf_var.det().log()  # determinant
    #kl += invL.diagonal().square().sum()  # trace

    #kl = -q_nu.shape[0]
    #kl += q_nu.T.mm(q_nu).squeeze()  # Mahalanobis
    #kl -= qf_var.det().log()  # determinant
    #kl += qf_var_diag.sum()  # trace
    return var_exp - 0.5*kl
def predict(self, Xs, full=False, tensor=False, predict_y=True)
Expand source code Browse git
def predict(self, Xs, full=False, tensor=False, predict_y=True):
    with torch.no_grad():
        Xs = self._check_input(Xs)  # MxD

        Kff = self.kernel(self.X)
        Kfs = self.kernel(self.X,Xs)  # NxS

        L = self._cholesky(Kff + (1.0/self.q_lambda().square()).diagflat())  # NxN
        a = torch.linalg.solve_triangular(L,Kfs,upper=False)  # NxS;  Kuu^(-1/2).Kus

        mu = Kfs.T.mm(self.q_nu())  # Sx1
        if self.mean is not None:
            mu += self.mean(Xs).reshape(-1,1)  # Sx1

        if full:
            Kss = self.kernel(Xs)  # SxS
            var = Kss - a.T.mm(a)  # SxS
        else:
            Kss_diag = self.kernel.K_diag(Xs)  # M
            var = Kss_diag - a.T.square().sum(dim=1)  # M
            var = var.reshape(-1,1)

        if predict_y:
            mu, var = self.likelihood.predict(mu, var, full=full, X=Xs)

        if tensor:
            return mu, var
        else:
            return mu.cpu().numpy(), var.cpu().numpy()

Inherited members

class Snelson (kernel, X, y, Z=10, Z_init='grid', variance=1.0, jitter=1e-08, mean=None, name='Snelson')

A sparse Gaussian process regression based on Snelson and Ghahramani [1] with a Gaussian likelihood and inducing points.

Args

kernel : Kernel
Kernel.
X : torch.tensor
Input data of shape (data_points,input_dims).
y : torch.tensor
Output data of shape (data_points,).
Z : int,torch.tensor
Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init : str
Method for initialization of inducing points, can be grid, random, or density.
variance : float,torch.tensor
Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
jitter : float
Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean : Mean
Mean.
name : str
Name of the model.

[1] E. Snelson, Z. Ghahramani, "Sparse Gaussian Processes using Pseudo-inputs", 2005

Expand source code Browse git
class Snelson(Model):
    """
    A sparse Gaussian process regression based on Snelson and Ghahramani [1] with a Gaussian likelihood and inducing points.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
        Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
        variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] E. Snelson, Z. Ghahramani, "Sparse Gaussian Processes using Pseudo-inputs", 2005
    """
    def __init__(self, kernel, X, y, Z=10, Z_init='grid', variance=1.0, jitter=1e-8, mean=None,
                 name="Snelson"):
        variance = Parameter.to_tensor(variance).squeeze()
        if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != kernel.output_dims:
            raise ValueError("variance must be float or have shape (channels,)")

        super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean, name)

        Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
        Z = self._check_input(Z)
        
        self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
        self.Z = Parameter(Z, name="induction_points")
        if kernel.output_dims is not None:
            self.Z.num_parameters -= self.Z().shape[0]

    def log_marginal_likelihood(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kff_diag = self.kernel.K_diag(self.X)  # N
        Kuf = self.kernel.K(self.Z(),self.X)  # MxN
        Kuu = self.kernel.K(self.Z())  # MxM

        Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Luu = Kuu^(1/2)
        v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
        g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X)  # N;  diag(Kff-Qff) + sigma^2.I
        G = torch.diagflat(1.0/g)  # N
        L = self._cholesky(v.mm(G).mm(v.T) + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)

        c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False)  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.G.y

        p = -self.log_marginal_likelihood_constant
        p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
        p -= 0.5*g.log().sum()
        p -= 0.5*y.T.mm(G).mm(y).squeeze()
        p += 0.5*c.T.mm(c).squeeze()
        return p

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD
            if self.mean is not None:
                y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
            else:
                y = self.y  # Nx1

            Kff_diag = self.kernel.K_diag(self.X)  # N
            Kuf = self.kernel.K(self.Z(),self.X)  # MxN
            Kuu = self.kernel.K(self.Z())  # MxM
            Kus = self.kernel.K(self.Z(),Xs)  # MxS

            Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
            v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
            g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X)
            G = torch.diagflat(1.0/g)  # N
            L = self._cholesky(v.mm(G).mm(v.T) + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)

            a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # NxM
            b = torch.linalg.solve_triangular(L,a,upper=False)
            c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False)  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.G.y

            mu = b.T.mm(c)  # Mx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Mx1

            if full:
                Kss = self.kernel(Xs)  # MxM
                var = Kss - a.T.mm(w) + b.T.mm(u)  # MxM
                if predict_y:
                    eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                    var += self._select_channel(self.likelihood.scale().square(), Xs) * eye
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
                if predict_y:
                    var += self._index_channel(self.likelihood.scale().square(), Xs)
                var = var.reshape(-1,1)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

Ancestors

Methods

def predict(self, Xs, full=False, tensor=False, predict_y=True)
Expand source code Browse git
def predict(self, Xs, full=False, tensor=False, predict_y=True):
    with torch.no_grad():
        Xs = self._check_input(Xs)  # MxD
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kff_diag = self.kernel.K_diag(self.X)  # N
        Kuf = self.kernel.K(self.Z(),self.X)  # MxN
        Kuu = self.kernel.K(self.Z())  # MxM
        Kus = self.kernel.K(self.Z(),Xs)  # MxS

        Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
        v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
        g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X)
        G = torch.diagflat(1.0/g)  # N
        L = self._cholesky(v.mm(G).mm(v.T) + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)

        a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # NxM
        b = torch.linalg.solve_triangular(L,a,upper=False)
        c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False)  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.G.y

        mu = b.T.mm(c)  # Mx1
        if self.mean is not None:
            mu += self.mean(Xs).reshape(-1,1)  # Mx1

        if full:
            Kss = self.kernel(Xs)  # MxM
            var = Kss - a.T.mm(w) + b.T.mm(u)  # MxM
            if predict_y:
                eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                var += self._select_channel(self.likelihood.scale().square(), Xs) * eye
        else:
            Kss_diag = self.kernel.K_diag(Xs)  # M
            var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
            if predict_y:
                var += self._index_channel(self.likelihood.scale().square(), Xs)
            var = var.reshape(-1,1)

        if tensor:
            return mu, var
        else:
            return mu.cpu().numpy(), var.cpu().numpy()

Inherited members

class SparseHensman (kernel, X, y, Z=None, Z_init='grid', likelihood=<mogptk.gpr.likelihood.GaussianLikelihood object>, jitter=1e-08, mean=None, name='SparseHensman')

A sparse Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood.

Args

kernel : Kernel
Kernel.
X : torch.tensor
Input data of shape (data_points,input_dims).
y : torch.tensor
Output data of shape (data_points,).
Z : int,torch.tensor
Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init : str
Method for initialization of inducing points, can be grid, random, or density.
likelihood : Likelihood
Likelihood.
jitter : float
Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean : Mean
Mean.
name : str
Name of the model.

[1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015

Expand source code Browse git
class SparseHensman(Model):
    """
    A sparse Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
        Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
        likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
    """
    # This version replaces mu_q by L.mu_q and sigma_q by L.sigma_q.L^T, where LL^T = Kuu
    # So that p(u) ~ N(0,1) and q(u) ~ N(L.mu_q, L.sigma_q.L^T)
    def __init__(self, kernel, X, y, Z=None, Z_init='grid',
                 likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None,
                 name="SparseHensman"):
        super().__init__(kernel, X, y, likelihood, jitter, mean, name)

        n = self.X.shape[0]
        self.is_sparse = Z is not None
        if self.is_sparse:
            Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
            Z = self._check_input(Z)
            n = Z.shape[0]

        self.eye = torch.eye(n, device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
        self.q_mu = Parameter(torch.zeros(n,1), name="q_mu")
        self.q_sqrt = Parameter(torch.eye(n), name="q_sqrt")
        self.q_sqrt.num_parameters = int((n*n+n)/2)
        if self.is_sparse:
            self.Z = Parameter(Z, name="induction_points")
            if kernel.output_dims is not None:
                self.Z.num_parameters -= self.Z().shape[0]
        else:
            self.Z = Parameter(self.X, train=False)  # don't use inducing points

    def kl_gaussian(self, q_mu, q_sqrt):
        S_diag = q_sqrt.diagonal().square() # NxN
        kl = -q_mu.shape[0]
        kl += q_mu.T.mm(q_mu).squeeze()  # Mahalanobis
        kl -= S_diag.log().sum()  # determinant of q_var
        kl += S_diag.sum()  # same as Trace(p_var^(-1).q_var)
        return 0.5*kl

    def elbo(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        if self.is_sparse:
            qf_mu, qf_var_diag = self._predict(self.X, full=False)
        else:
            Kff = self.kernel(self.X)
            Lff = self._cholesky(Kff, add_jitter=True)  # NxN

            qf_mu = Lff.mm(self.q_mu())
            if self.mean is not None:
                qf_mu -= self.mean(self.X).reshape(-1,1)  # Sx1

            qf_sqrt = Lff.mm(self.q_sqrt().tril())
            qf_var_diag = qf_sqrt.mm(qf_sqrt.T).diagonal().reshape(-1,1)

        var_exp = self.likelihood.variational_expectation(y, qf_mu, qf_var_diag, X=self.X)
        kl = self.kl_gaussian(self.q_mu(), self.q_sqrt())
        return var_exp - kl

    def log_marginal_likelihood(self):
        # maximize the lower bound
        return self.elbo()

    def _predict(self, Xs, full=False):
        Kuu = self.kernel(self.Z())
        Kus = self.kernel(self.Z(),Xs)  # NxS

        Luu = self._cholesky(Kuu, add_jitter=True)  # NxN
        a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # NxS;  Kuu^(-1/2).Kus
        b = self.q_sqrt().tril().T.mm(torch.linalg.solve_triangular(Luu,Kus,upper=False))

        mu = Kus.T.mm(torch.linalg.solve_triangular(Luu.T,self.q_mu(),upper=True))  # Sx1
        if full:
            Kss = self.kernel(Xs)  # SxS
            var = Kss - a.T.mm(a) + b.T.mm(b)  # SxS
        else:
            Kss_diag = self.kernel.K_diag(Xs)  # M
            var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
            var = var.reshape(-1,1)
        return mu, var

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD

            mu, var = self._predict(Xs, full=full)
            if predict_y:
                mu, var = self.likelihood.predict(mu, var, full=full, X=Xs)
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Sx1

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

Ancestors

Subclasses

Methods

def elbo(self)
Expand source code Browse git
def elbo(self):
    if self.mean is not None:
        y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
    else:
        y = self.y  # Nx1

    if self.is_sparse:
        qf_mu, qf_var_diag = self._predict(self.X, full=False)
    else:
        Kff = self.kernel(self.X)
        Lff = self._cholesky(Kff, add_jitter=True)  # NxN

        qf_mu = Lff.mm(self.q_mu())
        if self.mean is not None:
            qf_mu -= self.mean(self.X).reshape(-1,1)  # Sx1

        qf_sqrt = Lff.mm(self.q_sqrt().tril())
        qf_var_diag = qf_sqrt.mm(qf_sqrt.T).diagonal().reshape(-1,1)

    var_exp = self.likelihood.variational_expectation(y, qf_mu, qf_var_diag, X=self.X)
    kl = self.kl_gaussian(self.q_mu(), self.q_sqrt())
    return var_exp - kl
def kl_gaussian(self, q_mu, q_sqrt)
Expand source code Browse git
def kl_gaussian(self, q_mu, q_sqrt):
    S_diag = q_sqrt.diagonal().square() # NxN
    kl = -q_mu.shape[0]
    kl += q_mu.T.mm(q_mu).squeeze()  # Mahalanobis
    kl -= S_diag.log().sum()  # determinant of q_var
    kl += S_diag.sum()  # same as Trace(p_var^(-1).q_var)
    return 0.5*kl
def predict(self, Xs, full=False, tensor=False, predict_y=True)
Expand source code Browse git
def predict(self, Xs, full=False, tensor=False, predict_y=True):
    with torch.no_grad():
        Xs = self._check_input(Xs)  # MxD

        mu, var = self._predict(Xs, full=full)
        if predict_y:
            mu, var = self.likelihood.predict(mu, var, full=full, X=Xs)
        if self.mean is not None:
            mu += self.mean(Xs).reshape(-1,1)  # Sx1

        if tensor:
            return mu, var
        else:
            return mu.cpu().numpy(), var.cpu().numpy()

Inherited members

class Titsias (kernel, X, y, Z, Z_init='grid', variance=1.0, jitter=1e-08, mean=None, name='Titsias')

A sparse Gaussian process regression based on Titsias [1] with a Gaussian likelihood.

Args

kernel : Kernel
Kernel.
X : torch.tensor
Input data of shape (data_points,input_dims).
y : torch.tensor
Output data of shape (data_points,).
Z : int,torch.tensor
Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init : str
Method for initialization of inducing points, can be grid, random, or density.
variance : float
Gaussian likelihood initial variance.
jitter : float
Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean : Mean
Mean.
name : str
Name of the model.

[1] Titsias, "Variational learning of induced variables in sparse Gaussian processes", 2009

Expand source code Browse git
class Titsias(Model):
    """
    A sparse Gaussian process regression based on Titsias [1] with a Gaussian likelihood.

    Args:
        kernel (mogptk.gpr.kernel.Kernel): Kernel.
        X (torch.tensor): Input data of shape (data_points,input_dims).
        y (torch.tensor): Output data of shape (data_points,).
        Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
        Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
        variance (float): Gaussian likelihood initial variance.
        jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
        mean (mogptk.gpr.mean.Mean): Mean.
        name (str): Name of the model.

    [1] Titsias, "Variational learning of induced variables in sparse Gaussian processes", 2009
    """
    # See: http://krasserm.github.io/2020/12/12/gaussian-processes-sparse/
    def __init__(self, kernel, X, y, Z, Z_init='grid', variance=1.0, jitter=1e-8,
                 mean=None, name="Titsias"):
        # TODO: variance per channel
        variance = Parameter.to_tensor(variance)
        super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean, name)

        Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
        Z = self._check_input(Z)

        self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype)
        self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
        self.Z = Parameter(Z, name="induction_points")
        if kernel.output_dims is not None:
            self.Z.num_parameters -= self.Z().shape[0]

    def elbo(self):
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kff_diag = self.kernel.K_diag(self.X)  # N
        Kuf = self.kernel(self.Z(),self.X)  # MxN
        Kuu = self.kernel(self.Z())  # MxM

        Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
        v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
        Q = v.mm(v.T)  # MxM;  Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)
        L = self._cholesky(Q/self.likelihood.scale().square() + self.eye)  # MxM;  (Q/sigma^2 + I)^(1/2)

        c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square()  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.y

        # p = log N(0, Kfu.Kuu^(-1).Kuf + I/sigma^2) - 1/(2.sigma^2).Trace(Kff - Kfu.Kuu^(-1).Kuf)
        p = -self.log_marginal_likelihood_constant
        p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
        p -= self.X.shape[0]*self.likelihood.scale().log()
        p -= 0.5*y.T.mm(y).squeeze()/self.likelihood.scale().square()
        p += 0.5*c.T.mm(c).squeeze()
        p -= 0.5*(Kff_diag.sum() - Q.trace())/self.likelihood.scale().square() # trace
        return p

    def log_marginal_likelihood(self):
        # maximize the lower bound
        return self.elbo()

    def predict(self, Xs, full=False, tensor=False, predict_y=True):
        with torch.no_grad():
            Xs = self._check_input(Xs)  # MxD
            if self.mean is not None:
                y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
            else:
                y = self.y  # Nx1

            Kus = self.kernel(self.Z(),Xs)  # MxS
            Kuf = self.kernel(self.Z(),self.X)  # MxN
            Kuu = self.kernel(self.Z())  # MxM

            Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
            v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
            L = self._cholesky(v.mm(v.T)/self.likelihood.scale().square() + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)/sigma^2 + I)^(1/2)

            a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # MxS;  Kuu^(-1/2).Kus
            b = torch.linalg.solve_triangular(L,a,upper=False)  # MxS;  L^(-1).Kuu^(-1/2).Kus
            c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square()  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.y

            # mu = sigma^(-2).Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kuf.y
            mu = b.T.mm(c)  # Mx1
            if self.mean is not None:
                mu += self.mean(Xs).reshape(-1,1)  # Mx1

            # var = Kss - Qsf.(Qff + sigma^2 I)^(-1).Qfs
            # below is the equivalent but more stable version by using the matrix inversion lemma
            # var = Kss - Ksu.Kuu^(-1).Kus + Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kus
            if full:
                Kss = self.kernel(Xs)  # MxM
                var = Kss - a.T.mm(a) + b.T.mm(b)  # MxM
                if predict_y:
                    eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                    var += self.likelihood.scale().square() * eye
            else:
                Kss_diag = self.kernel.K_diag(Xs)  # M
                var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
                if predict_y:
                    var += self.likelihood.scale().square()
                var = var.reshape(-1,1)

            if tensor:
                return mu, var
            else:
                return mu.cpu().numpy(), var.cpu().numpy()

Ancestors

Methods

def elbo(self)
Expand source code Browse git
def elbo(self):
    if self.mean is not None:
        y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
    else:
        y = self.y  # Nx1

    Kff_diag = self.kernel.K_diag(self.X)  # N
    Kuf = self.kernel(self.Z(),self.X)  # MxN
    Kuu = self.kernel(self.Z())  # MxM

    Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
    v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
    Q = v.mm(v.T)  # MxM;  Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)
    L = self._cholesky(Q/self.likelihood.scale().square() + self.eye)  # MxM;  (Q/sigma^2 + I)^(1/2)

    c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square()  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.y

    # p = log N(0, Kfu.Kuu^(-1).Kuf + I/sigma^2) - 1/(2.sigma^2).Trace(Kff - Kfu.Kuu^(-1).Kuf)
    p = -self.log_marginal_likelihood_constant
    p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
    p -= self.X.shape[0]*self.likelihood.scale().log()
    p -= 0.5*y.T.mm(y).squeeze()/self.likelihood.scale().square()
    p += 0.5*c.T.mm(c).squeeze()
    p -= 0.5*(Kff_diag.sum() - Q.trace())/self.likelihood.scale().square() # trace
    return p
def predict(self, Xs, full=False, tensor=False, predict_y=True)
Expand source code Browse git
def predict(self, Xs, full=False, tensor=False, predict_y=True):
    with torch.no_grad():
        Xs = self._check_input(Xs)  # MxD
        if self.mean is not None:
            y = self.y - self.mean(self.X).reshape(-1,1)  # Nx1
        else:
            y = self.y  # Nx1

        Kus = self.kernel(self.Z(),Xs)  # MxS
        Kuf = self.kernel(self.Z(),self.X)  # MxN
        Kuu = self.kernel(self.Z())  # MxM

        Luu = self._cholesky(Kuu, add_jitter=True)  # MxM;  Kuu^(1/2)
        v = torch.linalg.solve_triangular(Luu,Kuf,upper=False)  # MxN;  Kuu^(-1/2).Kuf
        L = self._cholesky(v.mm(v.T)/self.likelihood.scale().square() + self.eye)  # MxM;  (Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)/sigma^2 + I)^(1/2)

        a = torch.linalg.solve_triangular(Luu,Kus,upper=False)  # MxS;  Kuu^(-1/2).Kus
        b = torch.linalg.solve_triangular(L,a,upper=False)  # MxS;  L^(-1).Kuu^(-1/2).Kus
        c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square()  # Mx1;  L^(-1).Kuu^(-1/2).Kuf.y

        # mu = sigma^(-2).Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kuf.y
        mu = b.T.mm(c)  # Mx1
        if self.mean is not None:
            mu += self.mean(Xs).reshape(-1,1)  # Mx1

        # var = Kss - Qsf.(Qff + sigma^2 I)^(-1).Qfs
        # below is the equivalent but more stable version by using the matrix inversion lemma
        # var = Kss - Ksu.Kuu^(-1).Kus + Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kus
        if full:
            Kss = self.kernel(Xs)  # MxM
            var = Kss - a.T.mm(a) + b.T.mm(b)  # MxM
            if predict_y:
                eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
                var += self.likelihood.scale().square() * eye
        else:
            Kss_diag = self.kernel.K_diag(Xs)  # M
            var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1)  # M
            if predict_y:
                var += self.likelihood.scale().square()
            var = var.reshape(-1,1)

        if tensor:
            return mu, var
        else:
            return mu.cpu().numpy(), var.cpu().numpy()

Inherited members