Source code for metran.factoranalysis

"""FactorAnalysis class for Metran in Pastas."""

from logging import getLogger

import numpy as np
import scipy.optimize as scopt
from pastas.utils import initialize_logger

logger = getLogger(__name__)
initialize_logger(logger)


[docs] class FactorAnalysis: """Class to perform a factor analysis for the Pastas Metran model. Parameters ---------- maxfactors : int, optional. maximum number of factors to select. The default is None. Examples -------- A minimal working example of the FactorAnalysis class is shown below: >>> fa = FactorAnalysis() >>> factors = fa.solve(oseries) """ def __init__(self, maxfactors=None): self.maxfactors = maxfactors
[docs] def get_eigval_weight(self): """Method to get the relative weight of each eigenvalue. Returns ------- numpy.ndarray All eigenvalues as a fraction of the sum of eigenvalues. """ return self.eigval / np.sum(self.eigval)
[docs] def solve(self, oseries): """Method to perform factor analysis. Factor analysis is based on the minres algorithm. The number of eigenvalues is determined by MAP test. If more than one eigenvalue is used, the factors are rotated using orthogonal rotation. Parameters ---------- oseries : pandas.DataFrame Object containing the time series. The series can be non-equidistant. Raises ------ Exception If no proper factors can be derived from the series. Returns ------- factors : numpy.ndarray Factor loadings. """ correlation = self._get_correlations(oseries) self.eigval, eigvec = self._get_eigval(correlation) # Velicer's MAP test try: nfactors, _ = self._maptest(correlation, eigvec, self.eigval) msg = "Number of factors according to Velicer's MAP test: " + f"{nfactors}" logger.info(msg) if nfactors == 0: nfactors = sum(self.eigval > 1) msg = ( "Number of factors according to Kaiser criterion: " + f"{nfactors}" ) logger.info(msg) if self.maxfactors is not None: nfactors = min(nfactors, self.maxfactors) except Exception: nfactors = 0 factors = self._minres(correlation, nfactors) if (nfactors > 0) and (factors is not None) and (np.count_nonzero(factors) > 0): # factors is not None and does not contain nonzero elements if nfactors > 1: # perform varimax rotation comm = np.zeros(factors.shape[0]) for i in range(factors.shape[0]): for j in range(nfactors): comm[i] = comm[i] + factors[i, j] ** 2 factors[i, :] = factors[i, :] / np.sqrt(comm[i]) factors = self._rotate(factors[:, :nfactors]) for i in range(factors.shape[0]): factors[i, :] = factors[i, :] * np.sqrt(comm[i]) # swap sign if dominant sign is negative for j in range(factors.shape[1]): facsign = 0 for i in range(factors.shape[0]): facsign = facsign + factors[i, j] if facsign < 0: for i in range(factors.shape[0]): if np.sign(factors[i, j]) != 0: factors[i, j] = -1.0 * factors[i, j] self.factors = np.atleast_2d(factors[:, :nfactors]) self.fep = 100 * np.sum(self.get_eigval_weight()[:nfactors]) else: msg = "No proper common factors could be derived from series." logger.warning(msg) self.factors = None return self.factors
@staticmethod def _rotate(phi, gamma=1, maxiter=20, tol=1e-6): """Internal method to rotate factor loadings. Uses varimax, quartimax, equamax, or parsimax rotation. Parameters ---------- phi : numpy.ndarray Eigenvectors to be rotated gamma : float, optional Coefficient for rotation. The default is 1. Varimax: gamma = 1. Quartimax: gamma = 0. Equamax: gamma = nfac/2. Parsimax: gamma = nvar(nfac - 1)/(nvar + nfac - 2). maxiter : integer, optional Maximum number of iterations. The default is 20. tol : float, optional Stop criterion. The default is 1e-6. Returns ------- phi_rot : 2-dimensional array rotated eigenvectors References ---------- Kaiser, H.F. (1958): The varimax criterion for analytic rotation in factor analysis. Psychometrika 23: 187–200. """ p, k = phi.shape R = np.eye(k) d = 0 for _ in range(maxiter): d_old = d Lambda = np.dot(phi, R) u, s, vh = np.linalg.svd( np.dot( phi.T, np.asarray(Lambda) ** 3 - (gamma / p) * np.dot(Lambda, np.diag(np.diag(np.dot(Lambda.T, Lambda)))), ) ) R = np.dot(u, vh) d = np.sum(s) if (d_old != 0) and (d / d_old < 1 + tol): break phi_rot = np.dot(phi, R) return phi_rot def _minres(self, s, nf, covar=False): """Internal method for estimating factor loadings. Uses the minimum residuals (minres) algorithm. Parameters ---------- s : numpy.ndarray Correlation matrix nf : integer Number of factors covar : boolean True if S is covar Returns ------- loadings : numpy.ndarray Estimated factor loadings """ sorg = np.copy(s) try: ssmc = 1 - 1 / np.diag(np.linalg.inv(s)) if (not (covar) and np.sum(ssmc) == nf) and (nf > 1): start = 0.5 * np.ones(nf, dtype=float) else: start = np.diag(s) - ssmc except: return bounds = list() for _ in range(len(start)): bounds.append((0.005, 1)) res = scopt.minimize( self._minresfun, start, method="L-BFGS-B", jac=self._minresgrad, bounds=bounds, args=(s, nf), ) loadings = self._get_loadings(res.x, sorg, nf) return loadings @staticmethod def _maptest(cov, eigvec, eigval): """Internal method to run Velicer's MAP test. Determines the number of factors to be used. This method includes two variations of the MAP test: the orginal and the revised MAP test. Parameters ---------- cov : numpy.ndarray Covariance matrix. eigvec : numpy.ndarray Matrix with columns eigenvectors associated with eigenvalues. eigval : numpy.ndarray Vector with eigenvalues in descending order. Returns ------- nfacts : integer Number factors according to MAP test. nfacts4 : integer Number factors according to revised MAP test. References ---------- The original MAP test: Velicer, W. F. (1976). Determining the number of components from the matrix of partial correlations. Psychometrika, 41, 321-327. The revised (2000) MAP test i.e., with the partial correlations raised to the 4rth power (rather than squared): Velicer, W. F., Eaton, C. A., and Fava, J. L. (2000). Construct explication through factor or component analysis: A review and evaluation of alternative procedures for determining the number of factors or components. Pp. 41-71 in R. D. Goffin and E. Helmes, eds., Problems and solutions in human assessment. Boston: Kluwer. """ nvars = len(eigval) fm = np.array([np.arange(nvars, dtype=float), np.arange(nvars, dtype=float)]).T np.put( fm, [0, 1], ((np.sum(np.sum(np.square(cov))) - nvars) / (nvars * (nvars - 1))), ) fm4 = np.copy(fm) np.put( fm4, [0, 1], ( (np.sum(np.sum(np.square(np.square(cov)))) - nvars) / (nvars * (nvars - 1)) ), ) for m in range(nvars - 1): biga = np.atleast_2d(eigvec[:, : m + 1]) partcov = cov - np.dot(biga, biga.T) # exit function with nfacts=1 if diag partcov contains negatives if np.amin(np.diag(partcov)) < 0: return 1, 1 d = np.diag((1 / np.sqrt(np.diag(partcov)))) pr = np.dot(d, np.dot(partcov, d)) np.put( fm, [m + 1, 1], ((np.sum(np.sum(np.square(pr))) - nvars) / (nvars * (nvars - 1))), ) np.put( fm4, [m + 1, 1], ( (np.sum(np.sum(np.square(np.square(pr)))) - nvars) / (nvars * (nvars - 1)) ), ) minfm = fm[0, 1] nfacts = 0 minfm4 = fm4[0, 1] nfacts4 = 0 for s in range(nvars): fm[s, 0] = s fm4[s, 0] = s if fm[s, 1] < minfm: minfm = fm[s, 1] nfacts = s if fm4[s, 1] < minfm4: minfm4 = fm4[s, 1] nfacts4 = s return nfacts, nfacts4 @staticmethod def _minresfun(psi, s, nf): """Function to be minimized in minimum residuals (minres) algorithm. Parameters ---------- psi : array Vector to be adjusted during optimization s : array Correlation matrix nf : integer Number of factors Returns ------- obj : array objective function defined as sum of residuals """ s2 = np.copy(s) np.fill_diagonal(s2, 1 - psi) eigval, eigvec = np.linalg.eigh(s2) EPS = np.finfo(float).eps eigval[eigval < EPS] = 100 * EPS if nf > 1: loadings = np.atleast_2d( np.dot(eigvec[:, :nf], np.diag(np.sqrt(eigval[:nf]))) ) else: loadings = eigvec[:, 0] * np.sqrt(eigval[0]) model = np.dot(loadings, loadings.T) residual = np.square(s2 - model) np.fill_diagonal(residual, 0) return np.sum(residual) def _minresgrad(self, psi, s, nf): """Internal method to calculate jacobian of function. Jacobian to be minimized in minimum residuals (minres) algorithm. Parameters ---------- psi : array Vector to be adjusted during optimization. s : array Correlation matrix. nf : integer Number of factors. Returns ------- jac : array Jacobian of minresfun. """ load = self._get_loadings(psi, s, nf) g = np.dot(load, load.T) + np.diag(psi) - s jac = np.diag(g) / np.square(psi) return jac @staticmethod def _get_loadings(psi, s, nf): """Internal method to estimate matrix of factor loadings. Based on minimum residuals (minres) algorithm. Parameters ---------- psi : numpy.ndarray Communality estimate. s : numpy.ndarray Correlation matrix. nf : integer Number of factors. Returns ------- load : npumy.ndarray Estimated factor loadings. """ sc = np.diag(1 / np.sqrt(psi)) sstar = np.dot(sc, np.dot(s, sc)) eigval, eigvec = np.linalg.eig(sstar) L = eigvec[:, :nf] load = np.dot(L, np.diag(np.sqrt(np.maximum(np.subtract(eigval[:nf], 1), 0)))) load = np.dot(np.diag(np.sqrt(psi)), load) return load @staticmethod def _get_correlations(oseries): """Internal method to calculate correlations for multivariate series. Parameters ---------- oseries : pandas.DataFrame Multivariate series Returns ------- corr : numpy.ndarray Correlation matrix """ corr = np.array(oseries.corr()) return corr @staticmethod def _get_eigval(correlation): """Internal method to get eigenvalues and eigenvectors. Get eigenvalues and eigenvectors based on correlation matrix. Parameters ---------- correlation : numpy.ndarray Correlation matrix for which eigenvalues and eigenvectors need to be derived. Raises ------ Exception If method results in complex eigenvalues and eigenvectors. Returns ------- eigval : numpy.ndarray Vector with eigenvalues. eigvec : numpy.ndarray Matrix with eigenvectors. """ # perform eigenvalue decomposition eigval, eigvec = np.linalg.eig(correlation) if isinstance(eigval[0], np.complex128): msg = ( "Serial correlation matrix has " + "complex eigenvalues and eigenvectors. " + "Factors cannot be estimated for these series." ) logger.error(msg) raise Exception(msg) # sort eigenvalues and eigenvectors evals_order = np.argsort(-eigval) eigval = eigval[evals_order] eigval[eigval < 0] = 0.0 eigvec = eigvec[:, evals_order] eigvec = np.atleast_2d(np.dot(eigvec, np.sqrt(np.diag(eigval)))) return eigval, eigvec