Source code for dca.dca

import logging, time
import numpy as np
import scipy.stats
from scipy.optimize import minimize
from scipy.signal.windows import hann

import torch
import torch.fft
import torch.nn.functional as F

from .base import SingleProjectionComponentsAnalysis, ortho_reg_fn, init_coef, ObjectiveWrapper
from .cov_util import (calc_cross_cov_mats_from_data, calc_pi_from_cross_cov_mats,
                       calc_pi_from_cross_cov_mats_block_toeplitz)

__all__ = ['DynamicalComponentsAnalysis',
           'DynamicalComponentsAnalysisFFT',
           'build_loss']


logging.basicConfig()


[docs]def build_loss(cross_cov_mats, d, ortho_lambda=1., block_toeplitz=False): """Constructs a loss function which gives the (negative) predictive information in the projection of multidimensional timeseries data X onto a d-dimensional basis, where predictive information is computed using a stationary Gaussian process approximation. Parameters ---------- X : np.ndarray, shape (# time-steps, N) The multidimensional time series data from which the mutual information is computed. d: int Number of basis vectors onto which the data X are projected. ortho_lambda : float Regularization hyperparameter. Returns ------- loss : function Loss function which accepts a (flattened) N-by-d matrix, whose columns are basis vectors, and outputs the negative predictive information corresponding to that projection (plus regularization term). """ N = cross_cov_mats.shape[1] if block_toeplitz: def loss(V_flat): V = V_flat.reshape(N, d) reg_val = ortho_reg_fn(ortho_lambda, V) return -calc_pi_from_cross_cov_mats_block_toeplitz(cross_cov_mats, V) + reg_val else: def loss(V_flat): V = V_flat.reshape(N, d) reg_val = ortho_reg_fn(ortho_lambda, V) return -calc_pi_from_cross_cov_mats(cross_cov_mats, V) + reg_val return loss
[docs]class DynamicalComponentsAnalysis(SingleProjectionComponentsAnalysis): """Dynamical Components Analysis. Runs DCA on multidimensional timeseries data X to discover a projection onto a d-dimensional subspace of an N-dimensional space which maximizes the complexity, as defined by the Gaussian Predictive Information (PI) of the d-dimensional dynamics over windows of length T. Parameters ---------- d : int Number of basis vectors onto which the data X are projected. T : int Size of time windows across which to compute mutual information. Total window length will be `2 * T`. When fitting a model, the length of the shortest timeseries must be greater than `2 * T` and for good performance should be much greater than `2 * T`. init : str Options: "random_ortho", "random", or "PCA" Method for initializing the projection matrix. n_init : int Number of random restarts. Default is 1. stride : int Number of samples to skip when estimating cross covariance matrices. Settings stride > 1 will speedup covariance estimation but may reduce the quality of the covariance estimate for small datasets. chunk_cov_estimate : None or int If `None`, cov is estimated from entire time series. If an `int`, cov is estimated by chunking up time series and averaging covariances from chucks. This can use less memory and be faster for long timeseries. Requires that the length of the shortest timeseries in the batch is longer than `2 * T * chunk_cov_estimate`. tol : float Tolerance for stopping optimization. Default is 1e-6. ortho_lambda : float Coefficient on term that keeps V close to orthonormal. verbose : bool Verbosity during optimization. use_scipy : bool Whether to use SciPy or Pytorch L-BFGS-B. Default is True. Pytorch is not well tested. block_toeplitz : bool If True, uses the block-Toeplitz logdet algorithm which is typically faster and less memory intensive on cpu for `T >~ 10` and `d >~ 40`. method : str 'toeplitzify' for naive averaging to compute the covariance, which is faster but less accurate for very small datasets. 'ML' for maximum likelihood block toeplitz covariance estimation which can be very slow for large datasets. device : str What device to run the computation on in Pytorch. dtype : pytorch.dtype What dtype to use for computation. rng_or_seed : None, int, or NumPy RandomState Random number generator or seed. Attributes ---------- T : int Default T used for PI. T_fit : int T used for last cross covariance estimation. d : int Default d used for fitting the projection. d_fit : int d used for last projection fit. cross covs : torch tensor Cross covariance matrices from the last covariance estimation. coef_ : ndarray (N, d) Projection matrix from fit. """ def __init__(self, d=None, T=None, init="random_ortho", n_init=1, stride=1, chunk_cov_estimate=None, tol=1e-6, ortho_lambda=10., verbose=False, block_toeplitz=None, method='toeplitzify', device="cpu", dtype=torch.float64, rng_or_seed=None): super(DynamicalComponentsAnalysis, self).__init__(d=d, T=T, init=init, n_init=n_init, stride=stride, chunk_cov_estimate=chunk_cov_estimate, tol=tol, verbose=verbose, device=device, dtype=dtype, rng_or_seed=rng_or_seed) self.ortho_lambda = ortho_lambda if block_toeplitz is None: try: if d > 40 and T > 10: self.block_toeplitz = True else: self.block_toeplitz = False except TypeError: self.block_toeplitz = False else: self.block_toeplitz = block_toeplitz self.cross_covs = None self.method = method
[docs] def estimate_data_statistics(self, X, T=None, regularization=None, reg_ops=None): """Estimate the cross covariance matrix from data. Parameters ---------- X : ndarray or list of ndarrays Data to estimate the cross covariance matrix. T : int T for PI calculation (optional). regularization : str Whether to regularize cross covariance estimation. reg_ops : dict Options for cross covariance regularization. """ if T is None: T = self.T else: self.T = T start = time.time() self._logger.info('Starting cross covariance estimate.') if isinstance(X, list) or X.ndim == 3: self.mean_ = np.concatenate(X).mean(axis=0, keepdims=True) else: self.mean_ = X.mean(axis=0, keepdims=True) cross_covs = calc_cross_cov_mats_from_data(X, 2 * self.T, mean=self.mean_, chunks=self.chunk_cov_estimate, stride=self.stride, rng=self.rng, regularization=regularization, reg_ops=reg_ops, logger=self._logger, method=self.method) self.cross_covs = torch.tensor(cross_covs, device=self.device, dtype=self.dtype) delta_time = round((time.time() - start) / 60., 1) self._logger.info('Cross covariance estimate took {:0.1f} minutes.'.format(delta_time)) return self
def _fit_projection(self, d=None, T=None, record_V=False): """Fit the projection matrix. Parameters ---------- d : int Dimensionality of the projection (optional.) T : int T for PI calculation (optional). Default is `self.T`. If `T` is set here it must be less than or equal to `self.T` or self.estimate_cross_covariance() must be called with a larger `T`. record_V : bool If True, saves a copy of V at each optimization step. Default is False. """ if d is None: d = self.d if d < 1: raise ValueError self.d_fit = d if T is None: T = self.T if T < 1: raise ValueError if (2 * T) > self.cross_covs.shape[0]: raise ValueError('T must less than or equal to the value when ' + '`estimate_data_statistics()` was called.') self.T_fit = T if self.cross_covs is None: raise ValueError('Call `estimate_cross_covariance()` first.') c = self.cross_covs[:2 * T] N = c.shape[1] V_init = init_coef(N, d, self.rng, self.init) if not isinstance(c, torch.Tensor): c = torch.tensor(c, device=self.device, dtype=self.dtype) def f_params(v_flat, requires_grad=True): v_flat_torch = torch.tensor(v_flat, requires_grad=requires_grad, device=self.device, dtype=self.dtype) v_torch = v_flat_torch.reshape(N, d) loss = build_loss(c, d, self.ortho_lambda, self.block_toeplitz)(v_torch) return loss, v_flat_torch objective = ObjectiveWrapper(f_params) def null_callback(*args, **kwargs): pass if self.verbose or record_V: if record_V: self.V_seq = [V_init] def callback(v_flat, objective): if record_V: self.V_seq.append(v_flat.reshape(N, d)) if self.verbose: loss, v_flat_torch = objective.core_computations(v_flat, requires_grad=False) v_torch = v_flat_torch.reshape(N, d) loss = build_loss(c, d, self.ortho_lambda, self.block_toeplitz)(v_torch) reg_val = ortho_reg_fn(self.ortho_lambda, v_torch) loss = loss.detach().cpu().numpy() reg_val = reg_val.detach().cpu().numpy() PI = -(loss - reg_val) string = "Loss {}, PI: {} nats, reg: {}" self._logger.info(string.format(str(np.round(loss, 4)), str(np.round(PI, 4)), str(np.round(reg_val, 4)))) callback(V_init, objective) else: callback = null_callback opt = minimize(objective.func, V_init.ravel(), method='L-BFGS-B', jac=objective.grad, options={'disp': self.verbose, 'ftol': self.tol}, callback=lambda x: callback(x, objective)) v = opt.x.reshape(N, d) # Orthonormalize the basis prior to returning it V_opt = scipy.linalg.orth(v) final_pi = calc_pi_from_cross_cov_mats(c, V_opt).detach().cpu().numpy() return V_opt, final_pi
[docs] def score(self, X=None): """Calculate the PI of data for the DCA projection. Parameters ---------- X : ndarray or list Optional. If X is none, calculate PI from the training data. If X is given, calcuate the PI of X for the learned projections. """ T = self.T_fit if T is None: T = self.T if X is None: cross_covs = self.cross_covs.cpu().numpy() else: cross_covs = calc_cross_cov_mats_from_data(X, 2 * self.T) cross_covs = cross_covs[:2 * T] if self.block_toeplitz: return calc_pi_from_cross_cov_mats_block_toeplitz(cross_covs, self.coef_) else: return calc_pi_from_cross_cov_mats(cross_covs, self.coef_)
def make_cepts2(X, T_pi): """Calculate the squared real cepstral coefficents.""" Y = F.unfold(X, kernel_size=[T_pi, 1], stride=T_pi) Y = torch.transpose(Y, 1, 2) # Compute the power spectral density window = torch.Tensor(hann(Y.shape[-1])[np.newaxis, np.newaxis]).type(Y.dtype) Yf = torch.fft.rfft(Y * window, dim=-1) spect = abs(Yf)**2 spect = spect.mean(dim=1) spect = torch.cat([torch.flip(spect[:, 1:], dims=(1,)), spect], dim=1) # Log of the DFT of the autocorrelation logspect = torch.log(spect) - np.log(float(Y.shape[-1])) # Compute squared cepstral coefs (b_k^2) cepts = torch.fft.rfft(logspect, dim=1) / float(Y.shape[-1]) cepts = abs(cepts) return cepts**2 def pi_fft(X, proj, T_pi): """ This is well-tested when X has shape (# time steps, 1). Otherwise, behavior has not been considered. """ if not isinstance(X, torch.Tensor): X = torch.Tensor(X) if not isinstance(proj, torch.Tensor): proj = torch.Tensor(proj) X = torch.mm(X, proj) Xp_tensor = X.t() Xp_tensor = torch.unsqueeze(Xp_tensor, -1) Xp_tensor = torch.unsqueeze(Xp_tensor, 1) bs2 = make_cepts2(Xp_tensor, T_pi) ks = torch.arange(bs2.shape[-1], dtype=bs2.dtype) return .5 * (torch.unsqueeze(ks, 0) * bs2).sum(dim=1).sum()
[docs]class DynamicalComponentsAnalysisFFT(object): """Dynamical Components Analysis using FFT for PI calculation. Currently only well-defined for `d=1`. Runs DCA on multidimensional timeseries data X to discover a projection onto a d-dimensional subspace which maximizes the dynamical complexity. Parameters ---------- d : int Number of basis vectors onto which the data X are projected. T : int Size of time windows across which to compute mutual information. init : string Options: "random", "PCA" Method for initializing the projection matrix. """ def __init__(self, d=None, T=None, init="random_ortho", n_init=1, tol=1e-6, ortho_lambda=10., verbose=False, device="cpu", dtype=torch.float64, rng_or_seed=None): self.d = d if d > 1: raise ValueError('DCAFFT is only defined for d=1.') self.T = T self.init = init self.n_init = n_init self.tol = tol self.ortho_lambda = ortho_lambda self.verbose = verbose self.device = device self.dtype = dtype self.cross_covs = None if rng_or_seed is None: self.rng = np.random elif isinstance(rng_or_seed, np.random.RandomState): self.rng = rng_or_seed else: self.rng = np.random.RandomState(rng_or_seed) def fit(self, X, d=None, T=None, n_init=None): self.mean_ = X.mean(axis=0, keepdims=True) X = X - self.mean_ if n_init is None: n_init = self.n_init pis = [] coefs = [] for ii in range(n_init): coef, pi = self._fit_projection(X, d=d) pis.append(pi) coefs.append(coef) idx = np.argmin(pis) self.coef_ = coefs[idx] def _fit_projection(self, X, d=None): if d is None: d = self.d if d > 1: raise ValueError('DCAFFT is only defined for d=1.') N = X.shape[1] if type(self.init) == str: if self.init == "random": V_init = self.rng.normal(0, 1, (N, d)) elif self.init == "random_ortho": V_init = scipy.stats.ortho_group.rvs(N, random_state=self.rng)[:, :d] elif self.init == "uniform": V_init = np.ones((N, d)) / np.sqrt(N) V_init = V_init + self.rng.normal(0, 1e-3, V_init.shape) else: raise ValueError else: raise ValueError V_init /= np.linalg.norm(V_init, axis=0, keepdims=True) v = torch.tensor(V_init, requires_grad=True, device=self.device, dtype=self.dtype) Xt = X if not isinstance(Xt, torch.Tensor): Xt = torch.tensor(Xt, device=self.device, dtype=self.dtype) if self.verbose: def callback(v_flat): v_flat_torch = torch.tensor(v_flat, requires_grad=True, device=self.device, dtype=self.dtype) v_torch = v_flat_torch.reshape(N, d) pi = pi_fft(Xt, v_torch, self.T) reg_val = ortho_reg_fn(self.ortho_lambda, v_torch) pi = pi.detach().cpu().numpy() reg_val = reg_val.detach().cpu().numpy() print("PI: {} nats, reg: {}".format(str(np.round(pi, 4)), str(np.round(reg_val, 4)))) callback(V_init) else: callback = None def f_df(v_flat): v_flat_torch = torch.tensor(v_flat, requires_grad=True, device=self.device, dtype=self.dtype) v_torch = v_flat_torch.reshape(N, d) pi = pi_fft(Xt, v_torch, self.T) reg_val = ortho_reg_fn(self.ortho_lambda, v_torch) loss = -pi + reg_val loss.backward() grad = v_flat_torch.grad return (loss.detach().cpu().numpy().astype(float), grad.detach().cpu().numpy().astype(float)) opt = minimize(f_df, V_init.ravel(), method='L-BFGS-B', jac=True, options={'disp': self.verbose, 'ftol': self.tol}, callback=callback) v = opt.x.reshape(N, d) # Orthonormalize the basis prior to returning it V_opt = scipy.linalg.orth(v) v_flat_torch = torch.tensor(V_opt.ravel(), requires_grad=True, device=self.device, dtype=self.dtype) v_torch = v_flat_torch.reshape(N, d) final_pi = pi_fft(Xt, v_torch, self.T).detach().cpu().numpy() return V_opt, final_pi def transform(self, X): X = X - self.mean_ return X.dot(self.coef_) def fit_transform(self, X, d=None, T=None, regularization=None, reg_ops=None): self.fit(X, d=d, T=T) return self.transform(X)
[docs] def score(self, X): """Calculate the PI of data for the DCA projection. Parameters ---------- X : ndarray or list """ return pi_fft(X, self.coef_, self.T)