Source code for inferpy.models.random_variable

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================


""" Module implementing the shared functionality across all the variable types  """


from inferpy.util import tf_run_wrapper
from inferpy.util import static_multishape
import inferpy as inf
import tensorflow as tf
import edward as ed
import numpy as np
import sys
import collections





[docs]class RandomVariable(object): """Base class for random variables. """ __declared_vars = {}
[docs] def __init__(self, base_object=None, observed=False): """ Constructor for the RandomVariable class Args: base_object: encapsulated Edward object (optional). observed (bool): specifies if the random variable is observed (True) or observed (False). """ self.base_object = base_object self.__bind = None self.observed = observed self.copied_from = None if base_object != None: if inf.ProbModel.is_active() and not self.is_generic_variable(): inf.ProbModel.get_active_model().varlist.append(self) if inf.replicate.in_replicate(): for r in inf.replicate.get_active_replicate(): r.varlist.append(self) RandomVariable.__declared_vars.update({id(self) : self})
@property def dist(self): """Underlying Edward object""" if self.is_generic_variable(): return None return self._base_object @property def base_object(self): """Underlying Tensorflow object""" return self._base_object @property def dim(self): """ Dimensionality of variable """ return self.base_object.shape.as_list()[- (1 + len(self.event_shape)) ] @property def batches(self): """ Number of batches of the variable""" dist_shape = self.base_object.shape.as_list() if len(dist_shape) - len(self.event_shape) <= 1 : return 1 return dist_shape[0] @property def shape(self): """ shape of the variable, i.e. (batches, dim)""" return self.base_object.shape.as_list() @property def event_shape(self): """ event_shape""" if self.is_generic_variable(): return [] return self.base_object.event_shape.as_list() @property def name(self): """ name of the variable""" return self.base_object.name[0:-1] @property def observed(self): """ boolean property that determines if a variable is observed or not """ return self.__observed @observed.setter def observed(self,observed): """ modifies the boolean property that determines if a variable is observed or not """ self.__observed=observed @property def bind(self): """ """ return self.__bind @bind.setter def bind(self,bind): """ """ if not isinstance(bind, RandomVariable): raise ValueError("object to bind is not RandomVariable") self.__bind=bind @dist.setter def dist(self, dist): """ Set the Underlying Edward object""" if isinstance(dist, ed.models.RandomVariable)==False: raise ValueError("Type of input distribution is not correct") self._base_object = dist @base_object.setter def base_object(self, tensor): """ Set the Underlying tensorflow object""" if isinstance(tensor, tf.Tensor)==False and \ isinstance(tensor, ed.RandomVariable) == False and tensor != None: raise ValueError("Type of input object is not correct") self._base_object = tensor
[docs] @tf_run_wrapper def sample(self, size=1): """ Method for obaining a samples Args: size: scalar or matrix of integers indicating the shape of the matrix of samples. Return: Matrix of samples. Each element in the output matrix has the same shape than the variable. """ if self.is_generic_variable(): return self.base_object s = self.dist.sample(size) if self.dim > 1 and self.batches == 1: final_shape = (size, self.dim) if self.event_shape == [] else (size, self.dim, self.event_shape[0]) s = tf.reshape(s, shape=final_shape) return s
[docs] @tf_run_wrapper def prob(self, v): """ Method for computing the probability of a sample v (or a set of samples)""" return self.dist.prob(tf.cast(v, tf.float32))
[docs] @tf_run_wrapper def log_prob(self, v): """ Method for computing the log probability of a sample v (or a set of samples)""" return self.dist.log_prob(tf.cast(v, tf.float32))
[docs] @tf_run_wrapper def prod_prob(self, v): """ Method for computing the joint probability of a sample v (or a set of samples)""" return tf.reduce_prod(self.dist.prob(tf.cast(v, tf.float32)))
[docs] @tf_run_wrapper def sum_log_prob(self, v): """ Method for computing the sum of the log probability of a sample v (or a set of samples)""" return tf.reduce_sum(self.dist.log_prob(tf.cast(v, tf.float32)))
[docs] @tf_run_wrapper def mean(self, name='mean'): """ Method for obaining the mean of this random variable """ if not self.is_generic_variable(): return self.dist.mean(name) return None
[docs] @tf_run_wrapper def variance(self, name='variance'): """ Method for obataining the variance of this random variable""" if not self.is_generic_variable(): return self.dist.variance(name) return None
[docs] @tf_run_wrapper def stddev(self, name='stddev'): """ Method for obataining the standard deviation of this random variable""" if not self.is_generic_variable(): return self.dist.stddev(name) return None
[docs] def is_generic_variable(self): """ Determines if this is a generic variable, i.e., an Edward variable is not encapsulated. """ return isinstance(self._base_object, ed.RandomVariable) == False
[docs] def get_replicate_list(self): """ Returns a list with all the replicate constructs that this variable belongs to. """ return [r for r in inf.replicate.get_all_replicate() if self in r.varlist]
[docs] def get_local_hidden(self): """ Returns a list with all the local hidden variables w.r.t. this one. Local hidden variables are those latent variables which are in the same replicate construct. """ var_rep = [r.varlist for r in self.get_replicate_list()] var_rep_id = [inf.models.RandomVariable.get_key_from_var(vr) for vr in var_rep] if len(var_rep) == 0: intersect_id = [] elif len(var_rep) == 1: intersect_id = var_rep_id[0] else: for i in range(0, len(var_rep_id)): if i == 0: intersect_id = var_rep_id[0] else: intersect_id = np.intersect1d(intersect_id, var_rep_id[i]) intersect_id = [x for x in intersect_id if inf.models.RandomVariable.get_key_from_var(self) != x] intersect = inf.models.RandomVariable.get_var_with_key(intersect_id) intersect = [x for x in intersect if not x.observed] return intersect
def __repr__(self): """ Representation for this variable""" str = "<inferpy RandomVariable '%s' shape=%s dtype=%s>" % ( self.name, self.shape, self.base_object.dtype.name) return str
[docs] @staticmethod @static_multishape def get_var_with_key(key): return RandomVariable.__declared_vars.get(key)
[docs] @staticmethod @static_multishape def get_key_from_var(var): from six import iteritems for (key, value) in iteritems(RandomVariable.__declared_vars): if value==var: return key return None
[docs] def copy(self, swap_dict=None, observed=False): """ Build a new random variable with the same values. The underlying tensors or edward objects are copied as well. Args: swap_dict: random variables, variables, tensors, or operations to swap with. observed: determines if the new variable is observed or not. """ new_var = getattr(sys.modules[self.__class__.__module__], type(self).__name__)() new_var.dist = ed.copy(self.dist, swap_dict) new_var.copied_from = self new_var.observed = False return new_var
# List of Python operators that we allow to override. BINARY_OPERATORS = { # Binary. "__add__", "__radd__", "__sub__", "__rsub__", "__mul__", "__rmul__", "__div__", "__rdiv__", "__truediv__", "__rtruediv__", "__floordiv__", "__rfloordiv__", "__mod__", "__rmod__", "__lt__", "__le__", "__gt__", "__ge__", "__and__", "__rand__", "__or__", "__ror__", "__xor__", "__rxor__", # "__getitem__", "__pow__", "__rpow__", "__matmul__", "__rmatmul__" } UNARY_OPERATORS = { # Unary. # "__invert__", "__neg__", "__abs__" } def __add_operator(cls, name, unary=False): import inferpy.models.deterministic if unary==False: def operator(self, other): res = inferpy.models.Deterministic() if isinstance(other, RandomVariable): res.base_object = getattr(self.base_object, name)(other.base_object) else: res.base_object = getattr(self.base_object, name)(other) return res else: def operator(self): res = inferpy.models.Deterministic() res.base_object = getattr(self.base_object, name)() return res operator.__doc__ = "documentation for "+name operator.__name__ = name setattr(cls, operator.__name__, operator) for x in BINARY_OPERATORS: __add_operator(RandomVariable,x) for x in UNARY_OPERATORS: __add_operator(RandomVariable,x, unary=True) def __add_equal_operator(): import inferpy.models.deterministic name = "equal" cls = RandomVariable def operator(self, other): res = inferpy.models.Deterministic() op1 = self.base_object op2 = other.base_object if isinstance(other, RandomVariable) else other res.base_object = tf.equal(op1,op2) return res operator.__doc__ = "documentation for " + name operator.__name__ = name setattr(cls, operator.__name__, operator) __add_equal_operator() def __add_getitem_operator_old(): import inferpy.models.deterministic name = "__getitem__" cls = RandomVariable def operator(self, index): res = inferpy.models.Deterministic() if isinstance(index, inf.models.RandomVariable): if index.batches == 1: if self.batches > 1: res_dist = tf.reshape(tf.gather(self.base_object, index.base_object, axis=1), (self.batches,1)) else: res_dist = tf.reshape(tf.gather(self.base_object, index.base_object, axis=0), (1, 1)) else: res_dist = tf.reshape(tf.stack([self[index[n,0]][n,0].base_object for n in range(0, index.batches)]), (index.batches, 1)) else: if np.ndim(index) < 1: index = [index] if len(np.shape(index)) < 2: In = range(0, self.batches) if len(index) < 2 else [index[0]] Id = [index[-1]] if self.batches>1: res_dist = tf.gather(tf.gather(self.base_object, In, axis=0), Id, axis=1) else: res_dist = tf.reshape(tf.gather(self.base_object, Id, axis=0), (1,1)) else: res_dist = tf.reshape(tf.stack([self[0,index[n][0]].base_object for n in range(0, np.shape(index)[0])]), (self.batches,1)) res.base_object = res_dist return res operator.__doc__ = "documentation for " + name operator.__name__ = name setattr(cls, operator.__name__, operator) def __add_getitem_operator(): import inferpy.models.deterministic name = "__getitem__" cls = RandomVariable def operator(self, index): res = inferpy.models.Deterministic() res_tf = self.dist if not isinstance(index, collections.Iterable): index = [index] axis = 0 ndim = len(self.shape) for i in index: if isinstance(i, inf.models.RandomVariable): res_tf = tf.gather(res_tf, i.base_object, axis=axis) elif isinstance(i, tf.Tensor): res_tf = tf.gather(res_tf, i, axis=axis) else: I = tuple([slice(None,None,None) for x in range(0,axis)] + [i]) res_tf = res_tf[I] ndim_new = len(res_tf._shape_as_list()) if ndim != ndim_new: ndim = ndim_new else: axis = axis+1 res.base_object = tf.reshape(res_tf, inf.fix_shape(res_tf.shape)) return res operator.__doc__ = "documentation for " + name operator.__name__ = name setattr(cls, operator.__name__, operator) __add_getitem_operator() """ if np.ndim(index)<1: index = [index] elif np.ndim(index)>1: raise ValueError("wrong index dimensions "+str(index)) res_dist = self.base_object for i in index: res_dist = tf.gather(res_dist, i if not isinstance(i, inf.models.RandomVariable) else i.dist) res.base_object = res_dist return res """