Module mogptk.kernels.kernel
Expand source code Browse git
import torch
import copy
from . import Parameter, config
class Kernel:
def __init__(self, input_dims=None, active_dims=None, name=None):
if name is None:
name = self.__class__.__name__
if name.endswith('Kernel') and name != 'Kernel':
name = name[:-6]
self.input_dims = input_dims
self.active_dims = active_dims
self.name = name
def __call__(self, X1, X2=None):
return self.K(X1,X2)
def __setattr__(self, name, val):
if name == 'trainable':
from .util import _find_parameters
for p in _find_parameters(self):
p.trainable = val
return
if hasattr(self, name) and isinstance(getattr(self, name), Parameter):
raise AttributeError("parameter is read-only, use Parameter.assign()")
if isinstance(val, Parameter) and val.name is None:
val.name = name
super(Kernel,self).__setattr__(name, val)
def _check_input(self, X1, X2=None):
if len(X1.shape) != 2:
raise ValueError("X should have two dimensions (data_points,input_dims)")
if X1.shape[0] == 0 or X1.shape[1] == 0:
raise ValueError("X must not be empty")
if X2 is not None:
if len(X2.shape) != 2:
raise ValueError("X should have two dimensions (data_points,input_dims)")
if X2.shape[0] == 0:
raise ValueError("X must not be empty")
if X1.shape[1] != X2.shape[1]:
raise ValueError("input_dims for X1 and X2 must match")
if self.active_dims is not None:
X1 = torch.index_select(X1, dim=1, index=self.active_dims)
if X2 is not None:
X2 = torch.index_select(X2, dim=1, index=self.active_dims)
return X1, X2
def _check_kernels(self, kernels, length=None):
if isinstance(kernels, tuple):
if len(kernels) == 1 and isinstance(kernels[0], list):
kernels = kernels[0]
else:
kernels = list(kernels)
elif not isinstance(kernels, list):
kernels = [kernels]
if len(kernels) == 0:
raise ValueError("must pass at least one kernel")
elif length is not None and len(kernels) != length:
if len(kernels) != 1:
raise ValueError("must pass %d kernel" % length)
for i in range(length - len(kernels)):
kernels.append(copy.deepcopy(kernels[0]))
for i, kernel in enumerate(kernels):
if not issubclass(type(kernel), Kernel):
raise ValueError("must pass kernels")
return kernels
@property
def active_dims(self):
return self._active_dims
@active_dims.setter
def active_dims(self, active_dims):
if active_dims is not None:
if not isinstance(active_dims, list):
active_dims = [active_dims]
if not all(isinstance(item, int) for item in active_dims):
raise ValueError("active dimensions must be a list of integers")
active_dims = torch.tensor(active_dims, device=config.device, dtype=torch.long)
if self.input_dims is not None and self.input_dims != active_dims.shape[0]:
raise ValueError("input dimensions must match the number of actived dimensions")
self._active_dims = active_dims
def K(self, X1, X2=None):
raise NotImplementedError()
def distance(self, X1, X2=None):
# X1 is NxD, X2 is MxD, then ret is NxMxD
if X2 is None:
X2 = X1
return X1.unsqueeze(1) - X2
def squared_distance(self, X1, X2=None):
# X1 is NxD, X2 is MxD, then ret is NxMxD
if X2 is None:
X2 = X1
#return (X1.unsqueeze(1) - X2)**2 # slower than cdist for large X
return torch.cdist(X2.T.unsqueeze(2), X1.T.unsqueeze(2)).T**2
class AddKernel(Kernel):
def __init__(self, *kernels, name="Add"):
super(AddKernel, self).__init__(name=name)
self.kernels = self._check_kernels(kernels)
def __getitem__(self, key):
return self.kernels[key]
def K(self, X1, X2=None):
return torch.stack([kernel(X1,X2) for kernel in self.kernels], dim=2).sum(dim=2)
class MulKernel(Kernel):
def __init__(self, *kernels, name="Mul"):
super(MulKernel, self).__init__(name=name)
self.kernels = self._check_kernels(kernels)
def __getitem__(self, key):
return self.kernels[key]
def K(self, X1, X2=None):
return torch.stack([kernel(X1,X2) for kernel in self.kernels], dim=2).prod(dim=2)
class MixtureKernel(AddKernel):
def __init__(self, kernel, Q, name="Mixture"):
Kernel.__init__(self, name=name)
self.kernels = self._check_kernels(kernel, Q)
class AutomaticRelevanceDeterminationKernel(MulKernel):
def __init__(self, kernel, input_dims, name="ARD"):
Kernel.__init__(self, name=name)
self.kernels = self._check_kernels(kernel, input_dims)
for i, kernel in enumerate(self.kernels):
kernel.set_active_dims(i)
class MultiOutputKernel(Kernel):
# The MultiOutputKernel is a base class for multi output kernels. It assumes that the first dimension of X contains channel IDs (integers) and calculate the final kernel matrix accordingly. Concretely, it will call the Ksub method for derived kernels from this class, which should return the kernel matrix between channel i and j, given inputs X1 and X2. This class will automatically split and recombine the input vectors and kernel matrices respectively, in order to create the final kernel matrix of the multi output kernel.
# Be aware that for implementation of Ksub, i==j is true for the diagonal matrices. X2==None is true when calculating the Gram matrix (i.e. X1==X2) and when i==j. It is thus a subset of the case i==j, and if X2==None than i is always equal to j.
def __init__(self, output_dims, input_dims=None, active_dims=None, name=None):
super(MultiOutputKernel, self).__init__(input_dims, active_dims, name)
noise = torch.ones(output_dims)
self.output_dims = output_dims
self.noise = Parameter(noise, lower=config.positive_minimum)
def K(self, X1, X2=None):
# X has shape (data_points,1+input_dims) where the first column is the channel ID
X1,X2 = self._check_input(X1,X2)
# extract channel mask, get data, and find indices that belong to the channels
I1 = X1[:,0].long()
m1 = [I1==i for i in range(self.output_dims)]
x1 = [X1[m1[i],1:] for i in range(self.output_dims)] # I is broadcastable with last dimension in X
r1 = [torch.nonzero(m1[i], as_tuple=False) for i in range(self.output_dims)] # as_tuple avoids warning
if X2 is None:
r2 = [r1[i].reshape(1,-1) for i in range(self.output_dims)]
res = torch.empty(X1.shape[0], X1.shape[0], device=config.device, dtype=config.dtype) # N1 x N1
# calculate lower triangle of main kernel matrix, the upper triangle is a transpose
for i in range(self.output_dims):
for j in range(i+1):
# calculate sub kernel matrix and add to main kernel matrix
if i == j:
res[r1[i],r2[i]] = self.Ksub(i, i, x1[i])
else:
k = self.Ksub(i, j, x1[i], x1[j])
res[r1[i],r2[j]] = k
res[r1[j],r2[i]] = k.T
# add noise per channel
res += torch.index_select(self.noise(), dim=0, index=I1).diagflat()
else:
# extract channel mask, get data, and find indices that belong to the channels
I2 = X2[:,0].long()
m2 = [I2==j for j in range(self.output_dims)]
x2 = [X2[m2[j],1:] for j in range(self.output_dims)] # I is broadcastable with last dimension in X
r2 = [torch.nonzero(m2[j], as_tuple=False).reshape(1,-1) for j in range(self.output_dims)] # as_tuple avoids warning
res = torch.empty(X1.shape[0], X2.shape[0], device=config.device, dtype=config.dtype) # N1 x N2
for i in range(self.output_dims):
for j in range(self.output_dims):
# calculate sub kernel matrix and add to main kernel matrix
res[r1[i],r2[j]] = self.Ksub(i, j, x1[i], x2[j])
return res
def Ksub(self, i, j, X1, X2=None):
raise NotImplementedError()
Classes
class AddKernel (*kernels, name='Add')
-
Expand source code Browse git
class AddKernel(Kernel): def __init__(self, *kernels, name="Add"): super(AddKernel, self).__init__(name=name) self.kernels = self._check_kernels(kernels) def __getitem__(self, key): return self.kernels[key] def K(self, X1, X2=None): return torch.stack([kernel(X1,X2) for kernel in self.kernels], dim=2).sum(dim=2)
Ancestors
Subclasses
Methods
def K(self, X1, X2=None)
-
Expand source code Browse git
def K(self, X1, X2=None): return torch.stack([kernel(X1,X2) for kernel in self.kernels], dim=2).sum(dim=2)
class AutomaticRelevanceDeterminationKernel (kernel, input_dims, name='ARD')
-
Expand source code Browse git
class AutomaticRelevanceDeterminationKernel(MulKernel): def __init__(self, kernel, input_dims, name="ARD"): Kernel.__init__(self, name=name) self.kernels = self._check_kernels(kernel, input_dims) for i, kernel in enumerate(self.kernels): kernel.set_active_dims(i)
Ancestors
class Kernel (input_dims=None, active_dims=None, name=None)
-
Expand source code Browse git
class Kernel: def __init__(self, input_dims=None, active_dims=None, name=None): if name is None: name = self.__class__.__name__ if name.endswith('Kernel') and name != 'Kernel': name = name[:-6] self.input_dims = input_dims self.active_dims = active_dims self.name = name def __call__(self, X1, X2=None): return self.K(X1,X2) def __setattr__(self, name, val): if name == 'trainable': from .util import _find_parameters for p in _find_parameters(self): p.trainable = val return if hasattr(self, name) and isinstance(getattr(self, name), Parameter): raise AttributeError("parameter is read-only, use Parameter.assign()") if isinstance(val, Parameter) and val.name is None: val.name = name super(Kernel,self).__setattr__(name, val) def _check_input(self, X1, X2=None): if len(X1.shape) != 2: raise ValueError("X should have two dimensions (data_points,input_dims)") if X1.shape[0] == 0 or X1.shape[1] == 0: raise ValueError("X must not be empty") if X2 is not None: if len(X2.shape) != 2: raise ValueError("X should have two dimensions (data_points,input_dims)") if X2.shape[0] == 0: raise ValueError("X must not be empty") if X1.shape[1] != X2.shape[1]: raise ValueError("input_dims for X1 and X2 must match") if self.active_dims is not None: X1 = torch.index_select(X1, dim=1, index=self.active_dims) if X2 is not None: X2 = torch.index_select(X2, dim=1, index=self.active_dims) return X1, X2 def _check_kernels(self, kernels, length=None): if isinstance(kernels, tuple): if len(kernels) == 1 and isinstance(kernels[0], list): kernels = kernels[0] else: kernels = list(kernels) elif not isinstance(kernels, list): kernels = [kernels] if len(kernels) == 0: raise ValueError("must pass at least one kernel") elif length is not None and len(kernels) != length: if len(kernels) != 1: raise ValueError("must pass %d kernel" % length) for i in range(length - len(kernels)): kernels.append(copy.deepcopy(kernels[0])) for i, kernel in enumerate(kernels): if not issubclass(type(kernel), Kernel): raise ValueError("must pass kernels") return kernels @property def active_dims(self): return self._active_dims @active_dims.setter def active_dims(self, active_dims): if active_dims is not None: if not isinstance(active_dims, list): active_dims = [active_dims] if not all(isinstance(item, int) for item in active_dims): raise ValueError("active dimensions must be a list of integers") active_dims = torch.tensor(active_dims, device=config.device, dtype=torch.long) if self.input_dims is not None and self.input_dims != active_dims.shape[0]: raise ValueError("input dimensions must match the number of actived dimensions") self._active_dims = active_dims def K(self, X1, X2=None): raise NotImplementedError() def distance(self, X1, X2=None): # X1 is NxD, X2 is MxD, then ret is NxMxD if X2 is None: X2 = X1 return X1.unsqueeze(1) - X2 def squared_distance(self, X1, X2=None): # X1 is NxD, X2 is MxD, then ret is NxMxD if X2 is None: X2 = X1 #return (X1.unsqueeze(1) - X2)**2 # slower than cdist for large X return torch.cdist(X2.T.unsqueeze(2), X1.T.unsqueeze(2)).T**2
Subclasses
- AddKernel
- MulKernel
- MultiOutputKernel
- LinearKernel
- PeriodicKernel
- PhiKernel
- PolynomialKernel
- RationalQuadraticKernel
- SpectralKernel
- SquaredExponentialKernel
Instance variables
var active_dims
-
Expand source code Browse git
@property def active_dims(self): return self._active_dims
Methods
def K(self, X1, X2=None)
-
Expand source code Browse git
def K(self, X1, X2=None): raise NotImplementedError()
def distance(self, X1, X2=None)
-
Expand source code Browse git
def distance(self, X1, X2=None): # X1 is NxD, X2 is MxD, then ret is NxMxD if X2 is None: X2 = X1 return X1.unsqueeze(1) - X2
def squared_distance(self, X1, X2=None)
-
Expand source code Browse git
def squared_distance(self, X1, X2=None): # X1 is NxD, X2 is MxD, then ret is NxMxD if X2 is None: X2 = X1 #return (X1.unsqueeze(1) - X2)**2 # slower than cdist for large X return torch.cdist(X2.T.unsqueeze(2), X1.T.unsqueeze(2)).T**2
class MixtureKernel (kernel, Q, name='Mixture')
-
Expand source code Browse git
class MixtureKernel(AddKernel): def __init__(self, kernel, Q, name="Mixture"): Kernel.__init__(self, name=name) self.kernels = self._check_kernels(kernel, Q)
Ancestors
class MulKernel (*kernels, name='Mul')
-
Expand source code Browse git
class MulKernel(Kernel): def __init__(self, *kernels, name="Mul"): super(MulKernel, self).__init__(name=name) self.kernels = self._check_kernels(kernels) def __getitem__(self, key): return self.kernels[key] def K(self, X1, X2=None): return torch.stack([kernel(X1,X2) for kernel in self.kernels], dim=2).prod(dim=2)
Ancestors
Subclasses
Methods
def K(self, X1, X2=None)
-
Expand source code Browse git
def K(self, X1, X2=None): return torch.stack([kernel(X1,X2) for kernel in self.kernels], dim=2).prod(dim=2)
class MultiOutputKernel (output_dims, input_dims=None, active_dims=None, name=None)
-
Expand source code Browse git
class MultiOutputKernel(Kernel): # The MultiOutputKernel is a base class for multi output kernels. It assumes that the first dimension of X contains channel IDs (integers) and calculate the final kernel matrix accordingly. Concretely, it will call the Ksub method for derived kernels from this class, which should return the kernel matrix between channel i and j, given inputs X1 and X2. This class will automatically split and recombine the input vectors and kernel matrices respectively, in order to create the final kernel matrix of the multi output kernel. # Be aware that for implementation of Ksub, i==j is true for the diagonal matrices. X2==None is true when calculating the Gram matrix (i.e. X1==X2) and when i==j. It is thus a subset of the case i==j, and if X2==None than i is always equal to j. def __init__(self, output_dims, input_dims=None, active_dims=None, name=None): super(MultiOutputKernel, self).__init__(input_dims, active_dims, name) noise = torch.ones(output_dims) self.output_dims = output_dims self.noise = Parameter(noise, lower=config.positive_minimum) def K(self, X1, X2=None): # X has shape (data_points,1+input_dims) where the first column is the channel ID X1,X2 = self._check_input(X1,X2) # extract channel mask, get data, and find indices that belong to the channels I1 = X1[:,0].long() m1 = [I1==i for i in range(self.output_dims)] x1 = [X1[m1[i],1:] for i in range(self.output_dims)] # I is broadcastable with last dimension in X r1 = [torch.nonzero(m1[i], as_tuple=False) for i in range(self.output_dims)] # as_tuple avoids warning if X2 is None: r2 = [r1[i].reshape(1,-1) for i in range(self.output_dims)] res = torch.empty(X1.shape[0], X1.shape[0], device=config.device, dtype=config.dtype) # N1 x N1 # calculate lower triangle of main kernel matrix, the upper triangle is a transpose for i in range(self.output_dims): for j in range(i+1): # calculate sub kernel matrix and add to main kernel matrix if i == j: res[r1[i],r2[i]] = self.Ksub(i, i, x1[i]) else: k = self.Ksub(i, j, x1[i], x1[j]) res[r1[i],r2[j]] = k res[r1[j],r2[i]] = k.T # add noise per channel res += torch.index_select(self.noise(), dim=0, index=I1).diagflat() else: # extract channel mask, get data, and find indices that belong to the channels I2 = X2[:,0].long() m2 = [I2==j for j in range(self.output_dims)] x2 = [X2[m2[j],1:] for j in range(self.output_dims)] # I is broadcastable with last dimension in X r2 = [torch.nonzero(m2[j], as_tuple=False).reshape(1,-1) for j in range(self.output_dims)] # as_tuple avoids warning res = torch.empty(X1.shape[0], X2.shape[0], device=config.device, dtype=config.dtype) # N1 x N2 for i in range(self.output_dims): for j in range(self.output_dims): # calculate sub kernel matrix and add to main kernel matrix res[r1[i],r2[j]] = self.Ksub(i, j, x1[i], x2[j]) return res def Ksub(self, i, j, X1, X2=None): raise NotImplementedError()
Ancestors
Subclasses
- CrossSpectralKernel
- GaussianConvolutionProcessKernel
- IndependentMultiOutputKernel
- LinearModelOfCoregionalizationKernel
- MultiOutputSpectralKernel
Methods
def K(self, X1, X2=None)
-
Expand source code Browse git
def K(self, X1, X2=None): # X has shape (data_points,1+input_dims) where the first column is the channel ID X1,X2 = self._check_input(X1,X2) # extract channel mask, get data, and find indices that belong to the channels I1 = X1[:,0].long() m1 = [I1==i for i in range(self.output_dims)] x1 = [X1[m1[i],1:] for i in range(self.output_dims)] # I is broadcastable with last dimension in X r1 = [torch.nonzero(m1[i], as_tuple=False) for i in range(self.output_dims)] # as_tuple avoids warning if X2 is None: r2 = [r1[i].reshape(1,-1) for i in range(self.output_dims)] res = torch.empty(X1.shape[0], X1.shape[0], device=config.device, dtype=config.dtype) # N1 x N1 # calculate lower triangle of main kernel matrix, the upper triangle is a transpose for i in range(self.output_dims): for j in range(i+1): # calculate sub kernel matrix and add to main kernel matrix if i == j: res[r1[i],r2[i]] = self.Ksub(i, i, x1[i]) else: k = self.Ksub(i, j, x1[i], x1[j]) res[r1[i],r2[j]] = k res[r1[j],r2[i]] = k.T # add noise per channel res += torch.index_select(self.noise(), dim=0, index=I1).diagflat() else: # extract channel mask, get data, and find indices that belong to the channels I2 = X2[:,0].long() m2 = [I2==j for j in range(self.output_dims)] x2 = [X2[m2[j],1:] for j in range(self.output_dims)] # I is broadcastable with last dimension in X r2 = [torch.nonzero(m2[j], as_tuple=False).reshape(1,-1) for j in range(self.output_dims)] # as_tuple avoids warning res = torch.empty(X1.shape[0], X2.shape[0], device=config.device, dtype=config.dtype) # N1 x N2 for i in range(self.output_dims): for j in range(self.output_dims): # calculate sub kernel matrix and add to main kernel matrix res[r1[i],r2[j]] = self.Ksub(i, j, x1[i], x2[j]) return res
def Ksub(self, i, j, X1, X2=None)
-
Expand source code Browse git
def Ksub(self, i, j, X1, X2=None): raise NotImplementedError()