Source code for pyexplainer.pyexplainer_pyexplainer

import copy
import math
import os
import random
import string
import warnings
import ipywidgets as widgets
import numpy as np
import pandas as pd
import scipy as sp
import sklearn
from IPython.core.display import display, HTML
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
from sklearn.ensemble import RandomForestClassifier
from pyexplainer.rulefit import RuleFit


[docs]def data_validation(data): """Validate that the given data format is a list of dictionary. Parameters ---------- data : :obj:`Any` Data to be validated. Returns ------- :obj:`bool` True: The data is a list of dictionary.\n False: The data is not a list of dictionary. """ valid = True if isinstance(data, list): for i in range(len(data)): if not isinstance(data[i], dict): print("Data Format Error - the input data should be a list of dictionary") valid = False break else: valid = False return valid
[docs]def id_generator(size=15, random_state=check_random_state(None)): """Generate unique ids for div tag which will contain the visualisation stuff from d3. Parameters ---------- size : :obj:`int` An integer that specifies the length of the returned id, default = 15. Size should be ion range 1 - 30(both included) random_state : :obj:`np.random.RandomState`, default is None. A RandomState instance. Returns ------- :obj:`str` A random identifier. """ if not isinstance(size, int): size = 15 if size <= 0 or size > 30: size = 15 if not isinstance(random_state, np.random.mtrand.RandomState): random_state = check_random_state(None) chars = list(string.ascii_uppercase + string.digits) return ''.join(random_state.choice(chars, size, replace=True))
[docs]def to_js_data(list_of_dict): """Transform python list to a str to be used inside the html <script><script/> Parameters ---------- list_of_dict : :obj:`list` Data to be transformed. Returns ------- :obj:`str` A str to represent a list of dict ending with ';' """ if data_validation(list_of_dict): return str(list_of_dict) + ";" else: print("Data to be transformed to the javascript format is not a python list of dict, hence '[{}];' is returned") return '[{}];'
[docs]class PyExplainer: """A PyExplainer object is able to load training data and an ML model to generate human-centric explanation and visualisation Parameters ---------- X_train : :obj:`pandas.core.frame.DataFrame` Training data X (Features) y_train : :obj:`pandas.core.series.Series` Training data y (Label) indep : :obj:`pandas.core.indexes.base.Index` independent variables (column names) dep : :obj:`str` dependent variables (column name) blackbox_model : :obj:`sklearn.ensemble.RandomForestClassifier` A global random forest model trained from sklearn class_label : :obj:`list` Classification labels, default = ['Clean', 'Defect'] top_k_rules : :obj:`int` Number of top positive and negative rules to be retrieved """ def __init__(self, X_train, y_train, indep, dep, blackbox_model, class_label=['Clean', 'Defect'], top_k_rules=3): if isinstance(X_train, pd.core.frame.DataFrame): self.X_train = X_train else: print("X_train should be type 'pandas.core.frame.DataFrame'") raise TypeError if isinstance(y_train, pd.core.series.Series): self.y_train = y_train else: print("y_train should be type 'pandas.core.series.Series'") raise TypeError if isinstance(indep, pd.core.indexes.base.Index): self.indep = indep else: print("indep (feature column names) should be type 'pandas.core.indexes.base.Index'") raise TypeError if isinstance(dep, str): self.dep = dep else: print("dep (label column name) should be type 'str'") raise TypeError if isinstance(blackbox_model, sklearn.ensemble.RandomForestClassifier): self.blackbox_model = blackbox_model else: print("The blackbox_model should be a Random Forest model trained from sklearn (" "sklearn.ensemble.RandomForestClassifier)") raise TypeError if isinstance(class_label, list): if len(class_label) == 2: self.class_label = class_label else: print("class_label should be a list with length of 2") raise ValueError else: print("class_label should be type 'list'") raise TypeError if isinstance(top_k_rules, int): if top_k_rules <= 0 or top_k_rules > 15: print("top_k_rules should be in range 1 - 15 (both included)") raise ValueError else: self.top_k_rules = top_k_rules else: print("top_k_rules should be type 'int'") raise TypeError self.bullet_data = [{}] self.risk_data = [{}] self.bullet_output = widgets.Output(layout={'border': '3px solid black'}) self.hbox_items = [] self.X_explain = None self.y_explain = None
[docs] def explain(self, X_explain, y_explain, top_k=3, max_rules=10, max_iter=10, cv=5, search_function='CrossoverInterpolation', debug=False): """Generate Rule Object Manually by passing X_explain and y_explain Parameters ---------- X_explain : :obj:`pandas.core.frame.DataFrame` Features to be explained by the local RuleFit model, can be seen as X_test y_explain : :obj:`pandas.core.series.Series` Label to be explained by the local RuleFit model, can be seen as y_test top_k : :obj:`int`, default is 3 Number of top rules to be retrieved max_rules : :obj:`int`, default is 10 Number of maximum rules to be generated max_iter : :obj:`int`, default is 10 Maximum number of iteration to be tuned in to the local RuleFit model cv : :obj:`int`, default is 5 Cross Validation to be tuned in to the local RuleFit model search_function : :obj:`str`, default is 'crossoverinterpolation' Name of the search function to be used to generate the instance used by RuleFit.fit() debug : :obj:`bool`, default is False True for debugging mode, False otherwise. Returns ------- :obj:`dict` A dict rule object including all of the data related to the local RuleFit model with the following keys, 'synthetic_data', 'synthetic_predictions', 'X_explain', 'y_explain', 'indep', 'dep', 'top_k_positive_rules', 'top_k_negative_rules'. Examples -------- >>> from pyexplainer.pyexplainer_pyexplainer import PyExplainer >>> import pandas as pd >>> from sklearn.ensemble import RandomForestClassifier >>> data = pd.read_csv('../tests/pyexplainer_test_data/activemq-5.0.0.csv', index_col = 'File') >>> dep = data.columns[-4] >>> indep = data.columns[0:(len(data.columns) - 4)] >>> X_train = data.loc[:, indep] >>> y_train = data.loc[:, dep] >>> blackbox_model = RandomForestClassifier(max_depth=3, random_state=0) >>> blackbox_model.fit(X_train, y_train) >>> class_label = ['Clean', 'Defect'] >>> py_explainer = PyExplainer(X_train, y_train, indep, dep, class_label, blackbox_model) >>> sample_test_data = pd.read_csv('../tests/pyexplainer_test_data/activemq-5.0.0.csv', index_col = 'File') >>> X_test = sample_test_data.loc[:, indep] >>> y_test = sample_test_data.loc[:, dep] >>> sample_explain_index = 0 >>> X_explain = X_test.iloc[[sample_explain_index]] >>> y_explain = y_test.iloc[[sample_explain_index]] >>> py_explainer.explain(X_explain, y_explain, search_function = 'crossoverinterpolation', top_k = 3, max_rules=30, max_iter =5, cv=5, debug = False) """ # check if X_explain is a DF if not isinstance(X_explain, pd.core.frame.DataFrame): print("X_explain (X_test) should be type 'pandas.core.frame.DataFrame'") raise ValueError # check if X_explain has the same num of cols as X_train if len(X_explain.columns) != len(X_explain.columns): print("X_explain should have the same number of columns as X_train") raise ValueError # check if y_explain is a Series if not isinstance(y_explain, pd.core.series.Series): print("y_explain (y_test) should be type 'pandas.core.series.Series'") raise ValueError self.set_top_k_rules(top_k) # Step 1 - Generate synthetic instances if search_function.lower() == 'crossoverinterpolation': synthetic_object = self.generate_instance_crossover_interpolation(X_explain, y_explain, debug=debug) elif search_function.lower() == 'randomperturbation': # This random perturbation approach to generate instances is used by LIME to gerate synthetic instances synthetic_object = self.generate_instance_random_perturbation(X_explain=X_explain, debug=debug) # Step 2 - Generate predictions of synthetic instances using the global model synthetic_instances = synthetic_object['synthetic_data'].loc[:, self.indep] synthetic_predictions = self.blackbox_model.predict(synthetic_instances) if 1 in synthetic_predictions and 0 in synthetic_predictions: one_class_problem = False else: one_class_problem = True if one_class_problem: print("Random Perturbation only generated one class for the prediction column which means\ Random Perturbation is not compatible with the current data.\ The 'Crossover and Interpolation' approach is used as the alternative.") synthetic_object = self.generate_instance_crossover_interpolation(X_explain, y_explain, debug=debug) synthetic_instances = synthetic_object['synthetic_data'].loc[:, self.indep] synthetic_predictions = self.blackbox_model.predict(synthetic_instances) if debug: n_defect_class = np.sum(synthetic_predictions) print('nDefect=', n_defect_class, 'from', len(synthetic_predictions)) # Step 3 - Build a RuleFit local model with synthetic instances # indep_index = [list(synthetic_instances.columns).index(i) for i in self.indep] local_rulefit_model = RuleFit(rfmode='classify', exp_rand_tree_size=False, random_state=0, max_rules=max_rules, cv=cv, max_iter=max_iter, n_jobs=-1) local_rulefit_model.fit(synthetic_instances.values, synthetic_predictions, feature_names=self.indep) if debug: print('Constructed a RuleFit model') # Step 4 Get rules from theRuleFit local model rules = local_rulefit_model.get_rules() rules = rules[rules.coef != 0].sort_values("importance", ascending=False) rules = rules[rules.type == 'rule'].sort_values("importance", ascending=False) top_k_positive_rules = rules[rules.coef > 0].sort_values("importance", ascending=False).head(top_k) top_k_positive_rules['Class'] = self.class_label[1] top_k_negative_rules = rules[rules.coef < 0].sort_values("importance", ascending=False).head(top_k) top_k_negative_rules['Class'] = self.class_label[0] rule_obj = {'synthetic_data': synthetic_instances, 'synthetic_predictions': synthetic_predictions, 'X_explain': X_explain, 'y_explain': y_explain, 'indep': self.indep, 'dep': self.dep, 'top_k_positive_rules': top_k_positive_rules, 'top_k_negative_rules': top_k_negative_rules} return rule_obj
[docs] def generate_bullet_data(self, parsed_rule_object): """Generate bullet chart data (a list of dict) to be implemented with d3.js chart. Parameters ---------- parsed_rule_object : :obj:`dict` Top rules parsed from Rule object. Returns ------- :obj:`list` A list of dict that contains the data needed to generate a bullet chart. """ X_explain = self.__get_X_explain() min_max_values = self.retrieve_X_explain_min_max_values() # Version 01 - only visualise for what to follow (Rules => Clean) bullet_data = [] for i in range(len(parsed_rule_object['top_tofollow_rules'])): # {'variable': 'MAJOR_COMMIT', 'lessthan': True, 'value': '1.550000011920929'} tmp_rule = parsed_rule_object['top_tofollow_rules'][i] tmp_min = int((min_max_values['min_values'][tmp_rule['variable']])) tmp_max = int(round(min_max_values['max_values'][tmp_rule['variable']])) tmp_interval = (tmp_max - tmp_min) / 10.0 tmp_threshold_value = round(float(tmp_rule['value']), 2) tmp_actual_value = round(X_explain[tmp_rule['variable']][0], 2) tmp_markers = [tmp_actual_value] plot_min = int(round(max(tmp_min, tmp_threshold_value - tmp_interval), 0)) * 1.0 plot_max = int(round(min(tmp_max, tmp_threshold_value + tmp_interval), 0)) * 1.0 # keep marker in the range if tmp_markers[0] > plot_max: plot_max = tmp_markers[0] elif tmp_markers[0] < plot_min: plot_min = tmp_markers[0] diff_plot_max_min = plot_max - plot_min tmp_subtitle_text = 'Actual = ' + str(tmp_actual_value) tmp_ticks = [plot_min, plot_max] if plot_max - plot_min <= 10: tmp_step = [0.1] else: tmp_step = [1] bullet_total_width = 450 tmp_start_points = [0, round((tmp_threshold_value - plot_min) / diff_plot_max_min * bullet_total_width if diff_plot_max_min * bullet_total_width else 0, 0)] tmp_widths = [round((tmp_threshold_value - plot_min) / diff_plot_max_min * bullet_total_width if diff_plot_max_min * bullet_total_width else 0, 0), round((plot_max - tmp_threshold_value) / diff_plot_max_min * bullet_total_width if diff_plot_max_min * bullet_total_width else 0, 0)] id = '#' + str(i + 1) var_name = str(tmp_rule['variable']) if tmp_rule['lessthan']: # lessthan == TRUE: # The rule suggest to decrease the values to less than a certain threshold tmp_title_text = id + ' Decrease the values of ' + \ var_name + ' to less than ' + \ str(tmp_actual_value) tmp_colors = ["#a6d96a", "#d7191c"] else: # lessthan == FALSE: # The rule suggest to increase the values to more than a certain threshold tmp_title_text = id + ' Increase the values of ' + \ var_name + ' to more than ' + \ str(tmp_actual_value) tmp_colors = ["#d7191c", "#a6d96a"] bullet_data.append({ "title": tmp_title_text, "subtitle": tmp_subtitle_text, "ticks": tmp_ticks, "step": tmp_step, "startPoints": tmp_start_points, "widths": tmp_widths, "colors": tmp_colors, "markers": tmp_markers, "varRef": var_name, }) return bullet_data
[docs] def generate_html(self): """Generate d3 bullet chart html and return it as a String. Returns ---------- :obj:`str` html String """ this_dir, _ = os.path.split(__file__) with open(os.path.join(this_dir, 'css/styles.css'), encoding="utf8") as f: style_css = f.read() with open(os.path.join(this_dir, 'js/d3.min.js'), encoding="utf8") as f: d3_js = f.read() with open(os.path.join(this_dir, 'js/bullet.js'), encoding="utf8") as f: bullet_js = f.read() css_stylesheet = """ <style>%s</style> """ % style_css d3_script = """ <script>%s</script> <script>%s</script> """ % (d3_js, bullet_js) main_title = "What to do to decrease the risk of having defects?" title = """ <div style="position: relative; top: 0; width: 100vw; left: 27vw;"> <b>%s</b> </div> """ % main_title unique_id = id_generator() bullet_data = to_js_data(self.__get_bullet_data()) risk_data = to_js_data(self.__get_risk_data()) d3_operation_script = """ <script> var margin = { top: 5, right: 40, bottom: 20, left: 500 }, width = 990 - margin.left - margin.right, height = 50 - margin.top - margin.bottom; var chart = d3.bullet().width(width).height(height); var bulletData = %s var riskData = %s // define the color of the box var boxColor = "box green"; var riskPred = riskData[0].riskPred[0]; if (riskPred.localeCompare("Yes")==0) { boxColor = "box orange"; } // append risk prediction and risk score d3.select("#d3-target-bullet-%s") .append("div") .attr("class", "riskPred") .data(riskData) .text((d) => d.riskPred) .append("div") .attr("class", boxColor); d3.select("#d3-target-bullet-%s") .append("div") .attr("class", "riskScore") .data(riskData) .text((d) => "Risk Score: " + d.riskScore); var svg = d3 .select("#d3-target-bullet-%s") .selectAll("svg") .data(bulletData) .enter() .append("svg") .attr("class", "bullet") .attr("width", width + margin.left + margin.right) .attr("height", height + margin.top + margin.bottom) .append("g") .attr( "transform", "translate(" + margin.left + "," + margin.top + ")" ) .call(chart); var title = svg .append("g") .style("text-anchor", "end") .attr("transform", "translate(-6," + height / 2 + ")"); title .append("text") .attr("class", "title") .text((d) => d.title); title .append("text") .attr("class", "subtitle") .attr("dy", "1em") .text((d) => d.subtitle); </script> """ % (bullet_data, risk_data, unique_id, unique_id, unique_id) html = """ <!DOCTYPE html> <html> <meta http-equiv="content-type" content="text/html; charset=UTF8"> <head> %s %s </head> <body> <div class="bullet-chart"> %s <div class="d3-target-bullet" id="d3-target-bullet-%s" /> </div> %s </body> </html> """ % (css_stylesheet, d3_script, title, unique_id, d3_operation_script) return html
[docs] def generate_instance_crossover_interpolation(self, X_explain, y_explain, debug=False): """An approach to generate instance using Crossover and Interpolation Parameters ---------- X_explain : :obj:`pandas.core.frame.DataFrame` X_explain (Testing Features) y_explain : :obj:`pandas.core.series.Series` y_explain (Testing Label) debug : :obj:`bool` True for debugging mode, False otherwise. Returns ------- :obj:`dict` A dict with two keys 'synthetic_data' and 'sampled_class_frequency' generated via Crossover and Interpolation. """ categorical_vars = [] X_train_i = self.X_train.copy() # y_train_i = self.y_train.copy() X_explain = X_explain.copy() y_explain = y_explain.copy() X_train_i.reset_index(inplace=True) X_explain.reset_index(inplace=True) X_train_i = X_train_i.loc[:, self.indep] # y_train_i = y_train_i.reset_index()[[self.dep]] X_explain = X_explain.loc[:, self.indep] y_explain = y_explain.reset_index()[[self.dep]] # get the global model predictions for the training set target_train = self.blackbox_model.predict(X_train_i) # class variables # ori_dataset = pd.concat([X_train_i.reset_index(drop=True), y_train_i], axis=1) # Do feature scaling for continuous data and one hot encoding for categorical data scaler = StandardScaler() trainset_normalize = X_train_i.copy() if debug: print(list(X_train_i), "columns") cases_normalize = X_explain.copy() train_objs_num = len(trainset_normalize) dataset = pd.concat(objs=[trainset_normalize, cases_normalize], axis=0) if debug: print(self.indep, "continuous") print(type(self.indep)) dataset[self.indep] = scaler.fit_transform(dataset[self.indep]) # dataset = pd.get_dummies(dataset, prefix_sep="__", columns=self.__categorical_vars) trainset_normalize = copy.copy(dataset[:train_objs_num]) cases_normalize = copy.copy(dataset[train_objs_num:]) # make dataframe to store similarities of the trained instances from the explained instance dist_df = pd.DataFrame(index=trainset_normalize.index.copy()) width = math.sqrt(len(X_train_i.columns)) * 0.75 # similarity for count, case in cases_normalize.iterrows(): # Calculate the euclidean distance from the instance to be explained dist = np.linalg.norm( trainset_normalize.sub(np.array(case)), axis=1) # Convert distance to a similarity score similarity = np.sqrt(np.exp(-(dist ** 2) / (width ** 2))) dist_df['dist'] = similarity dist_df['t_target'] = target_train # get the unique classes of the training set unique_classes = dist_df.t_target.unique() # Sort similarity scores in to descending order dist_df.sort_values(by=['dist'], ascending=False, inplace=True) # dist_df.reset_index(inplace=True) # Make a dataframe with top 40 elements in each class top_fourty_df = pd.DataFrame([]) for clz in unique_classes: top_fourty_df = top_fourty_df.append(dist_df[dist_df['t_target'] == clz].head(40)) # top_fourty_df.reset_index(inplace=True) # get the minimum value of the top 40 elements and return the index cutoff_similarity = top_fourty_df.nsmallest(1, 'dist', keep='last').index.values.astype(int)[0] # Get the location for the given index with the minimum similarity min_loc = dist_df.index.get_loc(cutoff_similarity) # whole neighbourhood without undersampling the majority class train_neigh_sampling_b = dist_df.iloc[0:min_loc + 1] # get the size of neighbourhood for each class target_details = train_neigh_sampling_b.groupby(['t_target']).size() if debug: print(target_details, "target_details") target_details_df = pd.DataFrame({'target': target_details.index, 'target_count': target_details.values}) # Get the majority class and undersample final_neighbours_similarity_df = pd.DataFrame([]) for index, row in target_details_df.iterrows(): if row["target_count"] > 200: filterd_class_set = train_neigh_sampling_b \ .loc[train_neigh_sampling_b['t_target'] == row['target']] \ .sample(n=200) final_neighbours_similarity_df = final_neighbours_similarity_df.append(filterd_class_set) else: filterd_class_set = train_neigh_sampling_b \ .loc[train_neigh_sampling_b['t_target'] == row['target']] final_neighbours_similarity_df = final_neighbours_similarity_df.append(filterd_class_set) if debug: print(final_neighbours_similarity_df, "final_neighbours_similarity_df") # Get the original training set instances which is equal to the index of the selected neighbours train_set_neigh = X_train_i[X_train_i.index.isin(final_neighbours_similarity_df.index)] if debug: print(train_set_neigh, "train set neigh") train_class_neigh = y_explain[y_explain.index.isin(final_neighbours_similarity_df.index)] # train_neigh_df = train_set_neigh.join(train_class_neigh) # class_neigh = train_class_neigh.groupby([self.dep]).size() new_con_df = pd.DataFrame([]) sample_classes_arr = [] sample_indexes_list = [] # Generating 1000 instances using interpolation technique for num in range(0, 1000): rand_rows = train_set_neigh.sample(2) sample_indexes_list = sample_indexes_list + rand_rows.index.values.tolist() similarity_both = dist_df[dist_df.index.isin(rand_rows.index)] sample_classes = train_class_neigh[train_class_neigh.index.isin( rand_rows.index)] sample_classes = np.array( sample_classes.to_records().view(type=np.matrix)) sample_classes_arr.append(sample_classes[0].tolist()) alpha_n = np.random.uniform(low=0, high=1.0) x = rand_rows.iloc[0] y = rand_rows.iloc[1] new_ins = x + (y - x) * alpha_n new_ins = new_ins.to_frame().T # For Categorical Variables for cat in categorical_vars: x_df = x.to_frame().T y_df = y.to_frame().T # Check similarity of x > similarity of y if similarity_both.iloc[0]['dist'] > similarity_both.iloc[1]['dist']: new_ins[cat] = x_df.iloc[0][cat] # Check similarity of y > similarity of x elif similarity_both.iloc[0]['dist'] < similarity_both.iloc[1]['dist']: new_ins[cat] = y_df.iloc[0][cat] else: new_ins[cat] = random.choice([x_df.iloc[0][cat], y_df.iloc[0][cat]]) new_ins.name = num new_con_df = new_con_df.append(new_ins, ignore_index=True) # Generating 1000 instances using cross-over technique for num in range(1000, 2000): rand_rows = train_set_neigh.sample(3) sample_indexes_list = sample_indexes_list + rand_rows.index.values.tolist() sample_classes = train_class_neigh[train_class_neigh.index.isin(rand_rows.index)] sample_classes = np.array(sample_classes.to_records().view(type=np.matrix)) sample_classes_arr.append(sample_classes[0].tolist()) mu_f = np.random.uniform(low=0.5, high=1.0) x = rand_rows.iloc[0] y = rand_rows.iloc[1] z = rand_rows.iloc[2] new_ins = x + (y - z) * mu_f new_ins = new_ins.to_frame().T # For Categorical Variables get the value of the closest instance to the explained instance for cat in categorical_vars: x_df = x.to_frame().T y_df = y.to_frame().T z_df = z.to_frame().T new_ins[cat] = random.choice([x_df.iloc[0][cat], y_df.iloc[0][cat], z_df.iloc[0][cat]]) new_ins.name = num new_con_df = new_con_df.append(new_ins, ignore_index=True) # get the global model predictions of the generated instances and the instances in the neighbourhood predict_dataset = train_set_neigh.append(new_con_df, ignore_index=True) target = self.blackbox_model.predict(predict_dataset) target_df = pd.DataFrame(target) # neighbor_frequency = Counter(tuple(sorted(entry)) for entry in sample_classes_arr) new_df_case = pd.concat([predict_dataset, target_df], axis=1) new_df_case = np.round(new_df_case, 2) new_df_case.rename(columns={0: y_explain.columns[0]}, inplace=True) sampled_class_frequency = new_df_case.groupby([self.dep]).size() return {'synthetic_data': new_df_case, 'sampled_class_frequency': sampled_class_frequency}
[docs] def generate_instance_random_perturbation(self, X_explain, debug=False): """The random perturbation approach to generate synthetic instances which is also used by LIME. Parameters ---------- X_explain : :obj:`pandas.core.frame.DataFrame` X_explain (Testing Features) debug : :obj:`bool` True for debugging mode, False otherwise. Returns ------- :obj:`dict` A dict with two keys 'synthetic_data' and 'sampled_class_frequency' generated via Random Perturbation. """ random_seed = 0 data_row = X_explain.loc[:, self.indep].values num_samples = 1000 sampling_method = 'gaussian' discretizer = None sample_around_instance = True scaler = sklearn.preprocessing.StandardScaler(with_mean=False) scaler.fit(self.X_train.loc[:, self.indep]) # distance_metric = 'euclidean' random_state = check_random_state(random_seed) is_sparse = sp.sparse.issparse(data_row) if is_sparse: num_cols = data_row.shape[1] data = sp.sparse.csr_matrix( (num_samples, num_cols), dtype=data_row.dtype) else: num_cols = data_row.shape[0] data = np.zeros((num_samples, num_cols)) if discretizer is None: instance_sample = data_row scale = scaler.scale_ mean = scaler.mean_ if is_sparse: # Perturb only the non-zero values non_zero_indexes = data_row.nonzero()[1] num_cols = len(non_zero_indexes) instance_sample = data_row[:, non_zero_indexes] scale = scale[non_zero_indexes] # mean = mean[non_zero_indexes] if sampling_method == 'gaussian': data = random_state.normal(0, 1, num_samples * num_cols).reshape(num_samples, num_cols) data = np.array(data) else: warnings.warn('''Invalid input for sampling_method. Defaulting to Gaussian sampling.''', UserWarning) data = random_state.normal(0, 1, num_samples * num_cols).reshape(num_samples, num_cols) data = np.array(data) if sample_around_instance: data = data * scale + instance_sample # else: # data = data * scale + mean if is_sparse: if num_cols == 0: data = sp.sparse.csr_matrix((num_samples, data_row.shape[1]), dtype=data_row.dtype) else: indexes = np.tile(non_zero_indexes, num_samples) indptr = np.array(range(0, len(non_zero_indexes) * (num_samples + 1), len(non_zero_indexes))) data_1d_shape = data.shape[0] * data.shape[1] data_1d = data.reshape(data_1d_shape) data = sp.sparse.csr_matrix((data_1d, indexes, indptr), shape=(num_samples, data_row.shape[1])) # first_row = data_row # else: # first_row = discretizer.discretize(data_row) data[0] = data_row.copy() inverse = data.copy() # todo - this for-loop is for categorical columns in the future """ for column in categorical_features: values = feature_values[column] freqs = feature_frequencies[column] inverse_column = random_state.choice(values, size=num_samples, replace=True, p=freqs) binary_column = (inverse_column == first_row[column]).astype(int) binary_column[0] = 1 inverse_column[0] = data[0, column] data[:, column] = binary_column inverse[:, column] = inverse_column """ # if discretizer is not None: # inverse[1:] = discretizer.undiscretize(inverse[1:]) inverse[0] = data_row if sp.sparse.issparse(data): # Note in sparse case we don't subtract mean since data would become dense scaled_data = data.multiply(scaler.scale_) # Multiplying with csr matrix can return a coo sparse matrix if not sp.sparse.isspmatrix_csr(scaled_data): scaled_data = scaled_data.tocsr() else: scaled_data = (data - scaler.mean_) / scaler.scale_ # distances = sklearn.metrics.pairwise_distances(scaled_data, # scaled_data[0].reshape(1, -1), # metric=distance_metric).ravel() new_df_case = pd.DataFrame(data=scaled_data, columns=self.indep) sampled_class_frequency = 0 n_defect_class = np.sum(self.blackbox_model.predict(new_df_case.loc[:, self.indep])) if debug: print('Random seed', random_seed, 'nDefective', n_defect_class) return {'synthetic_data': new_df_case, 'sampled_class_frequency': sampled_class_frequency}
[docs] def generate_risk_data(self, X_explain): """Generate risk prediction and risk score to be visualised Parameters ---------- X_explain : :obj:`pandas.core.frame.DataFrame` Explained Dataframe generated from RuleFit model. Returns ------- :obj:`list` A list of dict that contains the data of risk prediction and risk score. """ risk_pred = int(self.blackbox_model.predict(X_explain)[0]) return [{"riskScore": [str(int(round(self.blackbox_model.predict_proba(X_explain)[0][1] * 100, 0))) + '%'], "riskPred": [self.class_label[risk_pred]] }]
[docs] def get_risk_pred(self): """Retrieve the risk prediction from risk_data Returns ---------- :obj:`str` A string of risk prediction """ return self.__get_risk_data()[0]['riskPred'][0]
[docs] def get_risk_score(self): """Retrieve the risk score from risk_data Returns ---------- :obj:`float` A float of risk score """ risk_score = self.__get_risk_data()[0]['riskScore'][0].strip("%") return float(risk_score)
[docs] def get_top_k_rules(self): """Getter of top_k_rules Returns ---------- :obj:`int` Number of top positive and negative rules to be retrieved """ return self.top_k_rules
[docs] def generate_progress_bar_items(self): """Generate items to be set into hbox (horizontal box) """ progress_bar = widgets.FloatProgress(value=0, min=0, max=100, bar_style='info', layout=widgets.Layout(width='40%'), orientation='horizontal') left_text = widgets.Label("Risk Score: ") right_text = widgets.Label("0") self.__set_hbox_items([left_text, progress_bar, right_text, widgets.Label("%")])
[docs] def generate_sliders(self): """Generate one or more slider widgets and return as a list. Slider would be either IntSlider or FloatSlider depending on the value in the data Returns ------- :obj:`list` A list of slider widgets. """ slider_widgets = [] data = self.__get_bullet_data() style = {'description_width': '40%'} layout = widgets.Layout(width='99%', height='20px') for d in data: # decide to use either IntSlider or FloatSlider if isinstance(d['step'], int): # create IntSlider obj and store it into a list slider = widgets.IntSlider( value=d['markers'][0], min=d['ticks'][0], max=d['ticks'][-1], step=d['step'][0], description=d['title'], layout=layout, style=style, disabled=False, continuous_update=False, orientation='horizontal', readout=True, readout_format='d' ) slider_widgets.append(slider) else: # create FloatSlider obj and store it into a list slider = widgets.FloatSlider( value=d['markers'][0], min=d['ticks'][0], max=d['ticks'][-1], step=d['step'][0], description=d['title'], layout=layout, style=style, disabled=False, continuous_update=False, orientation='horizontal', readout=True, readout_format='.1f' ) slider_widgets.append(slider) return slider_widgets
[docs] def on_value_change(self, change): """The callback function for the interactive slider Whenever the user interacts with the slider, If the slider is in the non-continuous update mode, only if the mouse click is released, this callback will be triggered. If the slider is in the continuous update mode (not recommended here), this function will be triggered continuously when the user is moving the slider. This callback will first clear the output of Risk Score Progress Bar and the Bullet Chart. Then it will call funcs to compute the new values to be visualised. When the computing is done, it will soon visualise the new value. Parameters ---------- change : :obj:`dict` A dict that contains the former(before changing) and later(after changing) data inside the slider """ # step 1 - clear the bullet chart output and risk score bar output bullet_out = self.bullet_output bullet_out.clear_output() # step 2 - compute new values to be visualised # get var changed bullet_data = self.__get_bullet_data() id = int(change['owner'].description.split(" ")[0].strip("#")) var_changed = bullet_data[id - 1]['varRef'] new_value = change.new # modify changed var in X_explain X_explain = self.__get_X_explain() row_name = self.__get_X_explain().index[0] X_explain.at[row_name, var_changed] = new_value # modify bullet data bullet_data[id - 1]['markers'][0] = new_value self.__set_bullet_data(bullet_data) # generate new risk data self.__set_risk_data(self.generate_risk_data(X_explain)) # step 3 - visualise new output # update risk score progress bar self.run_bar_animation() # update bullet chart with bullet_out: # display d3 bullet chart html = self.generate_html() display(HTML(html))
[docs] def parse_top_rules(self, top_k_positive_rules, top_k_negative_rules): """Parse top k positive rules and top k negative rules given positive and negative rules as DataFrame Parameters ---------- top_k_positive_rules : :obj:`pandas.core.frame.DataFrame` Top positive rules DataFrame top_k_negative_rules : :obj:`pandas.core.frame.DataFrame` Top negative rules DataFrame Returns ------- :obj:`dict` A dict containing two keys, 'top_tofollow_rules' and 'top_toavoid_rules' """ if len(top_k_positive_rules) < len(top_k_negative_rules): smaller_top_rule = len(top_k_positive_rules) else: smaller_top_rule = len(top_k_negative_rules) if self.get_top_k_rules() > smaller_top_rule: self.set_top_k_rules(smaller_top_rule) top_variables = [] top_k_toavoid_rules = [] top_k_tofollow_rules = [] for i in range(len(top_k_positive_rules)): tmp_rule = (top_k_positive_rules['rule'].iloc[i]) tmp_rule = tmp_rule.strip() tmp_rule = str.split(tmp_rule, '&') for j in tmp_rule: j = j.strip() tmp_sub_rule = str.split(j, ' ') tmp_variable = tmp_sub_rule[0] tmp_condition_variable = tmp_sub_rule[1] tmp_value = tmp_sub_rule[2] if tmp_variable not in top_variables: top_variables.append(tmp_variable) top_k_toavoid_rules.append({'variable': tmp_variable, 'lessthan': tmp_condition_variable[0] == '<', 'value': tmp_value}) if len(top_k_toavoid_rules) == self.get_top_k_rules(): break if len(top_k_toavoid_rules) == self.get_top_k_rules(): break for i in range(len(top_k_negative_rules)): tmp_rule = (top_k_negative_rules['rule'].iloc[i]) tmp_rule = tmp_rule.strip() tmp_rule = str.split(tmp_rule, '&') for j in tmp_rule: j = j.strip() tmp_sub_rule = str.split(j, ' ') tmp_variable = tmp_sub_rule[0] tmp_condition_variable = tmp_sub_rule[1] tmp_value = tmp_sub_rule[2] if tmp_variable not in top_variables: top_variables.append(tmp_variable) top_k_tofollow_rules.append({'variable': tmp_variable, 'lessthan': tmp_condition_variable[0] == '<', 'value': tmp_value}) if len(top_k_tofollow_rules) == self.get_top_k_rules(): break if len(top_k_tofollow_rules) == self.get_top_k_rules(): break return {'top_tofollow_rules': top_k_tofollow_rules, 'top_toavoid_rules': top_k_toavoid_rules}
[docs] def retrieve_X_explain_min_max_values(self): """Retrieve the minimum and maximum value from X_train Returns ------- :obj:`dict` A dict containing two keys, 'min_values' and 'max_values' """ min_values = self.X_train.min() max_values = self.X_train.max() return {'min_values': min_values, 'max_values': max_values}
[docs] def run_bar_animation(self): """Run the animation of Risk Score Progress Bar """ import time items_in_hbox = self.__get_hbox_items() progress_bar = items_in_hbox[1] risk_score = self.get_risk_score() risk_prediction = True if self.get_risk_pred().upper() == self.class_label[0].upper(): risk_prediction = False if risk_prediction: progress_bar.style = {'bar_color': '#FA8128'} else: progress_bar.style = {'bar_color': '#00FF00'} # play speed of the animation play_speed = 1 # progress bar animation # count start from the current val of the progress bar progress_bar.value = 0 count = progress_bar.value right_text = items_in_hbox[2] while count < risk_score: progress_bar.value += play_speed # signal to increment the progress bar new_progress_value = float(right_text.value) + play_speed if new_progress_value > risk_score: right_text.value = str(risk_score) else: right_text.value = str(new_progress_value) time.sleep(.01) count += play_speed # update the right text self.update_right_text(right_text)
[docs] def set_top_k_rules(self, top_k_rules): """Setter of top_k_rules Parameters ---------- top_k_rules : :obj:`int` Number of top positive and negative rules to be retrieved """ if top_k_rules <= 0 or top_k_rules > 15 or isinstance(top_k_rules, int) == False: return print("set top_k_rules failed, top_k_rules should be int in range 1 - 15 (both included)") else: self.top_k_rules = top_k_rules
[docs] def show_visualisation(self): """Display items as follows, (1) Risk Score Progress Bar (made from ipywidgets) (2) Interactive Slider (made from ipywidgets) (3) Bullet Chart (Generated By D3.js) """ # display risk score progress bar self.generate_progress_bar_items() items = self.__get_hbox_items() display(widgets.HBox(items)) self.run_bar_animation() # display sliders sliders = self.generate_sliders() for slider in sliders: slider.observe(self.on_value_change, names='value') display(slider) bullet_out = self.bullet_output bullet_out.clear_output() display(bullet_out) with bullet_out: # display d3 bullet chart html = self.generate_html() display(HTML(html))
[docs] def update_risk_score(self, risk_score): """Update the risk score value inside the risk_data Parameters ---------- risk_score : :obj:`int` Value of risk score """ risk_score = str(risk_score) + '%' self.__get_risk_data()[0]['riskScore'][0] = risk_score
[docs] def update_right_text(self, right_text): """Update the text on the rightward side of the Risk Score Progress Bar Parameters ---------- right_text : :obj:`widgets.Label` Text on the rightward side of the Risk Score Progress Bar """ if isinstance(right_text, widgets.Label): self.__get_hbox_items()[2] = right_text else: print("The right_text to be set into hbox_items should be type 'ipywidgets.Label'") raise TypeError
[docs] def visualise(self, rule_obj): """Given the rule object, show all of the visualisation as follows . (1) Risk Score Progress Bar (made from ipywidgets) (2) Interactive Slider (made from ipywidgets) (3) Bullet Chart (Generated By D3.js) Parameters ---------- rule_obj : :obj:`dict` A rule dict generated either through loading the .pyobject file or the .explain(...) function Examples -------- >>> from pyexplainer.pyexplainer_pyexplainer import PyExplainer >>> import pandas as pd >>> from sklearn.ensemble import RandomForestClassifier >>> data = pd.read_csv('../tests/pyexplainer_test_data/activemq-5.0.0.csv', index_col = 'File') >>> dep = data.columns[-4] >>> indep = data.columns[0:(len(data.columns) - 4)] >>> X_train = data.loc[:, indep] >>> y_train = data.loc[:, dep] >>> blackbox_model = RandomForestClassifier(max_depth=3, random_state=0) >>> blackbox_model.fit(X_train, y_train) >>> class_label = ['Clean', 'Defect'] >>> pyExp = PyExplainer(X_train, y_train, indep, dep, class_label, blackbox_model) >>> sample_test_data = pd.read_csv('../tests/pyexplainer_test_data/activemq-5.0.0.csv', index_col = 'File') >>> X_test = sample_test_data.loc[:, indep] >>> y_test = sample_test_data.loc[:, dep] >>> sample_explain_index = 0 >>> X_explain = X_test.iloc[[sample_explain_index]] >>> y_explain = y_test.iloc[[sample_explain_index]] >>> rule_obj = pyExp.explain(X_explain, y_explain, search_function = 'CrossoverInterpolation', top_k = 3, max_rules=30, max_iter =5, cv=5, debug = False) >>> pyExp.visualise(rule_obj) """ self.visualisation_data_setup(rule_obj) self.show_visualisation()
[docs] def visualisation_data_setup(self, rule_obj): """Set up the data before visualising them Parameters ---------- rule_obj : :obj:`dict` A rule dict generated either through loading the .pyobject file or the .explain(...) function """ top_rules = self.parse_top_rules(top_k_positive_rules=rule_obj['top_k_positive_rules'], top_k_negative_rules=rule_obj['top_k_negative_rules']) self.__set_X_explain(rule_obj['X_explain']) self.__set_y_explain(rule_obj['y_explain']) self.__set_bullet_data(self.generate_bullet_data(top_rules)) self.__set_risk_data(self.generate_risk_data(self.__get_X_explain()))
def __get_bullet_data(self): """Getter of bullet_data Returns ---------- :obj:`list` A list of dict that contains data needed by the d3 bullet chart """ return self.bullet_data def __get_bullet_output(self): """Getter of bullet_output Returns ---------- :obj:`ipywidgets.Output` A Output object used to wrap and locate contents of visualisation """ return self.bullet_output def __get_hbox_items(self): """Getter of hbox_items Returns ---------- :obj:`list` A list of dict that contains items to be in a horizontal box """ return self.hbox_items def __get_risk_data(self): """Getter of risk_data Returns ---------- :obj:`list` A list of dict that contains data needed by the d3 bullet chart """ return self.risk_data def __get_X_explain(self): """Getter of X_explain Returns ---------- :obj:`pandas.core.frame.DataFrame` An explained DataFrame containing the features """ return self.X_explain def __get_y_explain(self): """Getter of y_explain Returns ---------- :obj:`pandas.core.series.Series` An explained DataFrame containing the label """ return self.y_explain def __set_bullet_data(self, bullet_data): """Setter of bullet_data Parameters ---------- bullet_data : :obj:`list` A list of dict that contains data needed by the d3 bullet chart """ if data_validation(bullet_data): self.bullet_data = bullet_data else: print('bullet_data is not in the format of python list of dict') raise ValueError def __set_bullet_output(self, bullet_output): """Setter of bullet_output Parameters ---------- bullet_output : :obj:`widgets.Output` A Output object used to wrap and locate contents of visualisation """ if isinstance(bullet_output, widgets.Output): self.bullet_output = bullet_output else: print("bullet_output should be type 'ipywidgets.Output'") raise TypeError def __set_hbox_items(self, hbox_items): """Setter of hbox_items Parameters ---------- hbox_items : :obj:`list` A list of dict that contains items to be in a horizontal box """ if len(hbox_items) == 4: if isinstance(hbox_items[0], widgets.Label) and isinstance(hbox_items[1], widgets.FloatProgress) \ and isinstance(hbox_items[2], widgets.Label) and isinstance(hbox_items[3], widgets.Label): self.hbox_items = hbox_items else: print("""hbox_items should be in the format of '[widgets.Label, widgets.FloatProgress, widgets.Label, widgets.Label]'""") raise TypeError else: print("""hbox_items should be in the format of '[widgets.Label, widgets.FloatProgress, widgets.Label, widgets.Label]'""") raise TypeError def __set_risk_data(self, risk_data): """Setter of risk_data Parameters ---------- risk_data : :obj:`list` A list of dict that contains risk prediction and risk score info """ if data_validation(risk_data): self.risk_data = risk_data else: print('risk_data is not in the format of python list of dict') raise ValueError def __set_X_explain(self, X_explain): """Setter of X_explain Parameters ---------- X_explain : :obj:`pandas.core.frame.DataFrame` An explained DataFrame containing feature cols """ if isinstance(X_explain, pd.core.frame.DataFrame): self.X_explain = X_explain else: print("X_explain should be type 'pandas.core.frame.DataFrame'") raise TypeError def __set_y_explain(self, y_explain): """Setter of y_explain Parameters ---------- y_explain : :obj:`pandas.core.series.Series` An explained DataFrame containing label col """ if isinstance(y_explain, pd.core.series.Series): self.y_explain = y_explain else: print("y_explain should be type 'pandas.core.series.Series'") raise TypeError