"""The Metran model class."""
from logging import getLogger
from os import getlogin
import numpy as np
from pandas import DataFrame, DatetimeIndex, Series, Timedelta, Timestamp, concat
from pandas.tseries.frequencies import to_offset
from pastas.timeseries import TimeSeries
from pastas.utils import initialize_logger, validate_name
from pastas.version import __version__
if __version__ > "0.22.0":
from pastas.timeseries_utils import (
_frequency_is_supported as frequency_is_supported,
)
else:
from pastas.utils import frequency_is_supported
from scipy.stats import norm
from .factoranalysis import FactorAnalysis
from .kalmanfilter import SPKalmanFilter
from .plots import MetranPlot
from .solver import ScipySolve
logger = getLogger(__name__)
initialize_logger(logger)
[docs]
class Metran:
"""Class for the Pastas Metran model.
Parameters
----------
oseries: pandas.DataFrame, list of pandas.Series or pastas.TimeSeries
Time series to be analyzed. Index must be DatetimeIndex.
The series can be non-equidistant.
name: str, optional
String with the name of the model. The default is 'Cluster'
freq: str, optional
String with the frequency the stressmodels are simulated. Must
be one of the following (D, h, m, s, ms, us, ns) or a multiple of
that e.g. "7D".
tmin: str, optional
String with a start date for the simulation period (E.g. '1980').
If none is provided, the tmin from the oseries is used.
tmax: str, optional
String with an end date for the simulation period (E.g. '2010').
If none is provided, the tmax from the oseries is used.
Returns
-------
mt: metran.Metran
Metran instance.
"""
def __init__(self, oseries, name="Cluster", freq=None, tmin=None, tmax=None):
# Default settings
self.settings = {
"tmin": None,
"tmax": None,
"freq": "D",
"min_pairs": 20,
"solver": None,
"warmup": 1,
}
if tmin is not None:
self.settings["tmin"] = tmin
if tmax is not None:
self.settings["tmax"] = tmax
if freq is not None:
self.settings["freq"] = frequency_is_supported(freq)
# Initialize and rework observations
self.nfactors = 0
self.set_observations(oseries)
self.parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"])
self.set_init_parameters()
# initialize attributes
self.masked_observations = None
self.fit = None
self.name = validate_name(name)
# File Information
self.file_info = self._get_file_info()
# add plots
self.plots = MetranPlot(self)
@property
def nparam(self):
return self.parameters.index.size
@property
def nstate(self):
return self.nseries + self.nfactors
[docs]
def standardize(self, oseries):
"""Method to standardize series.
Standardized by subtracting mean and dividing by standard deviation.
Parameters
----------
oseries : pandas.DataFrame
series to be standardized
Returns
-------
pandas.DataFrame
standardized series
"""
std = oseries.std()
mean = oseries.mean()
self.oseries_std = np.array(std.values)
self.oseries_mean = np.array(mean.values)
return (oseries - mean) / std
[docs]
def truncate(self, oseries):
"""Method to set start and end of series.
If tmin and/or tmax have been defined in self.settings, use
these dates to trucate series. Dates with only NaN are being removed.
Parameters
----------
oseries : pandas.DataFrame
series to be tructated
Returns
-------
pandas.DataFrame
truncated series
"""
if self.settings["tmin"] is None:
tmin = oseries.index.min()
else:
tmin = self.settings["tmin"]
if self.settings["tmax"] is None:
tmax = oseries.index.max()
else:
tmax = self.settings["tmax"]
oseries = oseries.loc[tmin:tmax]
return oseries.dropna(how="all")
[docs]
def test_cross_section(self, oseries=None, min_pairs=None):
"""Method to test whether series have enough cross-sectional data.
Default threshold value is defined by self.settings["min_pairs"].
Parameters
----------
oseries : pandas.DataFrame, optional
Time series to be evaluated. The default is None.
min_pairs : int, optional
Minimum number of cross-sectional data for each series.
Should be greater than 1. The default is None.
Raises
------
Exception
If one of the series has less than min_pairs of cross-sectional
data and exception is raised.
Returns
-------
None.
"""
if min_pairs is None:
min_pairs = self.settings["min_pairs"]
if min_pairs == 0:
logger.warning("min_pairs must be greater than 0.")
if oseries is None:
oseries = self.oseries.copy()
pairs = Series(index=oseries.columns, dtype=int)
oseries["count"] = oseries.count(axis=1)
for s in pairs.index:
pairs[s] = oseries.dropna(
subset=[
s,
]
)["count"].count()
oseries = oseries.drop(["count"], axis=1)
if pairs.min() < max(min_pairs, 1):
err = pairs[pairs < min_pairs].index.tolist()
msg = (
"Number of cross-sectional data is less than "
+ str(min_pairs)
+ " for series "
+ (", ").join([str(e) for e in err])
)
logger.error(msg)
raise Exception(msg)
[docs]
def get_factors(self, oseries=None):
"""Method to get factor loadings based on factor analysis.
This method also gets some relevant results from the factor analysis
including the eigenvalues and percentage explained by factors (fep).
Parameters
----------
oseries : pandas.DataFrame, optional
Series to be analyzed. The default is None.
Returns
-------
factors : numpy.ndarray
Factor loadings as estimated using factor analysis
"""
if oseries is None:
oseries = self.oseries
fa = FactorAnalysis()
self.factors = fa.solve(oseries)
self.eigval = fa.eigval
if self.factors is not None:
self.nfactors = self.factors.shape[1]
self.fep = fa.fep
else:
self.nfactors = 0
return self.factors
[docs]
def _init_kalmanfilter(self, oseries, engine="numba"):
"""Internal method, initialize Kalmanfilter for sequential processing.
Parameters
----------
oseries : pandas.DataFrame
Series being processed by the Kalmanfilter.
engine: str, optional
Engine used for the Kalman filter, by default 'numba' which is the
fastest choice but 'numpy' is also available, but is slower.
Returns
-------
None.
"""
self.kf = SPKalmanFilter(engine=engine)
self.kf.set_observations(oseries)
[docs]
def _phi(self, alpha):
"""Internal method to calculate autoregressive model parameter.
Autoregressive model parameter is calculated based on parameter
alpha.
Parameters
----------
alpha : float
model parameter
Returns
-------
float
autoregressive model parameter
"""
a = Timedelta(to_offset(self.settings["freq"])) / Timedelta(1, "D")
return np.exp(-a / alpha)
[docs]
def get_transition_matrix(self, p=None, initial=False):
"""Method to get transition matrix of the Metran dynamic factor model.
Parameters
----------
p : pandas.Series, optional
Model parameters. The default is None.
initial : bool, optional
Determines whether to use initial (True)
or optimal (False) parameters. The default is False.
Returns
-------
transition_matrix : numpy.ndarray
Transition matrix
"""
if p is None:
p = self.get_parameters(initial)
transition_matrix = np.zeros((self.nstate, self.nstate), dtype=np.float64)
for n in range(self.nseries):
name = self.snames[n] + "_sdf" + "_alpha"
transition_matrix[n, n] = self._phi(p[name])
for n in range(self.nfactors):
name = "cdf" + str(n + 1) + "_alpha"
transition_matrix[self.nseries + n, self.nseries + n] = self._phi(p[name])
return transition_matrix
[docs]
def get_transition_covariance(self, p=None, initial=False):
"""Get transition covariance matrix of the Metran dynamic factor model.
Parameters
----------
p : pandas.Series, optional
Model parameters. The default is None.
initial : bool, optional
Determines whether to use initial (True)
or optimal (False) parameters. The default is False.
Returns
-------
transition_covariance : numpy.ndarray
Transition covariance matrix
"""
if p is None:
p = self.get_parameters(initial)
transition_covariance = np.eye(self.nstate, dtype=np.float64)
factor_load = np.sum(np.square(self.factors), axis=1)
for n in range(self.nseries):
name = self.snames[n] + "_sdf" + "_alpha"
transition_covariance[n, n] = (1 - self._phi(p[name]) ** 2) * (
1 - factor_load[n]
)
for n in range(self.nfactors):
name = "cdf" + str(n + 1) + "_alpha"
transition_covariance[self.nseries + n, self.nseries + n] = (
1 - self._phi(p[name]) ** 2
)
return transition_covariance
[docs]
def get_transition_variance(self, p=None, initial=False):
"""Get the transition variance vector.
The transition variance vector is obtained by extracting the diagonal
of the transition covariance matrix.
Parameters
----------
p : pandas.Series, optional
Model parameters. The default is None.
initial : bool, optional
Determines whether to use initial (True)
or optimal (False) parameters. The default is False.
Returns
-------
transition_variance : numpy.ndarray
Transition variance vector
"""
if p is None:
p = self.get_parameters(initial)
return np.diag(self.get_transition_covariance(p))
[docs]
def get_observation_matrix(self, p=None, initial=False):
"""Method to get observation matrix of the Metran dynamic factor model.
Parameters
----------
p : pandas.Series, optional
Model parameters. The default is None.
initial : bool, optional
Determines whether to use initial (True)
or optimal (False) parameters. The default is False.
Returns
-------
observation_matrix : numpy.ndarray
Observation matrix
"""
if p is None:
p = self.get_parameters(initial)
observation_matrix = np.zeros((self.nseries, self.nstate), dtype=np.float64)
observation_matrix[:, : self.nseries] = np.eye(self.nseries)
for n in range(self.nseries):
for k in range(self.nfactors):
observation_matrix[n, self.nseries + k] = self.factors[n, k]
return observation_matrix
[docs]
def get_observation_variance(self):
"""Method to get observation matrix.
Currently the observation variance is zero by default.
Returns
-------
observation_variance : numpy.ndarray
Observation variance vector
"""
(self.nseries, _) = self.factors.shape
observation_variance = np.zeros(self.nseries, dtype=np.float64)
return observation_variance
[docs]
def _get_matrices(self, p, initial=False):
"""Internal method to get all matrices.
Returns all matrices required to define the Metran dynamic
factor model.
Parameters
----------
p : pandas.Series
Model parameters.
initial : bool, optional
Determines whether to use initial (True)
or optimal (False) parameters. The default is False.
Returns
-------
numpy.ndarray
Transition matrix.
numpy.ndarray
Transition covariance matrix.
numpy.ndarray
Observation matrix.
numpy.ndarray
Observation variance vector.
"""
return (
self.get_transition_matrix(p, initial),
self.get_transition_covariance(p, initial),
self.get_observation_matrix(p, initial),
self.get_observation_variance(),
)
[docs]
def get_parameters(self, initial=False):
"""Method to get all parameters from the individual objects.
Parameters
----------
initial: bool, optional
True to get initial parameters, False to get optimized parameters.
If optimized parameters do not exist, return initial parameters.
Returns
-------
parameters: pandas.Series
initial or optimal parameters.
"""
if not (initial) and "optimal" in self.parameters:
parameters = self.parameters["optimal"]
else:
parameters = self.parameters["initial"]
return parameters
[docs]
def set_init_parameters(self):
"""Method to initialize parameters to be optimized.
Returns
-------
None
"""
pinit_alpha = 10.0
for n in range(self.nfactors):
self.parameters.loc["cdf" + str(n + 1) + "_alpha"] = (
pinit_alpha,
1e-5,
None,
True,
"cdf",
)
for n in range(self.nseries):
self.parameters.loc[self.snames[n] + "_sdf" + "_alpha"] = (
pinit_alpha,
1e-5,
None,
True,
"sdf",
)
[docs]
def mask_observations(self, mask):
"""Mask observations for processing with Kalman filter or smoother.
This method does NOT change the oseries itself. It only
masks (hides) observations while running the
Kalman filter or smoother, so that these observations
are not used for updating the Kalman filter/smoother.
Parameters
----------
mask : pandas.DataFrame
DataFrame with shape of oseries containing 0 or False
for observation to be kept and 1 or True for observation
to be masked (hidden).
Returns
-------
None.
"""
if mask.shape != self.oseries.shape:
logger.error(
"Dimensions of mask "
+ str(mask.shape)
+ " do not equal dimensions of series "
+ str(self.oseries.shape)
+ ". Mask cannot be applied."
)
else:
self.masked_observations = self.oseries.mask(mask.astype(bool))
self.kf.init_states()
self.kf.set_observations(self.masked_observations)
self.kf.mask = True
[docs]
def unmask_observations(self):
"""Method to unmask observation and reset observations.
Returns
-------
None
"""
self.masked_observations = None
self.kf.init_states()
self.kf.set_observations(self.oseries)
self.kf.mask = False
[docs]
def set_observations(self, oseries):
"""Rework oseries to pandas.DataFrame for further use in Metran class.
Parameters
----------
oseries : pandas.DataFrame
or list/tuple of pandas.Series/pandas.DataFrame/pastas.TimeSeries
Time series to be analyzed.
Raises
------
Exception
- if a DataFrame within a list/tuple has more than one column
- if input type is not correct
- if number of series is less than 2
- if index of Series/DataFrame is not a DatetimeIndex
Returns
-------
None.
"""
# combine series to DataFrame
if isinstance(oseries, (list, tuple)):
_oseries = []
_names = []
if len(oseries) > 1:
for i, os in enumerate(oseries):
if isinstance(os, TimeSeries):
_oseries.append(os.series)
_names.append(os.name)
elif isinstance(os, (Series, DataFrame)):
if isinstance(os, DataFrame):
if os.shape[1] > 1:
msg = (
"One or more series have "
+ "DataFrame with multiple columns"
)
logger.error(msg)
raise Exception(msg)
os = os.squeeze()
if os.name is None:
os.name = "Series" + str(i + 1)
_oseries.append(os)
_names.append(os.name)
self.snames = _names
oseries = concat(_oseries, axis=1)
else:
oseries = DataFrame()
elif isinstance(oseries, DataFrame):
self.snames = oseries.columns
else:
msg = "Input type should be either a " + "list, tuple, or pandas.DataFrame"
logger.error(msg)
raise TypeError(msg)
if oseries.shape[1] < 2:
msg = "Metran requires at least 2 series, found " + str(oseries.shape[1])
logger.error(msg)
raise Exception(msg)
oseries = self.truncate(oseries)
if isinstance(oseries.index, DatetimeIndex):
oseries = oseries.asfreq("D")
self.nseries = oseries.shape[1]
self.oseries_unstd = oseries
self.oseries = self.standardize(oseries)
self.test_cross_section()
else:
msg = "Index of series must be DatetimeIndex"
logger.error(msg)
raise TypeError(msg)
[docs]
def get_observations(self, standardized=False, masked=False):
"""Returns series as available in Metran class.
Parameters
----------
standardized : bool, optional
If True, obtain standardized observations. If False,
obtain unstandardized observations. The default is False.
masked : boolean
If True, return masked observations. The default is False.
Returns
-------
pandas.DataFrame
Time series.
"""
if masked:
oseries = self.masked_observations
else:
oseries = self.oseries
if not standardized:
oseries = oseries * self.oseries_std + self.oseries_mean
return oseries
[docs]
def get_mle(self, p):
"""Method to obtain maximum likelihood estimate based on Kalman filter.
Parameters
----------
p : pandas.Series
Model parameters.
Returns
-------
mle: float
Maximum likelihood estimate.
"""
p = Series(p, index=self.parameters.index)
self.kf.set_matrices(*self._get_matrices(p))
self.kf.run_filter()
mle = self.kf.get_mle()
return mle
[docs]
def get_specificity(self):
"""Get fraction that is explained by the specific dynamic factor.
Calculate specificity for each series. The specificity is
equal to (1 - communality).
Returns
-------
numpy.ndarray
For each series the specificity, a value between 0 and 1.
A value of 0 means that the series has all variation
in common with other series. A value of 1 means that the
series has no variation in common.
"""
return 1 - self.get_communality()
[docs]
def get_communality(self):
"""Get fraction that is explained by the common dynamic factor(s).
Calculate communality for each series.
Returns
-------
numpy.ndarray
For each series the communality, a value between 0 and 1.
A value of 0 means that the series has no variation
in common with other series. A value of 1 means that the
series has all variation in common.
"""
return np.sum(np.square(self.factors), axis=1)
[docs]
def get_state_means(self, p=None, method="smoother"):
"""Method to get filtered or smoothed state means.
Parameters
----------
p : pandas.Series
Model parameters. The default is None.
method : str, optional
Use "filter" to obtain filtered states, and
"smoother" to obtain smoothed states. The default is "smoother".
Returns
-------
state_means : pandas.DataFrame
Filtered or smoothed states. Column names refer to
specific dynamic factors (sdf) and common dynamic factors (cdf)
"""
self._run_kalman(method, p=p)
columns = [name + "_sdf" for name in self.snames]
columns.extend(["cdf" + str(i + 1) for i in range(self.nfactors)])
if method == "filter":
means = self.kf.filtered_state_means
else:
means = self.kf.smoothed_state_means
state_means = DataFrame(means, index=self.oseries.index, columns=columns)
return state_means
[docs]
def get_state_variances(self, p=None, method="smoother"):
"""Method to get filtered or smoothed state variances.
Parameters
----------
p : pandas.Series
Model parameters. The default is None.
method : str, optional
Use "filter" to obtain filtered variances, and
"smoother" to obtain smoothed variances.
The default is "smoother".
Returns
-------
state_variances : pandas.DataFrame
Filtered or smoothed variances. Column names refer to
specific dynamic factors (sdf) and common dynamic factors (cdf)
"""
self._run_kalman(method, p=p)
with np.errstate(invalid="ignore"):
if method == "filter":
cov = self.kf.filtered_state_covariances
else:
cov = self.kf.smoothed_state_covariances
n_timesteps = cov.shape[0]
var = np.vstack([np.diag(cov[i]) for i in range(n_timesteps)])
columns = [name + "_sdf" for name in self.snames]
columns.extend(["cdf" + str(i + 1) for i in range(self.nfactors)])
state_variances = DataFrame(var, index=self.oseries.index, columns=columns)
return state_variances
[docs]
def get_state(self, i, p=None, alpha=0.05, method="smoother"):
"""Get filtered or smoothed mean for specific state.
Optionally including the 1-alpha confidence interval.
Parameters
----------
i : int
index of state vector to be obtained
p : pandas.Series
Model parameters. The default is None.
alpha : float, optional
Include (1-alpha) confidence interval in DataFrame. The value
of alpha must be between 0 and 1.
If None, no confidence interval is returned. The default is 0.05.
method : str, optional
Use "filter" to obtain filtered variances, and
"smoother" to obtain smoothed variances.
The default is "smoother".
Returns
-------
state : pandas.DataFrame
ith filtered or smoothed state (mean),
optionally with 'lower' and 'upper' as
lower and upper bounds of 95% confidence interval.
"""
state = None
if i < 0 or i >= self.nstate:
logger.error("Value of i must be >=0 and <" + self.nstate)
else:
state = self.get_state_means(p=p, method=method).iloc[:, i]
if alpha is not None:
if alpha > 0 and alpha < 1:
z = norm.ppf(1 - alpha / 2.0)
else:
msg = "The value of alpha must be between 0 and 1."
logger.error(msg)
raise Exception(msg)
variances = self.get_state_variances(p=p, method=method).iloc[:, i]
iv = z * np.sqrt(variances)
state = concat([state, state - iv, state + iv], axis=1)
state.columns = ["mean", "lower", "upper"]
return state
[docs]
def get_simulated_means(self, p=None, standardized=False, method="smoother"):
"""Method to calculate simulated means.
Simulated means are the filtered/smoothed mean estimates for
the observed series.
Parameters
----------
p : pandas.Series
Model parameters. The default is None.
standardized : bool, optional
If True, obtain estimates for standardized series.
If False, obtain estimates for unstandardized series.
The default is False.
method : str, optional
Use "filter" to obtain filtered estimates, and
"smoother" to obtain smoothed estimates.
The default is "smoother".
Returns
-------
simulated_means : pandas.DataFrame
Filtered or smoothed estimates for observed series.
"""
self._run_kalman(method, p=p)
if standardized:
observation_matrix = self.get_observation_matrix(p=p)
observation_means = np.zeros(observation_matrix.shape[0])
else:
observation_matrix = self.get_scaled_observation_matrix(p=p)
observation_means = self.oseries_mean
(means, _) = self.kf.simulate(observation_matrix, method=method)
simulated_means = (
DataFrame(means, index=self.oseries.index, columns=self.oseries.columns)
+ observation_means
)
return simulated_means
[docs]
def get_simulated_variances(self, p=None, standardized=False, method="smoother"):
"""Method to calculate simulated variances,
The simulated variances are the filtered/smoothed variances
for the observed series.
Parameters
----------
p : pandas.Series
Model parameters. The default is None.
standardized : bool, optional
If True, obtain estimates for standardized series.
If False, obtain estimates for unstandardized series.
The default is False.
method : str, optional
Use "filter" to obtain filtered estimates, and
"smoother" to obtain smoothed estimates.
The default is "smoother".
Returns
-------
simulated_variances : pandas.DataFrame
Filtered or smoothed variances for observed series.
"""
self._run_kalman(method, p=p)
if standardized:
observation_matrix = self.get_observation_matrix(p=p)
else:
observation_matrix = self.get_scaled_observation_matrix(p=p)
(_, variances) = self.kf.simulate(observation_matrix, method=method)
simulated_variances = DataFrame(
variances, index=self.oseries.index, columns=self.oseries.columns
)
return simulated_variances
[docs]
def get_simulation(
self, name, p=None, alpha=0.05, standardized=False, method="smoother"
):
"""Method to calculate simulated means for specific series.
Optionally including 1-alpha confidence interval.
Parameters
----------
name : str
name of series to be obtained
p : pandas.Series
Model parameters. The default is None.
alpha : float, optional
Include (1-alpha) confidence interval in DataFrame. The value
of alpha must be between 0 and 1.
If None, no confidence interval is returned. The default is 0.05.
standardized : bool, optional
If True, obtain estimates for standardized series.
If False, obtain estimates for unstandardized series.
The default is False.
method : str, optional
Use "filter" to obtain filtered estimates, and
"smoother" to obtain smoothed estimates.
The default is "smoother".
Returns
-------
proj : pandas.DataFrame
filtered or smoothed estimate (mean) for series 'name',
optionally with 'lower' and 'upper' as
lower and upper bounds of 95% confidence interval.
"""
sim = None
means = self.get_simulated_means(p=p, standardized=standardized, method=method)
if name in means.columns:
sim = means.loc[:, name]
if alpha is not None:
if alpha > 0 and alpha < 1:
z = norm.ppf(1 - alpha / 2.0)
else:
msg = "The value of alpha must be between 0 and 1."
logger.error(msg)
raise Exception(msg)
variances = self.get_simulated_variances(
p=p, standardized=standardized, method=method
).loc[:, name]
iv = z * np.sqrt(variances)
sim = concat([sim, sim - iv, sim + iv], axis=1)
sim.columns = ["mean", "lower", "upper"]
else:
logger.error("Unknown name: " + name)
return sim
[docs]
def decompose_simulation(self, name, p=None, standardized=False, method="smoother"):
"""Decompose simulation into specific and common dynamic components.
Method to get for observed series filtered/smoothed estimate
decomposed into specific dynamic component (sdf) and the sum of common
dynamic components (cdf).
Parameters
----------
name : str
name of series to be obtained.
p : pandas.Series
Model parameters. The default is None.
standardized : bool, optional
If True, obtain estimates for standardized series.
If False, obtain estimates for unstandardized series.
The default is False.
method : str, optional
Use "filter" to obtain filtered estimates, and
"smoother" to obtain smoothed estimates.
The default is "smoother".
Returns
-------
df : pandas.DataFrame
DataFrame with specific and common dynamic component
for series with name 'name'.
"""
df = None
self._run_kalman(method, p=p)
if standardized:
observation_matrix = self.get_observation_matrix(p=p)
observation_means = np.zeros(observation_matrix.shape[0])
else:
observation_matrix = self.get_scaled_observation_matrix(p=p)
observation_means = self.oseries_mean
(sdf_means, cdf_means) = self.kf.decompose(observation_matrix, method=method)
if name in self.oseries.columns:
sdf = (
DataFrame(
sdf_means, index=self.oseries.index, columns=self.oseries.columns
)
+ observation_means[self.oseries.columns.tolist().index(name)]
)
df = sdf.loc[:, name]
cols = [
"sdf",
]
for k in range(self.nfactors):
cdf = DataFrame(
cdf_means[k], index=self.oseries.index, columns=self.oseries.columns
)
df = concat([df, cdf.loc[:, name]], axis=1)
cols.append("cdf" + str(k + 1))
df.columns = cols
else:
logger.error("Unknown name: " + name)
return df
[docs]
def get_scaled_observation_matrix(self, p=None):
"""Method scale observation matrix by standard deviations of oseries.
Returns
-------
observation_matrix: numpy.ndarray
scaled observation matrix
p : pandas.Series
Model parameters. The default is None.
"""
scale = self.oseries_std
observation_matrix = self.get_observation_matrix(p=p)
np.fill_diagonal(observation_matrix[:, : self.nseries], scale)
for i in range(self.nfactors):
observation_matrix[:, self.nseries + i] = np.multiply(
scale, observation_matrix[:, self.nseries + i]
)
return observation_matrix
[docs]
def _run_kalman(self, method, p=None):
"""Internal method to (re)run Kalman filter or smoother.
Parameters
----------
method : str, optional
Use "filter" to run Kalman filter, and
"smoother" to run Kalman smoother. The default is "smoother".
p : pandas.Series
Model parameters. The default is None.
Returns
-------
None.
"""
if method == "filter":
if p is not None:
self.kf.set_matrices(*self._get_matrices(p))
self.kf.run_filter()
elif self.kf.filtered_state_means is None:
self.kf.run_filter()
else:
if p is not None:
self.kf.set_matrices(*self._get_matrices(p))
self.kf.run_smoother()
elif self.kf.smoothed_state_means is None:
self.kf.run_smoother()
[docs]
def solve(self, solver=None, report=True, engine="numba", **kwargs):
"""Method to solve the time series model.
Parameters
----------
solver: metran.solver.BaseSolver class, optional
Class used to solve the model. Options are: mt.ScipySolve
(default) or mt.LmfitSolve. A class is needed, not an instance
of the class!
report: bool, optional
Print reports to the screen after optimization finished. This
can also be manually triggered after optimization by calling
print(mt.fit_report()) or print(mt.metran_report())
on the Metran instance.
engine: str, optional
Engine used for the Kalman filter, by default 'numba' which is the
fastest choice but 'numpy' is also available, but is slower.
**kwargs: dict, optional
All keyword arguments will be passed onto minimization method
from the solver.
Notes
-----
- The solver object including some results are stored as mt.fit.
From here one can access the covariance (mt.fit.pcov) and
correlation matrix (mt.fit.pcor).
- The solver returns a number of results after optimization. These
are stored in mt.fit.result and can be accessed from there.
"""
# Perform factor analysis to get factors
factors = self.get_factors(self.oseries)
if factors is not None:
# Initialize Kalmanfilter
self._init_kalmanfilter(self.oseries, engine=engine)
# Initialize parameters
self.set_init_parameters()
# Store the solve instance
if solver is None:
if self.fit is None:
self.fit = ScipySolve(mt=self)
elif not issubclass(solver, self.fit.__class__):
self.fit = solver(mt=self)
self.settings["solver"] = self.fit._name
# Solve model
success, optimal, stderr = self.fit.solve(**kwargs)
self.parameters["optimal"] = optimal
self.parameters["stderr"] = stderr
if not success:
logger.warning("Model parameters could not be estimated well.")
if report:
if isinstance(report, str):
output = report
else:
output = "full"
print("\n" + self.fit_report(output=output))
print("\n" + self.metran_report())
[docs]
def _get_file_info(self):
"""Internal method to get the file information.
Returns
-------
file_info: dict
dictionary with file information.
"""
# Check if file_info already exists
if hasattr(self, "file_info"):
file_info = self.file_info
else:
file_info = {"date_created": Timestamp.now()}
file_info["date_modified"] = Timestamp.now()
file_info["pastas_version"] = __version__
try:
file_info["owner"] = getlogin()
except Exception:
file_info["owner"] = "Unknown"
return file_info
[docs]
def fit_report(self, output="full"):
"""Method that reports on the fit after a model is optimized.
Parameters
----------
output: str, optional
If any other value than "full" is provided, the parameter
correlations will be removed from the output.
Returns
-------
report: str
String with the report.
Examples
--------
This method is called by the solve method if report=True, but can
also be called on its own::
>>> print(mt.fit_report())
"""
model = {
"tmin": str(self.settings["tmin"]),
"tmax": str(self.settings["tmax"]),
"freq": self.settings["freq"],
"solver": self.settings["solver"],
}
fit = {
"obj": "{:.2f}".format(self.fit.obj_func),
"nfev": self.fit.nfev,
"AIC": "{:.2f}".format(self.fit.aic),
"": "",
}
parameters = self.parameters.loc[
:, ["optimal", "stderr", "initial", "vary"]
].copy()
stderr = parameters.loc[:, "stderr"] / parameters.loc[:, "optimal"]
parameters["stderr"] = "-"
parameters.loc[parameters["vary"], "stderr"] = stderr.abs().apply(
"\u00B1{:.2%}".format
)
parameters["initial"] = parameters["initial"].astype(str)
parameters.loc[~parameters["vary"].astype(bool), "initial"] = "-"
# Determine the width of the fit_report based on the parameters
width = len(parameters.__str__().split("\n")[1])
string = "{:{fill}{align}{width}}"
# Create the first header with model information and stats
w = max(width - 45, 0)
header = "Fit report {name:<16}{string}Fit Statistics\n" "{line}\n".format(
name=self.name[:14],
string=string.format("", fill=" ", align=">", width=w),
line=string.format("", fill="=", align=">", width=width),
)
basic = ""
vw = max(width - 45, 0)
for (val1, val2), (val3, val4) in zip(model.items(), fit.items()):
val4 = string.format(val4, fill=" ", align=">", width=w)
space = string.format("", fill=" ", align=">", width=vw)
basic += "{:<8} {:<16} {:} {:<7} {:}\n".format(
val1, val2, space, val3, val4
)
# Create the parameters block
parameters = (
"\nParameters ({n_param} were optimized)\n{line}\n{parameters}".format(
n_param=parameters.vary.sum(),
line=string.format("", fill="=", align=">", width=width),
parameters=parameters,
)
)
if output == "full":
cor = {}
pcor = self.fit.pcor
for idx in pcor:
for col in pcor:
if (
(np.abs(pcor.loc[idx, col]) > 0.5)
and (idx != col)
and ((col, idx) not in cor.keys())
):
cor[(idx, col)] = pcor.loc[idx, col].round(2)
cor = DataFrame(data=cor.values(), index=cor.keys(), columns=["rho"])
if cor.shape[0] > 0:
cor = cor.to_string(header=False)
else:
cor = "None"
correlations = "\n\nParameter correlations |rho| > 0.5\n{}" "\n{}".format(
string.format("", fill="=", align=">", width=width), cor
)
else:
correlations = ""
report = "{header}{basic}{parameters}{correlations}".format(
header=header, basic=basic, parameters=parameters, correlations=correlations
)
return report
[docs]
def metran_report(self, output="full"):
"""Method that reports on the metran model results.
Parameters
----------
output: str, optional
If any other value than "full" is provided, the state
correlations will be removed from the output.
Returns
-------
report: str
String with the report.
Examples
--------
This method is called by the solve method if report=True, but can
also be called on its own::
>>> print(mt.metran_report())
"""
model = {
"tmin": str(self.settings["tmin"]),
"tmax": str(self.settings["tmax"]),
"freq": self.settings["freq"],
}
fit = {"nfct": str(self.nfactors), "fep": "{:.2f}%".format(self.fep), "": ""}
# Create the state parameters block
phi = np.diag(self.get_transition_matrix())
q = self.get_transition_variance()
names = [name + "_sdf" for name in self.snames]
names.extend(["cdf" + str(i + 1) for i in range(self.nfactors)])
transition = DataFrame(np.array([phi, q]).T, index=names, columns=["phi", "q"])
# get width of index to align state parameters index
idx_width = int(max([len(n) for n in transition.index]))
# Create the communality block
communality = Series(
self.get_communality(), index=self.oseries.columns, name=""
)
communality.index = [idx.ljust(idx_width) for idx in communality.index]
communality = communality.apply("{:.2%}".format).to_frame()
# Create the observation parameters block
gamma = self.factors
names = ["gamma" + str(i + 1) for i in range(self.nfactors)]
observation = DataFrame(gamma, index=self.oseries.columns, columns=names)
observation.index = [idx.ljust(idx_width) for idx in observation.index]
observation.loc[:, "scale"] = self.oseries_std
observation.loc[:, "mean"] = self.oseries_mean
# Determine the width of the metran_report based on the parameters
width = max(
len(transition.__str__().split("\n")[1]),
len(observation.__str__().split("\n")[1]),
44,
)
string = "{:{fill}{align}{width}}"
# # Create the first header with results factor analysis
w = max(width - 43, 0)
header = "Metran report {name:<14}{string}Factor Analysis\n" "{line}\n".format(
name=self.name[:14],
string=string.format("", fill=" ", align=">", width=w),
line=string.format("", fill="=", align=">", width=width),
)
factors = ""
for (val1, val2), (val3, val4) in zip(model.items(), fit.items()):
val4 = string.format(val4, fill=" ", align=">", width=w)
factors += "{:<8} {:<19} {:<7} {:}\n".format(val1, val2, val3, val4)
# Create the transition block
transition = "\nState parameters\n{line}\n" "{transition}\n".format(
line=string.format("", fill="=", align=">", width=width),
transition=transition,
)
# Create the observation block
observation = "\nObservation parameters\n{line}\n" "{observation}\n".format(
line=string.format("", fill="=", align=">", width=width),
observation=observation,
)
# Create the communality block
communality = "\nCommunality\n{line}\n" "{communality}\n".format(
line=string.format("", fill="=", align=">", width=width),
communality=communality,
)
if output == "full":
cor = {}
pcor = self.get_state_means().corr()
for idx in pcor:
for col in pcor:
if (
(np.abs(pcor.loc[idx, col]) > 0.5)
and (idx != col)
and ((col, idx) not in cor.keys())
):
cor[(idx, col)] = pcor.loc[idx, col].round(2)
cor = DataFrame(data=cor.values(), index=cor.keys(), columns=["rho"])
if cor.shape[0] > 0:
cor = cor.to_string(header=False)
else:
cor = "None"
correlations = "\nState correlations |rho| > 0.5\n{}" "\n{}\n".format(
string.format("", fill="=", align=">", width=width), cor
)
else:
correlations = ""
report = (
"{header}{factors}{communality}{transition}"
"{observation}{correlations}".format(
header=header,
factors=factors,
communality=communality,
transition=transition,
observation=observation,
correlations=correlations,
)
)
return report