import numpy as np
import jax.numpy as jnp
from jax.scipy.special import logsumexp
from skscope import ScopeSolver
from sklearn.base import BaseEstimator
from sklearn.covariance import LedoitWolf
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.utils.validation import (
check_array,
check_random_state,
check_X_y,
check_is_fitted,
)
from sklearn.utils._param_validation import Hidden, Interval, StrOptions
from numbers import Integral, Real
def check_data(X, y=None, sample_weight=None):
if y is None:
X = check_array(X)
else:
X, y = check_X_y(X, y, dtype="numeric", y_numeric=True)
n, p = X.shape
if sample_weight is None:
sample_weight = np.ones(n)
else:
sample_weight = np.array(sample_weight, dtype="float")
if sample_weight.ndim > 1:
raise ValueError("sample_weight should be a 1-D array.")
if sample_weight.size != n:
raise ValueError("X.shape[0] should be equal to sample_weight.size")
if sample_weight.sum() == 0:
raise ValueError("Weights sum to zero, can't be normalized.")
useful_index = list()
for i, w in enumerate(sample_weight):
if w > 0:
useful_index.append(i)
if len(useful_index) < n:
X = X[useful_index, :]
if not (y is None):
y = y[useful_index, :] if len(y.shape) > 1 else y[useful_index]
sample_weight = sample_weight[useful_index]
n = len(useful_index)
return X, y, sample_weight
[docs]
class PortfolioSelection(BaseEstimator):
r"""
Construct a sparse portfolio using ``skscope`` with ``MinVar`` or ``MeanVar`` measure.
Parameters
------------
sparsity : int, default=10
The number of stocks to be chosen, i.e., the sparsity level
obj : {"MinVar", "MeanVar"}, default="MinVar"
The objective of the portfolio optimization
alpha: float, default=0
The penalty coefficient of the return
cov_matrix : {"empirical", "lw"}, default="lw"
Specify the estimator of covariance matrix.
If ``empirical``, it will be the empirical estimator. If ``lw``, it will be the LedoitWolf estimator.
random_state : {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, default=None
The seed to initialize the parameter ``init_params`` in ``ScopeSolver``
"""
def __init__(
self,
sparsity=1,
obj="MinVar",
alpha=0,
cov_matrix="lw",
random_state=None,
):
self.sparsity = sparsity
self.obj = obj
self.alpha = alpha
self.cov_matrix = cov_matrix
self.random_state = random_state
_parameter_constraints: dict = {
"sparsity": [Interval(Integral, 1, None, closed="left")],
"obj": [StrOptions({"MinVar", "MeanVar"})],
"alpha": [Interval(Real, 0, None, closed="left")],
"cov_matrix": [StrOptions({"empirical", "lw"})],
"random_state": ["random_state"],
}
# def _more_tags(self):
# return {'non_deterministic': True}
[docs]
def fit(self, X, y=None, sample_weight=None):
r"""
The fit function is used to comupte the weight of the desired sparse portfolio with a certain objective.
Parameters
----------
X : array-like of shape (n_periods, n_assets)
Return data of n_assets assets spanning n_periods periods
y : ignored
Not used, present here for API consistency by convention.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
--------
self : object
Fitted Estimator.
"""
self._validate_params()
X, y, sample_weight = check_data(X, y, sample_weight)
T, N = X.shape
self.n_features_in_ = N
self.random_state_ = check_random_state(self.random_state)
# rng = np.random.default_rng(self.random_state)
# init_params = rng.standard_normal(N)
init_params = self.random_state_.randn(N)
mu = np.mean(X, axis=0)
if self.cov_matrix == "empirical":
Sigma = np.cov(X.T)
elif self.cov_matrix == "lw":
Sigma = LedoitWolf().fit(X).covariance_
else:
raise ValueError("{} estimator is not supported.".format(self.cov_matrix))
if self.obj == "MinVar":
def custom_objective(params):
params = params / jnp.sum(params)
var = params @ Sigma @ params
return var
elif self.obj == "MeanVar":
def custom_objective(params):
params = params / jnp.sum(params)
var = params @ Sigma @ params - self.alpha * mu @ params
return var
else:
raise ValueError("{} objective is not supported.".format(self.obj))
solver = ScopeSolver(N, self.sparsity)
params = solver.solve(custom_objective, init_params=init_params, jit=True)
self.weight = params / params.sum()
self.coef_ = self.weight
return self
[docs]
def score(self, X, y=None, sample_weight=None, measure="Sharpe"):
r"""
Give data, and it return the Sharpe ratio of the portfolio constructed with the weight ``self.coef_``
Parameters
-----------
X : array-like of shape (n_periods, n_assets)
Return data of n_assets assets spanning n_periods periods
y : ignored
Not used, present here for API consistency by convention.
sample_weight : ignored
Not used, present here for API consistency by convention.
measure : {"Sharpe"}, default="Sharpe"
The measure of the performance of a portfolio.
Returns
--------
score : float
The Sharpe ratio of the constructed portfolio.
"""
check_is_fitted(self)
X, y, sample_weight = check_data(X, y, sample_weight)
n, p = X.shape
if p != self.n_features_in_:
raise ValueError("X.shape[1] should be " + str(self.n_features_in_))
return_ = X @ self.coef_
if measure == "Sharpe":
score = np.mean(return_) / np.std(return_)
else:
raise ValueError("{} measure is not supported.".format(measure))
return score
[docs]
class NonlinearSelection(BaseEstimator):
r"""
Select relevant features which may have nonlinear dependence on the target.
Parameters
-----------
sparsity : int, default=5
The number of features to be selected, i.e., the sparsity level.
gamma_x : float, default=0.7
The width parameter of Gaussian kernel for X.
gamma_y : float, default=0.7
The width parameter of Gaussian kernel for y.
"""
_parameter_constraints: dict = {
"sparsity": [Interval(Integral, 1, None, closed="left")],
"gamma_x": [Interval(Real, 0, None, closed="neither")],
"gamma_y": [Interval(Real, 0, None, closed="neither")],
}
def __init__(
self,
sparsity=1,
gamma_x=0.7,
gamma_y=0.7,
):
self.sparsity = sparsity
self.gamma_x = gamma_x
self.gamma_y = gamma_y
[docs]
def fit(
self,
X,
y,
sample_weight=None,
):
r"""
The fit function is used to comupte the coeffifient vector ``coef_`` and
those features corresponding to larger coefficients are considered having
stronger dependence on the target.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Feature matrix.
y : array-like of shape (n_samples,)
Target values.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
--------
self : object
Fitted Estimator.
"""
self._validate_params()
X, y, sample_weight = check_data(X, y, sample_weight)
n, p = X.shape
self.n_features_in_ = p
Gamma = np.eye(n) - np.ones((n, 1)) @ np.ones((1, n)) / n
L = rbf_kernel(y.reshape(-1, 1), gamma=self.gamma_y)
L_bar = Gamma @ L @ Gamma
response = L_bar.reshape(-1)
K_bar = np.zeros((n**2, p))
for k in range(p):
x = X[:, k]
tmp = rbf_kernel(x.reshape(-1, 1), gamma=self.gamma_x)
K_bar[:, k] = (Gamma @ tmp @ Gamma).reshape(-1)
covariate = K_bar
def custom_objective(alpha):
loss = jnp.mean((response - covariate @ jnp.abs(alpha)) ** 2)
return loss
solver = ScopeSolver(p, sparsity=self.sparsity)
alpha = solver.solve(custom_objective, jit=True)
self.coef_ = np.abs(alpha)
return self
[docs]
def score(self, X, y, sample_weight=None):
r"""
Give test data, and it return the test score of this fitted model.
Parameters
----------
X : array-like, shape(n_samples, n_features)
Feature matrix.
y : array-like, shape(n_samples,)
Target values.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
-------
score : float
The negative loss on the given data.
"""
check_is_fitted(self)
X, y, sample_weight = check_data(X, y, sample_weight)
n, p = X.shape
if p != self.n_features_in_:
raise ValueError("X.shape[1] should be " + str(self.n_features_in_))
Gamma = np.eye(n) - np.ones((n, 1)) @ np.ones((1, n)) / n
L = rbf_kernel(y.reshape(-1, 1), gamma=self.gamma_y)
L_bar = Gamma @ L @ Gamma
response = L_bar.reshape(-1)
K_bar = np.zeros((n**2, p))
for k in range(p):
x = X[:, k]
tmp = rbf_kernel(x.reshape(-1, 1), gamma=self.gamma_x)
K_bar[:, k] = (Gamma @ tmp @ Gamma).reshape(-1)
covariate = K_bar
score = -np.mean((response - covariate @ self.coef_) ** 2)
return score
[docs]
class RobustRegression(BaseEstimator):
r"""
A robust regression procedure via sparsity constrained exponential loss minimization.
Specifically, ``RobustRegression`` solves the following problem:
:math:`\min_{\beta}-\sum_{i=1}^n\exp\{-(y_i-x_i^{\top}\beta)^2/\gamma\} \text{ s.t. } \|\beta\|_0 \leq s`
where :math:`\gamma` is a hyperparameter controlling the degree of robustness and
:math:`s` is a hyperparameter controlling the sparsity level of :math:`\beta`.
Note: When :math:`\gamma` is large, the exponential loss is approximately equivalent to :math:`|y_i-x_i^{\top}\beta|^2/\gamma` and thus similar to
the least square estimator. When :math:`\gamma` is small, the sample :math:`i` with large error :math:`|y_i-x_i^{\top}\beta|`
will obtain small impact on the estimation of :math:`\beta` and thus limiting the impact of outlier (i.e., improve the robustness
but reduce the efficiency).
Therefore, :math:`\gamma` need to be selected carefully with prior knowledge of the data or via some data-dirven methods (e.g. cross validation) to
achieve a appropriate trade-off between robustness and efficiency of the resulting estimator.
Parameters
-----------
sparsity : int, default=1
The number of features to be selected, i.e., the sparsity level.
gamma : float, default=1
The parameter controlling the degree of robustness for the estimator.
"""
_parameter_constraints: dict = {
"sparsity": [Interval(Integral, 1, None, closed="left")],
"gamma": [Interval(Real, 0, None, closed="neither")],
}
def __init__(self, sparsity=1, gamma=1):
self.sparsity = sparsity
self.gamma = gamma
def _more_tags(self):
return {"requires_y": False}
[docs]
def fit(self, X, y=None, sample_weight=None):
r"""
The fit function is used to comupte the coeffifient vector ``coef_``.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Feature matrix.
y : array-like of shape (n_samples,)
Target values.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
--------
self : object
Fitted Estimator.
"""
self._validate_params()
X, y, sample_weight = check_data(X, y, sample_weight)
n, p = X.shape
self.n_features_in_ = p
def custom_objective(params):
err = -jnp.exp(-jnp.square(y - X @ params) / self.gamma)
loss = jnp.average(err, weights=sample_weight)
return loss
solver = ScopeSolver(p, self.sparsity)
self.coef_ = solver.solve(custom_objective, jit=True)
return self
[docs]
def score(self, X, y, sample_weight=None):
r"""
Give test data, and it return the test score of this fitted model.
Parameters
----------
X : array-like, shape(n_samples, n_features)
Feature matrix.
y : array-like, shape(n_samples,)
Target values.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
-------
score : float
The weighted exponential loss of the given data.
"""
check_is_fitted(self)
X, y, sample_weight = check_data(X, y, sample_weight)
n, p = X.shape
if p != self.n_features_in_:
raise ValueError("X.shape[1] should be " + str(self.n_features_in_))
err = -np.exp(-np.square(y - X @ self.coef_) / self.gamma)
loss = np.average(err, weights=sample_weight)
score = -loss
return score
[docs]
class MultivariateFailure(BaseEstimator):
r"""
Multivariate failure time model.
Parameters
----------
sparsity : int, default=5
The number of features to be selected, i.e., the sparsity level.
"""
_parameter_constraints: dict = {
"sparsity": [Interval(Integral, 1, None, closed="left")],
}
def __init__(self, sparsity=5):
self.sparsity = sparsity
[docs]
def fit(self, X, y, delta, sample_weight=None):
r"""
Minimize negative partial log-likelihood with sparsity constraint for provided data.
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Data matrix
y : array-like, shape = (n_samples, n_events)
Observed time of multiple events.
delta : array-like, shape = (n_samples, n_events)
Indicator matrix of censoring.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
--------
self : object
Fitted Estimator.
"""
self._validate_params()
n, p = X.shape
K = delta.shape[1]
self.n_features_in_ = p
self.n_events = K
def multivariate_failure_objective(params):
Xbeta_expanded = jnp.matmul(X, params)[:, None]
sum_exp_Xbeta = logsumexp(
Xbeta_expanded + jnp.log(y >= y[:, None, :]), axis=1
)
loss = -jnp.mean((Xbeta_expanded - sum_exp_Xbeta) * delta)
return loss
solver = ScopeSolver(p, self.sparsity)
self.coef_ = solver.solve(multivariate_failure_objective, jit=True)
return self
[docs]
def predict(self, X):
r"""
Given the features, predict the hazard function up to some constant independent of the sample.
Parameters
----------
X : array-like, shape(n_samples, n_features)
Feature matrix.
Returns
--------
hazard : array, shape = (n_samples,)
the quantity :math:`e^{\beta^{\top}X_i}` proportional to the harzard function up to
some constant independent of the sample index :math:`i` such that
:math:`\lambda_k(t;X_{i})=\lambda_{0k}(t)e^{\beta^{\top}X_i}`.
"""
check_is_fitted(self)
X, _, _ = check_data(X)
Xbeta = X @ self.coef_
return np.exp(Xbeta)
[docs]
def score(self, X, y, delta, sample_weight=None):
r"""
Give test data, and it return the test score of this fitted model.
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Data matrix
y : array-like, shape = (n_samples, n_events)
Observed time of multiple events.
delta : array-like, shape = (n_samples, n_events)
Indicator matrix of censoring.
sample_weight : ignored
Not used, present here for API consistency by convention.
Returns
-------
score : float
The log likelihood of the given data.
"""
check_is_fitted(self)
n, p = X.shape
K = delta.shape[1]
if p != self.n_features_in_:
raise ValueError("X.shape[1] should be " + str(self.n_features_in_))
if K != self.n_events:
raise ValueError(
"y.shape[1] and delta.shape[1] should be " + str(self.events)
)
Xbeta = np.matmul(X, self.coef_)
tmp = np.ones((n, K))
for k in range(K):
tmp[:, k] = X @ self.coef_ - np.log(
np.matmul(
y[:, k].reshape(1, -1) >= y[:, k].reshape(-1, 1), np.exp(Xbeta)
)
)
score = np.mean(tmp * delta)
return score