#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on 12/14/2021
Arrangement models class
Author: dzhi, jdiedrichsen
"""
import numpy as np
import torch as pt
from torch import exp,log
from HierarchBayesParcel.model import Model
import pandas as pd
import warnings
[docs]
class ArrangementModel(Model):
""" Abstract arrangement model class
"""
def __init__(self, K, P):
self.K = K # Number of states
self.P = P # Number of nodes
self.tmp_list = []
[docs]
def map_to_full(self,Uhat):
""" Placeholder
Args:
Uhat (ndarray): tensor of estimated arrangement
Returns:
Uhat (ndarray): tensor of estimated arrangements
"""
return Uhat
[docs]
def map_to_arrange(self, emloglik):
""" Maps emission log likelihoods to the internal size of the representation: Empty
Args:
emloglik (list): List of emission logliklihoods
Returns:
emloglik_comb (ndarray): ndarray of emission logliklihoods
"""
return emloglik
[docs]
class ArrangeIndependent(ArrangementModel):
""" Independent arrangement model
"""
def __init__(self, K=3, P=100, spatial_specific=True,
remove_redundancy=False):
"""Constructor for the independent arrangement model
Args:
K (int): Number of different parcels
P (int): Number of voxels / vertices
spatial_specific (bool): Use a spatially specific
model (default True)
remove_redundancy (bool): Code with K or K-1
probabilities parameters?
"""
super().__init__(K, P)
self.spatial_specific = spatial_specific
if spatial_specific:
pi = pt.ones(K, P) / K
else:
pi = pt.ones(K, 1) / K
self.logpi = pt.log(pi)
# Remove redundancy in parametrization
self.rem_red = remove_redundancy
if self.rem_red:
self.logpi = self.logpi - self.logpi[-1, :]
self.set_param_list(['logpi'])
self.tmp_list = ['estep_Uhat']
[docs]
def random_params(self):
""" Sets prior parameters to random starting values
"""
self.logpi = pt.distributions.normal.Normal(0, 1).sample(self.logpi.shape)
# This was a hack before, but leads to a very strong group prior
# self.logpi = pt.distributions.uniform.Uniform(1,100).sample(self.logpi.shape)
self.logpi = self.logpi - self.logpi.mean(dim=0)
[docs]
def Estep(self, emloglik, gather_ss=True):
""" Estep for the spatial arrangement model
Args:
emloglik: (pt.tensor) emission log likelihood
log p(Y|u,theta_E) a numsubj x K x P matrix
gather_ss: (bool) Gather Sufficient statistics
for M-step (default = True)
Returns:
Uhat (pt.tensor): posterior p(U|Y) a
numsubj x K x P matrix
ll_A (pt.tensor): Expected log-liklihood of
the arrangement model
"""
if type(emloglik) is np.ndarray:
emloglik = pt.tensor(emloglik, dtype=pt.get_default_dtype())
Uhat = pt.softmax(emloglik + self.logpi, dim=1)
if gather_ss:
self.estep_Uhat = Uhat
# The log likelihood for arrangement model
# p(U|theta_A) is sum_i sum_K Uhat_(K)*log pi_i(K)
pi = pt.softmax(self.logpi,dim=0)
lpi = pt.nan_to_num(pt.log(pi),neginf=0) # Prevent underflow
ll_A = pt.sum(Uhat * lpi)
if pt.isnan(ll_A):
raise (NameError('likelihood is nan'))
return Uhat, ll_A
[docs]
def Mstep(self):
""" M-step for the spatial arrangement model. Update
the pi for arrangement model uses the epos_Uhat
statistic that is put away from the last e-step.
"""
# Averarging over subjects
pi = pt.mean(self.estep_Uhat, dim=0)
if not self.spatial_specific:
pi = pi.mean(dim=1).reshape(-1, 1)
self.logpi = log(pi)
if self.rem_red:
self.logpi = self.logpi - self.logpi[-1, :]
self.logpi = pt.nan_to_num(self.logpi)
[docs]
def sample(self, num_subj=10):
""" Samples a number of subjects from the prior.
In this i.i.d arrangement model we assume each node has
no relation with other nodes, which means it equals to
sample from the prior pi.
Args:
num_subj (int): the number of subjects to sample
Returns:
U (pt.tensor): the sampled data for subjects
"""
U = pt.zeros(num_subj, self.P)
pi = pt.softmax(self.logpi, dim=0)
for i in range(num_subj):
if self.spatial_specific:
U = sample_multinomial(pi, shape=(num_subj,self.K,self.P),
compress=True)
else:
pi=pi.expand(self.K, self.P)
U = sample_multinomial(pi, shape=(num_subj,self.K,self.P),
compress=True)
return U
[docs]
def marginal_prob(self):
""" Returns marginal probabilty for every node under the model
Returns:
pi (pt.tensor): marginal probability under the model
"""
return pt.softmax(self.logpi, dim=0)
[docs]
class ArrangeIndependentSymmetric(ArrangeIndependent):
""" Independent arrangement model with symmetry constraint.
It has two sizes:
P and K (number of nodes / parcels for arrangement model)
P_full and K_full (number of location / parcels for data)
"""
def __init__(self, K,
indx_full,
indx_reduced,
same_parcels=False,
spatial_specific=True,
remove_redundancy=False):
""" Constructor for the independent arrangement model
Args:
K (int): Number of different parcels
indx_full (ndarray/tensor): 2 x P array of data-indices
for each node (L/R)
indx_reduced (ndarray): P_full - vector of node-indices
for each data location
same_parcels (bool): are the mean functional profiles of parcels the same
or different across hemispheres?
spatial_specific (bool): Use a spatially specific model
(default True)
remove_redundancy (bool): Code with K probabilities
with K or K-1 parameters?
"""
if type(indx_full) is np.ndarray:
indx_full = pt.tensor(
indx_full, dtype=pt.get_default_dtype()).long()
if type(indx_reduced) is np.ndarray:
indx_reduced = pt.tensor(
indx_reduced, dtype=pt.get_default_dtype()).long()
self.indx_full = indx_full
self.indx_reduced = indx_reduced
self.P_full = indx_reduced.shape[0]
self.P = indx_full.shape[1]
self.K_full = K
if not same_parcels:
self.K = int(K / 2)
self.same_parcels = same_parcels
super().__init__(self.K, self.P, spatial_specific, remove_redundancy)
[docs]
def map_to_full(self,Uhat):
""" Remapping evidence from an arrangement space to a
emission space (here it doesn't do anything)
Args:
Uhat (ndarray): tensor of estimated arrangement ((num_subj x K x P) or (K x P))
Returns:
Uhat (ndarray): tensor of estimated arrangements ((num_subj x K_full x P_full) or (K_full x P_full))
"""
if Uhat.ndim == 3:
if self.same_parcels:
Umap = Uhat[:, :, self.indx_reduced]
else:
Umap = pt.zeros((Uhat.shape[0], self.K_full, self.P_full))
Umap[:, :self.K, self.indx_full[0]] = Uhat
Umap[:, self.K:, self.indx_full[1]] = Uhat
elif Uhat.ndim == 2:
if self.same_parcels:
Umap = Uhat[:, self.indx_reduced]
else:
Umap = pt.zeros((self.K_full, self.P_full))
Umap[:self.K, self.indx_full[0]] = Uhat
Umap[self.K:, self.indx_full[1]] = Uhat
return Umap
[docs]
def map_to_arrange(self, emloglik):
""" Maps emission log likelihoods to the internal size of the
representation
Args:
emloglik (list): List of emission logliklihoods (K_full x P_full)
Returns:
emloglik_comb (ndarray): ndarray of emission logliklihoods ()
"""
if self.same_parcels:
emloglik_comb = emloglik[:, :, self.indx_full[0]
] + emloglik[:, :, self.indx_full[1]]
else:
emloglik_comb = emloglik[:, :self.K, self.indx_full[0]
] + emloglik[:, self.K:, self.indx_full[1]]
return emloglik_comb
[docs]
def Estep(self, emloglik, gather_ss=True):
""" Estep for the spatial arrangement model
Args:
emloglik (pt.tensor):
emission log likelihood log p(Y|u,theta_E) a numsubj x K x P matrix
gather_ss (bool):
Gather Sufficient statistics for M-step (default = True)
Returns:
Uhat (pt.tensor):
posterior p(U|Y) a numsubj x K x P matrix
ll_A (pt.tensor):
Expected log-liklihood of the arrangement model
"""
emloglik = self.map_to_arrange(emloglik)
Uhat, ll_A = super().Estep(emloglik, gather_ss)
Uhat = self.map_to_full(Uhat)
Uhat = Uhat / Uhat.sum(dim=1, keepdim=True)
return Uhat, ll_A
[docs]
def sample(self, num_subj=10):
""" Samples a number of subjects from the prior.
In this i.i.d arrangement model we assume each node has
no relation with other nodes, which means it equals to
sample from the prior pi.
Args:
num_subj (int): the number of subjects to sample
Returns:
U (pt.tensor): the sampled data for subjects
"""
U = super.sample(num_subj)
U = self.map_to_full(U)
return U
[docs]
def marginal_prob(self):
""" Returns marginal probability for every node under the model
Returns:
pi (pt.tensor): marginal probability under the model
"""
P = self.map_to_full(pt.softmax(self.logpi, dim=0))
P = P/P.sum(dim=0)
return P
[docs]
class ArrangeIndependentSeparateHem(ArrangeIndependentSymmetric):
"""Independent arrangement model without symmetry constraint,
but like a symmetric model, it keeps the parcels (and emission models)
for the left and right hemishere separate. P is the same to the full data,
K (parcels for arrangement model) K_full (parcels for data)
Args:
K (int):
Number of different parcels
indx_hem (ndarray/tensor):
1 x P array of indices for the hemisphere
-1 - Left; 0 - Midline; 1 - Right
spatially_specific (bool):
Use a spatially specific model (default True)
remove_redundancy (bool):
Code with K probabilities with K or K-1 parameters?
"""
def __init__(self, K,
indx_hem,
spatial_specific=True,
remove_redundancy=False):
if type(indx_hem) is np.ndarray:
indx_hem = pt.tensor(
indx_hem, dtype=pt.get_default_dtype()).long()
self.indx_hem = indx_hem
self.P = indx_hem.shape[1]
self.K = int(K / 2)
super().__init__(K, indx_full=indx_hem,
indx_reduced=indx_hem.T,
spatial_specific=spatial_specific,
remove_redundancy=remove_redundancy)
[docs]
def map_to_full(self, Uhat):
""" remapping evidence from an
arrangement space to emission space
Args:
Uhat (ndarray): tensor of estimated arrangement
Returns:
Umap (ndarray): tensor of estimated arrangements
"""
left = (self.indx_hem == -1).squeeze()
right = (self.indx_hem == 1).squeeze()
midline = (self.indx_hem == 0).squeeze()
if Uhat.ndim == 3:
Umap = pt.zeros((Uhat.shape[0], self.K_full, self.P))
# left hemisphere
Umap[:, :self.K, left] = Uhat[:, :, left]
# right hemisphere
Umap[:, self.K:, right] = Uhat[:, :, right]
# Map midline to both
Umap[:, :self.K, midline] = Uhat[:, :,midline] / 2
Umap[:, self.K:, midline] = Uhat[:, :,midline] / 2
elif Uhat.ndim == 2:
Umap = pt.zeros((self.K_full, self.P))
Umap[:self.K, left] = Uhat[:, left]
Umap[self.K:, right] = Uhat[:, right]
# Map midline to both
Umap[:self.K, midline] = Uhat[:, midline] / 2
Umap[self.K:, midline] = Uhat[:, midline] / 2
return Umap
[docs]
def map_to_arrange(self, emloglik):
""" Maps emission log likelihoods to the internal size of the
representation
Args:
emloglik (list): List of emission logliklihoods
Returns:
emloglik_comb (ndarray): ndarray of emission logliklihoods
"""
emloglik_comb = pt.zeros((emloglik.shape[0], self.K, self.P_full))
# left hemisphere
emloglik_comb[:, :, self.indx_hem[0, :] == -1] \
= emloglik[:, :self.K, self.indx_hem[0, :] == -1]
# right hemisphere
emloglik_comb[:, :, self.indx_hem[0, :] == 1] \
= emloglik[:, self.K:, self.indx_hem[0, :] == 1]
# midline from left
emloglik_comb[:, :, self.indx_hem[0, :] == 0] \
= emloglik[:, :self.K, self.indx_hem[0, :] == 0]
# midline from right
emloglik_comb[:, :, self.indx_hem[0, :] == 0] \
+= emloglik[:, self.K:, self.indx_hem[0, :] == 0]
return emloglik_comb
class PottsModel(ArrangementModel):
""" Potts models (Markov random field on multinomial variable) with K
possible states. Potential function is determined by linkages
parameterization is joint between all linkages, although it could
be split into different parameter functions
"""
def __init__(self, W, K=3, remove_redundancy=True):
self.W = W
self.K = K # Number of states
self.P = W.shape[0]
self.theta_w = 1 # Weight of the neighborhood relation - inverse temperature param
self.rem_red = remove_redundancy
pi = pt.ones((K, self.P)) / K
self.logpi = log(pi)
if remove_redundancy:
self.logpi = self.logpi - self.logpi[-1, :]
# Inference parameters for persistence CD alogrithm via sampling
self.epos_U = None
self.eneg_U = None
self.fit_theta_w = True # Update smoothing parameter in Mstep
self.update_order = None
self.nparams = 10
self.set_param_list(['logpi', 'theta_w'])
def random_smooth_pi(self, Dist, theta_mu=1,centroids=None):
""" Defines pi (prior over parcels) using a Ising model with K centroids
Needs the Distance matrix to define the prior probability
"""
if centroids is None:
centroids = np.random.choice(self.P, (self.K,))
d2 = Dist[centroids, :]**2
pi = exp(-d2 / (2 * theta_mu))
pi = pi / pi.sum(dim=0)
self.logpi = log(pi)
if self.rem_red:
self.logpi = self.logpi - self.logpi[-1, :]
def potential(self,y):
""" Returns the potential functions for the log-linear form of the model
Args:
y (ndarray): 2d array (NxP) of network states
Returns:
phi (ndarray): 2d array (NxP) of potential functions
"""
if y.ndim == 1:
y = y.reshape((-1, 1))
# Potential on states
N = y.shape[0] # Number of observations
phi = pt.zeros((self.numparam, N))
for i in range(N):
S = pt.eq(y[i, :], y[i, :].reshape((-1, 1)))
phi[0, i] = pt.sum(S * self.W)
return (phi)
def loglike(self,U):
""" Returns the energy term of the network up to a constant
the loglikelihood of the state
Args:
U (ndarray): 2d array (NxP) of network states
Returns:
ll (ndarray)): 1d array (N,) of likelihoods
"""
N, P = U.shape
la = pt.empty((N,))
lp = pt.empty((N,))
for n in range(N):
phi = np.equal(U[n, :], U[n, :].reshape((-1, 1)))
la[n] = pt.sum(self.theta_w * self.W * phi)
lp[n] = pt.sum(self.logpi(U[n,:],range(self.P)))
return(la + lp)
def cond_prob(self, U, node, bias):
"""Returns the conditional probabity vector for node x, given U
Args:
U (ndarray): Current state of the network
node (int): Number of node to get the conditional prov for
bias (pt.tensor): (1,P) Log-Bias term for the node
Returns:
p (pt.tensor): (K,) vector of conditional probabilities
for the node
"""
x = np.arange(self.K)
# Find all the neighbors for node x (precompute!)
ind = np.where(self.W[node,:] > 0)
nb_x = U[ind] # Neighbors to node x
same = np.equal(x,nb_x.reshape(-1, 1))
loglik = self.theta_w * pt.sum(same, dim=0) + bias
return(pt.softmax(loglik, dim=0))
def calculate_neighbours(self):
"""Calculate Neighbourhood
"""
self.neighbours = np.empty((self.P,), dtype=object)
for p in range(self.P):
# Find all the neighbors for node x (precompute!)
self.neighbours[p] = pt.where(self.W[p, :] != 0)[0]
def sample_gibbs(self, U0=None, num_chains=None, bias=None,
iter=5, return_hist=False, track=None):
""" Samples a number of gibbs-chains simulatenously
using the same bias term
Args:
U0 (nd-array): Initial starting point (num_chains x P).
Default None - and will be initialized by the bias term alone
num_chains (int): If U0 not provided, number of chains to initialize
bias (nd-array): Bias term (in log-probability (K,P)).
Defaults to None. Assumed to be the same for all the chains
iter (int): Number of iterations. Defaults to 5.
return_hist (bool): Return the history as a second return argument?
Returns:
U (nd-array): A (num_chains,P) array of integers
Uhist (nd-array): Full sampling path - (iter,num_chains,P)
array of integers (optional, only if return_all = True)
Notes:
This probably can be made more efficient by doing some of the
sampling un bulk?
"""
# Check for initialization of chains
if U0 is None:
prob = pt.softmax(bias, dim=0)
U0 = sample_multinomial(prob, shape=(num_chains, self.K, self.P),
compress=True)
else:
num_chains = U0.shape[0]
if return_hist:
if track is None:
# Initilize array of full history of sample
Uhist = pt.zeros((iter, num_chains, self.P), dtype=np.int16)
else:
Uhist = pt.zeros((iter, self.K))
if not hasattr(self, 'neighbours'):
self.calculate_neighbours()
# Start the chains
U = U0
u = pt.arange(self.K).reshape(self.K, 1)
for i in range(iter):
if return_hist:
if track is None:
Uhist[i, :, :] = U
else:
for k in range(self.K):
Uhist[i, k] = pt.mean(U[:, track] == k)
# Now loop over noes for all chains at the same tim
for p in np.arange(self.P-1,-1,-1):
nb_u = U[:,self.neighbours[p]] # Neighbors to node x
nb_u = nb_u.reshape(num_chains,1,-1)
same = pt.eq(nb_u,u)
loglik = self.theta_w * pt.sum(same,dim=2) + bias[:,p].reshape(1,self.K)
prob = pt.softmax(loglik,dim=1)
U[:,p]=sample_multinomial(prob,kdim=1,compress=True)
return (U,Uhist) if return_hist else U
def sample(self, num_subj=10, burnin=20):
""" Samples new subjects from prior: wrapper for sample_gibbs
Args:
num_subj (int): Number of subjects. Defaults to 10.
burnin (int): Number of . Defaults to 20.
Returns:
U (pt.tensor): Labels for all subjects (numsubj x P) array
"""
U = self.sample_gibbs(bias = self.logpi, iter = burnin,
num_chains = num_subj)
return U
def Estep(self, emloglik):
""" Positive phase of getting p(U|Y) for the spatial arrangement model
Gets the expectations.
Args:
emloglik (pt.tensor):
emission log likelihood log p(Y|u,theta_E),
a numsubj x K x P matrix
Returns:
Uhat (pt.tensor):
posterior p(U|Y) a numsubj x K x P matrix
ll_A (pt.tensor):
Unnormalized log-likelihood of the arrangement model for
each subject. Note that this does not contain the partition function
"""
numsubj, K, P = emloglik.shape
bias = emloglik + self.logpi
self.epos_U = pt.empty((numsubj, self.epos_numchains, P))
for s in range(numsubj):
self.epos_U[s,:,:] = self.sample_gibbs(num_chains=self.epos_numchains,
bias=bias[s], iter=self.epos_iter)
# Get Uhat from the sampled examples
self.epos_Uhat = pt.empty((numsubj, self.K, self.P))
for k in range(self.K):
self.epos_Uhat[:, k, :] = pt.sum(
self.epos_U == k, dim=1) / self.epos_numchains
# Get the sufficient statistics for the potential functions
self.epos_phihat = pt.zeros((numsubj,))
for s in range(numsubj):
phi = pt.zeros((self.P, self.P))
for n in range(self.epos_numchains):
phi = phi + \
np.equal(self.epos_U[s, n, :],
self.epos_U[s, n, :].reshape((-1, 1)))
self.epos_phihat[s] = pt.sum(self.W * phi) / self.epos_numchains
# The log likelihood for arrangement model p(U|theta_A) is not trackable-
# So we can only return the unormalized potential functions
if P > 2:
ll_Ae = self.theta_w * self.epos_phihat
ll_Ap = pt.sum(self.epos_Uhat * self.logpi, dim=(1, 2))
if self.rem_red:
Z = exp(self.logpi).sum(dim=0) # Local partition function
ll_Ap = ll_Ap - pt.sum(log(Z))
ll_A = ll_Ae + ll_Ap
else:
# Calculate Z in the case of P=2
pp=exp(self.logpi[:,0] + self.logpi[:,1].reshape((-1, 1))
+ np.eye(self.K) * self.theta_w)
Z = pt.sum(pp) # full partition function
ll_A = self.theta_w * self.epos_phihat \
+ pt.sum(self.epos_Uhat*self.logpi,dim=(1,2)) - log(Z)
return self.epos_Uhat, ll_A
def eneg_sample(self, num_chains=None, iter=5):
""" Negative phase of the learning: uses persistent contrastive divergence
with sampling from the spatial arrangement model (not clampled to data)
Uses persistence across negative smapling steps
Args:
num_chains (int, optional): Number of chains to use. Defaults to None.
iter (int, optional): Number of iterations. Defaults to 5.
Returns:
eneg_Uhat (pt.tensor): Labels for all subjects (numsubj x P) array
"""
if self.eneg_U is None:
self.eneg_U = self.sample_gibbs(num_chains=num_chains,
bias=self.logpi, iter=iter)
# For tracking history: ,return_hist=True,track=0
else:
if (num_chains != self.eneg_U.shape[0]):
raise NameError('num_chains needs to stay constant')
self.eneg_U = self.sample_gibbs(self.eneg_U,
bias=self.logpi, iter=iter)
# Get Uhat from the sampled examples
self.eneg_Uhat = pt.empty((self.K, self.P))
for k in range(self.K):
self.eneg_Uhat[k, :] = pt.sum(self.eneg_U == k, dim=0) / num_chains
# Get the sufficient statistics for the potential functions
phi = pt.zeros((self.P, self.P))
for n in range(num_chains):
phi = phi + \
np.equal(self.eneg_U[n, :], self.eneg_U[n, :].reshape((-1, 1)))
self.eneg_phihat = pt.sum(self.W * phi) / num_chains
return self.eneg_Uhat
def Mstep(self, stepsize=0.1):
""" Gradient update for SML or CD algorithm
Args:
stepsize (float): Stepsize for the update of the parameters
"""
# Update logpi
if self.rem_red:
# The - pi can be dropped here as we follow the
# difference between pos and neg anyway
gradpos_logpi = self.epos_Uhat[:,:-1,:].mean(dim=0)
gradneg_logpi = self.eneg_Uhat[:-1,:]
self.logpi[:-1,:] = self.logpi[:-1,:] \
+ stepsize * (gradpos_logpi - gradneg_logpi)
else:
gradpos_logpi = self.epos_Uhat.mean(dim=0)
gradneg_logpi = self.eneg_Uhat
self.logpi = self.logpi + stepsize * (gradpos_logpi - gradneg_logpi)
if self.fit_theta_w:
grad_theta_w = self.epos_phihat.mean() - self.eneg_phihat
self.theta_w = self.theta_w + stepsize* grad_theta_w
def marginal_prob(self):
""" Returns marginal probabilty for every node under the model
Returns:
pi (pt.tensor): marginal probability under the model
"""
# TODO: need find a smarter way to computer marginals
# U = self.sample(num_subj=1, burnin=20)
# N, P = U.shape
# la = pt.empty((self.K, P))
# lp = pt.empty((N,))
# for n in range(N):
# phi = pt.eq(U[n, :], U[n, :].reshape((-1, 1)))
# local = pt.sum(self.theta_w * self.W * phi, dim=0, keepdim=True)
# local += self.logpi
# return (la + lp)
return pt.softmax(self.logpi, dim=0)
class mpRBM(ArrangementModel):
""" multinomial (categorial) restricted Boltzman machine
for learning of brain parcellations for probabilistic input
Uses Contrastive-Divergence k for learning
Notes:
Outer nodes (U):
The outer (most peripheral nodes) are
categorical with K possible categories.
There are three different representations:
a) N x nv: integers between 0 and K-1 (u)
b) N x K x nv : indicator variables or probabilities (U)
c) N x (K * nv): Vectorized version of b- with all nodes
of category 1 first, etc,
If not otherwise noted, we will use presentation b).
Hidden nodes (h):
In this version we will use binary hidden nodes -
so to get the same capacity as a mmRBM, one would need to
set the number of hidden nodes to nh
"""
def __init__(self, K, P, nh):
""" Constructor for the mpRBM class
Args:
K (int): Number of parcels
P (int): Number of brain voxels
nh (int): Number of hidden nodes
"""
super().__init__(K, P)
self.K = K
self.P = P
self.nh = nh
self.W = pt.randn(nh, P * K)
self.bh = pt.randn(nh)
self.bu = pt.randn(K, P)
self.eneg_U = None
self.Etype = 'prob'
self.alpha = 0.01
self.epos_iter = 5
self.set_param_list(['W', 'bh', 'bu'])
self.tmp_list = ['epos_U', 'epos_Eh', 'eneg_U', 'eneg_Eh']
def sample_h(self, U):
""" Sample hidden nodes given an activation state
of the outer nodes
Args:
U (pt.tensor): Indicator or probability tensor
of outer layer, shape (N, K, P)
Returns:
p_h (pt.tensor): probability of the hidden nodes,
shape (N, nh)
sample_h (pt.tensor): 0/1 values of discretely
sampled hidde nodes, shape (N, nh)
"""
wv = pt.mm(U.reshape(U.shape[0], -1), self.W.t())
activation = wv + self.bh
p_h = pt.sigmoid(activation)
sample_h = pt.bernoulli(p_h)
return p_h, sample_h
def sample_U(self, h):
""" Returns a sampled U as a unpacked indicator variable
Args:
h (pt.tensor): Hidden states
Returns:
p_u (pt.tensor): Probability of each node [N,K,nv] array
sample_U (pt.tensor): One-hot encoding of random sample [N,K,nv] array
"""
N = h.shape[0]
wh = pt.mm(h, self.W).reshape(N,self.K,self.P)
p_u = pt.softmax(wh + self.bu,1)
sample = sample_multinomial(p_u,kdim=1)
return p_u, sample
def sample(self, num_subj, iter=10):
"""Draw new subjects from the model
Args:
num_subj (int): Number of subjects
iter (int): Number of iterations until burn in
Returns:
u (pt.tensor): Sampled subjects
"""
p = pt.ones(self.K)
u = pt.multinomial(p, num_subj * self.P, replacement=True)
u = u.reshape(num_subj, self.P)
U = expand_mn(u, self.K)
for i in range(iter):
_, h = self.sample_h(U)
_, U = self.sample_U(h)
u = compress_mn(U)
return u
def Estep(self, emloglik, gather_ss=True, iter=None):
""" Positive Estep for the multinomial boltzman model
Uses mean field approximation to posterior to U and hidden parameters.
Args:
emloglik (pt.tensor):
emission log likelihood log p(Y|u,theta_E) a numsubj x K x P matrix
gather_ss (bool):
Gather Sufficient statistics for M-step (default = True)
Returns:
Uhat (pt.tensor):
posterior p(U|Y) a numsubj x K x P matrix
ll_A (pt.tensor):
Nan - returned for consistency
"""
if type(emloglik) is np.ndarray:
emloglik = pt.tensor(emloglik, dtype=pt.get_default_dtype())
if iter is None:
iter = self.epos_iter
N = emloglik.shape[0]
Uhat = pt.softmax(emloglik + self.bu, dim=1) # Start with hidden = 0
for i in range(iter):
wv = pt.mm(Uhat.reshape(N, -1), self.W.t())
Eh = pt.sigmoid(wv + self.bh)
wh = pt.mm(Eh, self.W).reshape(N, self.K, self.P)
Uhat = pt.softmax(wh + self.bu + emloglik, 1)
if gather_ss:
if self.Etype=='vis':
# This is incorrect, but a understandable and information error
self.epos_U = pt.softmax(emloglik,dim=1)
elif self.Etype=='prob':
# This is correct and isthe olny version that should be used.
self.epos_U = Uhat
self.epos_Eh = Eh
return Uhat, pt.nan
def marginal_prob(self):
""" Returns marginal probabilty for every node under the model
Returns:
pi (pt.tensor): marginal probability under the model
"""
# If not given, then initialize:
if self.eneg_U is None:
U, Eh = self.Eneg()
pi = pt.mean(self.eneg_U, dim=0)
return pi
def Mstep(self):
""" Performs gradient step on the parameters given
the learning rate self.alpha
"""
N = self.epos_Eh.shape[0]
M = self.eneg_Eh.shape[0]
epos_U=self.epos_U.reshape(N,-1)
eneg_U=self.eneg_U.reshape(M,-1)
self.W += self.alpha * (pt.mm(self.epos_Eh.t(),epos_U)
- N / M * pt.mm(self.eneg_Eh.t(),eneg_U))
self.bu += self.alpha * (pt.sum(self.epos_U,0)
- N / M * pt.sum(self.eneg_U,0))
self.bh += self.alpha * (pt.sum(self.epos_Eh,0)
- N / M * pt.sum(self.eneg_Eh, 0))
class mpRBM_pCD(mpRBM):
""" Multinomial (categorial) restricted Boltzman machine
for learning of brain parcellations for probabilistic input
Uses persistent Contrastive-Divergence k for learning
"""
def __init__(self, K, P, nh, eneg_iter=3, eneg_numchains=77):
""" Constructor for the mpRBM_pCD class
Args:
K (int): Number of parcels
P (int): Number of brain voxels
nh (int): Number of hidden units
eneg_iter (int): Number of iterations for negative phase
eneg_numchains (int): Number of chains for energy minimization
"""
super().__init__(K, P,nh)
self.eneg_iter = eneg_iter
self.eneg_numchains = eneg_numchains
def Eneg(self, U=None):
""" Negative phase of the persistent contrastive divergence algorithm
Args:
U (pt.tensor): Initial values for the chains (default = None)
Returns:
EU (pt.tensor): Sampled values for the chains
Eh (pt.tensor): Sampled values for the hidden units
"""
if (self.eneg_U is None):
U = pt.empty(self.eneg_numchains, self.K, self.P).uniform_(0, 1)
else:
U = self.eneg_U
for i in range(self.eneg_iter):
Eh, h = self.sample_h(U)
EU, U = self.sample_U(h)
self.eneg_Eh = Eh
self.eneg_U = EU
return self.eneg_U, self.eneg_Eh
class mpRBM_CDk(mpRBM):
""" Multinomial (categorial) restricted Boltzman machine
for learning of brain parcellations for probabilistic input
Uses persistent Contrastive-Divergence k for learning
"""
def __init__(self, K, P, nh, eneg_iter=1):
""" Constructor for the mpRBM_CDk class
Args:
K (int): Number of parcels
P (int): Number of brain voxels
nh (int): Number of hidden units
eneg_iter (int): Number of iterations for negative phase
"""
super().__init__(K, P,nh)
self.eneg_iter = eneg_iter
def Eneg(self,U):
""" Negative phase of the persistent contrastive divergence algorithm
Args:
U (pt.tensor): Initial values for the chains (default = None)
Returns:
EU (pt.tensor): Sampled values for the chains
Eh (pt.tensor): Sampled values for the hidden units
"""
for i in range(self.eneg_iter):
Eh, h = self.sample_h(U)
EU, U = self.sample_U(h)
self.eneg_Eh = Eh
self.eneg_U = EU
return self.eneg_U, self.eneg_Eh
class cmpRBM(mpRBM):
""" Convolutional multinomial (categorial) restricted Boltzman machine
for learning of brain parcellations for probabilistic input
Uses variational stochastic maximum likelihood for learning
"""
def __init__(self, K, P, nh=None, Wc=None, theta=None,
eneg_iter=10, epos_iter=10, eneg_numchains=77,
momentum=False, wd=0):
""" Constructor for the cmpRBM class
Args:
K (int): number of classes
P (int): number of brain locations
nh (int): number of hidden multinomial nodes
Wc (tensor): 2d/3d-tensor for connecticity weights
theta (tensor): 1d vector of parameters
eneg_iter (int): HOw many iterations for each negative step. Defaults to 3.
eneg_numchains (int): How many chains. Defaults to 77.
"""
self.K = K
self.P = P
self.Wc = Wc
self.bu = pt.randn(K,P)
self.momentum = momentum
self.wd = wd
if Wc is None:
if nh is None:
raise NameError('Provide Connectivty kernel (Wc)'
' matrix or number of hidden nodes (nh)')
self.nh = nh
self.W = pt.randn(nh,P) * 0.1
self.theta = None
self.set_param_list(['bu', 'W'])
else:
if Wc.ndim == 2:
self.Wc = Wc.view(Wc.shape[0], Wc.shape[1], 1)
self.nh = Wc.shape[0]
if theta is None:
# self.theta = pt.abs(pt.randn((self.Wc.shape[2],)))
self.theta = pt.distributions.uniform.Uniform(0, 3).sample((self.Wc.shape[2],))
else:
self.theta = pt.tensor(theta, dtype=pt.get_default_dtype())
if self.theta.ndim ==0:
self.theta = self.theta.view(1)
self.W = (self.Wc * self.theta).sum(dim=2)
self.set_param_list(['bu', 'theta'])
self.gibbs_U = None # samples from the hidden layer for negative phase
self.alpha = 0.01
if self.momentum:
self.MOMENTUM_COEF = 0.6
self.velocity_W = 0
self.velocity_bu = 0
self.epos_iter = epos_iter
self.eneg_iter = eneg_iter
self.eneg_numchains = eneg_numchains
self.use_tempered_transition = False
self.fit_bu = True
self.fit_W = True
self.tmp_list = ['epos_Uhat', 'epos_Hhat', 'eneg_U', 'eneg_H']
def sample_h(self, U):
""" Sample hidden nodes given an activation state of the outer nodes
Args:
U (NxKxP tensor): Indicator or probability tensor of outer layer
Returns:
p_h: (N x nh tensor): probability of the hidden nodes
sample_h (N x nh tensor): 0/1 values of discretely sampled hidde nodes
"""
wv = pt.matmul(U, self.W.t())
# activation = wv + self.b
# p_h = pt.sigmoid(activation)
# sample_h = pt.bernoulli(p_h)
p_h = pt.softmax(wv, 1)
sample_h = sample_multinomial(p_h, kdim=1)
return p_h, sample_h
def sample_U(self, h, emloglik=None):
""" Returns a sampled U as a unpacked indicator variable
Args:
h (tensor): Hidden states (NxKxnh)
Returns:
p_u (tensor): Probability of each node [N,K,P] array
sample_U (tensor): One-hot encoding of random sample [N,K,P] array
"""
N = h.shape[0]
act = pt.matmul(h, self.W) + self.bu
if emloglik is not None:
act += emloglik
p_u = pt.softmax(act ,1)
sample = sample_multinomial(p_u, kdim=1)
return p_u, sample
def marginal_prob(self):
""" Returns marginal probabilty for every node under the model
Returns:
pi (pt.tensor): marginal probability under the model
"""
if self.gibbs_U is None:
return pt.softmax(self.bu, 0)
else:
pi = pt.mean(self.gibbs_U, dim=0)
return pi
def unnormalized_prob(self, U, H):
""" Calculate the unnormalized probability of the model given U and H
Args:
U (pt.Tensor): The indicator tensor of the outer layer
H (pt.Tensor): The indicator tensor of the hidden layer
Returns:
unnormalized_prob (pt.Tensor): The unnormalized probability
of the model
"""
bias = (U * self.bu).sum((1, 2))
connection = (H @ self.W * U).sum((1, 2))
return pt.exp(bias + connection).mean()
def tempered_transition(self, U, betas, emission_model):
""" Sample from the model using tempered transitions.
Args:
U (pt.Tensor): The initial value of U
betas (list[float]): The temperature coefficient for
each intermediate distribution, where betas[-1] = 1
emission_model (EmissionModel): The emission model to
use for sampling
Returns:
U (pt.Tensor): The sampled value of U
H (pt.Tensor): The sampled value of H
References:
https://www.cs.cmu.edu/~rsalakhu/papers/trans.pdf
"""
# save the current parameters
true_W = self.W.detach().clone()
true_bu = self.bu.detach().clone()
accept = 0.0
_, H = self.sample_U(U)
U_p = U
while (accept < np.random.uniform(0.0, 1.0)):
accept = 1.0
for beta in reversed(betas):
accept /= self.unnormalized_prob(U, H)
self.W = true_W * beta
self.bu = true_bu * beta
accept *= self.unnormalized_prob(U, H)
Y = emission_model.sample(compress_mn(U_p))
emloglik = emission_model.Estep(Y)
_, H = self.sample_h(U_p)
U_p, U = self.sample_U(H, emloglik)
# inverse transition T tilde (Gibbs sample in reverse)
for beta in betas[1:]:
Y = emission_model.sample(compress_mn(U_p))
emloglik = emission_model.Estep(Y)
U_p, U = self.sample_U(H, emloglik)
_, H = self.sample_h(U_p)
accept /= self.unnormalized_prob(U, H)
self.W = true_W * beta
self.bu = true_bu * beta
accept *= self.unnormalized_prob(U, H)
accept = min(1, accept)
# reload the true parameters
self.W = true_W
self.bu = true_bu
return U, H
def Estep(self, emloglik,gather_ss=True,iter=None):
""" Positive Estep for the multinomial boltzman model
Uses mean field approximation to posterior to U and hidden parameters.
Args:
emloglik (pt.tensor):
emission log likelihood log p(Y|u,theta_E) a numsubj x K x P matrix
gather_ss (bool):
Gather Sufficient statistics for M-step (default = True)
Returns:
Uhat (pt.tensor):
posterior p(U|Y) a numsubj x K x P matrix
ll_A (pt.tensor):
Nan - returned for consistency
"""
if type(emloglik) is np.ndarray:
emloglik = pt.tensor(emloglik, dtype=pt.get_default_dtype())
if iter is None:
iter = self.epos_iter
N = emloglik.shape[0]
Uhat = pt.softmax(emloglik + self.bu, dim=1) # Start with hidden = 0
for i in range(iter):
wv = pt.matmul(Uhat,self.W.t())
Hhat = pt.softmax(wv,1)
# Hsamples = sample_multinomial_pt(Hhat, kdim=1)
# wh = pt.matmul(Hsamples, self.W)
wh = pt.matmul(Hhat, self.W)
Uhat = pt.softmax(wh + self.bu + emloglik, 1)
if gather_ss:
self.epos_Uhat = Uhat
self.epos_Hhat = Hhat
# The unnormalized log likelihood
ll_A = pt.sum(Uhat * self.bu) \
+ pt.sum(self.W * pt.matmul(pt.transpose(Uhat, 1, 2), Hhat))
if pt.isnan(ll_A):
raise ValueError('likelihood is nan')
return Uhat, ll_A
def Eneg(self, iter=None, use_chains=None, emission_model=None):
""" Negative phase of the E-step for the multinomial boltzman model
Args:
iter (int): Number of iterations to run the negative phase
use_chains (list): Which chains to use for the negative phase
emission_model (object): Emission model to use for the negative phase
Returns:
eneg_U (pt.tensor): Sampled U from the negative phase
eneg_H (pt.tensor): Sampled H from the negative phase
"""
# If no iterations specified - use standard
if iter is None:
iter = self.eneg_iter
# If no markov chain are initialized, start them off
if self.gibbs_U is None:
p = pt.softmax(self.bu,0)
# Old sample
self.gibbs_U = sample_multinomial(p, shape=(self.eneg_numchains,self.K,self.P),
kdim=0, compress=False)
# sample using pytorch
# self.gibbs_U = sample_multinomial_pt(p, num_subj=self.eneg_numchains, kdim=0)
# Grab the current chains
if use_chains is None:
use_chains = pt.arange(self.eneg_numchains)
U = self.gibbs_U[use_chains]
# sampling using tempered transitions
if self.use_tempered_transition:
U0 = U.detach().clone()
U, H = self.tempered_transition(U0, np.linspace(0.9, 1.0, iter),
emission_model)
else:
# standard gibbs sampling
for i in range(iter):
Y = emission_model.sample(compress_mn(U))
emloglik = emission_model.Estep(Y)
ph, H = self.sample_h(U)
pu, U = self.sample_U(H, emloglik)
# TODO: For the last update of the hidden units, it is common
# to use the probability instead of sampling a multinomial value
# to avoid unnecessary sampling noise.
# Refererce: 'A Practical Guide to Training Restricted Boltzmann Machines'
# https://www.cs.toronto.edu/~hinton/absps/guideTR.pdf
self.eneg_H = ph
self.eneg_U = pu
# Persistent: Keep the new gibbs samples around
self.gibbs_U[use_chains]=U
return self.eneg_U,self.eneg_H
def Mstep(self):
""" Performs gradient step on the parameters
given the learning rate self.alpha
"""
N = self.epos_Hhat.shape[0]
M = self.eneg_H.shape[0]
# Update the connectivity
if self.fit_W:
gradW = pt.matmul(pt.transpose(self.epos_Hhat,1,2),
self.epos_Uhat).sum(dim=0)/N
gradW -= pt.matmul(pt.transpose(self.eneg_H,1,2),
self.eneg_U).sum(dim=0)/M
# If we are dealing with component Wc:
if self.Wc is not None:
if self.momentum:
self.velocity_W = self.MOMENTUM_COEF * self.velocity_W \
+ self.alpha * gradW
gradW = self.velocity_W
gradW = gradW.view(gradW.shape[0],gradW.shape[1],1)
weights = pt.sum(self.Wc,dim=(0,1))
gradT = pt.sum(gradW*self.Wc,dim=(0,1))/weights
if self.momentum:
self.theta += gradT - self.alpha * self.wd * self.theta
else:
self.theta += self.alpha * gradT - self.alpha * self.wd * self.theta
self.W = (self.Wc * self.theta).sum(dim=2)
else:
if self.momentum:
self.velocity_W = self.MOMENTUM_COEF * self.velocity_W \
+ self.alpha * gradW
self.W += self.velocity_W - self.alpha * self.wd * self.velocity_W
else:
self.W += self.alpha * gradW - self.alpha * self.wd * gradW
# Update the bias term
if self.fit_bu:
gradBU = 1 / N * pt.sum(self.epos_Uhat,0)
gradBU -= 1 / M * pt.sum(self.eneg_U,0)
if self.momentum:
self.velocity_bu = self.MOMENTUM_COEF * self.velocity_bu \
+ self.alpha * gradBU
# self.velocity_bu = self.velocity_bu - self.alpha * self.wd * self.bu
self.bu += self.velocity_bu
else:
self.bu += self.alpha * gradBU
# self.bu += self.alpha * gradBU - self.alpha * self.wd * self.bu
class wcmDBM(mpRBM):
""" wcmDBM: weighted convolutional multinomial Deep Boltzman Machine
for learning of brain parcellations for probabilistic
input Uses variational stochastic maximum likelihood for learning
"""
def __init__(self, K, P, nh=None, Wc=None, theta=None, eneg_iter=10,
epos_iter=10, eneg_numchains=77, momentum=True):
""" Constructor for the wcmDBM class
Args:
K (int): number of classes
P (int): number of brain locations
nh (int): number of hidden multinomial nodes
Wc (tensor): 2d/3d-tensor for connecticity weights
theta (tensor): 1d vector of parameters
eneg_iter (int): HOw many iterations for each negative step.
Defaults to 3.
eneg_numchains (int): How many chains. Defaults to 77.
"""
self.K = K
self.P = P
self.Wc = Wc
self.bu = pt.randn(K, P)
self.momentum = momentum
if Wc is None:
if nh is None:
raise NameError('Provide Connectivty kernel (Wc)'
' matrix or number of hidden nodes (nh)')
self.nh = nh
self.W = pt.randn(nh, P) * 0.1
self.theta = None
self.set_param_list(['bu', 'W'])
else:
assert Wc.ndim == 2, 'Currently only support Wc is a 2d tensor'
self.Wc = Wc.to_sparse_csr() if not Wc.is_sparse else Wc
self.Wc_ind = self.Wc.to_sparse_coo().indices().to('cpu')
self.nh = Wc.shape[0]
if theta is None:
theta = pt.abs(pt.randn((1,))).item()
self.theta = pt.tensor(theta, dtype=pt.get_default_dtype())
else:
self.theta = pt.tensor(theta, dtype=pt.get_default_dtype())
assert self.theta.ndim == 0, 'theta must be a scalar'
if self.Wc.layout == pt.sparse_coo:
self.W = self.Wc * self.theta
elif self.Wc.layout == pt.sparse_csr:
self.W = pt.clone(self.Wc)
self.W.values().mul_(self.theta)
self.Wc_value_coo = self.Wc.to_sparse_coo().values().to('cpu')
self.Wc = 1 # Remove Wc
self.set_param_list(['bu', 'theta'])
self.gibbs_U = None # samples from the hidden layer for negative phase
self.alpha = 0.1
if self.momentum:
self.MOMENTUM_COEF = 0.9
self.velocity_W = 0
self.velocity_bu = 0
self.epos_iter = epos_iter
self.eneg_iter = eneg_iter
self.eneg_numchains = eneg_numchains
self.fit_bu = True
self.fit_W = True
self.use_tempered_transition = False
self.pretrain = False
# Assembly dump list
# TODO: keep the gibbs chain in the model for pCD?
self.tmp_list = ['epos_Hhat', 'eneg_H']
if Wc is not None:
self.tmp_list += ['W', 'Wc_ind', 'Wc_value_coo']
if self.momentum:
self.tmp_list += ['velocity_W', 'velocity_bu']
def random_params(self):
""" Sets prior parameters to random starting values
"""
self.bu = pt.randn(self.K, self.P)
if self.Wc is not None:
# theta = pt.abs(pt.randn((1,))).item()
self.theta = pt.distributions.uniform.Uniform(0, 5).sample()
else:
self.W = pt.randn(self.nh, self.P) * 0.1
def sample_h(self, U):
"""Sample hidden nodes given an activation state of the outer nodes
Args:
U (NxKxP tensor): Indicator or probability tensor of outer layer
Returns:
p_h: (N x nh tensor): probability of the hidden nodes
sample_h (N x nh tensor): 0/1 values of discretely sampled hidde nodes
"""
wv = pt.matmul(U,self.W.t())
# activation = wv + self.b
# p_h = pt.sigmoid(activation)
# sample_h = pt.bernoulli(p_h)
p_h = pt.softmax(wv,1)
sample_h = sample_multinomial(p_h, kdim=1)
return p_h, sample_h
def sample_U(self, h, emloglik = None):
""" Returns a sampled U as a unpacked indicator variable
Args:
h tensor: Hidden states (NxKxnh)
Returns:
p_u: Probability of each node [N,K,P] array
sample_U: One-hot encoding of random sample [N,K,P] array
"""
N = h.shape[0]
act = pt.matmul(h, self.W) + self.bu
if emloglik is not None and not self.pretrain:
act += emloglik
p_u = pt.softmax(act ,1)
sample = sample_multinomial(p_u,kdim=1)
return p_u, sample
def marginal_prob(self):
""" Returns marginal probabilty for every node under the model
Returns:
pi (pt.tensor): marginal probability under the model
"""
if self.gibbs_U is None:
return pt.softmax(self.bu,0)
else:
pi = pt.mean(self.gibbs_U,dim=0)
return pi
def free_energy(self, emloglik):
""" Calculate the free energy of the current model states
TODO: finish the free energy function
Args:
emloglik (tensor): log likelihood of the emission model
Returns:
free_energy (tensor): free energy of the current model states
"""
if self.epos_Uhat is None:
Uhat = pt.softmax(emloglik + self.bu, dim=1)
else:
Uhat = self.epos_Uhat
v_sample = sample_multinomial(Uhat, kdim=1)
wv = pt.matmul(v_sample, self.W.t())
vWh = pt.matmul(self.gibbs_U, self.W.t())
vWh = vWh + self.bu
def unnormalized_prob(self, U, H):
""" Calculate the unnormalized probability of the model given U and H
Args:
U (pt.Tensor): The indicator tensor of the outer layer
H (pt.Tensor): The indicator tensor of the hidden layer
Returns:
unnormalized_prob (pt.Tensor): The unnormalized probability
of the model
"""
bias = (U * self.bu).sum((1, 2))
connection = (H @ self.W * U).sum((1, 2))
return pt.exp(bias + connection).mean()
def tempered_transition(self, U, betas, emission_model):
""" Sample from the model using tempered transitions.
Args:
U (pt.Tensor): The initial value of U
betas (list[float]): The temperature coefficient for
each intermediate distribution, where betas[-1] = 1
emission_model (EmissionModel): The emission model to
use for sampling
Returns:
U (pt.Tensor): The sampled value of U
H (pt.Tensor): The sampled value of H
References:
https://www.cs.cmu.edu/~rsalakhu/papers/trans.pdf
"""
# save the current parameters
true_W = self.W.detach().clone()
true_bu = self.bu.detach().clone()
accept = 0.0
_, H = self.sample_U(U)
U_p = U
while (accept < np.random.uniform(0.0, 1.0)):
accept = 1.0
for beta in reversed(betas):
accept /= self.unnormalized_prob(U, H)
self.W = true_W * beta
self.bu = true_bu * beta
accept *= self.unnormalized_prob(U, H)
Y = emission_model.sample(compress_mn(U_p))
emloglik = emission_model.Estep(Y)
_, H = self.sample_h(U_p)
U_p, U = self.sample_U(H, emloglik)
# inverse transition T tilde (Gibbs sample in reverse)
for beta in betas[1:]:
Y = emission_model.sample(compress_mn(U_p))
emloglik = emission_model.Estep(Y)
U_p, U = self.sample_U(H, emloglik)
_, H = self.sample_h(U_p)
accept /= self.unnormalized_prob(U, H)
self.W = true_W * beta
self.bu = true_bu * beta
accept *= self.unnormalized_prob(U, H)
accept = min(1, accept)
# reload the true parameters
self.W = true_W
self.bu = true_bu
return U, H
def Estep(self, emloglik, gather_ss=True, iter=None):
""" Positive Estep for the multinomial boltzman model using
mean field approximation to posterior to U and hidden parameters.
Args:
emloglik (pt.tensor):
emission log likelihood log p(Y|u,theta_E) a numsubj x K x P matrix
gather_ss (bool):
Gather Sufficient statistics for M-step (default = True)
iter (int): the number of iterations to run the mean field
Returns:
Uhat (pt.tensor):
posterior p(U|Y) a numsubj x K x P matrix
ll_A (pt.tensor):
Nan - returned for consistency
"""
if type(emloglik) is np.ndarray:
emloglik=pt.tensor(emloglik,dtype=pt.get_default_dtype())
if iter is None:
iter = self.epos_iter
N=emloglik.shape[0]
Uhat = pt.softmax(emloglik + self.bu,dim=1) # Start with hidden = 0
if self.pretrain:
# single pass for just the top RBM
wv = pt.matmul(Uhat, self.W.t())
Hhat = pt.softmax(wv, 1)
else:
# mean-field approximation for the entire network
try:
# First try if cuda can handle this large tensor
for i in range(iter):
wv = pt.matmul(Uhat, self.W.t())
# wv = pt.stack([pt.sparse.mm(self.W, Uhat[j].transpose(0, 1))
# for j in range(N)]).transpose(1,2)
Hhat = pt.softmax(wv, 1)
wh = pt.matmul(Hhat, self.W)
Uhat = pt.softmax(wh + self.bu + emloglik, 1)
except BaseException as e:
# If the given tensor too large, we convert to cpu computation
print(e)
print('We have to Fall back to CPU computation.')
# First convert these large tensor to cpu
Uhat = Uhat.to('cpu')
self.W = self.W.to('cpu')
# mean-field approximation for the entire network
for i in range(iter):
wv = pt.matmul(Uhat, self.W.t())
Hhat = pt.softmax(wv, 1)
wh = pt.matmul(Hhat, self.W)
Uhat = pt.softmax(wh + self.bu.to('cpu')
+ emloglik.to('cpu'), 1)
# Move back to cuda after cpu compuation
Uhat = Uhat.to('cuda')
self.W = self.W.to('cuda')
else:
pass
if gather_ss:
self.epos_Uhat = Uhat
self.epos_Hhat = Hhat
return Uhat, pt.nan
def Eneg(self, iter=None, use_chains=None, emission_model=None):
""" Negative Estep for the multinomial boltzman model using
Args:
iter (int): the number of iterations to run the mean field
use_chains (list[int]): the chains to use for the negative step
emission_model (EmissionModel): the emission model to use for
Returns:
eneg_U (pt.tensor): the sampled U values
eneg_H (pt.tensor): the sampled H values
"""
# If no iterations specified - use standard
if iter is None:
iter = self.eneg_iter
# If no markov chain are initialized, start them off
if self.gibbs_U is None:
p = pt.softmax(self.bu,0)
self.gibbs_U = sample_multinomial(p,
shape=(self.eneg_numchains,
self.K, self.P),
kdim=0,
compress=False)
# Grab the current chains
if use_chains is None:
use_chains = pt.arange(self.eneg_numchains)
U = self.gibbs_U[use_chains]
if self.pretrain:
# gibbs sample for only the top RBM
for i in range(iter):
_, H = self.sample_h(U)
_, U = self.sample_U(H)
else:
# sampling using tempered transitions
if self.use_tempered_transition:
U0 = U.detach().clone()
U, H = self.tempered_transition(U0, np.linspace(0.9, 1.0, iter),
emission_model)
else:
# standard gibbs sampling
for i in range(iter):
Y = emission_model.sample(compress_mn(U))
emloglik = emission_model.Estep(Y)
ph, H = self.sample_h(U)
pu, U = self.sample_U(H, emloglik)
self.eneg_H = ph
self.eneg_U = pu
# Persistent: Keep the new gibbs samples around
self.gibbs_U[use_chains] = U
return self.eneg_U, self.eneg_H
def Mstep(self):
""" Performs gradient step on the parameters
given the learning rate self.alpha
"""
N = self.epos_Hhat.shape[0]
M = self.eneg_H.shape[0]
# Update the connectivity
if self.fit_W:
try:
gradW = pt.matmul(pt.transpose(self.epos_Hhat,1,2),
self.epos_Uhat).sum(dim=0)/N
gradW -= pt.matmul(pt.transpose(self.eneg_H,1,2),
self.eneg_U).sum(dim=0)/M
except BaseException as e:
print('No enough cuda memory for calculating the gradW,'
' we have to back to cpu computation.')
gradW = pt.matmul(pt.transpose(self.epos_Hhat, 1, 2).to('cpu'),
self.epos_Uhat.to('cpu')).sum(dim=0) / N
gradW -= pt.matmul(pt.transpose(self.eneg_H, 1, 2).to('cpu'),
self.eneg_U.to('cpu')).sum(dim=0) / M
# gradW = pt.zeros(self.W.shape, device='cpu')
# for i in range(N):
# gradW += pt.matmul(pt.transpose(self.epos_Hhat, 1, 2)[i].to('cpu'),
# self.epos_Uhat[i].to('cpu'))
# gradW = gradW / N
#
# for j in range(M):
# gradW -= pt.matmul(pt.transpose(self.eneg_H,1,2)[j].to('cpu'),
# self.eneg_U[j].to('cpu'))/M
else:
pass
# Convert gradW to sparse COO on cuda
# values = gradW[self.Wc_ind[0], self.Wc_ind[1]]
# gradW = pt.sparse_coo_tensor(self.Wc_ind, values,
# size=gradW.shape).coalesce()
# If we are dealing with component Wc:
if self.Wc is not None:
if self.momentum:
self.velocity_W = self.MOMENTUM_COEF * self.velocity_W \
+ self.alpha * gradW
gradW = self.velocity_W
# weights = pt.sparse.sum(self.Wc)
# gradT = pt.sparse.sum(gradW * self.Wc) / weights
gradT = gradW[self.Wc_ind[0], self.Wc_ind[1]].sum() \
/ self.Wc_value_coo.sum()
if self.momentum:
self.theta += gradT.to('cuda')
else:
self.theta += self.alpha * gradT
# Update W only to its value, without creating new sparse tensor to save memory
self.W.values().mul_(0).add_(self.Wc_value_coo.to('cuda')*self.theta)
# self.W = pt.sparse_csr_tensor(self.W.crow_indices(),
# self.W.col_indices(),
# self.Wc_value_coo.to('cuda') * self.theta,
# size=self.W.shape)
else:
if self.momentum:
self.velocity_W = self.MOMENTUM_COEF * self.velocity_W \
+ self.alpha * gradW
self.W += self.velocity_W
else:
self.W += self.alpha * gradW
# Update the bias term
if self.fit_bu:
gradBU = 1 / N * pt.sum(self.epos_Uhat,0)
gradBU -= 1 / M * pt.sum(self.eneg_U,0)
if self.momentum:
self.velocity_bu = self.MOMENTUM_COEF * self.velocity_bu \
+ self.alpha * gradBU
self.bu += self.velocity_bu
else:
self.bu += self.alpha * gradBU
def sample(self, num_subj, Wc, iter=10):
""" Samples from the model
Args:
num_subj (int): Number of subjects to sample
iter (int): Number of iterations to run the sampler
Returns:
samples (dict): Dictionary of samples
"""
Uhat = pt.randn((num_subj, self.K, self.P))
Uhat = pt.softmax(Uhat, dim=1)
W = Wc * self.theta
for i in range(iter):
wv = pt.matmul(Uhat, W.t())
Hhat = pt.softmax(wv, 1)
wh = pt.matmul(Hhat, W)
Uhat = pt.softmax(wh + self.bu, 1)
return Uhat
####################################################################
## Belows are the helper functions for spatial arrangement models ##
####################################################################
[docs]
def sample_multinomial_pt(p, num_subj=1, kdim=0, compress=False):
""" Samples from a multinomial distribution using pytorch built in multinomial distribution sampler
Args:
p (pt.tensor): Tensor of probabilities
num_subj (int): Number of subjects to sample
kdim (int): Dimension of K
compress (bool): Whether to compress the output or not
Returns:
samples (pt.tensor): Samples from the multinomial distribution
"""
K = p.shape[kdim]
if p.dim() == 2:
# p is K x P matrix
sample = pt.multinomial(p.t(), num_subj, replacement=True).t()
elif p.dim() == 3:
# p is num_subjects x K x P matrix
sample = pt.stack([pt.multinomial(this_p.t(), 1, replacement=True).reshape(-1)
for this_p in p])
else:
raise ValueError("p must be 2 or 3 dimensional")
if compress:
return sample
else:
# No compress, return indicator coding
return expand_mn(sample, K)
[docs]
def sample_multinomial(p,shape=None,kdim=0,compress=False):
""" Samples from a multinomial distribution. fast smpling from matrix probability without looping
Args:
p (tensor): Tensor of probilities, which sums to 1
on the dimension kdim
shape (tuple): Shape of the output data (in uncompressed form)
Smaller p will be broadcasted to target shape
kdim (int): Number of dimension of p that indicates the different
categories (default 0)
compress (bool): Return as int (True) or indicator (false)
Returns:
samples (tensor): Samples either in indicator coding (compress = False)
or as ints (compress = False)
"""
if shape is None:
shape = p.shape
out_kdim = len(shape) - p.dim() + kdim
K = p.shape[kdim]
shapeR = list(shape)
shapeR[out_kdim] = 1 # Set the probability dimension to 1
r = pt.empty(shapeR).uniform_(0, 1)
cdf_v = p.cumsum(dim=kdim)
sample = (r < cdf_v).to(pt.get_default_dtype())
if compress:
return sample.argmax(dim=out_kdim)
else:
for k in np.arange(K - 1, 0, -1):
a = sample.select(out_kdim, k) # Get view of slice
a -= sample.select(out_kdim, k - 1)
return sample
[docs]
def expand_mn(u,K):
""" Expands a N x P multinomial vector to an N x K x P tensor of indictor variables
Args:
u (2d-tensor): N x nv matrix of samples from [int]
K (int): Number of categories
Returns:
U (3d-tensor): N x K x nv matrix of indicator variables
[default float]
"""
N = u.shape[0]
P = u.shape[1]
U = pt.zeros(N, K, P)
U[np.arange(N).reshape((N, 1)), u, np.arange(P)] = 1
return U
[docs]
def expand_mn_1d(U, K):
""" Expands a P long multinomial vector to an K x P tensor of indictor variables
Args:
U (1d-tensor): P vector of samples from [int]
K (int): Number of categories
Returns:
U (2d-tensor): K x P matrix of indicator variables
[default float]
"""
if type(U) is np.ndarray:
U = pt.tensor(U, dtype=pt.long)
P = U.shape[0]
U_return = pt.zeros(K, P)
U_return[U, pt.arange(P)] = 1
return U_return
[docs]
def compress_mn(U):
""" Compresses a N x K x P tensor of indictor variables to a N x P multinomial tensor
Args:
U (3d-tensor): N x K x P matrix of indicator variables
Returns:
u (2d-tensor): N x P matrix of category labels [int]
"""
u = U.argmax(1)
return u
[docs]
def build_arrangement_model(U, prior_type='prob', atlas=None, sym_type='asym',
model_type='independent', Wc=None, theta=None,
epos_iter=5, num_chain=20):
""" Builds an arrangment model based on a set of probability
Args:
U (tensor or ndarray):
A K x P matrix of group probability
prior_type (str):
the type of prior, either 'prob' or 'logpi' (default: 'prob')
if 'prob', the input is the marginal probability K x P matrix,
which the columns sum to 1. If 'logpi', the input is the group
prior in log-space
atlas (object):
the atlas object for the arrangement model for symmetric models
sym_type (str):
the symmetry type of the arrangement model (default: 'asym')
model_type (str):
the arrangement model type (default: 'independent')
Returns:
ar_model (object): the arrangement model object
"""
# convert tdata to tensor
if type(U) is np.ndarray:
U = pt.tensor(U, dtype=pt.get_default_dtype())
K,P = U.shape
# Check if the marginal has nan or zero values
if prior_type == 'prob':
# if it has nan values - give flat distribution
if pt.isnan(U).any().item():
nan_voxl = pt.sum(pt.any(pt.isnan(U), dim=0)).item()
warnings.warn(f'The marginal probability has {nan_voxl} voxels '
f'NaN value - replacing with flat distribution')
U = U.nan_to_num(1/K)
# if it has zero values - add small value to avoid -inf
if pt.eq(U, 0).any().item():
epsilon = 1e-8
zero_voxl = pt.sum(pt.any(U == 0, dim=0)).item()
warnings.warn(f'The marginal probability has {zero_voxl} voxels'
f' zero values - adding small value to avoid -inf')
while pt.any(pt.isinf(U.log())).item():
U += epsilon
# Convert to log space for building arrangement model
U = pt.log(U)
elif prior_type == 'logpi':
pass
else:
raise ValueError(f'Unknown prior_type: {prior_type}, it must be '
f'either prob or logpi!')
# Build arrangement model by given model and symmetry type
if model_type == 'independent':
if sym_type == 'sym':
ar_model = ArrangeIndependentSymmetric(K,
atlas.indx_full,
atlas.indx_reduced,
same_parcels=False,
spatial_specific=True,
remove_redundancy=False)
# Warn if the input is not symmetric
if not pt.allclose(U[:ar_model.K, ar_model.indx_full[0]], U[ar_model.K:, ar_model.indx_full[1]]):
warnings.warn('The input probability is not symmetric, '
'but the model is symmetric! Check you are importing the correct probabilities')
U = U.unsqueeze(0)
U = ar_model.map_to_arrange(U)
U = U.squeeze(0)
non_vermal = ar_model.indx_full[0] != ar_model.indx_full[1]
# U.sum(dim=0)[non_vermal]
U[:,non_vermal] = U[:,non_vermal] / 2
# assert(pt.allclose(U.sum(dim=0), pt.ones(U.shape[1])))
ar_model.name = 'indp_ym'
elif sym_type == 'asym':
ar_model = ArrangeIndependent(K, P,
spatial_specific=True,
remove_redundancy=False)
ar_model.name = 'indp_asym'
# Attach the logpi to the model
ar_model.logpi = U
elif model_type == 'cRBM_Wc':
if Wc is None:
raise ValueError('Wc must be provided')
ar_model = wcmDBM(K, atlas.P, Wc=Wc, theta=theta, eneg_iter=5,
epos_iter=epos_iter, eneg_numchains=num_chain)
ar_model.bu = U
ar_model.name = 'cRBM_Wc'
ar_model.momentum = False
ar_model.fit_W = False
ar_model.fit_bu = False
else:
raise NameError(f'Unknown model_type:{model_type} - Currently only '
f'support independent arrangement model.')
return ar_model