# -*- coding: utf-8 -*-
#
#
# Python implementation of the fast ICA algorithms.
#
# Reference: Tables 8.3 and 8.4 page 196 in the book:
# Independent Component Analysis, by Hyvarinen et al.
# Authors: Pierre Lafaye de Micheaux, Stefan van der Walt, Gael Varoquaux,
# Bertrand Thirion, Alexandre Gramfort, Denis A. Engemann
# License: BSD 3 clause
#
#
# Code from sklearn.decomposition.fastica_.py adapted by The Virtual Brain team - August 2019
#
# -------------------------------------------------------------------------------------------
#
# TheVirtualBrain-Scientific Package. This package holds all simulators, and
# analysers necessary to run brain-simulations. You can use it stand alone or
# in conjunction with TheVirtualBrain-Framework Package. See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
.. moduleauthor:: Gabriel Florea <gabriel.florea@codemart.ro>
"""
import warnings
import numpy
from scipy import linalg
from scipy._lib._util import check_random_state
from six import moves
from six import string_types
def _sym_decorrelation(W):
""" Symmetric decorrelation
i.e. W <- (W * W.T) ^{-1/2} * W
"""
s, u = linalg.eigh(numpy.dot(W, W.T))
# u (resp. s) contains the eigenvectors (resp. square roots of
# the eigenvalues) of W * W.T
return numpy.dot(numpy.dot(u * (1. / numpy.sqrt(s)), u.T), W)
def _ica_def(X, tol, g, fun_args, max_iter, w_init):
"""
Deflationary FastICA using fun approx to neg-entropy function
Used internally by FastICA.
"""
n_components = w_init.shape[0]
W = numpy.zeros((n_components, n_components), dtype=X.dtype)
n_iter = []
# j is the index of the extracted component
for j in range(n_components):
w = w_init[j, :].copy()
w /= numpy.sqrt((w ** 2).sum())
i = None
for i in moves.xrange(max_iter):
gwtx, g_wtx = g(numpy.dot(w.T, X), fun_args)
w1 = (X * gwtx).mean(axis=1) - g_wtx.mean() * w
w1 -= numpy.dot(numpy.dot(w1, W[:j].T), W[:j])
w1 /= numpy.sqrt((w1 ** 2).sum())
lim = numpy.abs(numpy.abs((w1 * w).sum()) - 1)
w = w1
if lim < tol:
break
n_iter.append(i + 1)
W[j, :] = w
return W, max(n_iter)
def _ica_par(X, tol, g, fun_args, max_iter, w_init):
"""
Parallel FastICA.
Used internally by FastICA --main loop
"""
W = _sym_decorrelation(w_init)
del w_init
p_ = float(X.shape[1])
ii = None
for ii in moves.xrange(max_iter):
gwtx, g_wtx = g(numpy.dot(W, X), fun_args)
W1 = _sym_decorrelation(numpy.dot(gwtx, X.T) / p_
- g_wtx[:, numpy.newaxis] * W)
del gwtx, g_wtx
# builtin max, abs are faster than numpy counter parts.
lim = max(abs(abs(numpy.diag(numpy.dot(W1, W.T))) - 1))
W = W1
if lim < tol:
break
else:
warnings.warn('FastICA did not converge. Consider increasing '
'tolerance or the maximum number of iterations.')
return W, ii + 1
# Some standard non-linear functions.
def _logcosh(x, fun_args=None):
alpha = fun_args.get('alpha', 1.0) # comment it out?
x *= alpha
gx = numpy.tanh(x, x) # apply the tanh inplace
g_x = numpy.empty(x.shape[0])
# XXX compute in chunks to avoid extra allocation
for i, gx_i in enumerate(gx): # please don't vectorize.
g_x[i] = (alpha * (1 - gx_i ** 2)).mean()
return gx, g_x
def _exp(x, fun_args):
exp = numpy.exp(-(x ** 2) / 2)
gx = x * exp
g_x = (1 - x ** 2) * exp
return gx, g_x.mean(axis=-1)
def _cube(x, fun_args):
return x ** 3, (3 * x ** 2).mean(axis=-1)
[docs]def fastica(X, n_components=None, algorithm="parallel", whiten=True,
fun="logcosh", fun_args=None, max_iter=200, tol=1e-04, w_init=None,
random_state=None, return_X_mean=False, compute_sources=True,
return_n_iter=False):
"""Perform Fast Independent Component Analysis.
Read more in the :ref:`User Guide <ICA>`.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training vector, where n_samples is the number of samples and
n_features is the number of features.
n_components : int, optional
Number of components to extract. If None no dimension reduction
is performed.
algorithm : {'parallel', 'deflation'}, optional
Apply a parallel or deflational FASTICA algorithm.
whiten : boolean, optional
If True perform an initial whitening of the data.
If False, the data is assumed to have already been
preprocessed: it should be centered, normed and white.
Otherwise you will get incorrect results.
In this case the parameter n_components will be ignored.
fun : string or function, optional. Default: 'logcosh'
The functional form of the G function used in the
approximation to neg-entropy. Could be either 'logcosh', 'exp',
or 'cube'.
You can also provide your own function. It should return a tuple
containing the value of the function, and of its derivative, in the
point. The derivative should be averaged along its last dimension.
Example:
def my_g(x):
return x ** 3, np.mean(3 * x ** 2, axis=-1)
fun_args : dictionary, optional
Arguments to send to the functional form.
If empty or None and if fun='logcosh', fun_args will take value
{'alpha' : 1.0}
max_iter : int, optional
Maximum number of iterations to perform.
tol : float, optional
A positive scalar giving the tolerance at which the
un-mixing matrix is considered to have converged.
w_init : (n_components, n_components) array, optional
Initial un-mixing array of dimension (n.comp,n.comp).
If None (default) then an array of normal r.v.'s is used.
random_state : int, RandomState instance or None, optional (default=None)
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
return_X_mean : bool, optional
If True, X_mean is returned too.
compute_sources : bool, optional
If False, sources are not computed, but only the rotation matrix.
This can save memory when working with big data. Defaults to True.
return_n_iter : bool, optional
Whether or not to return the number of iterations.
Returns
-------
K : array, shape (n_components, n_features) | None.
If whiten is 'True', K is the pre-whitening matrix that projects data
onto the first n_components principal components. If whiten is 'False',
K is 'None'.
W : array, shape (n_components, n_components)
Estimated un-mixing matrix.
The mixing matrix can be obtained by::
w = np.dot(W, K.T)
A = w.T * (w * w.T).I
S : array, shape (n_samples, n_components) | None
Estimated source matrix
X_mean : array, shape (n_features, )
The mean over features. Returned only if return_X_mean is True.
n_iter : int
If the algorithm is "deflation", n_iter is the
maximum number of iterations run across all components. Else
they are just the number of iterations taken to converge. This is
returned only when return_n_iter is set to `True`.
Notes
-----
The data matrix X is considered to be a linear combination of
non-Gaussian (independent) components i.e. X = AS where columns of S
contain the independent components and A is a linear mixing
matrix. In short ICA attempts to `un-mix' the data by estimating an
un-mixing matrix W where ``S = W K X.``
This implementation was originally made for data of shape
[n_features, n_samples]. Now the input is transposed
before the algorithm is applied. This makes it slightly
faster for Fortran-ordered input.
Implemented using FastICA:
`A. Hyvarinen and E. Oja, Independent Component Analysis:
Algorithms and Applications, Neural Networks, 13(4-5), 2000,
pp. 411-430`
"""
K = None
X_mean = None
random_state = check_random_state(random_state)
fun_args = {} if fun_args is None else fun_args
alpha = fun_args.get('alpha', 1.0)
if not 1 <= alpha <= 2:
raise ValueError('alpha must be in [1,2]')
if fun == 'logcosh':
g = _logcosh
elif fun == 'exp':
g = _exp
elif fun == 'cube':
g = _cube
elif callable(fun):
def g(x, fun_args):
return fun(x, **fun_args)
else:
exc = ValueError if isinstance(fun, string_types) else TypeError
raise exc("Unknown function %r;"
" should be one of 'logcosh', 'exp', 'cube' or callable"
% fun)
X = validate_and_transpose_source(X)
n, p = X.shape
if not whiten and n_components is not None:
n_components = None
warnings.warn('Ignoring n_components with whiten=False.')
if n_components is None:
n_components = min(n, p)
if n_components > min(n, p):
n_components = min(n, p)
warnings.warn('n_components is too large: it will be set to %s' % n_components)
if whiten:
# Centering the columns (ie the variables)
X_mean = X.mean(axis=-1)
X -= X_mean[:, numpy.newaxis]
# Whitening and preprocessing by PCA
u, d, _ = linalg.svd(X, full_matrices=False)
del _
K = (u / d).T[:n_components] # see (6.33) p.140
del u, d
X1 = numpy.dot(K, X)
# see (13.6) p.267 Here X1 is white and data
# in X has been projected onto a subspace by PCA
X1 *= numpy.sqrt(p)
else:
X1 = X
if w_init is None:
w_init = numpy.asarray(random_state.normal(size=(n_components,
n_components)), dtype=X1.dtype)
else:
w_init = numpy.asarray(w_init)
if w_init.shape != (n_components, n_components):
raise ValueError('w_init has invalid shape -- should be %(shape)s'
% {'shape': (n_components, n_components)})
kwargs = {'tol': tol,
'g': g,
'fun_args': fun_args,
'max_iter': max_iter,
'w_init': w_init}
if algorithm == 'parallel':
W, n_iter = _ica_par(X1, **kwargs)
elif algorithm == 'deflation':
W, n_iter = _ica_def(X1, **kwargs)
else:
raise ValueError('Invalid algorithm: must be either `parallel` or'
' `deflation`.')
del X1
if whiten:
if compute_sources:
S = numpy.dot(numpy.dot(W, K), X).T
else:
S = None
if return_X_mean:
if return_n_iter:
return K, W, S, X_mean, n_iter
else:
return K, W, S, X_mean
else:
if return_n_iter:
return K, W, S, n_iter
else:
return K, W, S
else:
if compute_sources:
S = numpy.dot(W, X).T
else:
S = None
if return_X_mean:
if return_n_iter:
return None, W, S, None, n_iter
else:
return None, W, S, None
else:
if return_n_iter:
return None, W, S, n_iter
else:
return None, W, S
[docs]def validate_and_transpose_source(X):
# Raise error if input is scalar or 1D
if X.ndim == 0:
raise ValueError(
"Expected 2D array, got scalar array instead:\narray={}.\n"
"Reshape your data either using array.reshape(-1, 1) if "
"your data has a single feature or array.reshape(1, -1) "
"if it contains a single sample.".format(X))
if X.ndim == 1:
raise ValueError(
"Expected 2D array, got 1D array instead:\narray={}.\n"
"Reshape your data either using array.reshape(-1, 1) if "
"your data has a single feature or array.reshape(1, -1) "
"if it contains a single sample.".format(X))
if numpy.issubdtype(X.dtype, numpy.floating) and X.dtype.itemsize < 8:
sum_result = numpy.sum(X, dtype=numpy.float64)
else:
sum_result = numpy.sum(X)
is_finite = numpy.isfinite(sum_result)
if not is_finite:
raise ValueError("Input contains infinity or a value too large for {}.".format(X.dtype))
return X.T