Source code for dca.analysis

import scipy
import numpy as np
from sklearn.linear_model import LinearRegression as LR
from sklearn.decomposition import PCA
from scipy.stats import special_ortho_group as sog

from .base import init_coef
from .cov_util import calc_pi_from_cross_cov_mats, form_lag_matrix
from .data_util import CrossValidate
from .methods_comparison import SlowFeatureAnalysis as SFA
from .dca import DynamicalComponentsAnalysis as DCA, DynamicalComponentsAnalysisFFT as DCAFFT


import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)


[docs]def linear_decode_r2(X_train, Y_train, X_test, Y_test, decoding_window=1, offset=0): """Train a linear model on the training set and test on the test set. This will work with batched training data and/or batched test data. X_train : ndarray (time, channels) or (batches, time, channels) Feature training data for regression. Y_train : ndarray (time, channels) or (batches, time, channels) Target training data for regression. X_test : ndarray (time, channels) or (batches, time, channels) Feature test data for regression. Y_test : ndarray (time, channels) or (batches, time, channels) Target test data for regression. decoding_window : int Number of time samples of X to use for predicting Y (should be odd). Centered around offset value. offset : int Temporal offset for prediction (0 is same-time prediction). """ if isinstance(X_train, np.ndarray) and X_train.ndim == 2: X_train = [X_train] if isinstance(Y_train, np.ndarray) and Y_train.ndim == 2: Y_train = [Y_train] if isinstance(X_test, np.ndarray) and X_test.ndim == 2: X_test = [X_test] if isinstance(Y_test, np.ndarray) and Y_test.ndim == 2: Y_test = [Y_test] X_train_lags = [form_lag_matrix(Xi, decoding_window) for Xi in X_train] X_test_lags = [form_lag_matrix(Xi, decoding_window) for Xi in X_test] Y_train = [Yi[decoding_window // 2:] for Yi in Y_train] Y_train = [Yi[:len(Xi)] for Yi, Xi in zip(Y_train, X_train_lags)] if offset >= 0: Y_train = [Yi[offset:] for Yi in Y_train] else: Y_train = [Yi[:Yi.shape[0] + offset] for Yi in Y_train] Y_test = [Yi[decoding_window // 2:] for Yi in Y_test] Y_test = [Yi[:len(Xi)] for Yi, Xi in zip(Y_test, X_test_lags)] if offset >= 0: Y_test = [Yi[offset:] for Yi in Y_test] else: Y_test = [Yi[:Yi.shape[0] + offset] for Yi in Y_test] if offset >= 0: X_train_lags = [Xi[:Xi.shape[0] - offset] for Xi in X_train_lags] X_test_lags = [Xi[:Xi.shape[0] - offset] for Xi in X_test_lags] else: X_train_lags = [Xi[-offset:] for Xi in X_train_lags] X_test_lags = [Xi[-offset:] for Xi in X_test_lags] if len(X_train_lags) == 1: X_train_lags = X_train_lags[0] else: X_train_lags = np.concatenate(X_train_lags) if len(Y_train) == 1: Y_train = Y_train[0] else: Y_train = np.concatenate(Y_train) if len(X_test_lags) == 1: X_test_lags = X_test_lags[0] else: X_test_lags = np.concatenate(X_test_lags) if len(Y_test) == 1: Y_test = Y_test[0] else: Y_test = np.concatenate(Y_test) model = LR().fit(X_train_lags, Y_train) r2 = model.score(X_test_lags, Y_test) return r2
def run_analysis(X, Y, T_pi_vals, dim_vals, offset_vals, num_cv_folds, decoding_window, n_init=1, verbose=False): results_size = (num_cv_folds, len(dim_vals), len(offset_vals), len(T_pi_vals) + 2) results = np.zeros(results_size) min_std = 1e-6 good_cols = (X.std(axis=0) > min_std) X = X[:, good_cols] # loop over CV folds cv = CrossValidate(X, Y, num_cv_folds, stack=False) for X_train, X_test, Y_train, Y_test, fold_idx in cv: if verbose: print("fold", fold_idx + 1, "of", num_cv_folds) # mean-center X and Y X_mean = np.concatenate(X_train).mean(axis=0, keepdims=True) X_train_ctd = [Xi - X_mean for Xi in X_train] X_test_ctd = X_test - X_mean Y_mean = np.concatenate(Y_train).mean(axis=0, keepdims=True) Y_train_ctd = [Yi - Y_mean for Yi in Y_train] Y_test_ctd = Y_test - Y_mean # compute cross-cov mats for DCA dca_model = DCA(T=np.max(T_pi_vals)) dca_model.estimate_data_statistics(X_train_ctd) # do PCA/SFA pca_model = PCA(svd_solver='full').fit(np.concatenate(X_train_ctd)) sfa_model = SFA(1).fit(X_train_ctd) # make DCA object # loop over dimensionalities for dim_idx in range(len(dim_vals)): dim = dim_vals[dim_idx] if verbose: print("dim", dim_idx + 1, "of", len(dim_vals)) # compute PCA/SFA R2 vals X_train_pca = [np.dot(Xi, pca_model.components_[:dim].T) for Xi in X_train_ctd] X_test_pca = np.dot(X_test_ctd, pca_model.components_[:dim].T) X_train_sfa = [np.dot(Xi, sfa_model.all_coef_[:, :dim]) for Xi in X_train_ctd] X_test_sfa = np.dot(X_test_ctd, sfa_model.all_coef_[:, :dim]) for offset_idx in range(len(offset_vals)): offset = offset_vals[offset_idx] r2_pca = linear_decode_r2(X_train_pca, Y_train_ctd, X_test_pca, Y_test_ctd, decoding_window=decoding_window, offset=offset) r2_sfa = linear_decode_r2(X_train_sfa, Y_train_ctd, X_test_sfa, Y_test_ctd, decoding_window=decoding_window, offset=offset) results[fold_idx, dim_idx, offset_idx, 0] = r2_pca results[fold_idx, dim_idx, offset_idx, 1] = r2_sfa # loop over T_pi vals for T_pi_idx in range(len(T_pi_vals)): T_pi = T_pi_vals[T_pi_idx] dca_model.fit_projection(d=dim, T=T_pi, n_init=n_init) V_dca = dca_model.coef_ # compute DCA R2 over offsets X_train_dca = [np.dot(Xi, V_dca) for Xi in X_train_ctd] X_test_dca = np.dot(X_test_ctd, V_dca) for offset_idx in range(len(offset_vals)): offset = offset_vals[offset_idx] r2_dca = linear_decode_r2(X_train_dca, Y_train_ctd, X_test_dca, Y_test_ctd, decoding_window=decoding_window, offset=offset) results[fold_idx, dim_idx, offset_idx, T_pi_idx + 2] = r2_dca return results
[docs]def random_complement(proj, size=1, random_state=None): """Computes a random vector in the orthogonal complement to proj. Parameters ---------- proj : ndarray (full dim, low dim) Projection matrix. random_state : NumPy random state (optional) Random state for rng. Returns ------- comp_vec : ndarray (full dim, 1) Random vector in the complement space. """ dim, pdim = proj.shape if pdim >= dim: raise ValueError # Create complement space proj_full = np.concatenate([proj, np.zeros((dim, dim - pdim))], axis=1) proj_full_comp = np.concatenate([np.zeros((dim, pdim)), np.linalg.svd(proj_full)[0][:, pdim:]], axis=1) # Sample from random vectors rots = sog.rvs(dim - pdim, size=size, random_state=random_state) if size == 1: rots = rots[np.newaxis] comp_vec = np.zeros((dim, size)) for ii in range(size): comp_vec[:, ii] = proj_full_comp[:, pdim:].dot(rots[ii])[:, -1] return comp_vec
def run_dim_analysis_dca(X, Y, T_pi, dim_vals, offset, num_cv_folds, decoding_window, n_init=1, verbose=False, n_null=1000, seed=20190710): rng = np.random.RandomState(seed) results = np.zeros((num_cv_folds, len(dim_vals))) null_results = np.zeros((num_cv_folds, len(dim_vals) - 2, n_null)) min_std = 1e-6 good_cols = (X.std(axis=0) > min_std) X = X[:, good_cols] # loop over CV folds cv = CrossValidate(X, Y, num_cv_folds, stack=False) for X_train, X_test, Y_train, Y_test, fold_idx in cv: X_test = X_test Y_test = Y_test if verbose: print("fold", fold_idx + 1, "of", num_cv_folds) # mean-center X and Y X_mean = np.concatenate(X_train).mean(axis=0, keepdims=True) X_train_ctd = [Xi - X_mean for Xi in X_train] X_test_ctd = X_test - X_mean Y_mean = np.concatenate(Y_train).mean(axis=0, keepdims=True) Y_train_ctd = [Yi - Y_mean for Yi in Y_train] Y_test_ctd = Y_test - Y_mean # make DCA object # compute cross-cov mats for DCA dca_model = DCA(T=T_pi) dca_model.estimate_data_statistics(X_train_ctd) # loop over dimensionalities for dim_idx in range(len(dim_vals)): dim = dim_vals[dim_idx] if verbose: print("dim", dim_idx + 1, "of", len(dim_vals)) dca_model.fit_projection(d=dim, n_init=n_init) V_dca = dca_model.coef_ # compute DCA R2 over offsets X_train_dca = [np.dot(Xi, V_dca) for Xi in X_train_ctd] X_test_dca = np.dot(X_test_ctd, V_dca) r2_dca = linear_decode_r2(X_train_dca, Y_train_ctd, X_test_dca, Y_test_ctd, decoding_window=decoding_window, offset=offset) results[fold_idx, dim_idx] = r2_dca if dim_idx < len(dim_vals) - 2: comp_vecs = random_complement(V_dca, n_null, rng) for ii in range(n_null): vec = comp_vecs[:, [ii]] Vp = np.concatenate([V_dca, vec], axis=1) X_train_dca = [np.dot(Xi, Vp) for Xi in X_train_ctd] X_test_dca = np.dot(X_test_ctd, Vp) r2_dca = linear_decode_r2(X_train_dca, Y_train_ctd, X_test_dca, Y_test_ctd, decoding_window=decoding_window, offset=offset) null_results[fold_idx, dim_idx, ii] = r2_dca return results, null_results def gen_pi_heatmap(calc_pi_fn, N=100): theta_vals = np.linspace(0, np.pi, N) phi_vals = np.linspace(0, np.pi, N) heatmap = np.zeros((N, N)) for theta_idx in range(N): if theta_idx % 10 == 0: print("theta_idx =", theta_idx) for phi_idx in range(N): theta, phi = theta_vals[theta_idx], phi_vals[phi_idx] x = np.cos(phi) * np.sin(theta) y = np.sin(phi) * np.sin(theta) z = np.cos(theta) V = np.array([x, y, z]).reshape((3, 1)) heatmap[theta_idx, phi_idx] = calc_pi_fn(V) return heatmap def make_pi_fn_gp(cross_cov_mats): def calc_pi_fn_gp(V): pi = calc_pi_from_cross_cov_mats(cross_cov_mats, proj=V) return pi return calc_pi_fn_gp def make_pi_fn_knn(X, T_pi, n_jobs=-1): from info_measures.continuous import kraskov_stoegbauer_grassberger as ksg def calc_pi_fn_knn(V): X_proj = np.dot(X, V) X_proj_lags = form_lag_matrix(X_proj, 2 * T_pi) mi = ksg.MutualInformation(X_proj_lags[:, :T_pi], X_proj_lags[:, T_pi:], add_noise=True) pi = mi.mutual_information(n_jobs=n_jobs) return pi return calc_pi_fn_knn def random_proj_pi_comparison(calc_pi_fn_1, cal_pi_fn_2, N, d=1, n_samples=10000, seed=20210412): rng = np.random.RandomState(seed) pi_1, pi_2 = np.zeros(n_samples), np.zeros(n_samples) for i in range(n_samples): if i % 100 == 0: print("sample {} of {}".format(i, n_samples)) V = init_coef(N, d, rng=rng, init='random_ortho') pi_1[i] = calc_pi_fn_1(V) pi_2[i] = cal_pi_fn_2(V) pi_12 = np.vstack((pi_1, pi_2)).T # (n_samples, 2) return pi_12 def gp_knn_trajectories(num_traj, cross_cov_mats, X, T_pi, d): f_gp = make_pi_fn_gp(cross_cov_mats) f_knn = make_pi_fn_knn(X, T_pi=T_pi) trajectories = [] for traj_idx in range(num_traj): print("traj_idx =", traj_idx) opt = DCA(d=d, T=T_pi) opt.cross_covs = cross_cov_mats opt.fit_projection(record_V=True) V_seq = opt.V_seq num_dca_iter = len(V_seq) pi_gp_knn_traj = np.zeros((num_dca_iter, 2)) for i in range(num_dca_iter): if i % 50 == 0: print("{} of {}".format(i, num_dca_iter)) V = V_seq[i] pi_gp_knn_traj[i, 0] = f_gp(V) pi_gp_knn_traj[i, 1] = f_knn(V) trajectories.append(pi_gp_knn_traj) return trajectories def dca_deflation(cross_cov_mats, n_proj, n_init=1): N = cross_cov_mats.shape[1] T = cross_cov_mats.shape[0] // 2 F = np.eye(N) cov_proj = np.copy(cross_cov_mats) basis = np.zeros((N, n_proj)) opt = DCA(T=T) for i in range(n_proj): if i % 10 == 0: print(i) # run DCA opt.cross_covs = cov_proj opt.fit_projection(d=1, n_init=n_init) v = opt.coef_.flatten() # get full-dim v v_full = np.dot(F, v) basis[:, i] = v_full # update U, F, cov_proj U = scipy.linalg.orth(np.eye(N - i) - np.outer(v, v)) F = np.dot(F, U) cov_proj = np.array([U.T.dot(C).dot(U) for C in cov_proj]) return basis def dca_fft_deflation(X, T, n_proj, n_init=1): N = X.shape[1] F = np.eye(N) X_proj = np.copy(X) basis = np.zeros((N, n_proj)) opt = DCAFFT(T=T, d=1) for i in range(n_proj): if i % 10 == 0: print(i) # run DCA opt.fit(X_proj, n_init=n_init) v = opt.coef_.flatten() # get full-dim v v_full = np.dot(F, v) basis[:, i] = v_full # update U, F, X U = scipy.linalg.orth(np.eye(N - i) - np.outer(v, v)) F = np.dot(F, U) X_proj = np.dot(X_proj, U) return basis def dca_full(cross_cov_mats, n_proj, n_init=1): T = cross_cov_mats.shape[0] // 2 opt = DCA(T=T) opt.cross_covs = cross_cov_mats V_seq = [] for i in range(n_proj): if i % 10 == 0: print(i) opt.fit_projection(d=i + 1, n_init=n_init) V = opt.coef_ V_seq.append(V) return V_seq def calc_pi_vs_dim(cross_cov_mats, V=None, V_seq=None): if V_seq is None: V_seq = [V[:, :i + 1] for i in range(V.shape[1])] pi_vals = np.zeros(len(V_seq)) for i in range(len(V_seq)): V = V_seq[i] pi_vals[i] = calc_pi_from_cross_cov_mats(cross_cov_mats, proj=V) return pi_vals