import logging
import numpy as np
import scipy as sp
import collections
import torch
import functools
from numpy.lib.stride_tricks import as_strided
from sklearn.utils.extmath import randomized_svd
from sklearn.utils import check_random_state
logging.basicConfig()
[docs]def rectify_spectrum(cov, epsilon=1e-6, logger=None):
"""Rectify the spectrum of a covariance matrix.
Parameters
----------
cov : ndarray
Covariance matrix
epsilon : float
Minimum eigenvalue for the rectified spectrum.
verbose : bool
Whethere to print when the spectrum needs to be rectified.
"""
eigvals = sp.linalg.eigvalsh(cov)
n_neg = np.sum(eigvals <= 0.)
if n_neg > 0:
cov += (-np.min(eigvals) + epsilon) * np.eye(cov.shape[0])
if logger is not None:
string = 'Non-PSD matrix, {} of {} eigenvalues were not positive.'
logger.info(string.format(n_neg, eigvals.size))
[docs]def toeplitzify(cov, T, N, symmetrize=True):
"""Make a matrix block-Toeplitz by averaging along the block diagonal.
Parameters
----------
cov : ndarray (T*N, T*N)
Covariance matrix to make block toeplitz.
T : int
Number of blocks.
N : int
Number of features per block.
symmetrize : bool
Whether to ensure that the whole matrix is symmetric.
Optional (default=True).
Returns
-------
cov_toep : ndarray (T*N, T*N)
Toeplitzified matrix.
"""
cov_toep = np.zeros((T * N, T * N))
for delta_t in range(T):
to_avg_lower = np.zeros((T - delta_t, N, N))
to_avg_upper = np.zeros((T - delta_t, N, N))
for i in range(T - delta_t):
to_avg_lower[i] = cov[(delta_t + i) * N:(delta_t + i + 1) * N, i * N:(i + 1) * N]
to_avg_upper[i] = cov[i * N:(i + 1) * N, (delta_t + i) * N:(delta_t + i + 1) * N]
avg_lower = np.mean(to_avg_lower, axis=0)
avg_upper = np.mean(to_avg_upper, axis=0)
if symmetrize:
avg_lower = 0.5 * (avg_lower + avg_upper.T)
avg_upper = avg_lower.T
for i in range(T - delta_t):
cov_toep[(delta_t + i) * N:(delta_t + i + 1) * N, i * N:(i + 1) * N] = avg_lower
cov_toep[i * N:(i + 1) * N, (delta_t + i) * N:(delta_t + i + 1) * N] = avg_upper
return cov_toep
[docs]def calc_chunked_cov(X, T, stride, chunks, cov_est=None, rng=None, stride_tricks=True):
"""Calculate an unormalized (by sample count) lagged covariance matrix
in chunks to save memory.
Parameters
----------
X : np.ndarray, shape (# time-steps, N)
The N-dimensional time series data from which the cross-covariance
matrices are computed.
T : int
The number of time lags.
stride : int
The number of time-points to skip between samples.
chunks : int
Number of chunks to break the data into when calculating the lagged cross
covariance. More chunks will mean less memory used
cov_est : ndarray
Current estimate of unnormalized cov_est to be added to.
Return
------
cov_est : ndarray
Current covariance estimate.
n_samples
How many samples were used.
"""
if cov_est is None:
cov_est = 0.
n_samples = 0
if X.shape[0] < T * chunks:
raise ValueError('Time series is too short to chunk for cov estimation.')
ends = np.linspace(0, X.shape[0], chunks + 1, dtype=int)[1:]
start = 0
for chunk in range(chunks):
X_with_lags = form_lag_matrix(X[start:ends[chunk]], T, stride=stride,
rng=rng, stride_tricks=stride_tricks)
start = ends[chunk] - T + 1
ni_samples = X_with_lags.shape[0]
cov_est += np.dot(X_with_lags.T, X_with_lags)
n_samples += ni_samples
return cov_est, n_samples
[docs]def calc_cross_cov_mats_from_data(X, T, mean=None, chunks=None, stride=1,
rng=None, regularization=None, reg_ops=None,
stride_tricks=True, logger=None, method='toeplitzify'):
"""Compute the N-by-N cross-covariance matrix, where N is the data dimensionality,
for each time lag up to T-1.
Parameters
----------
X : np.ndarray, shape (# time-steps, N)
The N-dimensional time series data from which the cross-covariance
matrices are computed.
T : int
The number of time lags.
chunks : int
Number of chunks to break the data into when calculating the lagged cross
covariance. More chunks will mean less memory used
stride : int or float
If stride is an `int`, it defines the stride between lagged samples used
to estimate the cross covariance matrix. Setting stride > 1 can speed up the
calculation, but may lead to a loss in accuracy. Setting stride to a `float`
greater than 0 and less than 1 will random subselect samples.
rng : NumPy random state
Only used if `stride` is a float.
regularization : string
Regularization method for computing the spatiotemporal covariance matrix.
reg_ops : dict
Paramters for regularization.
stride_tricks : bool
Whether to use numpy stride tricks in form_lag_matrix. True will use less
memory for large T.
logger : logger
Logger.
method : str
'ML' for EM-based maximum likelihood block toeplitz estimation, 'toeplitzify' for naive
block averaging.
Returns
-------
cross_cov_mats : np.ndarray, shape (T, N, N), float
Cross-covariance matrices. cross_cov_mats[dt] is the cross-covariance between
X(t) and X(t+dt), where X(t) is an N-dimensional vector.
"""
if reg_ops is None:
reg_ops = dict()
if chunks is not None and regularization is not None:
raise NotImplementedError
if isinstance(X, list) or X.ndim == 3:
for Xi in X:
if len(Xi) <= T:
raise ValueError('T must be shorter than the length of the shortest ' +
'timeseries. If you are using the DCA model, 2 * DCA.T must be ' +
'shorter than the shortest timeseries.')
if mean is None:
mean = np.concatenate(X).mean(axis=0, keepdims=True)
X = [Xi - mean for Xi in X]
N = X[0].shape[-1]
if chunks is None:
cov_est = np.zeros((N * T, N * T))
n_samples = 0
for Xi in X:
X_with_lags = form_lag_matrix(Xi, T, stride=stride, stride_tricks=stride_tricks,
rng=rng)
cov_est += np.dot(X_with_lags.T, X_with_lags)
n_samples += len(X_with_lags)
cov_est /= (n_samples - 1.)
else:
n_samples = 0
cov_est = np.zeros((N * T, N * T))
for Xi in X:
cov_est, ni_samples = calc_chunked_cov(Xi, T, stride, chunks, cov_est=cov_est,
stride_tricks=stride_tricks, rng=rng)
n_samples += ni_samples
cov_est /= (n_samples - 1.)
else:
if len(X) <= T:
raise ValueError('T must be shorter than the length of the shortest '
'timeseries. If you are using the DCA model, 2 * DCA.T must be '
'shorter than the shortest timeseries.')
if mean is None:
mean = X.mean(axis=0, keepdims=True)
X = X - mean
N = X.shape[-1]
if chunks is None:
X_with_lags = form_lag_matrix(X, T, stride=stride, stride_tricks=stride_tricks,
rng=rng)
cov_est = np.cov(X_with_lags, rowvar=False)
else:
cov_est, n_samples = calc_chunked_cov(X, T, stride, chunks,
stride_tricks=stride_tricks, rng=rng)
cov_est /= (n_samples - 1.)
if regularization is None:
if method == 'toeplitzify':
cov_est = toeplitzify(cov_est, T, N)
elif method == 'ML':
cov_est = block_toeplitz_covariance(X=None, S_X=cov_est, T=T)
else:
raise ValueError('`method` should be "toeplitzify" or "ML".')
elif regularization == 'kron':
num_folds = reg_ops.get('num_folds', 5)
r_vals = np.arange(1, min(2 * T, N**2 + 1))
sigma_vals = np.concatenate([np.linspace(1, 4 * T + 1, 10), [100. * T]])
alpha_vals = np.concatenate([[0.], np.logspace(-2, -1, 10)])
ll_vals, opt_idx = cv_toeplitz(X_with_lags, T, N, r_vals, sigma_vals, alpha_vals,
num_folds=num_folds)
ri, si, ai = opt_idx
cov = np.cov(X_with_lags, rowvar=False)
cov_est = toeplitz_reg_taper_shrink(cov, T, N, r_vals[ri], sigma_vals[si], alpha_vals[ai])
else:
raise ValueError
rectify_spectrum(cov_est, logger=logger)
cross_cov_mats = calc_cross_cov_mats_from_cov(cov_est, T, N)
return cross_cov_mats
[docs]def calc_cross_cov_mats_from_cov(cov, T, N):
"""Calculates T N-by-N cross-covariance matrices given
a N*T-by-N*T spatiotemporal covariance matrix by
averaging over off-diagonal cross-covariance blocks with
constant `|t1-t2|`.
Parameters
----------
N : int
Numbner of spatial dimensions.
T: int
Number of time-lags.
cov : np.ndarray, shape (N*T, N*T)
Spatiotemporal covariance matrix.
Returns
-------
cross_cov_mats : np.ndarray, shape (T, N, N)
Cross-covariance matrices.
"""
use_torch = isinstance(cov, torch.Tensor)
if use_torch:
cross_cov_mats = torch.zeros((T, N, N))
else:
cross_cov_mats = np.zeros((T, N, N))
for delta_t in range(T):
if use_torch:
to_avg_lower = torch.zeros((T - delta_t, N, N))
to_avg_upper = torch.zeros((T - delta_t, N, N))
else:
to_avg_lower = np.zeros((T - delta_t, N, N))
to_avg_upper = np.zeros((T - delta_t, N, N))
for i in range(T - delta_t):
to_avg_lower[i, :, :] = cov[(delta_t + i) * N:(delta_t + i + 1) * N, i * N:(i + 1) * N]
to_avg_upper[i, :, :] = cov[i * N:(i + 1) * N, (delta_t + i) * N:(delta_t + i + 1) * N]
avg_lower = to_avg_lower.mean(axis=0)
avg_upper = to_avg_upper.mean(axis=0)
if use_torch:
cross_cov_mats[delta_t, :, :] = 0.5 * (avg_lower + avg_upper.t())
else:
cross_cov_mats[delta_t, :, :] = 0.5 * (avg_lower + avg_upper.T)
return cross_cov_mats
[docs]def calc_cov_from_cross_cov_mats(cross_cov_mats):
"""Calculates the N*T-by-N*T spatiotemporal covariance matrix based on
T N-by-N cross-covariance matrices.
Parameters
----------
cross_cov_mats : np.ndarray, shape (T, N, N)
Cross-covariance matrices: cross_cov_mats[dt] is the
cross-covariance between X(t) and X(t+dt), where each
of X(t) and X(t+dt) is a N-dimensional vector.
Returns
-------
cov : np.ndarray, shape (N*T, N*T)
Big covariance matrix, stationary in time by construction.
"""
N = cross_cov_mats.shape[1]
T = len(cross_cov_mats)
use_torch = isinstance(cross_cov_mats, torch.Tensor)
cross_cov_mats_repeated = []
for i in range(T):
for j in range(T):
if i > j:
cross_cov_mats_repeated.append(cross_cov_mats[abs(i - j)])
else:
if use_torch:
cross_cov_mats_repeated.append(cross_cov_mats[abs(i - j)].t())
else:
cross_cov_mats_repeated.append(cross_cov_mats[abs(i - j)].T)
if use_torch:
cov_tensor = torch.reshape(torch.stack(cross_cov_mats_repeated), (T, T, N, N))
cov = torch.cat([torch.cat([cov_ii_jj for cov_ii_jj in cov_ii], dim=1)
for cov_ii in cov_tensor])
else:
cov_tensor = np.reshape(np.stack(cross_cov_mats_repeated), (T, T, N, N))
cov = np.concatenate([np.concatenate([cov_ii_jj for cov_ii_jj in cov_ii], axis=1)
for cov_ii in cov_tensor])
return cov
[docs]def calc_pi_from_data(X, T, proj=None, stride=1, rng=None):
"""Calculates the Gaussian Predictive Information between variables
{1,...,T_pi} and {T_pi+1,...,2*T_pi}..
Parameters
----------
X : ndarray or torch tensor (time, features) or (batches, time, features)
Data used to calculate the PI.
T : int
This T should be 2 * T_pi. This T sets the joint window length not the
past or future window length.
proj : ndarray or torch tensor
Projection matrix for data (optional). If `proj` is not given, the PI of
the dataset is given.
stride : int or float
If stride is an `int`, it defines the stride between lagged samples used
to estimate the cross covariance matrix. Setting stride > 1 can speed up the
calculation, but may lead to a loss in accuracy. Setting stride to a `float`
greater than 0 and less than 1 will random subselect samples.
rng : NumPy random state
Only used if `stride` is a float.
Returns
-------
PI : float
Mutual information in nats.
"""
if T % 2 != 0:
raise ValueError('T must be even (This T sets the joint window length,'
' not the past or future length')
ccms = calc_cross_cov_mats_from_data(X, T, stride=stride, rng=rng)
return calc_pi_from_cross_cov_mats(ccms, proj=proj)
[docs]def calc_pi_from_cov(cov_2_T_pi):
"""Calculates the Gaussian Predictive Information between variables
{1,...,T_pi} and {T_pi+1,...,2*T_pi} with covariance matrix cov_2_T_pi.
Parameters
----------
cov_2_T_pi : np.ndarray, shape (2*T_pi, 2*T_pi)
Covariance matrix.
Returns
-------
PI : float
Mutual information in nats.
"""
if cov_2_T_pi.shape[0] % 2 != 0:
raise ValueError('cov_2_T_pi must have even shape')
T_pi = cov_2_T_pi.shape[0] // 2
use_torch = isinstance(cov_2_T_pi, torch.Tensor)
cov_T_pi = cov_2_T_pi[:T_pi, :T_pi]
if use_torch:
logdet_T_pi = torch.slogdet(cov_T_pi)[1]
logdet_2T_pi = torch.slogdet(cov_2_T_pi)[1]
else:
logdet_T_pi = np.linalg.slogdet(cov_T_pi)[1]
logdet_2T_pi = np.linalg.slogdet(cov_2_T_pi)[1]
PI = logdet_T_pi - .5 * logdet_2T_pi
return PI
[docs]def project_cross_cov_mats(cross_cov_mats, proj):
"""Projects the cross covariance matrices.
Parameters
----------
cross_cov_mats : np.ndarray, shape (T, N, N)
Cross-covariance matrices: cross_cov_mats[dt] is the
cross-covariance between X(t) and X(t+dt), where each
of X(t) and X(t+dt) is a N-dimensional vector.
proj: np.ndarray, shape (N, d), optional
If provided, the N-dimensional data are projected onto a d-dimensional
basis given by the columns of proj. Then, the mutual information is
computed for this d-dimensional timeseries.
Returns
-------
cross_cov_mats_proj : ndarray, shape (T, d, d)
Projected cross covariances matrices.
"""
if isinstance(cross_cov_mats, torch.Tensor):
use_torch = True
elif isinstance(cross_cov_mats[0], torch.Tensor):
cross_cov_mats = torch.stack(cross_cov_mats)
use_torch = True
else:
use_torch = False
if use_torch and isinstance(proj, np.ndarray):
proj = torch.tensor(proj, device=cross_cov_mats.device, dtype=cross_cov_mats.dtype)
T = cross_cov_mats.shape[0] // 2
if use_torch:
cross_cov_mats_proj = torch.matmul(proj.t().unsqueeze(0),
torch.matmul(cross_cov_mats,
proj.unsqueeze(0)))
else:
cross_cov_mats_proj = []
for i in range(2 * T):
cross_cov = cross_cov_mats[i]
cross_cov_proj = np.dot(proj.T, np.dot(cross_cov, proj))
cross_cov_mats_proj.append(cross_cov_proj)
cross_cov_mats_proj = np.stack(cross_cov_mats_proj)
return cross_cov_mats_proj
[docs]def calc_pi_from_cross_cov_mats(cross_cov_mats, proj=None):
"""Calculates predictive information for a spatiotemporal Gaussian
process with T-1 N-by-N cross-covariance matrices.
Parameters
----------
cross_cov_mats : np.ndarray, shape (T, N, N)
Cross-covariance matrices: cross_cov_mats[dt] is the
cross-covariance between X(t) and X(t+dt), where each
of X(t) and X(t+dt) is a N-dimensional vector.
proj: np.ndarray, shape (N, d), optional
If provided, the N-dimensional data are projected onto a d-dimensional
basis given by the columns of proj. Then, the mutual information is
computed for this d-dimensional timeseries.
Returns
-------
PI : float
Mutual information in nats.
"""
if len(cross_cov_mats) % 2 != 0:
raise ValueError('number of cross covariance matrices provided must be even'
'(equal to joint window length)')
if proj is not None:
cross_cov_mats_proj = project_cross_cov_mats(cross_cov_mats, proj)
else:
cross_cov_mats_proj = cross_cov_mats
cov_2_T_pi = calc_cov_from_cross_cov_mats(cross_cov_mats_proj)
PI = calc_pi_from_cov(cov_2_T_pi)
return PI
[docs]def calc_block_toeplitz_logdets(cross_cov_mats, proj=None):
"""Calculates logdets which can be used to calculate predictive information or entropy
for a spatiotemporal Gaussian process with T N-by-N cross-covariance matrices using
the block-Toeplitz algorithm.
Based on:
Sowell, Fallaw. "A decomposition of block toeplitz matrices with applications
to vector time series." 1989a). Unpublished manuscript (1989).
Parameters
----------
cross_cov_mats : np.ndarray, shape (T, N, N)
Cross-covariance matrices: cross_cov_mats[dt] is the
cross-covariance between X(t) and X(t+dt), where each
of X(t) and X(t+dt) is a N-dimensional vector.
proj: np.ndarray, shape (N, d), optional
If provided, the N-dimensional data are projected onto a d-dimensional
basis given by the columns of proj. Then, the mutual information is
computed for this d-dimensional timeseries.
Returns
-------
lodgets : list
T logdets.
"""
use_torch = isinstance(cross_cov_mats, torch.Tensor)
if proj is not None:
ccms = project_cross_cov_mats(cross_cov_mats, proj)
else:
ccms = cross_cov_mats
T, d, d = ccms.shape
A = dict()
Ab = dict()
if use_torch:
v = ccms[0]
vb = [ccms[0]]
D = ccms[1]
for ii in range(1, T):
if ii > 1:
As = torch.stack([A[ii - 2, ii - jj - 1] for jj in range(1, ii)])
D = ccms[ii] - torch.matmul(As, ccms[1:ii]).sum(dim=0)
A[(ii - 1, ii - 1)] = torch.linalg.solve(vb[ii - 1].t(), D.t()).t()
Ab[(ii - 1, ii - 1)] = torch.linalg.solve(v.t(), D).t()
for kk in range(1, ii):
A[(ii - 1, kk - 1)] = (A[(ii - 2, kk - 1)]
- A[(ii - 1, ii - 1)].mm(Ab[(ii - 2, ii - kk - 1)]))
Ab[(ii - 1, kk - 1)] = (Ab[(ii - 2, kk - 1)]
- Ab[(ii - 1, ii - 1)].mm(A[(ii - 2, ii - kk - 1)]))
if ii < T - 1:
As = torch.stack([A[(ii - 1, jj - 1)] for jj in range(1, ii + 1)])
if ii == 1:
cs = ccms[[1]]
else:
cs = ccms[1: ii + 1]
v = ccms[0] - torch.matmul(As, torch.transpose(cs, 1, 2)).sum(dim=0)
Abs = torch.stack([Ab[(ii - 1, jj - 1)] for jj in range(1, ii + 1)])
if ii == 1:
cs = ccms[[1]]
else:
cs = ccms[1: ii + 1]
vb.append(ccms[0] - torch.matmul(Abs, cs).sum(dim=0))
logdets = [torch.slogdet(vb[ii])[1] for ii in range(T)]
else:
vb = np.zeros((T, d, d))
v = ccms[0]
vb[0] = ccms[0]
D = ccms[1]
for ii in range(1, T):
if ii > 1:
D = ccms[ii] - sum([A[ii - 2, ii - jj - 1].dot(ccms[jj])
for jj in range(1, ii)])
A[(ii - 1, ii - 1)] = np.linalg.solve(vb[ii - 1].T, D.T).T
Ab[(ii - 1, ii - 1)] = np.linalg.solve(v.T, D).T
for kk in range(1, ii):
if ii < T - 1:
A[(ii - 1, kk - 1)] = (A[(ii - 2, kk - 1)]
- A[(ii - 1, ii - 1)].dot(Ab[(ii - 2, ii - kk - 1)]))
Ab[(ii - 1, kk - 1)] = (Ab[(ii - 2, kk - 1)]
- Ab[(ii - 1, ii - 1)].dot(A[(ii - 2, ii - kk - 1)]))
if ii < T - 1:
v = ccms[0] - sum([A[(ii - 1, jj - 1)].dot(ccms[jj].T) for jj in range(1, ii + 1)])
vb[ii] = ccms[0] - sum([Ab[(ii - 1, jj - 1)].dot(ccms[jj]) for jj in range(1, ii + 1)])
logdets = [np.linalg.slogdet(vb[ii])[1] for ii in range(T)]
return logdets
[docs]def calc_pi_from_cross_cov_mats_block_toeplitz(cross_cov_mats, proj=None):
"""Calculates predictive information for a spatiotemporal Gaussian
process with T-1 N-by-N cross-covariance matrices using the block-Toeplitz
algorithm.
Based on:
Sowell, Fallaw. "A decomposition of block toeplitz matrices with applications
to vector time series." 1989a). Unpublished manuscript (1989).
Parameters
----------
cross_cov_mats : np.ndarray, shape (T, N, N)
Cross-covariance matrices: cross_cov_mats[dt] is the
cross-covariance between X(t) and X(t+dt), where each
of X(t) and X(t+dt) is a N-dimensional vector.
proj: np.ndarray, shape (N, d), optional
If provided, the N-dimensional data are projected onto a d-dimensional
basis given by the columns of proj. Then, the mutual information is
computed for this d-dimensional timeseries.
Returns
-------
PI : float
Mutual information in nats.
"""
T = cross_cov_mats.shape[0]
logdets = calc_block_toeplitz_logdets(cross_cov_mats, proj)
return sum(logdets[:T // 2]) - 0.5 * sum(logdets)
[docs]def one_step(S_X, R_X, R_Y, A, A_R_Y, Q):
"""Perform one EM step for ML block toeplitz covariance estimation.
Parameters
----------
S_X : ndarray
Sample covariance matrix.
R_X : ndarray
Observed covariance matrix.
R_Y : ndarray
Latent covariance matrix.
A : ndarray
Projection from latent to observed variables.
A_R_Y : ndarray
Precomputed A @ R_Y.
"""
R_X_inv_A_R_Y = np.linalg.solve(R_X, A_R_Y)
R_Y = (R_Y + A_R_Y.T.conj() @ ((np.linalg.solve(R_X, S_X) @ R_X_inv_A_R_Y) - R_X_inv_A_R_Y))
blocks = extract_diag_blocks(R_Y, Q)
R_Y = sp.linalg.block_diag(*blocks)
A_R_Y = A @ R_Y
R_X = (A_R_Y @ A.T.conj()).real
return R_X, R_Y, A_R_Y
[docs]def block_toeplitz_covariance(X, S_X, T, max_iter=100, tol=1e-6):
"""Estimate the ML block toeplitz covariance matrix using:
Fuhrmann, Daniel R., and T. A. Barton.
"Estimation of block-Toeplitz covariance matrices."
1990 Conference Record Twenty-Fourth Asilomar Conference on Signals, Systems and Computers.
Parameters
----------
X : ndarray (time, features) or (batches, time, features)
Data used to calculate the PI. Can be None if S_X is given.
S_X : ndarray
Sample covariance matrix. Can be None if X is given.
T : int
This T should be 2 * T_pi. This T sets the joint window length not the
past or future window length.
max_iter : int
Maximum number of EM iterations.
tol : int
LL tolerance for stopping EM iterations.
"""
N = T
if X is not None:
P = X.shape[1]
X_N = form_lag_matrix(X, N)
S_X = np.cov(X_N.T)
else:
P = S_X.shape[0] // N
rectify_spectrum(S_X)
Q = 4 * N
NP = N * P
QP = Q * P
I_NP = np.eye(NP)
I_P = np.eye(P)
W_Q = sp.linalg.dft(Q, scale='sqrtn')
I_NP_0 = np.concatenate([I_NP, np.zeros((NP, QP - NP))], axis=1)
W_Q_I_P = np.kron(W_Q, I_P)
A = I_NP_0 @ W_Q_I_P
R_Y = np.eye(QP)
A_R_Y = A @ R_Y
R_X = np.eye(NP)
if X is not None:
ll = sp.stats.multivariate_normal.logpdf(X_N, mean=np.zeros(NP), cov=R_X).mean()
else:
d = S_X.shape[0]
tr = np.trace(np.linalg.solve(R_X, S_X))
logdets = np.linalg.slogdet(R_X)[1] - np.linalg.slogdet(S_X)[1]
ll = -.5 * (tr + logdets - d)
for ii in range(max_iter):
R_X, R_Y, A_R_Y = one_step(S_X, R_X, R_Y, A, A_R_Y, Q)
if X is not None:
new_ll = sp.stats.multivariate_normal.logpdf(X_N, mean=np.zeros(NP), cov=R_X).mean()
else:
d = S_X.shape[0]
tr = np.trace(np.linalg.solve(R_X, S_X))
logdets = np.linalg.slogdet(R_X)[1] - np.linalg.slogdet(S_X)[1]
new_ll = -.5 * (tr + logdets - d)
if abs(new_ll - ll) / max([1., abs(new_ll), abs(ll)]) < tol:
break
return R_X
"""
====================================================================================================
====================================================================================================
=================================== ===============================
=================================== KronPCA-related methods ===============================
=================================== ===============================
====================================================================================================
====================================================================================================
"""
[docs]class memoized(object):
"""Decorator for memoization.
From: https://wiki.python.org/moin/PythonDecoratorLibrary.
Caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned
(not reevaluated).
"""
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args):
if not isinstance(args, collections.abc.Hashable):
# uncacheable. a list, for instance.
# better to not cache than blow up.
return self.func(*args)
if args in self.cache:
return self.cache[args]
else:
value = self.func(*args)
self.cache[args] = value
return value
def __repr__(self):
# Return the function's docstring.
return self.func.__doc__
def __get__(self, obj, objtype):
# Support instance methods.
return functools.partial(self.__call__, obj)
@memoized
def pv_permutation(T, N):
A = np.arange((T * N)**2, dtype=np.int).reshape((T * N, T * N))
A_perm = np.zeros((T**2, N**2), dtype=np.int)
for i in range(T):
for j in range(T):
row_idx = i * T + j
A_block = A[i * N:(i + 1) * N, j * N:(j + 1) * N]
A_perm[row_idx, :] = A_block.T.reshape((N**2,)) # equivalent to I_block.vectorize
perm = A_perm.ravel()
perm_inv = perm.argsort()
return perm, perm_inv
def pv_rearrange(C, T, N):
perm, _ = pv_permutation(T, N)
C_prime = C.ravel()[perm].reshape((T**2, N**2))
return C_prime
def pv_rearrange_inv(C, T, N):
_, perm_inv = pv_permutation(T, N)
C_prime = C.ravel()[perm_inv].reshape((T * N, T * N))
return C_prime
def build_P(T):
P = np.zeros((2 * T - 1, T**2))
idx = np.arange(T**2).reshape((T, T)).T + 1
for offset in range(-T + 1, T):
diag_idx = np.diagonal(idx, offset=offset)
P[offset + T - 1, diag_idx - 1] = 1. / np.sqrt(T - np.abs(offset))
return P
def toeplitz_reg(cov, T, N, r):
R_C = pv_rearrange(cov, T, N)
P = build_P(T)
to_svd = P.dot(R_C)
U, s, Vt = randomized_svd(to_svd, n_components=r + 1, n_iter=40, random_state=42)
trunc_svd = U[:, :-1].dot(np.diag(s[:-1] - s[-1])).dot(Vt[:-1, :])
cov_reg = pv_rearrange_inv(P.T.dot(trunc_svd), T, N)
return cov_reg
def non_toeplitz_reg(cov, T, N, r):
R_C = pv_rearrange(cov, T, N)
U, s, Vt = randomized_svd(R_C, n_components=r + 1, n_iter=40, random_state=42)
trunc_svd = U[:, :-1].dot(np.diag(s[:-1] - s[-1])).dot(Vt[:-1, :])
cov_reg = pv_rearrange_inv(trunc_svd, T, N)
return cov_reg
def toeplitz_reg_taper_shrink(cov, T, N, r, sigma, alpha):
cov_reg = toeplitz_reg(cov, T, N, r)
cov_reg_taper = taper_cov(cov_reg, T, N, sigma)
cov_reg_taper_shrink = (1. - alpha) * cov_reg_taper + alpha * np.eye(T * N)
return cov_reg_taper_shrink
def gaussian_log_likelihood(cov, sample_cov, num_samples):
to_trace = np.linalg.solve(cov, sample_cov)
log_det_cov = np.linalg.slogdet(cov)[1]
d = cov.shape[1]
log_likelihood = -0.5 * num_samples * (d * np.log(2. * np.pi) +
log_det_cov + np.trace(to_trace))
return log_likelihood
def taper_cov(cov, T, N, sigma):
t = np.arange(T).reshape((T, 1))
delta_t = t - t.T
temporal_kernel = np.exp(-(delta_t / sigma)**2)
full_kernel = np.kron(temporal_kernel, np.ones((N, N)))
result = full_kernel * cov
return result
def cv_toeplitz(X_with_lags, T, N, r_vals, sigma_vals, alpha_vals, num_folds=10, verbose=False):
fold_size = int(np.floor(len(X_with_lags) / num_folds))
P = build_P(T)
ll_vals = np.zeros((num_folds, len(r_vals), len(sigma_vals), len(alpha_vals)))
for cv_iter in range(num_folds):
if verbose:
print("fold =", cv_iter + 1)
X_train = np.concatenate((X_with_lags[:cv_iter * fold_size],
X_with_lags[(cv_iter + 1) * fold_size:]), axis=0)
X_test = X_with_lags[cv_iter * fold_size:(cv_iter + 1) * fold_size]
num_samples = len(X_test)
cov_train, cov_test = np.cov(X_train.T), np.cov(X_test.T)
cov_train, cov_test = toeplitzify(cov_train, T, N), toeplitzify(cov_test, T, N)
rectify_spectrum(cov_train)
rectify_spectrum(cov_test)
R_C = pv_rearrange(cov_train, T, N)
to_svd = P.dot(R_C)
U, s, Vt = randomized_svd(to_svd, n_components=np.max(r_vals), n_iter=40, random_state=42)
for r_idx in range(len(r_vals)):
r = r_vals[r_idx]
if verbose:
print("r =", r)
if r_idx == len(r_vals) - 1:
trunc_svd = to_svd
else:
trunc_svd = U[:, :r].dot(np.diag(s[:r] - s[r])).dot(Vt[:r, :])
cov_kron = pv_rearrange_inv(P.T.dot(trunc_svd), T, N)
for sigma_idx in range(len(sigma_vals)):
sigma = sigma_vals[sigma_idx]
cov_kron_taper = taper_cov(cov_kron, T, N, sigma)
for alpha_idx in range(len(alpha_vals)):
alpha = alpha_vals[alpha_idx]
cov_kron_taper_shrunk = ((1. - alpha) * cov_kron_taper + alpha * np.eye(T * N))
ll = gaussian_log_likelihood(cov_kron_taper_shrunk, cov_test, num_samples)
ll_vals[cv_iter, r_idx, sigma_idx, alpha_idx] = ll
opt_idx = np.unravel_index(ll_vals.mean(axis=0).argmax(), ll_vals.shape[1:])
return ll_vals, opt_idx