Source code for mico.mico
"""
This module contains a scikit-learn compatible implementation of mutual information based feature selection methods.
"""
from scipy import signal
from sklearn.utils import check_X_y
from sklearn.preprocessing import StandardScaler
from psutil import cpu_count
from sklearn.base import BaseEstimator
from sklearn.feature_selection.base import SelectorMixin
import bottleneck as bn
from . import mico_utils
#from sklearn.feature_selection import VarianceThreshold
from scipy import sparse, stats
from scipy.stats import rankdata
import copy
from abc import ABCMeta, abstractmethod
import math
DEBUG = False
###############################################################################
# IO #
###############################################################################
import errno, os, logging
def setup_logging(level, filename=None):
logging.basicConfig(filename=filename, format='%(message)s', level=level)
# logging.basicConfig(filename=filename, format='[%(asctime)s] %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p', level=level)
def append_message(full_msg, curr_msg, check=True, new_line=True):
if check:
return (full_msg + (curr_msg + "\n" if new_line else curr_msg)) if curr_msg not in full_msg else full_msg
else:
return full_msg + (curr_msg + "\n" if new_line else curr_msg)
def make_dir(path):
"""Private function for creating the output directory.
@ref https://stackoverflow.com/questions/20666764/python-logging-how-to-ensure-logfile-directory-is-created
"""
try:
os.makedirs(path, exist_ok=True) # Python>3.2
except TypeError:
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise FileNotFoundError("Failed to make directory")
###############################################################################
# Time stamp #
###############################################################################
from time import gmtime, strftime
def get_time_stamp():
return strftime("%Y-%m-%d-%H-%M-%S", gmtime())
###############################################################################
# MI. #
###############################################################################
import numpy as np
#from sklearn.externals.joblib import Parallel, delayed
from joblib import Parallel, delayed
def _replace_vec_nan_with_zero(vec):
for i, e in enumerate(vec):
if np.isnan(e):
vec[i] = 0.0
return vec
def _replace_mat_nan_with_zero(mat):
for i in range(mat.shape[0]):
for j in range(mat.shape[1]):
if np.isnan(mat[i][j]):
mat[i][j] = 0.0
return mat
def _clean_up_MI(MI, use_nan_for_invalid_mi):
if MI == np.inf:
MI = 999999.0
if MI >= 0:
return MI
else:
return np.nan if use_nan_for_invalid_mi else 0.0
def get_first_mi_vector(MI_FS, k, are_data_binned, n_jobs=1):
"""
Calculates the Mututal Information between each feature in X and y.
This function is for when |S| = 0. We select the first feature in S.
"""
n, p = MI_FS.X.shape
if n_jobs <= 1:
MIs = [_get_first_mi(i, k, MI_FS, are_data_binned) for i in range(p)]
else:
MIs = Parallel(n_jobs=MI_FS.n_jobs)(delayed(_get_first_mi)(i, k, MI_FS, are_data_binned) for i in range(p))
return MIs
def _get_first_mi(i, k, MI_FS, are_data_binned, use_nan_for_invalid_mi=True):
n, p = MI_FS.X.shape
x = MI_FS.X[:, i].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n, 1)
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(x, y)
else:
MI = mico_utils.get_mutual_information_cd(y, x, k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(x, y, k)
else:
MI = mico_utils.get_mutual_information_cc(x, y, k)
# MI must be non-negative
return _clean_up_MI(MI, use_nan_for_invalid_mi)
def get_mi_vector(MI_FS, k, F, s, are_data_binned, n_jobs=1):
"""
Calculates the Mututal Information between each feature in F and s.
This function is for when |S| > 1. s is the previously selected feature.
We exploit the fact that this step is embarrassingly parallel.
"""
if n_jobs <= 1:
MIs = [_get_mi(f, s, k, MI_FS, are_data_binned) for f in F]
else:
MIs = Parallel(n_jobs=MI_FS.n_jobs)(delayed(_get_mi)(f, s, k, MI_FS, are_data_binned) for f in F)
return MIs
def _get_mi(f, s, k, MI_FS, are_data_binned, use_nan_for_invalid_mi=True):
"""
A Semidefinite Programming Based Search Strategy for Feature Selection with Mutual Information Measure
Tofigh Naghibi , Sarah Hoffmann and Beat Pfister
Section 2.3.1.
:param f:
:param s:
:param MI_FS:
:param are_data_binned:
:return:
"""
n, p = MI_FS.X.shape
if MI_FS.method in ['JMI', 'JMIM']:
# JMI & JMIM
if s != f:
# Off-diagonal.
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n, 1)
joint = MI_FS.X[:, (s, f)]
if are_data_binned:
# Encoding.
joint = mico_utils.encode_discrete_x(joint).reshape(n, 1)
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(joint, y)
else:
MI = mico_utils.get_mutual_information_cd(y, joint, k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(joint, y, k)
else:
MI = mico_utils.get_mutual_information_cc(joint, y, k)
else:
# Diagonal.
x = MI_FS.X[:, f].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n)
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(x, y)
else:
MI = mico_utils.get_mutual_information_cd(y, x, k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(x, y, k)
else:
MI = mico_utils.get_mutual_information_cc(x, y, k)
else:
# MRMR
if s != f:
# Off-diagonal-- Did not use any information from y here.
x1 = MI_FS.X[:, f].reshape(n, 1)
x2 = MI_FS.X[:, s].reshape(n, 1)
if are_data_binned:
MI = mico_utils.get_mutual_information_dd(x1, x2)
else:
MI = mico_utils.get_mutual_information_cc(x1, x2, k)
else:
# Diagonal-- Did use information from y here.
x = MI_FS.X[:, f].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n)
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(x, y)
else:
MI = mico_utils.get_mutual_information_cd(y, x, k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(x, y, k)
else:
MI = mico_utils.get_mutual_information_cc(x, y, k)
# MI must be non-negative
return _clean_up_MI(MI, use_nan_for_invalid_mi)
def get_mico_vector(MI_FS, k, F, s, offdiagonal_param, are_data_binned, Nx=None, n_jobs=1):
"""
Calculates the Mututal Information between each feature in F and s.
This function is for when |S| > 1. s is the previously selected feature.
We exploit the fact that this step is embarrassingly parallel.
"""
#print("Start {}".format(s))
if n_jobs <= 1:
MIs = [_get_mico(f, s, k, MI_FS, offdiagonal_param, are_data_binned, Nx=Nx) for f in F]
else:
MIs = Parallel(n_jobs=MI_FS.n_jobs)(delayed(_get_mico)(f, s, k, MI_FS, offdiagonal_param, are_data_binned, Nx=Nx) for f in F)
#print("Done {}".format(s))
return MIs
def _get_mico(f, s, k, MI_FS, offdiagonal_param, are_data_binned, use_nan_for_invalid_mi=False, Nx=None):
"""
A Semidefinite Programming Based Search Strategy for Feature Selection with Mutual Information Measure
Tofigh Naghibi , Sarah Hoffmann and Beat Pfister
Section 2.3.1.
:param f:
:param s:
:param MI_FS:
:param are_data_binned:
:return:
"""
n, p = MI_FS.X.shape
clean_up = True
if MI_FS.method in ['JMI', 'JMIM']:
if False:
# Three-way interaction info.
# JMI & JMIM
if s != f:
# Off-diagonal -- -1/2(P-1) I(X_{.s}; X_{.k}; y).
x1 = MI_FS.X[:, f].reshape(n, 1)
x2 = MI_FS.X[:, s].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
clean_up = False # No need to clean-up as the 3-way interaction info can be negative.
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_interaction_information_3way(x1, x2, y, "ddd", k) * offdiagonal_param
else:
MI = mico_utils.get_interaction_information_3way(x1, x2, y, "ddc", k) * offdiagonal_param
else:
if MI_FS.categorical:
MI = mico_utils.get_interaction_information_3way(x1, x2, y, "ccd", k) * offdiagonal_param
else:
MI = mico_utils.get_interaction_information_3way(x1, x2, y, "ccc", k) * offdiagonal_param
else:
# Diagonal -- I(X_{.s}; y).
x = MI_FS.X[:, f].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n)
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(x, y)
else:
MI = mico_utils.get_mutual_information_cd(y, x, k)
#MI = mico_utils.get_mutual_information_cd(y.reshape(n, 1), x.reshape(n), k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(x, y, k, Nx)
else:
MI = mico_utils.get_mutual_information_cc(x, y, k)
else:
# MI
# JMI & JMIM
if s != f:
# Off-diagonal.
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n, 1)
joint = MI_FS.X[:, (s, f)]
if are_data_binned:
# Encoding.
joint = mico_utils.encode_discrete_x(joint).reshape(n, 1)
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(joint, y)
else:
MI = mico_utils.get_mutual_information_cd(y, joint, k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(joint, y, k, Nx)
else:
MI = mico_utils.get_mutual_information_cc(joint, y, k)
else:
# Diagonal.
x = MI_FS.X[:, f].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n)
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(x, y)
else:
MI = mico_utils.get_mutual_information_cd(y, x, k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(x, y, k, Nx)
else:
MI = mico_utils.get_mutual_information_cc(x, y, k)
else:
# MRMR
if s != f:
# Off-diagonal -- Did not use any information from y here.
x1 = MI_FS.X[:, f].reshape(n, 1)
x2 = MI_FS.X[:, s].reshape(n, 1)
if are_data_binned:
MI = mico_utils.get_mutual_information_dd(x1, x2) * offdiagonal_param
else:
MI = mico_utils.get_mutual_information_cc(x1, x2, k) * offdiagonal_param
else:
# Diagonal -- Did use information from y here.
x = MI_FS.X[:, f].reshape(n, 1)
y = MI_FS.y.reshape(n, 1)
#y = MI_FS.y.flatten().reshape(n)
if are_data_binned:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_dd(x, y)
else:
MI = mico_utils.get_mutual_information_cd(y, x, k, Nx)
#MI = mico_utils.get_mutual_information_cd(y.reshape(n, 1), x.reshape(n), k)
else:
if MI_FS.categorical:
MI = mico_utils.get_mutual_information_cd(x, y, k)
else:
MI = mico_utils.get_mutual_information_cc(x, y, k)
# MI must be non-negative
if clean_up:
MI = _clean_up_MI(MI, use_nan_for_invalid_mi)
if s == f:
MI = max(1.E-4, MI)
return MI
class MutualInformationBase(BaseEstimator, SelectorMixin, metaclass=ABCMeta):
"""
Base class for feature selection implementation.
This base class contains routines for selecting features implementation.
Parameters
----------
method : string, optional (default='JMI')
A string that specifies the mutual information measure. Possible options are:
- **JMI** : Joint Mutual Information [1]_
- **JMIM** : Joint Mutual Information Maximisation [2]_
- **MRMR** : Max-Relevance Min-Redundancy [3]_
k : int, optional (default=5)
An integer to specify number of samples for the kernel density estimation using KNN. A small integer between 3 and 10 is recommended. Note that this parameter is applicable only if `num_bins` is set to 0.
n_features : int or string, optional (default='auto')
A positive integer number that specifies the number of features to be selected.
If 'auto' is used, then this number will be determined automatically.
categorical : boolean, optional (default=True)
If `categorical` is `True`, then the dependent variable is assumed to be a categorical class label. Otherwise, the dependent variable is assumed to be a continuous variable.
n_jobs : int, optional (default=1)
The number of threads allowed for the computations. If `n_jobs` :math:`<= 0`, then it will be determined internally.
verbose : int, optional (default=0)
An integer number to specify the verbosity level. Available options are:
- 0: Disable output.
- 1: Display summary results.
- 2: Displays all results.
scale_data : boolean, optional (default=True)
An boolean flag that specifies if the input data input shall be scaled first.
num_bins : int, optional (default=0)
An integer number to specify the number of bins allowed for data binning (bucketing). If :math:`0`, then data binning will be disabled, and KNN will be used for kernal density estimation.
References
----------
.. [1] H Yang and J Moody, "Data Visualization and Feature Selection: New Algorithms for Nongaussian Data", NIPS 1999. [`Pre-print <https://papers.nips.cc/paper/1779-data-visualization-and-feature-selection-new-algorithms-for-nongaussian-data.pdf>`_]
.. [2] M Bennasar, Y Hicks, abd R Setchi, "Feature selection using Joint Mutual Information Maximisation", Expert Systems with Applications, 42(22), pp. 8520--8532, 2015 [`pre-print <https://core.ac.uk/download/pdf/82448198.pdf>`_]
.. [3] H Peng, F Long, and C Ding, "Feature selection based on mutual information criteria of max-dependency, max-relevance, and min-redundancy", IEEE Transactions on Pattern Analysis and Machine Intelligence, 27(8), pp. 1226--1238, 2005. [`Pre-print <http://ranger.uta.edu/~chqding/papers/mRMR_PAMI.pdf>`_]
"""
def __init__(self,
method='JMI',
k=5,
n_features='auto',
categorical=True,
n_jobs=0,
verbose=0,
scale_data=True,
num_bins=0):
self.method = method
self.k = k
self.n_features = n_features
self.categorical = categorical
self.n_jobs = n_jobs
self.verbose = verbose
self.scale_data = scale_data
self.num_bins = num_bins
self._support_mask = None
# Check if n_jobs is negative
if self.n_jobs <= 0:
self.n_jobs = cpu_count()
# Logger.
#make_dir("./log/")
#instance = get_time_stamp()
#filename = "./log/{}.log".format(instance)
filename = None
if self.verbose == 0:
lv = logging.CRITICAL
elif self.verbose == 1:
lv = logging.INFO
else:
lv = logging.DEBUG
setup_logging(level=lv, filename=filename)
def _get_support_mask(self):
if self._support_mask is None:
raise ValueError('mRMR has not been fitted yet!')
return self._support_mask
def _scale_data(sefl, X):
""""
Scale each column of the data using SKL.
Todo
----
Shall we ignore binning the encoded features and discrete features?
References
----------
[1] http://www.faqs.org/faqs/ai-faq/neural-nets/part2/section-16.html
"""
from sklearn.preprocessing import MinMaxScaler, StandardScaler, MaxAbsScaler
# X must be stored in a row-wise order.
if sparse.issparse(X):
scalar = MaxAbsScaler()
else:
scalar = MinMaxScaler() # MinMaxScaler() # StandardScaler
return scalar.fit_transform(X)
def _bin_data(sefl, X, num_bins):
""""
Bin the data using SKL.
"""
# X must be stored in a row-wise order.
from sklearn.preprocessing import KBinsDiscretizer
if type(num_bins) is int:
# bin_set = [ min(num_bins, len(array))]
# @todo Check unique value in each feature.
bin_set = [num_bins for _ in range(X.shape[1])]
else:
bin_set = num_bins
est = KBinsDiscretizer(n_bins=bin_set, encode='ordinal', strategy='uniform').fit(X)
return est.transform(X)
def _are_data_binned(self):
return self.num_bins > 0
def _is_integer(self, x):
return np.all(np.equal(np.mod(x, 1), 0))
def _init_data(self, X, y):
# checking input data and scaling it if y is continuous
X, y = check_X_y(X, y)
if not self.categorical:
ss = StandardScaler()
X = ss.fit_transform(X)
y = ss.fit_transform(y.reshape(-1, 1))
# Bin the data if needed.
if self.num_bins > 0:
logging.info("Started binning data.")
X = self._bin_data(X, self.num_bins)
else:
self.num_bins = 0
# Scale the data if needed.
if self.scale_data:
logging.info("Started scaling data.")
X = self._scale_data(X)
# sanity checks
methods = ['JMI', 'JMIM', 'MRMR']
if self.method not in methods:
raise ValueError('Please choose one of the following methods:\n' +
'\n'.join(methods))
if not isinstance(self.k, int):
raise ValueError("k must be an integer.")
if self.k < 1:
raise ValueError('k must be larger than 0.')
if self.categorical and np.any(self.k > np.bincount(y)):
raise ValueError('k must be smaller than your smallest class.')
if not isinstance(self.categorical, bool):
raise ValueError('Categorical must be Boolean.')
if self.categorical and np.unique(y).shape[0] > 5:
logging.warning('Are you sure y is categorical? It has more than 5 levels.')
if not self.categorical and self._is_integer(y):
logging.warning('Are you sure y is continuous? It seems to be discrete.')
if self._is_integer(X):
logging.warning('The values of X seem to be discrete. MI_FS will treat them as continuous.')
return X, y
def _add_remove(self, S, F, i):
"""
Helper function: removes ith element from F and adds it to S.
"""
S.append(i)
F.remove(i)
return S, F
def _remove(self, S, i):
"""
Helper function: removes ith element from F and adds it to S.
"""
S.remove(i)
return S
def _calc_xQx(self, Q, curr_soln):
score = 0
n = Q.shape[0]
for i in range(n):
if curr_soln[i] > 0:
for j in range(n):
if curr_soln[j] > 0 and not np.isnan(Q[i, j]):
score += Q[i, j]
return score
def _convert_idx_array_to_mask_array(self, idx_array, n):
mask_array = np.zeros(n, dtype=int)
mask_array[idx_array] = 1
return mask_array
def _convert_mask_array_to_idx_array(self, mask_array):
return np.where(mask_array)
def _generate_feature_importances(self, best_soln, Q, num_features):
feature_importances = []
require_clean_run = False
best_soln_binary = self._convert_idx_array_to_mask_array(best_soln, num_features)
best_score = self._calc_xQx(Q, best_soln_binary)
for i in range(len(best_soln_binary)):
if best_soln_binary[i] == 1:
# Remove the feature and recalculate the score.
best_soln_binary[i] = 0
curr_score = self._calc_xQx(Q, best_soln_binary)
diff_score = best_score - curr_score
best_soln_binary[i] = 1
if diff_score < 0:
logging.info("Warning: Feature {} has negative score {1}".format(i, diff_score))
best_soln_binary[i] = 0
require_clean_run = True
feature_importances.append(diff_score)
else:
feature_importances.append(0.0)
if require_clean_run:
best_soln = self._convert_mask_array_to_idx_array(best_soln_binary)
best_score = self._calc_xQx(Q, best_soln_binary)
# Normalize and rank.
if sum(feature_importances) <= 0:
raise ValueError("Failed: all features have zero importance score. Try using a smaller value for parameter k.")
feature_importances = feature_importances / sum(feature_importances)
feature_raking = list(map(int, rankdata(-feature_importances)))
return best_soln, best_score, feature_importances, feature_raking
[docs]class MutualInformationForwardSelection(MutualInformationBase):
"""
Forward selection approach for feature selection.
This class implements a forward selection approach for feature selection with mutual information (MI) measure.
Parameters
----------
method : string, optional (default='JMI')
A string that specifies the mutual information measure. Possible options are:
- **JMI** : Joint Mutual Information [1]_
- **JMIM** : Joint Mutual Information Maximisation [2]_
- **MRMR** : Max-Relevance Min-Redundancy [3]_
k : int, optional (default=5)
An integer to specify number of samples for the kernel density estimation using KNN. A small integer between 3 and 10 is recommended. Note that this parameter is applicable only if `num_bins` is set to 0.
n_features : int or string, optional (default='auto')
A positive integer number that specifies the number of features to be selected.
If 'auto' is used, then this number will be determined automatically.
categorical : boolean, optional (default=True)
If `categorical` is `True`, then the dependent variable is assumed to be a categorical class label. Otherwise, the dependent variable is assumed to be a continuous variable.
n_jobs : int, optional (default=1)
The number of threads allowed for the computations. If `n_jobs` :math:`<= 0`, then it will be determined internally.
verbose : int, optional (default=0)
An integer number to specify the verbosity level. Available options are:
- 0: Disable output.
- 1: Display summary results.
- 2: Displays all results.
scale_data : boolean, optional (default=True)
An boolean flag that specifies if the input data input shall be scaled first.
num_bins : int, optional (default=0)
An integer number to specify the number of bins allowed for data binning (bucketing). If :math:`0`, then data binning will be disabled, and KNN will be used for kernal density estimation.
early_stop_steps : int, optional (default=10)
An positive integer to specify the iteration limit allowed without improvement on the MI measure.
Attributes
----------
n_features_ : int
The number of selected features.
feature_importances_ : array of shape n_features
The feature importance scores of the selected features.
ranking_ : array of shape n_features
The feature ranking of the selected features, with the first being the first feature selected with largest marginal MI with y, followed by
the others with decreasing MI.
mi_ : array of shape n_features
The MI measure of the selected features. Usually this a monotone decreasing array of numbers converging to 0. One can use this to estimate the
number of features to select. In fact this is what n_features='auto' tries to do heuristically.
Examples
--------
This implementation follows the scikit-learn API convention:
- ``fit(X, y)``
- ``transform(X)``
- ``fit_transform(X, y)``
References
----------
.. [1] H Yang and J Moody, "Data Visualization and Feature Selection: New Algorithms for Nongaussian Data", NIPS 1999. [`Pre-print <https://papers.nips.cc/paper/1779-data-visualization-and-feature-selection-new-algorithms-for-nongaussian-data.pdf>`_]
.. [2] M Bennasar, Y Hicks, abd R Setchi, "Feature selection using Joint Mutual Information Maximisation", Expert Systems with Applications, 42(22), pp. 8520--8532, 2015 [`pre-print <https://core.ac.uk/download/pdf/82448198.pdf>`_]
.. [3] H Peng, F Long, and C Ding, "Feature selection based on mutual information criteria of max-dependency, max-relevance, and min-redundancy", IEEE Transactions on Pattern Analysis and Machine Intelligence, 27(8), pp. 1226--1238, 2005. [`Pre-print <http://ranger.uta.edu/~chqding/papers/mRMR_PAMI.pdf>`_]
"""
def __init__(self,
method='JMI',
k=5,
n_features='auto',
categorical=True,
n_jobs=0,
verbose=0,
scale_data=True,
num_bins=0,
early_stop_steps=10):
# Call base constructor.
super(MutualInformationForwardSelection, self).__init__(
method, k, n_features, categorical,
n_jobs, verbose, scale_data, num_bins)
self.early_stop_steps = early_stop_steps
# Attributes.
self.n_features_ = 0
self.feature_importances_ = []
self.ranking_ = []
self.mi_ = []
[docs] def fit(self, X, y):
"""
Fits the MI_FS feature selection with the chosen MI_FS method.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
The training input samples.
y : array-like, shape = [n_samples]
The target values.
"""
self.X, y = self._init_data(X, y)
n, p = X.shape
num_features = p
self.y = y.reshape((n, 1))
# list of selected features
S = []
# list of all features
F = list(range(p))
# Calculate max features
if self.n_features == 'auto':
n_features = p
else:
n_features = min(p, self.n_features)
feature_mi_matrix = np.zeros((n_features, p))
feature_mi_matrix[:] = np.nan
S_mi = []
# Populate information.
self._print_init_result()
#-------------------------------------------------------------------#
# Find first feature #
#-------------------------------------------------------------------#
xy_MI = np.array(get_first_mi_vector(self, self.k, self._are_data_binned(), n_jobs=1))
xy_MI = _replace_vec_nan_with_zero(xy_MI)
if sum(xy_MI) <= 0:
logging.warning("Warning: The first MI are all zeros. Try using a different value of k.")
# choose the best, add it to S, remove it from F
selected = bn.nanargmax(xy_MI)
S, F = self._add_remove(S, F, selected)
S_mi.append(selected)
# Populate information.
self._print_curr_result(S, selected, S_mi)
self.ranking_.append(selected)
#-------------------------------------------------------------------#
# Find Subsequent features #
#-------------------------------------------------------------------#
while len(S) < n_features:
# loop through the remaining unselected features and calculate MI
s = len(S) - 1
feature_mi_matrix[s, F] = get_mi_vector(self, self.k, F, S[-1], self._are_data_binned(), n_jobs=1)
# make decision based on the chosen FS algorithm
fmm = feature_mi_matrix[:len(S), F]
if self.method == 'JMI':
JMI = bn.nansum(fmm, axis=0)
selected = F[bn.nanargmax(JMI)]
S_mi.append(bn.nanmax(JMI))
elif self.method == 'JMIM':
JMIM = bn.nanmin(fmm, axis=0)
if bn.allnan(JMIM):
break
selected = F[bn.nanargmax(JMIM)]
S_mi.append(bn.nanmax(JMIM))
elif self.method == 'MRMR':
if bn.allnan(bn.nanmean(fmm, axis=0)):
break
MRMR = xy_MI[F] - bn.nanmean(fmm, axis=0)
selected = F[bn.nanargmax(MRMR)]
S_mi.append(bn.nanmax(MRMR))
# record the newly selected feature and add it to S.
S, F = self._add_remove(S, F, selected)
self.ranking_.append(selected)
# Populate information.
self._print_curr_result(S, selected, S_mi)
# Perform early stops.
if self.n_features == 'auto' and \
(self.early_stop_steps > 0 and len(S) > self.early_stop_steps):
# smooth the 1st derivative of the MI values of previously sel
MI_dd = signal.savgol_filter(S_mi[1:], 9, 2, 1)
# does the mean of the last 5 converge to 0?
if np.abs(np.mean(MI_dd[-5:])) < 1e-3:
break
S = sorted(S)
#-------------------------------------------------------------------#
# Calculate final MIs #
#-------------------------------------------------------------------#
logging.info("Started calculating final MI matrix.")
num_features_S = len(S)
feature_mi_matrix = np.zeros((p, p))
feature_mi_matrix[:] = np.nan
if True:
# Parallelize the outer loop - faster.
feature_mi_matrix_partial_tri = Parallel(n_jobs=self.n_jobs)(delayed(get_mi_vector)(
self, self.k, S[i:], s, self._are_data_binned(), n_jobs=1) for i, s in enumerate(S))
# Get the MI matrix.
for i in range(num_features_S):
for j in range(i, num_features_S):
feature_mi_matrix[S[i], S[j]] = feature_mi_matrix_partial_tri[i][j - i]
if i != j:
feature_mi_matrix[S[j], S[i]] = feature_mi_matrix[S[i], S[j]]
else:
# Parallelize the inner loop - slower.
all_S = list(range(p))
for s in all_S:
feature_mi_matrix[s, all_S] = get_mi_vector(self, self.k, all_S, s, self._are_data_binned(), n_jobs=self.n_jobs)
# Ensure the MI matrix is symmetric.
feature_mi_matrix = mico_utils.make_mat_sym(feature_mi_matrix)
#-------------------------------------------------------------------#
# Generate feature importance scores. #
#-------------------------------------------------------------------#
S, best_score, feature_importances, feature_raking = \
self._generate_feature_importances(best_soln=S, Q=feature_mi_matrix, num_features=num_features)
#-------------------------------------------------------------------#
# Save results #
#-------------------------------------------------------------------#
num_features_sel = n_features
self.n_features_ = len(S)
self.feature_importances_ = feature_importances
self._support_mask = np.zeros(p, dtype=np.bool)
self._support_mask[S] = True
self.mi_ = S_mi
self._print_end_result(num_features_sel=num_features_sel, num_features=num_features, best_soln=S)
return self
def _print_init_result(self):
logging.info("Started MIFS.")
logging.info(" - Method : {}".format(self.method))
logging.info(" - Num. threads : {}".format(self.n_jobs))
logging.info(" - Num. features : {}".format(self.n_features))
logging.info("{0:>5}{1:>5}{2:15}".format("Iter", "Sel", " Current MI"))
def _print_curr_result(self, S, selected, MIs):
logging.info("{0:>5}{1:>5}{2:+14.6E}".format(len(S), selected, MIs[-1]))
def _print_end_result(self, num_features_sel, num_features, best_soln):
logging.info("Done MIFS.")
logging.info(" - Total feat. : {}".format(num_features))
logging.info(" - Target feat. : {}".format(num_features_sel))
logging.info(" - Actual feat. : {}".format(len(best_soln)))
[docs]class MutualInformationBackwardElimination(MutualInformationBase):
"""
Backward elimination approach for feature selection.
This class implements a backward selection approach for feature selection with mutual information (MI) measure.
Parameters
----------
method : string, optional (default='JMI')
A string that specifies the mutual information measure. Possible options are:
- **JMI** : Joint Mutual Information [1]_
- **JMIM** : Joint Mutual Information Maximisation [2]_
- **MRMR** : Max-Relevance Min-Redundancy [3]_
k : int, optional (default=5)
An integer to specify number of samples for the kernel density estimation using KNN. A small integer between 3 and 10 is recommended. Note that this parameter is applicable only if `num_bins` is set to 0.
n_features : int or string, optional (default='auto')
A positive integer number that specifies the number of features to be selected.
If 'auto' is used, then this number will be determined automatically.
categorical : boolean, optional (default=True)
If `categorical` is `True`, then the dependent variable is assumed to be a categorical class label. Otherwise, the dependent variable is assumed to be a continuous variable.
n_jobs : int, optional (default=1)
The number of threads allowed for the computations. If `n_jobs` :math:`<= 0`, then it will be determined internally.
verbose : int, optional (default=0)
An integer number to specify the verbosity level. Available options are:
- 0: Disable output.
- 1: Display summary results.
- 2: Displays all results.
scale_data : boolean, optional (default=True)
An boolean flag that specifies if the input data input shall be scaled first.
num_bins : int, optional (default=0)
An integer number to specify the number of bins allowed for data binning (bucketing). If :math:`0`, then data binning will be disabled, and KNN will be used for kernal density estimation.
Attributes
----------
n_features_ : int
The number of selected features.
feature_importances_ : array of shape n_features
The feature importance scores of the selected features.
ranking_ : array of shape n_features
The feature ranking of the selected features, with the first being the first feature selected with largest marginal MI with y, followed by
the others with decreasing MI.
mi_ : array of shape n_features
The MI measure of the selected features. Usually this a monotone decreasing array of numbers converging to 0. One can use this to estimate the
number of features to select. In fact this is what n_features='auto' tries to do heuristically.
Examples
--------
This implementation follows the scikit-learn API convention:
- ``fit(X, y)``
- ``transform(X)``
- ``fit_transform(X, y)``
References
----------
.. [1] H Yang and J Moody, "Data Visualization and Feature Selection: New Algorithms for Nongaussian Data", NIPS 1999. [`Pre-print <https://papers.nips.cc/paper/1779-data-visualization-and-feature-selection-new-algorithms-for-nongaussian-data.pdf>`_]
.. [2] M Bennasar, Y Hicks, abd R Setchi, "Feature selection using Joint Mutual Information Maximisation", Expert Systems with Applications, 42(22), pp. 8520--8532, 2015 [`pre-print <https://core.ac.uk/download/pdf/82448198.pdf>`_]
.. [3] H Peng, F Long, and C Ding, "Feature selection based on mutual information criteria of max-dependency, max-relevance, and min-redundancy", IEEE Transactions on Pattern Analysis and Machine Intelligence, 27(8), pp. 1226--1238, 2005. [`Pre-print <http://ranger.uta.edu/~chqding/papers/mRMR_PAMI.pdf>`_]
"""
def __init__(self,
method='JMI',
k=5,
n_features='auto',
categorical=True,
n_jobs=0,
verbose=0,
scale_data=True,
num_bins=0):
# Call base constructor.
super(MutualInformationBackwardElimination, self).__init__(
method, k, n_features, categorical,
n_jobs, verbose, scale_data, num_bins)
# Deprecated.
self.early_stop_steps = 0
# Attributes.
self.n_features_ = 0
self.feature_importances_ = []
[docs] def fit(self, X, y):
"""
Fits the MI_FS feature selection with the chosen MI_FS method.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
The training input samples.
y : array-like, shape = [n_samples]
The target values.
"""
self.X, y = self._init_data(X, y)
n, p = X.shape
num_features = p
self.y = y.reshape(n, 1)
# list of selected features
S = list(range(p))
# Calculate max features.
if self.n_features == 'auto':
n_features = int(p * 0.2)
else:
n_features = min(p, self.n_features)
feature_mi_matrix = np.zeros((p, p))
feature_mi_matrix[:] = np.nan
S_mi = []
# Populate information.
self._print_init_result()
#-------------------------------------------------------------------#
# Calculate full MIs #
#-------------------------------------------------------------------#
logging.info("Started calculating full MI matrix.")
if self.method == 'MRMR':
xy_MI = np.array(get_first_mi_vector(self, self.k, self._are_data_binned(), n_jobs=1))
xy_MI = _replace_vec_nan_with_zero(xy_MI)
if True:
# Parallelize the outer loop - faster.
feature_mi_matrix_tri = Parallel(n_jobs=self.n_jobs)(delayed(get_mi_vector)(
self, self.k, list(range(i, num_features, 1)), s, self._are_data_binned(), n_jobs=1) for i, s in enumerate(S))
for i, s in enumerate(S):
feature_mi_matrix[s, list(range(i, num_features, 1))] = feature_mi_matrix_tri[s]
# Ensure the MI matrix is symmetric.
for i in range(num_features):
for j in range(i + 1, num_features):
feature_mi_matrix[j, i] = feature_mi_matrix[i, j]
else:
# Parallelize the inner loop - slower.
for s in S:
feature_mi_matrix[s, S] = get_mi_vector(self, self.k, S, s, self._are_data_binned(), n_jobs=self.n_jobs)
# Ensure the MI matrix is symmetric.
feature_mi_matrix = mico_utils.make_mat_sym(feature_mi_matrix)
#-------------------------------------------------------------------#
# Find subsequent features #
#-------------------------------------------------------------------#
logging.info("Started backward selection.")
while len(S) >= n_features + 1:
fmm = np.zeros((len(S) - 1, len(S)))
# make decision based on the chosen FS algorithm
for i, s_remove in enumerate(S):
S_keep = copy.deepcopy(S)
S_keep.pop(i)
fmm[:, i] = feature_mi_matrix[S_keep, s_remove]
if bn.allnan(fmm[:, i]):
fmm[:, i] = [0 for _ in range(fmm.shape[0])]
logging.warning("Feature [{0}] has all 0 MI.".format(i))
if self.method == 'JMI':
values = bn.nansum(fmm, axis=0)
removed_feature = S[bn.nanargmin(values)]
S_mi.append(bn.nanmin(values))
elif self.method == 'JMIM':
values = bn.nanmin(fmm, axis=0)
if bn.allnan(values):
break
removed_feature = S[bn.nanargmin(values)]
S_mi.append(bn.nanmin(values))
elif self.method == 'MRMR':
if bn.allnan(bn.nanmean(fmm, axis=0)):
break
values = xy_MI[S] - bn.nanmean(fmm, axis=0)
removed_feature = S[bn.nanargmin(values)]
S_mi.append(bn.nanmin(values))
# Update removed_feature feature list.
S = self._remove(S, removed_feature)
# Populate information.
self._print_curr_result(S, removed_feature, S_mi)
# if n_features == 'auto', let's check the S_mi to stop
# Deprecated.
if False and self.n_features == 'auto' and \
(self.early_stop_steps > 0 and p - len(S) > self.early_stop_steps):
# smooth the 1st derivative of the MI values of previously sel
MI_dd = signal.savgol_filter(S_mi[1:], 9, 2, 1)
# does the mean of the last 5 converge to 0?
if np.abs(np.mean(MI_dd[-5:])) < 1e-3:
logging.info("Early stopping criteria reached.")
break
S = sorted(S)
#-------------------------------------------------------------------#
# Generate feature importance scores. #
#-------------------------------------------------------------------#
S, best_score, feature_importances, feature_raking = \
self._generate_feature_importances(best_soln=S, Q=feature_mi_matrix, num_features=num_features)
#-------------------------------------------------------------------#
# Save results #
#-------------------------------------------------------------------#
num_features_sel = n_features
self.n_features_ = len(S)
self.feature_importances_ = feature_importances
self._support_mask = np.zeros(p, dtype=np.bool)
self._support_mask[S] = True
#self.mi_ = S_mi
self._print_end_result(num_features_sel=num_features_sel, num_features=num_features, best_soln=S)
return self
def _print_init_result(self):
logging.info("Started MIBS.")
logging.info(" - Method : {}".format(self.method))
logging.info(" - Num. threads : {}".format(self.n_jobs))
logging.info(" - Num. features : {}".format(self.n_features))
logging.info("{0:>5}{1:>5}{2:15}".format("Iter", "Rmv", " Current MI"))
def _print_curr_result(self, S, selected, MIs):
logging.info("{0:>5}{1:>5}{2:+14.6E}".format(len(S), selected, MIs[-1]))
def _print_end_result(self, num_features_sel, num_features, best_soln):
logging.info("Done MIBS.")
logging.info(" - Total feat. : {}".format(num_features))
logging.info(" - Target feat. : {}".format(num_features_sel))
logging.info(" - Actual feat. : {}".format(len(best_soln)))
[docs]class MutualInformationConicOptimization(MutualInformationBase):
"""
Conic optimization approach for feature selection.
This class implements a conic optimization based feature selection method with mutual information (MI) measure [1]_. This idea is to formulate the selection problem as a pure-binary quadratic optimization problem, which can be heuristically solved by an efficient randomization algorithm via semidefinite programming [2]_. Optimization software **Colin** [6]_ is used for solving the underlying conic optimization problems.
Parameters
----------
method : string, optional (default='JMI')
A string that specifies the mutual information measure. Possible options are:
- **JMI** : Joint Mutual Information [3]_
- **JMIM** : Joint Mutual Information Maximisation [4]_
- **MRMR** : Max-Relevance Min-Redundancy [5]_
k : int, optional (default=5)
An integer to specify number of samples for the kernel density estimation using KNN. A small integer between
3 and 10 is recommended. Note that this parameter is applicable only if `num_bins` is set to 0.
n_features : int or string, optional (default='auto')
A positive integer number that specifies the number of features to be selected.
If 'auto' is used, then this number will be determined automatically.
categorical : boolean, optional (default=True)
If `categorical` is `True`, then the dependent variable is assumed to be a categorical class label. Otherwise, the dependent variable is assumed to be a continuous variable.
n_jobs : int, optional (default=1)
The number of threads allowed for the computations. If `n_jobs` :math:`<= 0`, then it will be determined internally.
verbose : int, optional (default=0)
An integer number to specify the verbosity level. Available options are:
- 0: Disable output.
- 1: Display summary results.
- 2: Displays all results.
scale_data : boolean, optional (default=True)
An boolean flag that specifies if the input data input shall be scaled first.
num_bins : int, optional (default=0)
An integer number to specify the number of bins allowed for data binning (bucketing). If :math:`0`, then data binning will be disabled, and KNN will be used for kernal density estimation.
random_state : int, optional (default=0)
An integer number that specifies the random seed number.
max_roundings : int, optional (default=0)
An positive integer to specify the number of iterations allowed for the rounding solution process in [2]_. If `max_roundings` is 0, then this number will be determined internally.
Attributes
----------
n_features_ : int
The number of selected features.
feature_importances_ : array of shape n_features
The feature importance scores of the selected features.
Examples
--------
This implementation follows the scikit-learn API convention:
- ``fit(X, y)``
- ``transform(X)``
- ``fit_transform(X, y)``
References
----------
.. [1] T Naghibi, S Hoffmann and B Pfister, "A semidefinite programming based search strategy for feature selection with mutual information measure", IEEE Transactions on Pattern Analysis and Machine Intelligence, 37(8), pp. 1529--1541, 2015. [`Pre-print <http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.739.8516&rep=rep1&type=pdf>`_]
.. [2] M Goemans and D Williamson, "Improved approximation algorithms for maximum cut and satisfiability problems using semidefinite programming", J. ACM, 42(6), pp. 1115--1145, 1995 [`Pre-print <http://www-math.mit.edu/~goemans/PAPERS/maxcut-jacm.pdf>`_]
.. [3] H Yang and J Moody, "Data Visualization and Feature Selection: New Algorithms for Nongaussian Data", NIPS 1999. [`Pre-print <https://papers.nips.cc/paper/1779-data-visualization-and-feature-selection-new-algorithms-for-nongaussian-data.pdf>`_]
.. [4] M Bennasar, Y Hicks, abd R Setchi, "Feature selection using Joint Mutual Information Maximisation", Expert Systems with Applications, 42(22), pp. 8520--8532, 2015 [`pre-print <https://core.ac.uk/download/pdf/82448198.pdf>`_]
.. [5] H Peng, F Long, and C Ding, "Feature selection based on mutual information criteria of max-dependency, max-relevance, and min-redundancy", IEEE Transactions on Pattern Analysis and Machine Intelligence, 27(8), pp. 1226--1238, 2005. [`Pre-print <http://ranger.uta.edu/~chqding/papers/mRMR_PAMI.pdf>`_]
.. [6] Colin: Conic-form Linear Optimizer (`www.colinopt.org <www.colinopt.org>`_).
"""
def __init__(self,
method='JMI',
k=5,
n_features='auto',
categorical=True,
n_jobs=0,
verbose=0,
scale_data=True,
num_bins=0,
random_state=0,
max_roundings=0):
# Call base constructor.
super(MutualInformationConicOptimization, self).__init__(
method, k, n_features, categorical,
n_jobs, verbose, scale_data, num_bins)
self.random_state = random_state
self.max_roundings = max_roundings
# Attributes.
self.n_features_ = 0
self.feature_importances_ = []
[docs] def fit(self, X, y):
"""
Fits the feature selection with conic optimization approach.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
The training input samples.
y : array-like, shape = [n_samples]
The target values.
"""
# Colin
import colinpy
#from colinpy import *
from colinpy import ClnModel, ClnError
#-------------------------------------------------------------------#
# Initialize the parameters. #
#-------------------------------------------------------------------#
self.X, y = self._init_data(X, y)
n, num_features = X.shape
self.y = y.reshape(n, 1)
# list of features
S = list(range(num_features))
#-------------------------------------------------------------------#
# Create the MI matrix. #
#-------------------------------------------------------------------#
# Notation:
# - N : num_features
# - P : num_features_sel
Q = np.zeros((num_features, num_features))
Q[:] = 0.0
# Calculate number of the selected features and the parameter.
if self.n_features == 'auto':
num_features_sel = min(1000, max(1, num_features * 0.2))
else:
num_features_sel = min(num_features, self.n_features)
offdiagonal_param = -0.5 / (num_features_sel - 1)
# Calculate the number of rounding iterations.
if self.max_roundings <= 0:
self.max_roundings = num_features_sel
# Populate info.
self._print_init_result(num_features_sel, num_features, offdiagonal_param)
# Calculate the MI matrix.
logging.info("Started calculating full MI matrix.")
if self.categorical:
Nx = []
classes = np.unique(y)
sum_y = {}
for yi in classes:
sum_y[yi] = np.sum(y == yi)
for yi in y.flatten():
Nx.append(sum_y[yi])
if True:
# Parallelize the outer loop - faster.
Qtri = Parallel(n_jobs=self.n_jobs)(delayed(get_mico_vector)(
self, self.k, list(range(i, num_features, 1)), s, offdiagonal_param, self._are_data_binned(), Nx if self.categorical else None, 1
) for i, s in enumerate(S))
for i, s in enumerate(S):
Q[s, list(range(i, num_features, 1))] = Qtri[s]
else:
# Parallelize the outer loop - slower.
for i, s in enumerate(S):
S2 = list(range(i, num_features, 1))
Q[s, S2] = get_mico_vector(self, self.k, S2, s, offdiagonal_param, self._are_data_binned(), Nx if self.categorical else None, self.n_jobs)
# Ensure the MI matrix is symmetric.
for i in range(num_features):
for j in range(i + 1, num_features):
Q[j, i] = Q[i, j]
# Calculate Q^T e.
QTe = np.zeros(num_features)
for s in S:
QTe[s] = Q[s, :].sum()
assert(QTe[s] == Q[:, s].sum())
# Create Q^u matrix.
# The first row/column is the auxiliary variable.
Qu = np.zeros((num_features + 1, num_features + 1))
Qu[0, 0] = 0.0
Qu[1:(num_features + 1), 1:(num_features + 1)] = Q
Qu[0, 1:(num_features + 1)] = QTe
Qu[1:(num_features + 1), 0] = QTe
#-------------------------------------------------------------------#
# Input optimization model. #
# #
# See #
# A Semidefinite Programming Based Search Strategy for Feature #
# Selection with Mutual Information Measure, Eq (26) #
#-------------------------------------------------------------------#
logging.info("Started creating semidefinite optimization model.")
def map_ij_to_k(i, j, size):
return i * size + j
def map_k_to_ij(k, size):
return int(k/size), int(k%size)
# Cone information.
semidefinite_cone = list(range((num_features + 1) * (num_features + 1)))
# Constraint 1.
row1_idx = []
row1_val = []
for i in range(1, num_features + 1):
for j in range(1, num_features + 1):
row1_idx += [map_ij_to_k(i, j, num_features + 1)]
row1_val += [1.0]
row1_rhs = (2.0 * num_features_sel - num_features) ** 2
# Constraint 2.
row2_idx = []
row2_val = []
for i in range(1, num_features + 1):
row2_idx += [map_ij_to_k(0, i, num_features + 1)]
row2_val += [1.0]
for i in range(1, num_features + 1):
row2_idx += [map_ij_to_k(i, 0, num_features + 1)]
row2_val += [1.0]
row2_rhs = 2.0 * (2.0 * num_features_sel - num_features)
# Model.
CLN_INFINITY = ClnModel.get_infinity()
model = ClnModel()
try:
# Step 1. Create a model and change the parameters.
# Create an empty model.
model.create_mdl()
# Set parameters.
model.set_ipa("Model/Verbose", 3 if self.verbose >= 2 else self.verbose)
model.set_ipa("Model/Presolver/PresolveLv", 1)
model.set_ipa("Ips/Solver/NumThreads", self.n_jobs)
model.set_ipa("Ips/Solver/Type", 2)
# Step 2. Input model.
# Change to maximization problem.
model.set_max_obj_sense()
# Add variables.
# One semidefinite block.
for i in range(0, num_features + 1):
for j in range(0, num_features + 1):
model.add_col(-CLN_INFINITY, CLN_INFINITY, Qu[i, j], [], [])
# Add constraints.
# - Cnstraint 1:
# \sum_{i,j}^N Y_{ij} = (2P - N)^2.
model.add_row(row1_rhs, row1_rhs, row1_idx, row1_val)
# - Constraint 2:
# \sum_{i}^N Y_{i0} + \sum_{i}^N Y_{0i} = 2 * (2P - N).
model.add_row(row2_rhs, row2_rhs, row2_idx, row2_val)
# - Constraint 3:
# diag(Y) = e.
for i in range(0, num_features + 1):
model.add_row(1.0, 1.0, [map_ij_to_k(i, i, num_features + 1)], [1.0])
# - Add semidefinite conic constraint.
model.add_semi_definite_cone(semidefinite_cone)
# Step 3. Solve the problem and populate the result.
# Solve the problem.
model.solve_prob()
if not model.has_solution():
logging.warning("Resolving the problem again.")
model.set_ipa("Ips/Solver/Type", 3)
model.solve_prob()
except ClnError as e:
logging.error("Received Colin exception.")
logging.error(" - Explanation : {}".format(e.message))
logging.error(" - Code : {}".format(e.code))
except Exception as e:
logging.error("Received exception.")
logging.error(" - Explanation : {}".format(e))
finally:
# Step 4. Display the result and free the model.
if self.verbose >= 1:
model.display_results()
# Retrieve result code, optimization status, and solution status.
result = int(model.get_solver_result())
opt_status = model.get_solver_opt_status()
sol_status = model.get_solver_sol_status()
has_solution = model.has_solution()
# Note: Non-optimal status (ENM_OPT_STATUS_OPTIMAL).
make_dir("./log/")
if opt_status != 1:
if has_solution:
# TODO Support SDP in LP/MPS output.
# Can continue.
logging.warning("Colin failed to solve the problem to optimality, but at least a primal solution is available.")
instance = get_time_stamp()
filename = "./log/warn_code_{0}_time_{1}.lp".format(opt_status, instance)
#model.write_prob(filename)
else:
# Cannot continue.
instance = get_time_stamp()
filename = "./log/err_code_{0}_time_{1}.lp".format(opt_status, instance)
#model.write_prob(filename)
msg = "Colin failed to solve the problem to optimality. Terminated."
logging.error(msg)
raise ValueError(msg)
logging.info(" - Result : {0} ({1})".format(model.explain_result(result), result))
logging.info(" - Opt status : {0} ({1})".format(model.explain_opt_status(opt_status), opt_status))
logging.info(" - Sol status : {0} ({1})".format(model.explain_sol_status(sol_status), sol_status))
logging.info(" - Has solution : {0}".format(has_solution))
else:
logging.info("Colin solved the problem to optimality.")
if has_solution:
prim_soln = model.get_prim_soln()
model.free_mdl()
# Create covariance matrix.
logging.info("Started generating covariance matrix.")
mean_vec = np.zeros(num_features + 1)
cov_mat = []
for i in range(0, num_features + 1):
for j in range(0, num_features + 1):
cov_mat.append(prim_soln[map_ij_to_k(i, j, num_features + 1)])
#cov_mat = list(map(lambda x : round(x, 4), cov_mat))
#print(cov_mat)
# Perturbation.
for k in range(0, (num_features + 1) * (num_features + 1)):
i, j = map_k_to_ij(k, num_features + 1)
if i == j:
# Add eps to diagonal.
pass
cov_mat[k] += 1.E-4
cov_mat = np.matrix(cov_mat).reshape((num_features + 1), (num_features + 1))
#print("cov_mat", cov_mat)
# Rounding solution.
best_diff = math.inf
best_score = -math.inf
best_soln = []
pdf = stats.multivariate_normal(mean_vec, cov_mat)
num_roundings = 0
if DEBUG:
print("Primal soln: {}".format(prim_soln))
dbg_opt_val = 0
for i in range(0, num_features + 1):
for j in range(0, num_features + 1):
dbg_opt_val += prim_soln[i * (num_features + 1) + j] * Qu[i, j]
print("Obj: {}", dbg_opt_val)
for i in range(0, num_features + 1):
print(Qu[i, 0:(num_features + 1)])
logging.info("Started rounding process.")
logging.info(" - Max roundings : {}".format(self.max_roundings))
logging.info("{0:>5}{1:>5}{2:15}".format("Iter", "Diff", " ObjValue"))
seed = self.random_state
while num_roundings < self.max_roundings:
sampled_pt = pdf.rvs(1, random_state=seed)
seed += 1
ref_pt = sampled_pt[0]
if abs(ref_pt) < 0.1:
#print("ref_pt", ref_pt)
continue
# Solution rounding.
if ref_pt >= 0:
curr_soln = [i for i, e in enumerate(sampled_pt[1:len(sampled_pt)]) if e >= 0]
else:
curr_soln = [i for i, e in enumerate(sampled_pt[1:len(sampled_pt)]) if e <= 0]
curr_soln_binary = self._convert_idx_array_to_mask_array(curr_soln, num_features)
msg = ""
if True:
# Calculate feature difference.
curr_features_sel = np.sum(curr_soln_binary)
curr_diff = abs(curr_features_sel - num_features_sel)
curr_score = self._calc_xQx(Q, curr_soln_binary)
# Update statistics.
if curr_diff < best_diff:
best_diff = curr_diff
best_score = curr_score
best_soln = curr_soln
msg += "*"
elif curr_diff == best_diff and best_diff == 0:
# Check objective value.
# Update statistics.
if curr_score > best_score:
best_score = curr_score
best_soln = curr_soln
msg += "**"
else:
# Calculate the score.
curr_score = self._calc_xQx(Q, curr_soln_binary)
# Update statistics.
if curr_score > best_score or best_score == -math.inf:
best_score = curr_score
best_soln = curr_soln
msg += "*"
# Populate information.
num_roundings += 1
logging.info("{0:>5}{1:>5}{2:+14.6E} {3}".format(num_roundings, int(curr_diff), curr_score, msg))
#-------------------------------------------------------------------#
# Generate feature importance scores. #
#-------------------------------------------------------------------#
best_soln, best_score, feature_importances, feature_raking = \
self._generate_feature_importances(best_soln, Q, num_features)
#-------------------------------------------------------------------#
# Save results #
#-------------------------------------------------------------------#
self.n_features_ = len(best_soln)
self.feature_importances_ = feature_importances
self._support_mask = np.zeros(num_features, dtype=np.bool)
self._support_mask[best_soln] = True
self._print_end_result(num_features_sel, num_features, best_soln)
return self
def _print_init_result(self, num_features_sel, num_features, offdiagonal_param):
logging.info("Started MICO.")
logging.info(" - Method : {}".format(self.method))
logging.info(" - Num. threads : {}".format(self.n_jobs))
logging.info(" - Tot. features : {}".format(num_features))
logging.info(" - Sel. features : {}".format(num_features_sel))
logging.info(" - Off-diag param: {}".format(offdiagonal_param))
def _print_end_result(self, num_features_sel, num_features, best_soln):
logging.info("Done MICO.")
logging.info(" - Total feat. : {}".format(num_features))
logging.info(" - Target feat. : {}".format(num_features_sel))
logging.info(" - Actual feat. : {}".format(len(best_soln)))
def test_mi():
import pandas as pd
import numpy as np
from pyitlib import discrete_random_variable as drv
#X = np.array((1.0, 2.0, 1.0, 2.0, 4.12, 4.123, 6.123))
#print(drv.entropy(X))
X = np.array((1.0, 2.0, 1.0, 2.0, 4.123, 4.123, 10.123, 1.1))
#print(drv.entropy(X))
#print(pd.qcut(np.array(X), 4, retbins=True))
#print(pd.qcut(np.array(X), 4, retbins=True)[1])
if False:
# Test bin.
print("=" * 80)
print("Testing bin")
print("-" * 80)
num_bins = 3
# - Binned with pandas.
pd_bins = pd.cut(np.array(X), num_bins, retbins=True)[1]
labels = [i for i in range(num_bins)]
pd_data = np.array(pd.cut(X, bins=pd_bins , labels=labels, include_lowest=True))
print("Binned with PD.")
print(" - Bin : {}".format(pd_bins))
print(" - Data : {}".format(pd_data))
#print(type(pd.cut(X, bins=bins, labels=labels, include_lowest=True)))
# - Binned with NP.
np_bin_res = np.histogram(X, bins=num_bins)
np_data = np_bin_res[0]
np_bins = np_bin_res[1]
print("Binned with NP.")
print(" - Bin : {}".format(np_bins))
print(" - Data : {}".format(np_data))
# - Binned with SKL.
X = np.array([[ -3., 5., 15 ],
[ 0., 6., 14 ],
[ 6., 3., 11 ],
[ 16., -3.,13 ],
[ 12., -3.,10 ],
[ 16.4, -2.,11.2 ],
[ 26., 3., 211]
]
)
skl_data = bin_data(X, num_bins=3)
print("Binned with SKL.")
print(" - Num features : {}".format(X.shape[1]))
#print(" - Bin : {}".format(pd_bins))
print(" - Data :\n {}".format(skl_data))
print("Scaled dense matrix with SKL.")
den_data = scale_data(X)
print(" - Num features : {}".format(den_data.shape[1]))
print(" - Data :\n {}".format(den_data))
print(" - Mean :\n {}".format(den_data.mean(axis=0)))
print(" - Std :\n {}".format(den_data.std(axis=0)))
print("Scaled sparse matrix with SKL.")
csc_data = sparse.csc_matrix(X)
csr_data = csc_data.tocsr()
spa_data = scale_data(csr_data)
print(" - Num features : {}".format(spa_data.shape[1]))
print(" - Mean :\n {}".format(spa_data.mean(axis=0)))
print("=" * 80)
#h = np.histogram2d(X, Y, bins=bins)[0]
#print(X)
#print(np.array(df['qcut4']))
#print(drv.entropy(data))
#print(np.hstack(np.array([X, X])))
def scale_data(X):
""""
Scale the data using SKL.
References
----------
[1] http://www.faqs.org/faqs/ai-faq/neural-nets/part2/section-16.html
"""
from sklearn.preprocessing import MinMaxScaler, StandardScaler, MaxAbsScaler
# X must be stored in a row-wise order.
if sparse.issparse(X):
scalar = MaxAbsScaler()
else:
scalar = StandardScaler()#MinMaxScaler() # StandardScaler
return scalar.fit_transform(X)
def bin_data(X, num_bins):
""""
Bin the data using SKL.
"""
# X must be stored in a row-wise order.
from sklearn.preprocessing import KBinsDiscretizer
if type(num_bins) is int:
#bin_set = [ min(num_bins, len(array))]
# @todo Check unique value in each feature.
bin_set = [ num_bins for i in range(X.shape[1]) ]
else:
bin_set = num_bins
est = KBinsDiscretizer(n_bins=bin_set, encode='ordinal', strategy='uniform').fit(X)
return est.transform(X)
if __name__ == '__main__':
test_mi()