#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: Zezhi Wang
# Copyright (C) 2023 abess-team
# Licensed under the MIT License.
from sklearn.base import BaseEstimator
from sklearn.model_selection import KFold
import numpy as np
import jax
from .numeric_solver import convex_solver_nlopt
import math
from . import utilities
[docs]
class BaseSolver(BaseEstimator):
r"""
Get sparse optimal solution of convex objective function by searching all possible combinations of variables.
Specifically, ``BaseSolver`` aims to tackle this problem: :math:`\min_{x \in R^p} f(x) \text{ s.t. } ||x||_0 \leq s`, where :math:`f(x)` is a convex objective function and :math:`s` is the sparsity level. Each element of :math:`x` can be seen as a variable, and the nonzero elements of :math:`x` are the selected variables.
Parameters
----------
dimensionality : int
Dimension of the optimization problem, which is also the total number of variables that will be considered to select or not, denoted as :math:`p`.
sparsity : int or array of int, optional
The sparsity level, which is the number of nonzero elements of the optimal solution, denoted as :math:`s`.
Default is ``range(int(p/log(log(p))/log(p)))``.
sample_size : int, default=1
sample size, denoted as :math:`n`.
preselect : array of int, default=[]
An array contains the indexes of variables which must be selected.
numeric_solver : callable, optional
A solver for the convex optimization problem. ``BaseSolver`` will call this function to solve the convex optimization problem in each iteration.
It should have the same interface as ``skscope.convex_solver_nlopt``.
max_iter : int, default=100
Maximum number of iterations taken for converging.
group : array of shape (dimensionality,), default=range(dimensionality)
The group index for each variable, and it must be an incremental integer array starting from 0 without gap.
The variables in the same group must be adjacent, and they will be selected together or not.
Here are wrong examples: ``[0,2,1,2]`` (not incremental), ``[1,2,3,3]`` (not start from 0), ``[0,2,2,3]`` (there is a gap).
It's worth mentioning that the concept "a variable" means "a group of variables" in fact. For example,``sparsity=[3]`` means there will be 3 groups of variables selected rather than 3 variables,
and ``always_include=[0,3]`` means the 0-th and 3-th groups must be selected.
ic_method : callable, optional
A function to calculate the information criterion for choosing the sparsity level.
``ic(loss, p, s, n) -> ic_value``, where ``loss`` is the value of the objective function, ``p`` is the dimensionality, ``s`` is the sparsity level and ``n`` is the sample size.
Used only when ``sparsity`` is array and ``cv`` is 1.
Note that ``sample_size`` must be given when using ``ic_method``.
cv : int, default=1
The folds number when use the cross-validation method.
- If ``cv`` = 1, the sparsity level will be chosen by the information criterion.
- If ``cv`` > 1, the sparsity level will be chosen by the cross-validation method.
split_method : callable, optional
A function to get the part of data used in each fold of cross-validation.
Its interface should be ``(data, index) -> part_data`` where ``index`` is an array of int.
cv_fold_id : array of shape (sample_size,), optional
An array indicates different folds in CV, which samples in the same fold should be given the same number.
The number of different elements should be equal to ``cv``.
Used only when `cv` > 1.
random_state : int, optional
The random seed used for cross-validation.
Attributes
----------
params : array of shape(dimensionality,)
The sparse optimal solution.
objective_value: float
The value of objective function on the solution.
support_set :array of int
The indices of selected variables, sorted in ascending order.
"""
def __init__(
self,
dimensionality,
sparsity=None,
sample_size=1,
*,
preselect=[],
numeric_solver=convex_solver_nlopt,
max_iter=100,
group=None,
ic_method=None,
cv=1,
cv_fold_id=None,
split_method=None,
random_state=None,
):
self.dimensionality = dimensionality
self.sample_size = sample_size
self.sparsity = sparsity
self.preselect = preselect
self.max_iter = max_iter
self.group = group
self.ic_method = ic_method
self.cv = cv
self.cv_fold_id = cv_fold_id
self.split_method = split_method
self.jax_platform = "cpu"
self.random_state = random_state
self.numeric_solver = numeric_solver
[docs]
def get_config(self, deep=True):
"""
Get parameters for this solver.
Parameters
----------
deep : bool, default=True
If True, will return the parameters for this solver and
contained subobjects that are estimators.
Returns
-------
params : dict
Parameter names mapped to their values.
"""
return super().get_params(deep)
[docs]
def set_config(self, **params):
"""Set the parameters of this solver.
Parameters
----------
**params : dict
Solver parameters.
Returns
-------
self :
Solver instance.
"""
return super().set_params(**params)
[docs]
def solve(
self,
objective,
data=None,
layers=[],
gradient=None,
init_support_set=None,
init_params=None,
jit=False,
):
r"""
Optimize the optimization objective function.
Parameters
----------
objective : callable
The objective function to be minimized: ``objective(params, data) -> loss``
where ``params`` is a 1-D array with shape (dimensionality,) and
``data`` is the fixed parameters needed to completely specify the function.
``objective`` must be written in ``JAX`` library if ``gradient`` is not provided.
data : optional
Extra arguments passed to the objective function and its derivatives (if existed).
layers : list of ``Layer`` objects, default=[]
``Layer`` is a "decorator" of the objective function.
The parameters will be processed by the ``Layer`` before entering the objective function.
The different layers can achieve different effects,
and they can be sequentially concatenated together to form a larger layer,
enabling the implementation of more complex functionalities.
The ``Layer`` objects can be found in ``skscope.layers``.
If ``layers`` is not empty, ``objective`` must be written in ``JAX`` library.
init_support_set : array of int, default=[]
The index of the variables in initial active set.
init_params : array of shape (dimensionality,), optional
The initial value of parameters, default is an all-zero vector.
gradient : callable, optional
A function that returns the gradient vector of parameters: ``gradient(params, data) -> array of shape (dimensionality,)``,
where ``params`` is a 1-D array with shape (dimensionality,) and ``data`` is the fixed parameters needed to completely specify the function.
If ``gradient`` is not provided, ``objective`` must be written in ``JAX`` library.
jit : bool, default=False
If ``objective`` or ``gradient`` are written in JAX, ``jit`` can be set to True to speed up the optimization.
Returns
-------
parameters : array of shape (dimensionality,)
The optimal solution of optimization.
"""
jax.config.update("jax_platform_name", self.jax_platform)
BaseSolver._check_positive_integer(self.dimensionality, "dimensionality")
BaseSolver._check_positive_integer(self.sample_size, "sample_size")
BaseSolver._check_non_negative_integer(self.max_iter, "max_iter")
# group
if self.group is None:
self.group = np.arange(self.dimensionality, dtype="int32")
group_num = self.dimensionality # len(np.unique(group))
else:
self.group = np.array(self.group)
if self.group.ndim > 1:
raise ValueError("Group should be an 1D array of integers.")
if self.group.size != self.dimensionality:
raise ValueError(
"The length of group should be equal to dimensionality."
)
group_num = len(np.unique(self.group))
if self.group[0] != 0:
raise ValueError("Group should start from 0.")
if any(self.group[1:] - self.group[:-1] < 0):
raise ValueError("Group should be an incremental integer array.")
if not group_num == max(self.group) + 1:
raise ValueError("There is a gap in group.")
# preselect
self.preselect = np.unique(np.array(self.preselect, dtype="int32"))
if self.preselect.size > 0 and (
self.preselect[0] < 0 or self.preselect[-1] >= group_num
):
raise ValueError("preselect should be between 0 and dimensionality.")
# default sparsity level
force_min_sparsity = self.preselect.size
default_max_sparsity = max(
force_min_sparsity,
(
group_num
if group_num <= 5
else int(group_num / np.log(np.log(group_num)) / np.log(group_num))
),
)
# sparsity
if self.sparsity is None:
self.sparsity = np.arange(
force_min_sparsity,
default_max_sparsity + 1,
dtype="int32",
)
else:
self.sparsity = np.unique(np.array(self.sparsity, dtype="int32"))
if self.sparsity.size == 0:
raise ValueError("Sparsity should not be empty.")
if self.sparsity[0] < force_min_sparsity or self.sparsity[-1] > group_num:
raise ValueError("There is an invalid sparsity.")
BaseSolver._check_positive_integer(self.cv, "cv")
if self.cv == 1:
if self.sparsity.size == 1 and self.ic_method is None:
self.ic_method = utilities.AIC
elif self.sparsity.size > 1 and self.ic_method is None:
raise ValueError(
"ic_method should be provided for choosing sparsity level with information criterion."
)
elif self.sample_size <= 1:
raise ValueError("sample_size should be given when using ic_method.")
elif self.cv > 1:
if self.cv > self.sample_size:
raise ValueError("cv should not be greater than sample_size.")
if self.split_method is None:
raise ValueError("split_method should be provided when cv > 1.")
if self.cv_fold_id is None:
kf = KFold(
n_splits=self.cv, shuffle=True, random_state=self.random_state
).split(np.zeros(self.sample_size))
self.cv_fold_id = np.zeros(self.sample_size)
for i, (_, fold_id) in enumerate(kf):
self.cv_fold_id[fold_id] = i
else:
self.cv_fold_id = np.array(self.cv_fold_id, dtype="int32")
if self.cv_fold_id.ndim > 1:
raise ValueError("cv_fold_id should be an 1D array of integers.")
if self.cv_fold_id.size != self.sample_size:
raise ValueError(
"The length of cv_fold_id should be equal to sample_size."
)
if len(set(self.cv_fold_id)) != self.cv:
raise ValueError(
"The number of different elements in cv_fold_id should be equal to cv."
)
sparsity = self.sparsity
group = self.group
preselect = self.preselect
if gradient is None and len(layers) > 0:
if len(layers) == 1:
assert layers[0].out_features == self.dimensionality
else:
for i in range(len(layers) - 1):
assert layers[i].out_features == layers[i + 1].in_features
assert layers[-1].out_features == self.dimensionality
loss_, grad_ = BaseSolver._set_objective(objective, gradient, jit, layers)
p = layers[0].in_features
for layer in layers[::-1]:
sparsity = layer.transform_sparsity(sparsity)
group = layer.transform_group(group)
preselect = layer.transform_preselect(preselect)
else:
p = self.dimensionality
loss_, grad_ = BaseSolver._set_objective(objective, gradient, jit)
def loss_fn(params, data):
value = loss_(params, data)
if not np.isfinite(value):
raise ValueError("The objective function returned {}.".format(value))
if isinstance(value, float):
return value
return value.item()
def value_and_grad(params, data):
value, grad = grad_(params, data)
if not np.isfinite(value):
raise ValueError("The objective function returned {}.".format(value))
if not np.all(np.isfinite(grad)):
raise ValueError("The gradient returned contains NaN or Inf.")
if isinstance(value, float):
return value, np.array(grad)
return value.item(), np.array(grad)
if init_support_set is None:
init_support_set = np.array([], dtype="int32")
else:
init_support_set = np.array(init_support_set, dtype="int32")
if init_support_set.ndim > 1:
raise ValueError(
"The initial active set should be an 1D array of integers."
)
if init_support_set.min() < 0 or init_support_set.max() >= p:
raise ValueError("init_support_set contains wrong index.")
# init_params
if init_params is None:
random_init = False
if len(layers) > 0:
random_init = np.any(
np.array([layer.random_initilization for layer in layers])
)
if random_init:
init_params = np.random.RandomState(self.random_state).randn(p)
else:
init_params = np.zeros(p, dtype=float)
else:
init_params = np.array(init_params, dtype=float)
if init_params.shape != (p,):
raise ValueError(
"The length of init_params should be equal to dimensionality."
)
if self.cv == 1:
is_first_loop: bool = True
for s in sparsity:
init_params, init_support_set = self._solve(
s,
loss_fn,
value_and_grad,
init_support_set,
init_params,
data,
preselect,
group,
) # warm start: use results of previous sparsity as initial value
objective_value = loss_fn(init_params, data)
eval = self.ic_method(
objective_value,
self.dimensionality,
s,
self.sample_size,
)
if is_first_loop or eval < self.eval_objective:
is_first_loop = False
self.params = init_params
self.support_set = init_support_set
self.objective_value = objective_value
self.eval_objective = eval
else: # self.cv > 1
cv_eval = {s: 0.0 for s in sparsity}
cache_init_support_set = {}
cache_init_params = {}
for s in sparsity:
for i in range(self.cv):
train_index = np.where(self.cv_fold_id != i)[0]
test_index = np.where(self.cv_fold_id == i)[0]
init_params, init_support_set = self._solve(
s,
loss_fn,
value_and_grad,
init_support_set,
init_params,
self.split_method(data, train_index),
preselect,
group,
) # warm start: use results of previous sparsity as initial value
cv_eval[s] += loss_fn(
init_params, self.split_method(data, test_index)
)
cache_init_support_set[s] = init_support_set
cache_init_params[s] = init_params
best_sparsity = min(cv_eval, key=cv_eval.get)
self.params, self.support_set = self._solve(
best_sparsity,
loss_fn,
value_and_grad,
cache_init_support_set[best_sparsity],
cache_init_params[best_sparsity],
data,
preselect,
group,
)
self.objective_value = loss_fn(self.params, data)
self.eval_objective = cv_eval[best_sparsity]
if len(layers) > 0:
for layer in layers:
self.params = layer.transform_params(self.params)
self.support_set = np.sort(np.nonzero(self.params)[0])
return self.params
@staticmethod
def _set_objective(objective, gradient, jit, layers=[]):
# objective function
if objective.__code__.co_argcount == 1:
if len(layers) == 0:
def loss_(params, data):
return objective(params)
else:
def loss_(params, data):
for layer in layers:
params = layer.transform_params(params)
return objective(params)
else:
if len(layers) == 0:
def loss_(params, data):
return objective(params, data)
else:
def loss_(params, data):
for layer in layers:
params = layer.transform_params(params)
return objective(params, data)
if jit:
loss_ = jax.jit(loss_)
if gradient is None:
def grad_(params, data):
return jax.value_and_grad(loss_)(params, data)
elif gradient.__code__.co_argcount == 1:
def grad_(params, data):
return (loss_(params, data), gradient(params))
else:
def grad_(params, data):
return (loss_(params, data), gradient(params, data))
if jit:
grad_ = jax.jit(grad_)
return loss_, grad_
def _solve(
self,
sparsity,
loss_fn,
value_and_grad,
init_support_set,
init_params,
data,
preselect,
group,
):
if sparsity == 0:
return np.zeros_like(init_params), np.array([], dtype=int)
if sparsity < preselect.size:
raise ValueError(
"The number of always selected variables is larger than the sparsity."
)
group_num = len(np.unique(group))
group_indices = [np.where(group == i)[0] for i in range(group_num)]
if (
math.comb(
group_num - preselect.size,
sparsity - preselect.size,
)
> self.max_iter
):
raise ValueError(
"The number of subsets is too large, please reduce the sparsity, dimensionality or increase max_iter."
)
def all_subsets(p: int, s: int, preselect: np.ndarray = np.zeros(0)):
universal_set = np.setdiff1d(np.arange(group_num), preselect)
p = p - preselect.size
s = s - preselect.size
def helper(start: int, s: str, curr_selection: np.ndarray):
if s == 0:
yield curr_selection
else:
for i in range(start, p - s + 1):
yield from helper(
i + 1, s - 1, np.append(curr_selection, universal_set[i])
)
yield from helper(0, s, preselect)
result = {"params": None, "support_set": None, "objective_value": math.inf}
params = init_params.copy()
for support_set_group in all_subsets(group_num, sparsity, preselect):
support_set = np.concatenate([group_indices[i] for i in support_set_group])
inactive_set = np.ones_like(init_params, dtype=bool)
inactive_set[support_set] = False
params[inactive_set] = 0.0
params[support_set] = init_params[support_set]
loss, params = self._numeric_solver(
loss_fn, value_and_grad, params, support_set, data
)
if loss < result["objective_value"]:
result["params"] = params.copy()
result["support_set"] = support_set
result["objective_value"] = loss
return result["params"], result["support_set"]
def _numeric_solver(
self,
loss_fn,
value_and_grad,
params,
optim_variable_set,
data,
):
"""
Solve the optimization problem with given support set.
Parameters
----------
loss_fn: Callable[[Sequence[float], Any], float]
The loss function.
value_and_grad: Callable[[Sequence[float], Any], Tuple[float, Sequence[float]]]
The function to compute the loss and gradient.
params: Sequence[float]
The complete initial parameters.
optim_variable_set: Sequence[int]
The index of variables to be optimized.
data: Any
The data passed to loss_fn and value_and_grad.
Returns
-------
loss: float
The loss of the optimized parameters, i.e., `loss_fn(params, data)`.
optimized_params: Sequence[float]
The optimized parameters.
"""
if not isinstance(params, np.ndarray) or params.ndim != 1:
raise ValueError("params should be a 1D np.ndarray.")
if (
not isinstance(optim_variable_set, np.ndarray)
or optim_variable_set.ndim != 1
):
raise ValueError("optim_variable_set should be a 1D np.ndarray.")
if optim_variable_set.size == 0:
return loss_fn(params, data)
return self.numeric_solver(
loss_fn, value_and_grad, params, optim_variable_set, data
)
[docs]
def get_result(self):
r"""
Get the result of optimization.
Returns
-------
results : dict
The result of optimization, including the following keys:
+ ``params`` : array of shape (dimensionality,)
The optimal parameters.
+ ``support_set`` : array of int
The support set of the optimal parameters.
+ ``objective_value`` : float
The value of objective function at the optimal parameters.
+ ``eval_objective`` : float
The value of information criterion or mean loss of cross-validation.
"""
return {
"params": self.params,
"support_set": self.support_set,
"objective_value": self.objective_value,
"eval_objective": self.eval_objective,
}
[docs]
def get_estimated_params(self):
r"""
Get the optimal parameters of the objective function.
Returns
-------
parameters : array of shape (dimensionality,)
The optimal solution of optimization.
"""
return self.params
[docs]
def get_support(self):
r"""
Get the support set of the optimal parameters.
Returns
-------
support_set : array of int
The indices of selected variables, sorted in ascending order.
"""
return self.support_set
@staticmethod
def _check_positive_integer(var, name: str):
if not isinstance(var, int) or var <= 0:
raise ValueError("{} should be an positive integer.".format(name))
@staticmethod
def _check_non_negative_integer(var, name: str):
if not isinstance(var, int) or var < 0:
raise ValueError("{} should be an non-negative integer.".format(name))