Source code for pyVHR.analysis.stats

import pandas as pd
import numpy as np
import os
import re 
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import scipy.stats as ss
import scikit_posthocs as sp
from .stattests import friedman_aligned_ranks_test as ft
import math
from pyVHR.analysis import statsutils as pyVHRsu
from autorank import autorank, plot_stats, create_report
from matplotlib.colors import ListedColormap
from matplotlib.colorbar import ColorbarBase, Colorbar

os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

d = {}
alpha_l = 0.05
list_nemenyi = [pyVHRsu.findQ(i, 1000, alpha=alpha_l)/math.sqrt(2) for i in range(2,100)]
list_nemenyi.insert(0, 0)
list_nemenyi.insert(0, 0)    
d[("nemenyi", "0.05")] = list_nemenyi
alpha_l = 0.1
list_nemenyi = [pyVHRsu.findQ(i, 1000, alpha=alpha_l)/math.sqrt(2) for i in range(2,100)]
list_nemenyi.insert(0, 0)
list_nemenyi.insert(0, 0)    
d[("nemenyi", "0.1")] = list_nemenyi

[docs]class StatAnalysis(): """ Statistic analyses for multiple datasets and multiple rPPG methods """ def __init__(self, filepath, join_data=False, remove_ouliers=False): """ Args: filepath: - The path to the file contaning the results to test join_data: - 'True' - If filepath is a folder, join the dataframes contained in the folder (to be used when wanting to merge multiple results from the same pipeline on the same dataset) - 'False' - (default) To be used if you want to test the same pipeline (eventually with multiple methods) on multiple datasets remove_ouliers: - 'True' - Remove outliers from data prior to statistical testing - 'False' - (default) no outlier removal """ if os.path.isdir(filepath): self.multidataset = True self.path = filepath + "/" self.datasetsList = os.listdir(filepath) elif os.path.isfile(filepath): self.multidataset = False self.datasetsList = [filepath] self.path = "" else: raise("Error: filepath is wrong!") self.join_data = join_data self.available_metrics = ['MAE','RMSE','PCC','CCC'] self.remove_ouliers = remove_ouliers # -- get data self.__getMethods() self.metricSort = {'MAE':'min','RMSE':'min','PCC':'max', 'CCC': 'max'} self.scale = {'MAE':'log','RMSE':'log','PCC':'linear', 'CCC':'linear'} self.use_stats_pipeline = False
[docs] def run_stats(self, methods=None, metric='CCC', approach='frequentist', print_report=True): """ Runs the statistical testing procedure by automatically selecting the appropriate test for the available data. Args: methods: - The rPPG methods to analyze metric: - 'MAE' - Mean Absolute Error - 'RMSE' - Root Mean Squared Error - 'PCC' - Pearson's Correlation Coefficient - 'CCC' - Concordance Correlation Coefficient approach: - 'frequentist' - (default) Use frequentist hypotesis tests for the analysis - 'bayesian' - Use bayesian hypotesis tests for the analysis print_report: - 'True' - (default) print a report of the hypotesis testing procedure - 'False' - Doesn't print any report """ metric = metric.upper() assert metric in self.available_metrics, 'Error! Available metrics are ' + str(self.available_metrics) # -- Method(s) if methods != None: if not set(methods).issubset(set(self.methods)): raise ValueError("Some method is wrong!") else: self.methods = methods assert approach=='frequentist' or approach=='bayesian', "Approach should be 'frequentist' or bayesian, not " + str(approach) # -- set metric self.metric = metric self.mag = self.metricSort[metric] # -- get data from dataset(s) if self.multidataset: Y = self.__getData() else: Y = self.__getDataMono() self.ndataset = Y.shape[0] if metric == 'MAE': order='ascending' else: order='descending' Y_df = pd.DataFrame(Y, columns=self.methods) results = autorank(Y_df, alpha=0.05, order=order, verbose=False, approach=approach) self.stat_result = results self.use_stats_pipeline = True if approach=='bayesian': res_df = results.rankdf.iloc[:, [0,1,4,5,8]] print(res_df) if print_report: print(' ') create_report(results) print(' ') _ = self.computeCD(approach=approach) return Y_df
[docs] def FriedmanTest(self, methods=None, metric='MAE'): """ Runs the Friedman Omnibus non parametric test for comparison of multiple populations """ # -- Method(s) if methods == None: methods = self.methods else: if not set(methods).issubset(set(self.methods)): raise ValueError("Some method is wrong!") else: self.methods = methods # -- set metric self.metric = metric self.mag = self.metricSort[metric] # -- get data from dataset(s) # return Y = mat(n-datasets,k-methods) if self.multidataset: Y = self.__getData() else: Y = self.__getDataMono() self.ndataset = Y.shape[0] # -- Friedman test t,p,ranks,piv = ft(Y) self.avranks = list(np.divide(ranks, self.ndataset)) return t,p,ranks,piv,self.ndataset
[docs] def SignificancePlot(self, methods=None, metric='MAE'): """ Plots the results of hypotesis testing with the significance plot """ # -- Method(s) if methods == None: methods = self.methods else: if not set(methods).issubset(set(self.methods)): raise ValueError("Some method is wrong!") else: self.methods = methods # -- set metric self.metric = metric self.mag = self.metricSort[metric] # -- get data from dataset(s) if self.multidataset: Y = self.__getData() else: Y = self.__getDataMono() # -- Significance plot, a heatmap of p values methodNames = [x.upper() for x in self.methods] Ypd = pd.DataFrame(Y, columns=methodNames) ph = sp.posthoc_nemenyi_friedman(Ypd) cmap = ['1', '#fb6a4a', '#08306b', '#4292c6', '#c6dbef'] heatmap_args = {'cmap': cmap, 'linewidths': 0.25, 'linecolor': '0.5', 'clip_on': False, 'square': True, 'cbar_ax_bbox': [0.85, 0.35, 0.04, 0.3]} plt.figure(figsize=(5,4)) sp.sign_plot(ph, cbar=True, **heatmap_args) plt.title('p-vals') fname = 'SP_' + self.metric + '.pdf' plt.savefig(fname) plt.show()
[docs] def computeCD(self, avranks=None, numDatasets=None, alpha='0.05', display=True, approach='frequentist'): """ Returns critical difference for Nemenyi or Bonferroni-Dunn test according to given alpha (either alpha=”0.05” or alpha=”0.1”) for average ranks and number of tested datasets N. Test can be either “nemenyi” for for Nemenyi two tailed test or “bonferroni-dunn” for Bonferroni-Dunn test. See Orange package docs. """ if self.use_stats_pipeline: cd = self.stat_result.cd if display and approach=='frequentist': plot_stats(self.stat_result, allow_insignificant=True) plt.show() elif display and approach=='bayesian': self.plot_bayesian_res(self.stat_result) plt.show() else: if not numDatasets: numDatasets = self.ndataset if not avranks: avranks = self.avranks cd = compute_CD(avranks, numDatasets, alpha=alpha) #tested on 30 datasets if self.mag == 'min': reverse = True else: reverse = False methodNames = [x.upper() for x in self.methods] if display: graph_ranks(avranks, methodNames, cd=cd, width=6, textspace=1.5, reverse=reverse) name = 'CD Diagram (metric: ' + self.metric +')' plt.title(name) fname = 'CD_' + self.metric + '.pdf' plt.savefig(fname) plt.show() return cd
[docs] def plot_bayesian_res(self, stat_result): """ Plots the results of bayesian significance testing """ dm = stat_result.decision_matrix.copy() cmap = ['1', '#fb6a4a', '#08306b', '#4292c6']#, '#c6dbef'] heatmap_args = {'cmap': cmap, 'linewidths': 0.25, 'linecolor': '0.5', 'clip_on': False, 'square': True, 'cbar_ax_bbox': [0.85, 0.35, 0.04, 0.3]} dm[dm=='inconclusive'] = 0 dm[dm=='smaller'] = 1 dm[dm=='larger'] = 2 np.fill_diagonal(dm.values, -1) pl, ax = plt.subplots() ax.imshow(dm.values.astype(int), cmap=ListedColormap(cmap)) labels = list(dm.columns) # Major ticks ax.set_xticks(np.arange(0, len(labels), 1)) ax.set_yticks(np.arange(0, len(labels), 1)) # Labels for major ticks ax.set_xticklabels(labels, rotation='vertical') ax.set_yticklabels(labels) # Minor ticks ax.set_xticks(np.arange(-.5, len(labels), 1), minor=True) ax.set_yticks(np.arange(-.5, len(labels), 1), minor=True) ax.grid(which='minor', color='k', linestyle='-', linewidth=1) ax.set_title('Metric: ' + self.metric) cbar_ax = ax.figure.add_axes([0.85, 0.35, 0.04, 0.3]) cbar = ColorbarBase(cbar_ax, cmap=ListedColormap(cmap), boundaries=[0, 1, 2, 3, 4]) cbar.set_ticks(np.linspace(0.5, 3.5, 4)) cbar.set_ticklabels(['None', 'equivalent', 'smaller', 'larger']) cbar.outline.set_linewidth(1) cbar.outline.set_edgecolor('0.5') cbar.ax.tick_params(size=0)
[docs] def displayBoxPlot(self, methods=None, metric='MAE', scale=None, title=True): """ Shows the distribution of populations with box-plots """ metric = metric.upper() # -- Method(s) if methods == None: methods = self.methods else: if not set(methods).issubset(set(self.methods)): raise ValueError("Some method is wrong!") else: self.methods = methods if sorted(list(set(self.methods))) != sorted(self.methods): methods = self.datasetsList else: methods = self.methods # -- set metric self.metric = metric self.mag = self.metricSort[metric] if scale == None: scale = self.scale[metric] # -- get data from dataset(s) if self.multidataset: Y = self.__getData() else: Y = self.__getDataMono() # -- display box plot fig = self.boxPlot(methods, metric, Y, scale=scale, title=title) return fig
[docs] def boxPlot(self, methods, metric, Y, scale, title): """ Creates the box plot """ # Y = mat(n-datasets,k-methods) k = len(methods) if not (k == Y.shape[1]): raise("error!") offset = 50 fig = go.Figure() methodNames = [x.upper() for x in methods] for i in range(k): yd = Y[:,i] name = methodNames[i] if len(np.argwhere(np.isnan(yd)).flatten())!=0: print(f"Warning! Video {self.dataFrame[0]['videoFilename'][np.argwhere(np.isnan(yd)).flatten()[0]]} contains NaN value for method {name}") continue # -- set color for box if metric == 'MAE' or metric == 'RMSE' or metric == 'TIME_REQUIREMENT': med = np.median(yd) col = str(min(200,5*int(med)+offset)) if metric == 'CC' or metric == 'PCC' or metric == 'CCC': med = 1-np.abs(np.median(yd)) col = str(int(200*med)+offset) # -- add box fig.add_trace(go.Box( y=yd, name=name, boxpoints='all', jitter=.7, #whiskerwidth=0.2, fillcolor="rgba("+col+","+col+","+col+",0.5)", line_color="rgba(0,0,255,0.5)", marker_size=2, line_width=2) ) gwidth = np.max(Y)/10 if title: tit = "Metric: " + metric top = 40 else: tit='' top = 10 fig.update_layout( title=tit, yaxis_type=scale, xaxis_type="category", yaxis=dict( autorange=True, showgrid=True, zeroline=True, #dtick=gwidth, gridcolor='rgb(255,255,255)', gridwidth=.1, zerolinewidth=2, titlefont=dict(size=30) ), font=dict( family="monospace", size=16, color='rgb(20,20,20)' ), margin=dict( l=20, r=10, b=20, t=top, ), paper_bgcolor='rgb(250, 250, 250)', plot_bgcolor='rgb(243, 243, 243)', showlegend=False ) #fig.show() return fig
[docs] def saveStatsData(self, methods=None, metric='MAE', outfilename='statsData.csv'): """ Saves statistics of data on disk """ Y = self.getStatsData(methods=methods, metric=metric, printTable=False) np.savetxt(outfilename, Y)
[docs] def getStatsData(self, methods=None, metric='MAE', printTable=True): """ Computes statistics of data """ # -- Method(s) if methods == None: methods = self.methods else: if set(methods) <= set(self.methods): raise("Some method is wrong!") else: self.methods = methods # -- set metric self.metric = metric self.mag = self.metricSort[metric] # -- get data from dataset(s) # return Y = mat(n-datasets,k-methods) if self.multidataset: Y = self.__getData() else: Y = self.__getDataMono() # -- add median and IQR I = ss.iqr(Y,axis=0) M = np.median(Y,axis=0) Y = np.vstack((Y,M)) Y = np.vstack((Y,I)) if printTable: methodNames = [x.upper() for x in self.methods] dataseNames = self.datasetNames dataseNames.append('Median') dataseNames.append('IQR') df = pd.DataFrame(Y, columns=methodNames, index=dataseNames) display(df) return Y
def __remove_outliers(self, df, factor=3.5): """ Removes the ouliers. A data point is considered an outlier if lies outside factor times the inter-quartile range of the data distribution """ Q1 = df.quantile(0.25) Q3 = df.quantile(0.75) IQR = Q3 - Q1 df_out = df[~((df < (Q1 - factor*IQR)) | (df > (Q3 + factor*IQR))).any(axis=1)] return df_out def __getDataMono(self): mag = self.mag metric = self.metric methods = self.methods frame = self.dataFrame[0] # -- loop on methods Y = [] for method in methods: vals = frame[frame['method'] == method][metric] if mag == 'min': data = [v[np.argmin(v)] for v in vals] else: data = [v[np.argmax(v)] for v in vals] Y.append(data) if self.remove_ouliers: res = pd.DataFrame(np.array(Y).T) res = self.__remove_outliers(res) res = res.to_numpy() else: res = np.array(Y).T return res def __getData(self): mag = self.mag metric = self.metric methods = self.methods # -- loop on datasets Y = [] m_list = [] for i,frame in enumerate(self.dataFrame): # -- loop on methods y = [] for method in methods: vals = frame[frame['method'] == method][metric] if vals.empty: continue m_list.append(method) if mag == 'min': data = [v[np.argmin(v)] for v in vals] else: data = [v[np.argmax(v)] for v in vals] y.append(data) y = np.array(y) if not self.join_data: Y.append(np.mean(y,axis=1)) else: Y.append(y.T) if not self.join_data: res = np.array(Y) else: self.methods = m_list n_dpoints = [curr_y.shape[0] for curr_y in Y] if len(set(n_dpoints)) != 1: raise("There should be the exact same number of elements in each dataset to join when 'join_data=True'") res = np.hstack(Y) if self.remove_ouliers: res = pd.DataFrame(res) res = self.__remove_outliers(res) res = res.to_numpy() return res def __getMethods(self): mets = [] dataFrame = [] N = len(self.datasetsList) # -- load dataframes self.datasetNames = [] for file in self.datasetsList: filename = self.path + file self.datasetNames.append(file) data = pd.read_hdf(filename) mets.append(set(list(data['method']))) dataFrame.append(data) if not self.join_data: # -- method names intersection among datasets methods = set(mets[0]) if N > 1: for m in range(1,N-1): methods.intersection(mets[m]) methods = list(methods) else: methods = sum([list(m) for m in mets], []) if sorted(list(set(methods))) != sorted(methods): raise("Found multiple methods with the same name... Please ensure using different names for each method when 'join_data=True'") methods.sort() self.methods = methods self.dataFrame = dataFrame
# from Orange library
[docs]def graph_ranks(avranks, names, cd=None, cdmethod=None, lowv=None, highv=None, width=6, textspace=1, reverse=False, filename=None, **kwargs): """ Draws a CD graph, which is used to display the differences in methods' performance. See Janez Demsar, Statistical Comparisons of Classifiers over Multiple Data Sets, 7(Jan):1--30, 2006. Needs matplotlib to work. The image is ploted on `plt` imported using `import matplotlib.pyplot as plt`. Args: avranks (list of float): average ranks of methods. names (list of str): names of methods. cd (float): Critical difference used for statistically significance of difference between methods. cdmethod (int, optional): the method that is compared with other methods If omitted, show pairwise comparison of methods lowv (int, optional): the lowest shown rank highv (int, optional): the highest shown rank width (int, optional): default width in inches (default: 6) textspace (int, optional): space on figure sides (in inches) for the method names (default: 1) reverse (bool, optional): if set to `True`, the lowest rank is on the right (default: `False`) filename (str, optional): output file name (with extension). If not given, the function does not write a file. """ try: import matplotlib.pyplot as plt from matplotlib.backends.backend_agg import FigureCanvasAgg except ImportError: raise ImportError("Function graph_ranks requires matplotlib.") width = float(width) textspace = float(textspace) def nth(l, n): """ Returns only nth elemnt in a list. """ n = lloc(l, n) return [a[n] for a in l] def lloc(l, n): """ List location in list of list structure. Enable the use of negative locations: -1 is the last element, -2 second last... """ if n < 0: return len(l[0]) + n else: return n def mxrange(lr): """ Multiple xranges. Can be used to traverse matrices. This function is very slow due to unknown number of parameters. >>> mxrange([3,5]) [(0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2)] >>> mxrange([[3,5,1],[9,0,-3]]) [(3, 9), (3, 6), (3, 3), (4, 9), (4, 6), (4, 3)] """ if not len(lr): yield () else: # it can work with single numbers index = lr[0] if isinstance(index, int): index = [index] for a in range(*index): for b in mxrange(lr[1:]): yield tuple([a] + list(b)) def print_figure(fig, *args, **kwargs): canvas = FigureCanvasAgg(fig) canvas.print_figure(*args, **kwargs) sums = avranks tempsort = sorted([(a, i) for i, a in enumerate(sums)], reverse=reverse) ssums = nth(tempsort, 0) sortidx = nth(tempsort, 1) nnames = [names[x] for x in sortidx] if lowv is None: lowv = min(1, int(math.floor(min(ssums)))) if highv is None: highv = max(len(avranks), int(math.ceil(max(ssums)))) cline = 0.4 k = len(sums) lines = None linesblank = 0 scalewidth = width - 2 * textspace def rankpos(rank): if not reverse: a = rank - lowv else: a = highv - rank return textspace + scalewidth / (highv - lowv) * a distanceh = 0.25 if cd and cdmethod is None: # get pairs of non significant methods def get_lines(sums, hsd): # get all pairs lsums = len(sums) allpairs = [(i, j) for i, j in mxrange([[lsums], [lsums]]) if j > i] # remove not significant notSig = [(i, j) for i, j in allpairs if abs(sums[i] - sums[j]) <= hsd] # keep only longest def no_longer(ij_tuple, notSig): i, j = ij_tuple for i1, j1 in notSig: if (i1 <= i and j1 > j) or (i1 < i and j1 >= j): return False return True longest = [(i, j) for i, j in notSig if no_longer((i, j), notSig)] return longest lines = get_lines(ssums, cd) linesblank = 0.2 + 0.2 + (len(lines) - 1) * 0.1 # add scale distanceh = 0.25 cline += distanceh # calculate height needed height of an image minnotsignificant = max(2 * 0.2, linesblank) height = cline + ((k + 1) / 2) * 0.2 + minnotsignificant fig = plt.figure(figsize=(width, height)) fig.set_facecolor('white') ax = fig.add_axes([0, 0, 1, 1]) # reverse y axis ax.set_axis_off() hf = 1. / height # height factor wf = 1. / width def hfl(l): return [a * hf for a in l] def wfl(l): return [a * wf for a in l] # Upper left corner is (0,0). ax.plot([0, 1], [0, 1], c="w") ax.set_xlim(0, 1) ax.set_ylim(1, 0) def line(l, color='k', **kwargs): """ Input is a list of pairs of points. """ ax.plot(wfl(nth(l, 0)), hfl(nth(l, 1)), color=color, **kwargs) def text(x, y, s, *args, **kwargs): ax.text(wf * x, hf * y, s, *args, **kwargs) line([(textspace, cline), (width - textspace, cline)], linewidth=0.7) bigtick = 0.1 smalltick = 0.05 tick = None for a in list(np.arange(lowv, highv, 0.5)) + [highv]: tick = smalltick if a == int(a): tick = bigtick line([(rankpos(a), cline - tick / 2), (rankpos(a), cline)], linewidth=0.7) for a in range(lowv, highv + 1): text(rankpos(a), cline - tick / 2 - 0.05, str(a), ha="center", va="bottom") k = len(ssums) for i in range(math.ceil(k / 2)): chei = cline + minnotsignificant + i * 0.2 line([(rankpos(ssums[i]), cline), (rankpos(ssums[i]), chei), (textspace - 0.1, chei)], linewidth=0.7) text(textspace - 0.2, chei, nnames[i], ha="right", va="center") for i in range(math.ceil(k / 2), k): chei = cline + minnotsignificant + (k - i - 1) * 0.2 line([(rankpos(ssums[i]), cline), (rankpos(ssums[i]), chei), (textspace + scalewidth + 0.1, chei)], linewidth=0.7) text(textspace + scalewidth + 0.2, chei, nnames[i], ha="left", va="center") if cd and cdmethod is None: # upper scale if not reverse: begin, end = rankpos(lowv), rankpos(lowv + cd) else: begin, end = rankpos(highv), rankpos(highv - cd) line([(begin, distanceh), (end, distanceh)], linewidth=0.7) line([(begin, distanceh + bigtick / 2), (begin, distanceh - bigtick / 2)], linewidth=0.7) line([(end, distanceh + bigtick / 2), (end, distanceh - bigtick / 2)], linewidth=0.7) text((begin + end) / 2, distanceh - 0.05, "CD", ha="center", va="bottom") # no-significance lines def draw_lines(lines, side=0.05, height=0.1): start = cline + 0.2 for l, r in lines: line([(rankpos(ssums[l]) - side, start), (rankpos(ssums[r]) + side, start)], linewidth=2.5) start += height draw_lines(lines) elif cd: begin = rankpos(avranks[cdmethod] - cd) end = rankpos(avranks[cdmethod] + cd) line([(begin, cline), (end, cline)], linewidth=2.5) line([(begin, cline + bigtick / 2), (begin, cline - bigtick / 2)], linewidth=2.5) line([(end, cline + bigtick / 2), (end, cline - bigtick / 2)], linewidth=2.5) if filename: print_figure(fig, filename, **kwargs)
# from Orange library
[docs]def compute_CD(avranks, n, alpha="0.05", test="nemenyi"): """ Returns critical difference for Nemenyi or Bonferroni-Dunn test according to given alpha (either alpha="0.05" or alpha="0.1") for average ranks and number of tested datasets N. Test has to be "nemenyi" for Nemenyi two tailed test. """ k = len(avranks) #d = {("nemenyi", "0.05"): [0, 0, 1.959964, 2.343701, 2.569032, 2.727774, # 2.849705, 2.94832, 3.030879, 3.101730, 3.163684, # 3.218654, 3.268004, 3.312739, 3.353618, 3.39123, # 3.426041, 3.458425, 3.488685, 3.517073, # 3.543799], # ("nemenyi", "0.1"): [0, 0, 1.644854, 2.052293, 2.291341, 2.459516, # 2.588521, 2.692732, 2.779884, 2.854606, 2.919889, # 2.977768, 3.029694, 3.076733, 3.119693, 3.159199, # 3.195743, 3.229723, 3.261461, 3.291224, 3.319233], # ("bonferroni-dunn", "0.05"): [0, 0, 1.960, 2.241, 2.394, 2.498, 2.576, # 2.638, 2.690, 2.724, 2.773], # ("bonferroni-dunn", "0.1"): [0, 0, 1.645, 1.960, 2.128, 2.241, 2.326, # 2.394, 2.450, 2.498, 2.539]} #print(test,alpha) #print(len(d[(test, alpha)])) q = d[(test, alpha)] cd = q[k] * (k * (k + 1) / (6.0 * n)) ** 0.5 return cd