"""This module contains the solver that is available for Pastas Metran.
All solvers inherit from the BaseSolver class, which contains methods to
obtain the object function value and numerical approximation of the
parameter covariance matrix.
To solve a model the following syntax can be used:
>>> mt.solve(solver=ps.LmfitSolve)
"""
from logging import getLogger
import numpy as np
from pandas import DataFrame
from pastas.utils import initialize_logger
from scipy.optimize import approx_fprime, minimize
logger = getLogger(__name__)
initialize_logger(logger)
[docs]
class BaseSolver:
_name = "BaseSolver"
__doc__ = """All solver instances inherit from the BaseSolver class.
Attributes
----------
mt : Metran instance
"""
def __init__(self, mt, **kwargs):
self.mt = mt
# Initialize objects
self.pcov = None
self.pcor = None
self.nfev = None
self.result = None
[docs]
def objfunction(self, p, callback):
"""Method to get objective function used by solver.
Parameters
----------
p : type required for callback function
Parameters to be coverted by callback function into
proper type and format.
callback: ufunc
Function that is called after each iteration.
The parameters are provided to the func,
e.g. "callback(parameters)".
Returns
-------
obj : float
Objective function value.
"""
if callback is not None:
p = callback(p)
obj = self.mt.get_mle(p)
return obj
def _get_covariance(self, x0, f, callback, epsilon=None, diff="forward"):
"""Estimate covariance matrix of parameter estimates.
Uses a numerical approximation to the Hessian matrix of cost
function at location x0.
Parameters
----------
x0 : array_like
Parameter values at objective function minimum
f : callable
The objective function
callback : ufunc
Function that is called after each iteration.
The parameters are provided to the func,
e.g. "callback(parameters)".
epsilon : float, optional
Pertubation in finite difference scheme.
The default is EPS ** (1. / 4).
If finite difference approximation results in non-positive
values on the diagonal of the covariance matrix,
epsilon will be increase by factor 10.
diff : str, optional
forward or central difference scheme. The default is "forward"
Returns
-------
cov : numpy.ndarray
covariance matrix of parameter estimates
"""
n = x0.shape[0]
if epsilon is None:
EPS = np.finfo(float).eps
epsilon = EPS ** (1.0 / 4)
cov_ok = False
while not (cov_ok or epsilon > 1000.0 * epsilon):
# allocate space for the hessian
hessian = np.zeros((n, n))
# the next loop fill the hessian matrix
xx = x0
if diff == "forward":
f0 = approx_fprime(x0, f, epsilon, callback)
for j in range(n):
xx0 = xx[j]
d = epsilon * max(np.abs(xx0), 0.1)
# forward difference
xx[j] = xx0 + d
ff = approx_fprime(xx, f, epsilon, callback)
if diff == "central":
# backward difference
xx[j] = xx0 - d
fb = approx_fprime(xx, f, epsilon, callback)
hessian[: j + 1, j] = (ff[: j + 1] - fb[: j + 1]) / (2 * d)
else:
hessian[: j + 1, j] = (ff[: j + 1] - f0[: j + 1]) / d
hessian[j, : j + 1] = hessian[: j + 1, j]
# restore initial value of x0
xx[j] = xx0
# try to calculate covariance matrix from Hessian
if not np.isnan(hessian).any():
cov = np.linalg.pinv(hessian)
if np.amin(np.diag(cov)) <= 0:
# find nearest positive semi-definite matrix
try:
cov = np.linalg.pinv(self._nearPSD(hessian))
except Exception as e:
logger.debug(f"Could not calculate 'cov': {e}")
if np.amin(np.diag(cov)) > 0:
cov_ok = True
if not (cov_ok):
epsilon *= 10.0
return cov
@staticmethod
def _get_correlations(pcov):
"""Internal method to obtain the parameter correlations.
Parameter correlations are derived from the covariance matrix.
Parameters
----------
pcov: pandas.DataFrame
n x n Pandas DataFrame with the covariances.
Returns
-------
pcor: pandas.DataFrame
n x n Pandas DataFrame with the correlations.
"""
pcor = pcov.loc[pcov.index, pcov.index].copy()
for i in pcor.index:
for j in pcor.columns:
pcor.loc[i, j] = pcov.loc[i, j] / np.sqrt(
pcov.loc[i, i] * pcov.loc[j, j]
)
return pcor
@staticmethod
def _nearPSD(A, epsilon=0):
"""Get the nearest Positive Semi-Definite matrix of A.
Parameters
----------
A : numpy.ndarray
square matrix
epsilon : float, optional
cut-off value for eigenvalues. The default is 0.
Returns
-------
out : numpy.ndarray
Nearest Positive Semi-Definite matrix of A.
"""
n = A.shape[0]
eigval, eigvec = np.linalg.eig(A)
val = np.array(np.maximum(eigval, epsilon))
vec = np.array(eigvec)
with np.errstate(divide="ignore", invalid="ignore"):
T = 1 / (vec**2 @ val.T)
T = np.array(np.sqrt(np.diag(np.array(T).reshape((n)))))
B = T @ vec * np.diag(np.array(np.sqrt(val)).reshape((n)))
out = B @ B.T
return out
[docs]
class ScipySolve(BaseSolver):
"""Solver based on Scipy's least_squares method [scipy_ref]_.
This class is the default solver class in the Metran solve method.
Parameters
----------
mt : Metran instance
**kwargs : dict, optional
All keyword arguments will be passed onto minimization method
from the solver.
Examples
--------
>>> mt.solve(solver=ps.ScipySolve)
References
----------
.. [scipy_ref] https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.least_squares.html
"""
_name = "ScipySolve"
def __init__(self, mt, **kwargs):
BaseSolver.__init__(self, mt=mt, **kwargs)
[docs]
def solve(self, method="l-bfgs-b", **kwargs):
"""Method to run solver and optimize parameters.
Parameters
----------
method : str, optional
Name of the fitting method to use. The default is "l-bfgs-b".
**kwargs : dict, optional
All keyword arguments will be passed onto minimization method
from the solver.
Returns
-------
success : boolean
True if optimization routine terminated successfully.
params : lmfit.Parameters instance
Ordered dictionary of Parameter objects.
"""
# Deal with the parameters
self.vary = self.mt.parameters.vary.values.astype(bool)
self.initial = self.mt.parameters.initial.values.copy()
parameters = self.mt.parameters.loc[self.vary]
bounds = [(b[0], b[1]) for b in parameters.loc[:, ["pmin", "pmax"]].values]
# Create the Minimizer object and minimize
self.result = minimize(
fun=self.objfunction,
method=method,
x0=parameters.initial.values,
bounds=bounds,
args=(self._array_todict,),
**kwargs,
)
_stderr = np.zeros(parameters.shape[0]) * np.NaN
if hasattr(self.result, "hess_inv"):
pcov = self.result.hess_inv.todense()
_stderr = np.sqrt(np.diag(pcov))
if np.isnan(_stderr).any():
# calculate covariance matrix using finite differences
pcov = self._get_covariance(
self.result.x, self.objfunction, self._array_todict
)
_stderr = np.sqrt(np.diag(pcov))
optimal = self.initial
optimal[self.vary] = self.result.x
stderr = np.zeros(len(optimal)) * np.NaN
stderr[self.vary] = _stderr
# Set all parameter attributes
names = parameters.index.values
self.pcov = DataFrame(pcov, index=names, columns=names)
self.pcor = self._get_correlations(self.pcov)
# Set all optimization attributes
self.nfev = self.result.nfev
self.aic = 2 * parameters.shape[0] + self.result.fun
self.obj_func = self.result.fun
if hasattr(self.result, "success"):
success = self.result.success
else:
success = True
return success, optimal, stderr
def _array_todict(self, p):
"""Update parameter dictionary with array of varying parameters.
Parameters
----------
p : array_like
Model parameters (varying)
Returns
-------
dict
Model parameters as input for objective function
"""
par = self.initial
par[self.vary] = p
return par
[docs]
class LmfitSolve(BaseSolver):
"""Class for solving the model using the LmFit solver [LM]_.
Lmfit is basically a wrapper around the scipy solvers, adding some
functionality for boundary conditions.
Parameters
----------
mt : Metran instance
**kwargs : dict, optional
All keyword arguments will be passed onto minimization method
from the solver.
Examples
--------
>>> mt.solve(solver=ps.LmfitSolve)
References
----------
.. [LM] https://github.com/lmfit/lmfit-py/
"""
_name = "LmfitSolve"
def __init__(self, mt, **kwargs):
try:
# Import Lmfit here, so it is no dependency
global lmfit
import lmfit as lmfit
except ImportError:
msg = "lmfit not installed. Please install lmfit first."
logger.error(msg)
raise ImportError(msg)
self.mt = mt
# Initialize objects
self.pcov = None
self.pcor = None
self.nfev = None
self.result = None
[docs]
def solve(self, method="lbfgsb", **kwargs):
"""Method to run solver and optimize parameters.
Parameters
----------
method : str, optional
Name of the fitting method to use. The default is "lbfgsb".
**kwargs : dict, optional
All keyword arguments will be passed onto minimization method
from the solver.
Returns
-------
success : boolean
True if optimization routine terminated successfully.
params : lmfit.Parameters instance
Ordered dictionary of Parameter objects.
"""
# Deal with the parameters
parameters = lmfit.Parameters()
self.vary = self.mt.parameters.vary.values.astype(bool)
self.initial = self.mt.parameters.initial.values.copy()
p = self.mt.parameters.loc[:, ["initial", "pmin", "pmax", "vary"]]
for k in p.index:
pp = np.where(p.loc[k].isnull(), None, p.loc[k])
if method == "lbfgsb":
parameters.add(k, value=pp[0], min=None, max=None, vary=pp[3])
else:
parameters.add(k, value=pp[0], min=pp[1], max=pp[2], vary=pp[3])
if method == "lbfgsb":
bounds = [
(b[0], b[1]) for b in self.mt.parameters.loc[:, ["pmin", "pmax"]].values
]
kwargs["bounds"] = bounds
# Create the Minimizer object and minimize
self.mini = lmfit.Minimizer(
userfcn=self.objfunction,
params=parameters,
scale_covar=False,
fcn_args=(self._lmfit_todict,),
**kwargs,
)
self.result = self.mini.minimize(method=method)
optimal = np.array([p.value for p in self.result.params.values()])
# Set all parameter attributes
stderr = np.zeros(len(optimal)) * np.NaN
pcov = None
if hasattr(self.result, "covar"):
if self.result.covar is not None:
pcov = self.result.covar
stderr = np.sqrt(np.diag(pcov))
if pcov is None:
# calculate covariance matrix using finite differences
pcov = self._get_covariance(optimal, self.objfunction, self._array_todict)
stderr[self.vary] = np.sqrt(np.diag(pcov))
names = self.result.var_names
self.pcov = DataFrame(pcov, index=names, columns=names)
self.pcor = self._get_correlations(self.pcov)
# Set all optimization attributes
self.nfev = self.result.nfev
self.obj_func = self.objfunction(optimal, self._array_todict)
self.aic = 2 * p[p["vary"]].shape[0] + self.obj_func
if hasattr(self.result, "success"):
success = self.result.success
else:
success = True
return success, optimal, stderr
@staticmethod
def _lmfit_todict(p):
"""Convert lmfit.Parameter instance to dictionary.
Parameters
----------
p : lmfit.Parameter instance
Model parameters
Returns
-------
dict
Model parameters as input for objective function
"""
return p.valuesdict()
def _array_todict(self, p):
"""Update parameter dictionary with array of varying parameters.
Parameters
----------
p : array_like
Model parameters (varying)
Returns
-------
dict
Model parameters as input for objective function
"""
par = self.initial
par[self.vary] = p
return par