Source code for dca.synth_data

import numpy as np
import scipy
from scipy.linalg import expm
from numpy.linalg import matrix_power
from scipy.signal import resample

from .cov_util import (calc_cov_from_cross_cov_mats, calc_cross_cov_mats_from_cov,
                       calc_cross_cov_mats_from_data, calc_pi_from_cov)


[docs]def gen_gp_cov(kernel, T, N): """Generates a T*N-by-T*N covariance matrix for a spatiotemporal Gaussian process (2D Gaussian random field) with a provided kernel. Parameters ---------- T : int Number of time-steps. N : int Number of spatial steps. kernel : function Should be of the form kernel = K(t1, t2, x1, x2). The kernel can choose to imlement temporal or spatial stationarity, however this is not enfored. Returns ------- C : np.ndarray, shape (T*N, T*N) Covariance matrix for the Gaussian process. Time is the "outer" variable and space is the "inner" variable. """ t1, t2, x1, x2 = np.arange(T), np.arange(T), np.arange(N), np.arange(N) t1, t2, x1, x2 = np.meshgrid(t1, t2, x1, x2, indexing="ij") C = kernel(t1, t2, x1, x2) C = C.swapaxes(1, 2).reshape(T * N, T * N) return C
[docs]def calc_pi_for_gp(kernel, T_pi, N): """Calculates the predictive information in a spatiotemporal Gaussian process with a given kernel. Parameters ---------- T : int Length of temporal windows accross which to compute mutual information. N : int Number of spatial steps in teh Gaussian process. kernel : function Should be of the form kernel = K(t1, t2, x1, x2). The kernel can choose to imlement temporal or spatial stationarity, however this is not enfored. Returns ------- PI : float (Temporal) predictive information in the Gaussian process. """ cov_2_T_pi = gen_gp_cov(kernel, 2 * T_pi, N) cov_T_pi = cov_2_T_pi[:T_pi * N, :T_pi * N] sgn_T, logdet_T = np.linalg.slogdet(cov_T_pi) sgn_2T, logdet_2T = np.linalg.slogdet(cov_2_T_pi) PI = logdet_T - 0.5 * logdet_2T return PI
[docs]def gen_gp_kernel(kernel_type, spatial_scale, temporal_scale, local_noise=0.): """Generates a specified type of Kernel for a spatiotemporal Gaussian process. Parameters ---------- kernel_type : string 'squared_exp' or 'exp' spatial_scale : float Spatial autocorrelation scale. temporal_scale : float Temporal autocorrelation scale. Returns ------- K : function Kernel of the form K(t1, t2, x1, x2). """ def squared_exp(t1, t2, x1, x2): rval = np.exp(-(t1 - t2)**2 / temporal_scale**2 - (x1 - x2)**2 / spatial_scale**2) if local_noise > 0.: local_mask = np.logical_and(np.equal(t1, t2), np.equal(x1, x2)) rval += local_noise * local_mask return rval def exp(t1, t2, x1, x2): rval = np.exp(-np.abs(t1 - t2) / temporal_scale - np.abs(x1 - x2) / spatial_scale) if local_noise > 0.: local_mask = np.logical_and(np.equal(t1, t2), np.equal(x1, x2)) rval += local_noise * local_mask return rval def switch(t1, t2, x1, x2): mask = abs(t1 - t2) >= temporal_scale ex = exp(t1, t2, x1, x2) sq = squared_exp(t1, t2, x1, x2) rval = mask * ex + (1. - mask) * sq if local_noise > 0.: local_mask = np.logical_and(np.equal(t1, t2), np.equal(x1, x2)) rval += local_noise * local_mask return rval if kernel_type == "squared_exp": K = squared_exp elif kernel_type == "exp": K = exp elif kernel_type == "switch": K = switch return K
[docs]def sample_gp(T, N, kernel, num_to_concat=1): """Draw a sample from a spatiotemporal Gaussian process. Parameters ---------- T : int Length in time of sample. N : int Size in space of sample. kernel : function Kernel of the form K(t1, t2, x1, x2). num_to_concat : int Number of samples of lenght T to concatenate before returning the result. Returns ------- sample : np.ndarray, size (T*num_to_concat, N) Sample from the Gaussian process. """ t1, t2, x1, x2 = np.arange(T), np.arange(T), np.arange(N), np.arange(N) t1, t2, x1, x2 = np.meshgrid(t1, t2, x1, x2, indexing="ij") C = kernel(t1, t2, x1, x2) C = C.swapaxes(1, 2).reshape(T * N, T * N) sample = np.concatenate(np.random.multivariate_normal(mean=np.zeros(C.shape[0]), cov=C, size=num_to_concat)) sample = sample.reshape(T * num_to_concat, N) return sample
[docs]def embed_gp(T, N, d, kernel, noise_cov, T_pi, num_to_concat=1): """Embed a d-dimensional Gaussian process into N-dimensional space, then add (potentially) spatially structured white noise. Parameters ---------- T : int Length in time. N : int Ambient dimension. d : int Gaussian process dimension. kernel : function Kernel of the form K(t1, t2, x1, x2). noise_cov : np.ndarray, shape (N, N) Covariance matrix from which to sampel Gaussian noise to add to each time point in an iid fashion. num_to_concat : int Number of samples of lenght T to concatenate before returning the result. Returns ------- X : np.ndarray, size (T*num_to_concat, N) Embedding of GP into high-dimensional space, plus noise. """ # Latent dynamics Y = sample_gp(T, d, kernel, num_to_concat) # Random orthogonal embedding matrix U U = scipy.stats.ortho_group.rvs(N)[:, :d] # Data matrix X X = np.dot(Y, U.T) # Corrupt data by spatially structured white noise X += np.random.multivariate_normal(mean=np.zeros(N), cov=noise_cov, size=T * num_to_concat) # Compute the PI of the high-dimensional, noisy process cov_low_d = gen_gp_cov(T=2 * T_pi, N=d, kernel=kernel) low_d_cross_cov_mats = calc_cross_cov_mats_from_cov(cov_low_d, 2 * T_pi, d) high_d_cross_cov_mats = np.array([np.dot(U, np.dot(C, U.T)) for C in low_d_cross_cov_mats]) high_d_cross_cov_mats[0] += noise_cov cov_high_d = calc_cov_from_cross_cov_mats(high_d_cross_cov_mats) full_pi = calc_pi_from_cov(cov_high_d) embedding_cross_cov_mats = np.array([np.dot(U.T, np.dot(C, U)) for C in high_d_cross_cov_mats]) cov_embedding = calc_cov_from_cross_cov_mats(embedding_cross_cov_mats) embedding_pi = calc_pi_from_cov(cov_embedding) return X, Y, U, full_pi, embedding_pi, high_d_cross_cov_mats
[docs]def gen_lorenz_system(T, seed, integration_dt=0.005): """ Period ~ 1 unit of time (total time is T) So make sure integration_dt << 1 Known-to-be-good chaotic parameters See sussillo LFADS paper """ rng = np.random.RandomState(seed) rho = 28.0 sigma = 10.0 beta = 8 / 3. def dx_dt(state, t): x, y, z = state x_dot = sigma * (y - x) y_dot = x * (rho - z) - y z_dot = x * y - beta * z return (x_dot, y_dot, z_dot) x_0 = rng.randn(3) t = np.arange(0, T, integration_dt) X = scipy.integrate.odeint(dx_dt, x_0, t) return X
def gen_lorenz_data(num_samples, normalize=True, seed=20210610): integration_dt = 0.005 data_dt = 0.025 skipped_samples = 1000 T = (num_samples + 2 * skipped_samples) * data_dt X = gen_lorenz_system(T, seed, integration_dt) if normalize: X -= X.mean(axis=0) X /= X.std(axis=0) X_dwn = resample(X, num_samples + 2 * skipped_samples, axis=0) X_dwn = X_dwn[skipped_samples:-skipped_samples] return X_dwn def oscillators_dynamics_mat(N=10, omega_sq=.01, alpha_sq=.2, gamma=.05, tau=1.): # spring matrix K K = np.zeros((N, N)) K += np.eye(N) * (0.5 * omega_sq + alpha_sq) K[1:, :-1] += np.eye(N - 1) * (-0.5 * alpha_sq) # lower diag K[:-1, 1:] += np.eye(N - 1) * (-0.5 * alpha_sq) # upper diag K[0, -1] = -0.5 * alpha_sq K[-1, 0] = -0.5 * alpha_sq # friction matrix gamma gamma_mat = gamma * np.eye(N) # continuous-time dynamics matrix B (z^dot = B * z) B_top = np.concatenate((np.zeros((N, N)), np.eye(N)), axis=1) B_bottom = np.concatenate((-K, -gamma_mat), axis=1) B = np.concatenate((B_top, B_bottom), axis=0) # discrete-time dynamics matrix A (z_t = A * z_{t-1}) A = expm(B * tau) return A def oscillators_cross_cov_mats(A, T=10, sigma=1.): # get N N = A.shape[0] // 2 # noise matrix sigma (just position noise) sigma_mat = np.zeros((2 * N, 2 * N)) sigma_mat[:N, :N] = np.eye(N) * sigma**2 # steady-state covariance matrix C C = np.ones(2 * N) for i in range(100000): C = A.dot(C).dot(A.T) + sigma_mat # function for generating cross-cov matrices <z_{t+k} z_t^T> def gen_cross_cov(k): A_k = matrix_power(A, k) A_k_C = A_k.dot(C) return A_k_C # make all cross_cov mats for delta t = {0, .., T-1} cross_cov_mats = np.array([gen_cross_cov(k) for k in range(T)]) return cross_cov_mats def sample_oscillators(A, T, sigma=1.): # generate sample of dynamics N = A.shape[0] // 2 z = np.zeros((T, 2 * N)) for i in range(1, T): z[i] = np.dot(A, z[i - 1]) z[i, :N] += np.random.normal(0, sigma, N) return z def gen_noise_cov(N, D, var, rng, V_noise=None): noise_spectrum = var * np.exp(-2 * np.arange(N) / D) if V_noise is None: V_noise = scipy.stats.ortho_group.rvs(N, random_state=rng) noise_cov = np.dot(V_noise, np.dot(np.diag(noise_spectrum), V_noise.T)) return noise_cov def random_basis(N, D, rng): return scipy.stats.ortho_group.rvs(N, random_state=rng)[:, :D] def median_subspace(N, D, rng, num_samples=5000, V_0=None): subspaces = np.zeros((num_samples, N, D)) angles = np.zeros((num_samples, min(D, V_0.shape[1]))) if V_0 is None: V_0 = np.eye(N)[:, :D] for i in range(num_samples): subspaces[i] = random_basis(N, D, rng) angles[i] = np.rad2deg(scipy.linalg.subspace_angles(V_0, subspaces[i])) median_angles = np.median(angles, axis=0) median_subspace_idx = np.argmin(np.sum((angles - median_angles)**2, axis=1)) median_subspace = subspaces[median_subspace_idx] return median_subspace
[docs]def embedded_lorenz_cross_cov_mats(N, T, snr=1., noise_dim=7, return_samples=False, num_lorenz_samples=10000, num_subspace_samples=5000, V_dynamics=None, V_noise=None, X_dynamics=None, seed=20200326): """Embed the Lorenz system into high dimensions with additive spatially structued white noise. Signal and noise subspaces are oriented with the median subspace angle. Parameters ---------- N : int Embedding dimension. T : int Number of timesteps (2 * T_pi) snr : float Signal-to-noise ratio. Specifically it is the ratio of the largest eigenvalue of the signal covariance to the largest eigenvalue of the noise covariance. noise_dim : int Dimension at which noise eigenvalues fall to 1/e. If noise_dim is np.inf then a flat spectrum is used. return_samples : bool Whether to return cross_cov_mats or data samples. num_lorenz_samples : int Number of data samples to use. num_subspace_samples : int Number of random subspaces used to calculate the median relative angle. seed : int Seed for Numpy random state. """ rng = np.random.RandomState(seed) # Generate Lorenz dynamics if X_dynamics is None: X_dynamics = gen_lorenz_data(num_lorenz_samples) dynamics_var = np.max(scipy.linalg.eigvalsh(np.cov(X_dynamics.T))) noise_var = dynamics_var / snr # Generate dynamics embedding matrix (will remain fixed) if V_dynamics is None: if N == 3: V_dynamics = np.eye(3) else: V_dynamics = random_basis(N, 3, rng) if noise_dim == np.inf: noise_cov = np.eye(N) * noise_var else: # Generate a subspace with median principal angles w.r.t. dynamics subspace if V_noise is None: V_noise = median_subspace(N, noise_dim, rng, num_samples=num_subspace_samples, V_0=V_dynamics) # Extend V_noise to a basis for R^N if V_noise.shape[1] < N: V_noise_comp = scipy.linalg.orth(np.eye(N) - np.dot(V_noise, V_noise.T)) V_noise = np.concatenate((V_noise, V_noise_comp), axis=1) # Add noise covariance noise_cov = gen_noise_cov(N, noise_dim, noise_var, rng, V_noise=V_noise) # Generate actual samples of high-D data cross_cov_mats = calc_cross_cov_mats_from_data(X_dynamics, T) cross_cov_mats = np.array([V_dynamics.dot(C).dot(V_dynamics.T) for C in cross_cov_mats]) cross_cov_mats[0] += noise_cov if return_samples: X_samples = (np.dot(X_dynamics, V_dynamics.T) + rng.multivariate_normal(mean=np.zeros(N), cov=noise_cov, size=len(X_dynamics))) return cross_cov_mats, X_samples else: return cross_cov_mats