Greedy rule list. Greedily splits on one feature at a time along a single path. Tries to find rules which maximize the probability of class 1. Currently only supports binary classification.
Expand source code
'''Greedy rule list.
Greedily splits on one feature at a time along a single path.
Tries to find rules which maximize the probability of class 1.
Currently only supports binary classification.
'''
import math
from copy import deepcopy
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils import check_X_y
from sklearn.utils.multiclass import unique_labels
from sklearn.utils.validation import check_array, check_is_fitted
from sklearn.tree import DecisionTreeClassifier
from imodels.rule_list.rule_list import RuleList
from imodels.util.arguments import check_fit_arguments
class GreedyRuleListClassifier(BaseEstimator, RuleList, ClassifierMixin):
def __init__(self, max_depth: int = 5, class_weight=None,
criterion: str = 'gini'):
'''
Params
------
max_depth
Maximum depth the list can achieve
criterion: str
Criterion used to split
'gini', 'entropy', or 'log_loss'
'''
self.max_depth = max_depth
self.class_weight = class_weight
self.criterion = criterion
self.depth = 0 # tracks the fitted depth
def fit(self, X, y, depth: int = 0, feature_names=None, verbose=False):
"""
Params
------
X: array_like
Feature set
y: array_like
target variable
depth
the depth of the current layer (used to recurse)
"""
X, y, feature_names = check_fit_arguments(self, X, y, feature_names)
return self.fit_node_recursive(X, y, depth=0, verbose=verbose)
def fit_node_recursive(self, X, y, depth: int, verbose):
# base case 1: no data in this group
if y.size == 0:
return []
# base case 2: all y is the same in this group
elif np.all(y == y[0]):
return [{'val': y[0], 'num_pts': y.size}]
# base case 3: max depth reached
elif depth >= self.max_depth:
return []
# recursively generate rule list
else:
# find a split with the best value for the criterion
m = DecisionTreeClassifier(max_depth=1, criterion=self.criterion)
m.fit(X, y)
col = m.tree_.feature[0]
cutoff = m.tree_.threshold[0]
# col, cutoff, criterion_val = self._find_best_split(X, y)
y_left = y[X[:, col] < cutoff] # left-hand side data
y_right = y[X[:, col] >= cutoff] # right-hand side data
# put higher probability of class 1 on the right-hand side
if len(y_left) > 0 and np.mean(y_left) > np.mean(y_right):
flip = True
tmp = deepcopy(y_left)
y_left = deepcopy(y_right)
y_right = tmp
x_left = X[X[:, col] >= cutoff]
else:
flip = False
x_left = X[X[:, col] < cutoff]
# print
if verbose:
print(
f'{np.mean(100 * y):.2f} -> {self.feature_names_[col]} -> {np.mean(100 * y_left):.2f} ({y_left.size}) {np.mean(100 * y_right):.2f} ({y_right.size})')
# save info
par_node = [{
'col': self.feature_names_[col],
'index_col': col,
'cutoff': cutoff,
'val': np.mean(y), # values before splitting
'flip': flip,
'val_right': np.mean(y_right),
'num_pts': y.size,
'num_pts_right': y_right.size
}]
# generate tree for the non-leaf data
par_node = par_node + \
self.fit_node_recursive(x_left, y_left, depth + 1, verbose=verbose)
self.depth += 1 # increase the depth since we call fit once
self.rules_ = par_node
self.complexity_ = len(self.rules_)
self.classes_ = unique_labels(y)
return par_node
def predict_proba(self, X):
check_is_fitted(self)
X = check_array(X)
n = X.shape[0]
probs = np.zeros(n)
for i in range(n):
x = X[i]
for j, rule in enumerate(self.rules_):
if j == len(self.rules_) - 1:
probs[i] = rule['val']
elif x[rule['index_col']] >= rule['cutoff']:
probs[i] = rule['val_right']
break
return np.vstack((1 - probs, probs)).transpose() # probs (n, 2)
def predict(self, X):
check_is_fitted(self)
X = check_array(X)
return np.argmax(self.predict_proba(X), axis=1)
def __str__(self):
s = ''
for rule in self.rules_:
s += f"mean {rule['val'].round(3)} ({rule['num_pts']} pts)\n"
if 'col' in rule:
s += f"if {rule['col']} >= {rule['cutoff']} then {rule['val_right'].round(3)} ({rule['num_pts_right']} pts)\n"
return s
def _print_list(self):
'''Print out the list in a nice way
'''
s = ''
def red(s):
return f"\033[91m{s}\033[00m"
def cyan(s):
return f"\033[96m{s}\033[00m"
def rule_name(rule):
if rule['flip']:
return '~' + rule['col']
return rule['col']
# rule = self.rules_[0]
# s += f"{red((100 * rule['val']).round(3))}% IwI ({rule['num_pts']} pts)\n"
for rule in self.rules_:
s += f"\t{'':>35} => {cyan((100 * rule['val']).round(2)):>6}% risk ({rule['num_pts']} pts)\n"
# s += f"\t{'Else':>45} => {cyan((100 * rule['val']).round(2)):>6}% IwI ({rule['val'] * rule['num_pts']:.0f}/{rule['num_pts']} pts)\n"
if 'col' in rule:
# prefix = f"if {rule['col']} >= {rule['cutoff']}"
prefix = f"if {rule_name(rule)}"
val = f"{100 * rule['val_right'].round(3):.4}"
s += f"{prefix:>43} ===> {red(val)}% risk ({rule['num_pts_right']} pts)\n"
# rule = self.rules_[-1]
# s += f"{red((100 * rule['val']).round(3))}% IwI ({rule['num_pts']} pts)\n"
print(s)
######## HERE ONWARDS CUSTOM SPLITTING (DEPRECATED IN FAVOR OF SKLEARN STUMP) ########
######################################################################################
def _find_best_split(self, x, y):
"""
Find the best split from all features
returns: the column to split on, the cutoff value, and the actual criterion_value
"""
col = None
min_criterion_val = 1e10
cutoff = None
# iterating through each feature
for i, c in enumerate(x.T):
# find the best split of that feature
criterion_val, cur_cutoff = self._split_on_feature(c, y)
# found perfect cutoff
if criterion_val == 0:
return i, cur_cutoff, criterion_val
# check if it's best so far
elif criterion_val <= min_criterion_val:
min_criterion_val = criterion_val
col = i
cutoff = cur_cutoff
return col, cutoff, min_criterion_val
def _split_on_feature(self, col, y):
"""
col: the column we split on
y: target var
"""
min_criterion_val = 1e10
cutoff = 0.5
# iterate through each value in the column
for value in np.unique(col):
# separate y into 2 groups
y_predict = col < value
# get criterion val of this split
criterion_val = self._weighted_criterion(y_predict, y)
# check if it's the smallest one so far
if criterion_val <= min_criterion_val:
min_criterion_val = criterion_val
cutoff = value
return min_criterion_val, cutoff
def _weighted_criterion(self, split_decision, y_real):
"""Returns criterion calculated over a split
split decision, True/False, and y_true can be multi class
"""
if split_decision.shape[0] != y_real.shape[0]:
print('They have to be the same length')
return None
# choose the splitting criterion
if self.criterion == 'entropy':
criterion_func = self._entropy_criterion
elif self.criterion == 'gini':
criterion_func = self._gini_criterion
elif self.criterion == 'neg_corr':
return self._neg_corr_criterion(split_decision, y_real)
# left-hand side criterion
s_left = criterion_func(y_real[split_decision])
# right-hand side criterion
s_right = criterion_func(y_real[~split_decision])
# overall criterion, again weighted average
n = y_real.shape[0]
if self.class_weight is not None:
sample_weights = np.ones(n)
for c in self.class_weight.keys():
idxs_c = y_real == c
sample_weights[idxs_c] = self.class_weight[c]
total_weight = np.sum(sample_weights)
weight_left = np.sum(sample_weights[split_decision]) / total_weight
# weight_right = np.sum(sample_weights[~split_decision]) / total_weight
else:
tot_left_samples = np.sum(split_decision == 1)
weight_left = tot_left_samples / n
s = weight_left * s_left + (1 - weight_left) * s_right
return s
def _gini_criterion(self, y):
'''Returns gini index for one node
= sum(pc * (1 – pc))
'''
s = 0
n = y.shape[0]
classes = np.unique(y)
# for each class, get entropy
for c in classes:
# weights for each class
n_c = np.sum(y == c)
p_c = n_c / n
# weighted avg
s += p_c * (1 - p_c)
return s
def _entropy_criterion(self, y):
"""Returns entropy of a divided group of data
Data may have multiple classes
"""
s = 0
n = len(y)
classes = set(y)
# for each class, get entropy
for c in classes:
# weights for each class
weight = sum(y == c) / n
def _entropy_from_counts(c1, c2):
"""Returns entropy of a group of data
c1: count of one class
c2: count of another class
"""
if c1 == 0 or c2 == 0: # when there is only one class in the group, entropy is 0
return 0
def _entropy_func(p): return -p * math.log(p, 2)
p1 = c1 * 1.0 / (c1 + c2)
p2 = c2 * 1.0 / (c1 + c2)
return _entropy_func(p1) + _entropy_func(p2)
# weighted avg
s += weight * _entropy_from_counts(sum(y == c), sum(y != c))
return s
def _neg_corr_criterion(self, split_decision, y):
'''Returns negative correlation between y
and the binary splitting variable split_decision
y must be binary
'''
if np.unique(y).size < 2:
return 0
elif np.unique(y).size != 2:
print('y must be binary output for corr criterion')
# y should be 1 more often on the "right side" of the split
if y.sum() < y.size / 2:
y = 1 - y
return -1 * np.corrcoef(split_decision.astype(np.int), y)[0, 1]
Classes
class GreedyRuleListClassifier (max_depth: int = 5, class_weight=None, criterion: str = 'gini')
-
Base class for all estimators in scikit-learn.
Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).Params
max_depth Maximum depth the list can achieve criterion: str Criterion used to split 'gini', 'entropy', or 'log_loss'
Expand source code
class GreedyRuleListClassifier(BaseEstimator, RuleList, ClassifierMixin): def __init__(self, max_depth: int = 5, class_weight=None, criterion: str = 'gini'): ''' Params ------ max_depth Maximum depth the list can achieve criterion: str Criterion used to split 'gini', 'entropy', or 'log_loss' ''' self.max_depth = max_depth self.class_weight = class_weight self.criterion = criterion self.depth = 0 # tracks the fitted depth def fit(self, X, y, depth: int = 0, feature_names=None, verbose=False): """ Params ------ X: array_like Feature set y: array_like target variable depth the depth of the current layer (used to recurse) """ X, y, feature_names = check_fit_arguments(self, X, y, feature_names) return self.fit_node_recursive(X, y, depth=0, verbose=verbose) def fit_node_recursive(self, X, y, depth: int, verbose): # base case 1: no data in this group if y.size == 0: return [] # base case 2: all y is the same in this group elif np.all(y == y[0]): return [{'val': y[0], 'num_pts': y.size}] # base case 3: max depth reached elif depth >= self.max_depth: return [] # recursively generate rule list else: # find a split with the best value for the criterion m = DecisionTreeClassifier(max_depth=1, criterion=self.criterion) m.fit(X, y) col = m.tree_.feature[0] cutoff = m.tree_.threshold[0] # col, cutoff, criterion_val = self._find_best_split(X, y) y_left = y[X[:, col] < cutoff] # left-hand side data y_right = y[X[:, col] >= cutoff] # right-hand side data # put higher probability of class 1 on the right-hand side if len(y_left) > 0 and np.mean(y_left) > np.mean(y_right): flip = True tmp = deepcopy(y_left) y_left = deepcopy(y_right) y_right = tmp x_left = X[X[:, col] >= cutoff] else: flip = False x_left = X[X[:, col] < cutoff] # print if verbose: print( f'{np.mean(100 * y):.2f} -> {self.feature_names_[col]} -> {np.mean(100 * y_left):.2f} ({y_left.size}) {np.mean(100 * y_right):.2f} ({y_right.size})') # save info par_node = [{ 'col': self.feature_names_[col], 'index_col': col, 'cutoff': cutoff, 'val': np.mean(y), # values before splitting 'flip': flip, 'val_right': np.mean(y_right), 'num_pts': y.size, 'num_pts_right': y_right.size }] # generate tree for the non-leaf data par_node = par_node + \ self.fit_node_recursive(x_left, y_left, depth + 1, verbose=verbose) self.depth += 1 # increase the depth since we call fit once self.rules_ = par_node self.complexity_ = len(self.rules_) self.classes_ = unique_labels(y) return par_node def predict_proba(self, X): check_is_fitted(self) X = check_array(X) n = X.shape[0] probs = np.zeros(n) for i in range(n): x = X[i] for j, rule in enumerate(self.rules_): if j == len(self.rules_) - 1: probs[i] = rule['val'] elif x[rule['index_col']] >= rule['cutoff']: probs[i] = rule['val_right'] break return np.vstack((1 - probs, probs)).transpose() # probs (n, 2) def predict(self, X): check_is_fitted(self) X = check_array(X) return np.argmax(self.predict_proba(X), axis=1) def __str__(self): s = '' for rule in self.rules_: s += f"mean {rule['val'].round(3)} ({rule['num_pts']} pts)\n" if 'col' in rule: s += f"if {rule['col']} >= {rule['cutoff']} then {rule['val_right'].round(3)} ({rule['num_pts_right']} pts)\n" return s def _print_list(self): '''Print out the list in a nice way ''' s = '' def red(s): return f"\033[91m{s}\033[00m" def cyan(s): return f"\033[96m{s}\033[00m" def rule_name(rule): if rule['flip']: return '~' + rule['col'] return rule['col'] # rule = self.rules_[0] # s += f"{red((100 * rule['val']).round(3))}% IwI ({rule['num_pts']} pts)\n" for rule in self.rules_: s += f"\t{'':>35} => {cyan((100 * rule['val']).round(2)):>6}% risk ({rule['num_pts']} pts)\n" # s += f"\t{'Else':>45} => {cyan((100 * rule['val']).round(2)):>6}% IwI ({rule['val'] * rule['num_pts']:.0f}/{rule['num_pts']} pts)\n" if 'col' in rule: # prefix = f"if {rule['col']} >= {rule['cutoff']}" prefix = f"if {rule_name(rule)}" val = f"{100 * rule['val_right'].round(3):.4}" s += f"{prefix:>43} ===> {red(val)}% risk ({rule['num_pts_right']} pts)\n" # rule = self.rules_[-1] # s += f"{red((100 * rule['val']).round(3))}% IwI ({rule['num_pts']} pts)\n" print(s) ######## HERE ONWARDS CUSTOM SPLITTING (DEPRECATED IN FAVOR OF SKLEARN STUMP) ######## ###################################################################################### def _find_best_split(self, x, y): """ Find the best split from all features returns: the column to split on, the cutoff value, and the actual criterion_value """ col = None min_criterion_val = 1e10 cutoff = None # iterating through each feature for i, c in enumerate(x.T): # find the best split of that feature criterion_val, cur_cutoff = self._split_on_feature(c, y) # found perfect cutoff if criterion_val == 0: return i, cur_cutoff, criterion_val # check if it's best so far elif criterion_val <= min_criterion_val: min_criterion_val = criterion_val col = i cutoff = cur_cutoff return col, cutoff, min_criterion_val def _split_on_feature(self, col, y): """ col: the column we split on y: target var """ min_criterion_val = 1e10 cutoff = 0.5 # iterate through each value in the column for value in np.unique(col): # separate y into 2 groups y_predict = col < value # get criterion val of this split criterion_val = self._weighted_criterion(y_predict, y) # check if it's the smallest one so far if criterion_val <= min_criterion_val: min_criterion_val = criterion_val cutoff = value return min_criterion_val, cutoff def _weighted_criterion(self, split_decision, y_real): """Returns criterion calculated over a split split decision, True/False, and y_true can be multi class """ if split_decision.shape[0] != y_real.shape[0]: print('They have to be the same length') return None # choose the splitting criterion if self.criterion == 'entropy': criterion_func = self._entropy_criterion elif self.criterion == 'gini': criterion_func = self._gini_criterion elif self.criterion == 'neg_corr': return self._neg_corr_criterion(split_decision, y_real) # left-hand side criterion s_left = criterion_func(y_real[split_decision]) # right-hand side criterion s_right = criterion_func(y_real[~split_decision]) # overall criterion, again weighted average n = y_real.shape[0] if self.class_weight is not None: sample_weights = np.ones(n) for c in self.class_weight.keys(): idxs_c = y_real == c sample_weights[idxs_c] = self.class_weight[c] total_weight = np.sum(sample_weights) weight_left = np.sum(sample_weights[split_decision]) / total_weight # weight_right = np.sum(sample_weights[~split_decision]) / total_weight else: tot_left_samples = np.sum(split_decision == 1) weight_left = tot_left_samples / n s = weight_left * s_left + (1 - weight_left) * s_right return s def _gini_criterion(self, y): '''Returns gini index for one node = sum(pc * (1 – pc)) ''' s = 0 n = y.shape[0] classes = np.unique(y) # for each class, get entropy for c in classes: # weights for each class n_c = np.sum(y == c) p_c = n_c / n # weighted avg s += p_c * (1 - p_c) return s def _entropy_criterion(self, y): """Returns entropy of a divided group of data Data may have multiple classes """ s = 0 n = len(y) classes = set(y) # for each class, get entropy for c in classes: # weights for each class weight = sum(y == c) / n def _entropy_from_counts(c1, c2): """Returns entropy of a group of data c1: count of one class c2: count of another class """ if c1 == 0 or c2 == 0: # when there is only one class in the group, entropy is 0 return 0 def _entropy_func(p): return -p * math.log(p, 2) p1 = c1 * 1.0 / (c1 + c2) p2 = c2 * 1.0 / (c1 + c2) return _entropy_func(p1) + _entropy_func(p2) # weighted avg s += weight * _entropy_from_counts(sum(y == c), sum(y != c)) return s def _neg_corr_criterion(self, split_decision, y): '''Returns negative correlation between y and the binary splitting variable split_decision y must be binary ''' if np.unique(y).size < 2: return 0 elif np.unique(y).size != 2: print('y must be binary output for corr criterion') # y should be 1 more often on the "right side" of the split if y.sum() < y.size / 2: y = 1 - y return -1 * np.corrcoef(split_decision.astype(np.int), y)[0, 1]
Ancestors
- sklearn.base.BaseEstimator
- RuleList
- sklearn.base.ClassifierMixin
Subclasses
Methods
def fit(self, X, y, depth: int = 0, feature_names=None, verbose=False)
-
Params
X: array_like Feature set y: array_like target variable depth the depth of the current layer (used to recurse)
Expand source code
def fit(self, X, y, depth: int = 0, feature_names=None, verbose=False): """ Params ------ X: array_like Feature set y: array_like target variable depth the depth of the current layer (used to recurse) """ X, y, feature_names = check_fit_arguments(self, X, y, feature_names) return self.fit_node_recursive(X, y, depth=0, verbose=verbose)
def fit_node_recursive(self, X, y, depth: int, verbose)
-
Expand source code
def fit_node_recursive(self, X, y, depth: int, verbose): # base case 1: no data in this group if y.size == 0: return [] # base case 2: all y is the same in this group elif np.all(y == y[0]): return [{'val': y[0], 'num_pts': y.size}] # base case 3: max depth reached elif depth >= self.max_depth: return [] # recursively generate rule list else: # find a split with the best value for the criterion m = DecisionTreeClassifier(max_depth=1, criterion=self.criterion) m.fit(X, y) col = m.tree_.feature[0] cutoff = m.tree_.threshold[0] # col, cutoff, criterion_val = self._find_best_split(X, y) y_left = y[X[:, col] < cutoff] # left-hand side data y_right = y[X[:, col] >= cutoff] # right-hand side data # put higher probability of class 1 on the right-hand side if len(y_left) > 0 and np.mean(y_left) > np.mean(y_right): flip = True tmp = deepcopy(y_left) y_left = deepcopy(y_right) y_right = tmp x_left = X[X[:, col] >= cutoff] else: flip = False x_left = X[X[:, col] < cutoff] # print if verbose: print( f'{np.mean(100 * y):.2f} -> {self.feature_names_[col]} -> {np.mean(100 * y_left):.2f} ({y_left.size}) {np.mean(100 * y_right):.2f} ({y_right.size})') # save info par_node = [{ 'col': self.feature_names_[col], 'index_col': col, 'cutoff': cutoff, 'val': np.mean(y), # values before splitting 'flip': flip, 'val_right': np.mean(y_right), 'num_pts': y.size, 'num_pts_right': y_right.size }] # generate tree for the non-leaf data par_node = par_node + \ self.fit_node_recursive(x_left, y_left, depth + 1, verbose=verbose) self.depth += 1 # increase the depth since we call fit once self.rules_ = par_node self.complexity_ = len(self.rules_) self.classes_ = unique_labels(y) return par_node
def predict(self, X)
-
Expand source code
def predict(self, X): check_is_fitted(self) X = check_array(X) return np.argmax(self.predict_proba(X), axis=1)
def predict_proba(self, X)
-
Expand source code
def predict_proba(self, X): check_is_fitted(self) X = check_array(X) n = X.shape[0] probs = np.zeros(n) for i in range(n): x = X[i] for j, rule in enumerate(self.rules_): if j == len(self.rules_) - 1: probs[i] = rule['val'] elif x[rule['index_col']] >= rule['cutoff']: probs[i] = rule['val_right'] break return np.vstack((1 - probs, probs)).transpose() # probs (n, 2)