import configparser
import ast
from numpy.lib.arraysetops import isin
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import plotly.graph_objects as go
from importlib import import_module, util
from pyVHR.datasets.dataset import datasetFactory
from pyVHR.utils.errors import getErrors, printErrors, displayErrors
from pyVHR.extraction.sig_processing import *
from pyVHR.extraction.sig_extraction_methods import *
from pyVHR.extraction.skin_extraction_methods import *
from pyVHR.BVP.BVP import *
from pyVHR.BPM.BPM import *
from pyVHR.BVP.methods import *
from pyVHR.BVP.filters import *
[docs]class MultiMethodSuite():
"""
This class performs tests on a video dataset using multiple rPPG methods.
The suite uses these methods individually to calculate BVP and estimate BPM.
After this all the BVP signals of the methods are combined into a single n-estimator signal,
which is used to estimate a last BPM signal.
Using multiple methods together allows you to have many estimators,
so it can be useful to have a more precise BPM estimate.
You can customize all the parameters using a configuration file (.cfg); in the
analysis module you can find an example of cfg file named "multi_method_cfg.cfg".
"""
def __init__(self, configFilename='default'):
self.configFilename = configFilename
if configFilename == 'default':
self.configFilename = '../pyVHR/analysis/newcfg.cfg'
self.parse_cfg(self.configFilename)
[docs] def start(self, verb=0):
"""
Runs the tests as specified in the loaded config file.
Args:
verb:
- 0 - not verbose
- 1 - show the main steps
(use also combinations)
"""
# -- cfg parser
parser = configparser.ConfigParser(
inline_comment_prefixes=('#', ';'))
parser.optionxform = str
if not parser.read(self.configFilename):
raise FileNotFoundError(self.configFilename)
# -- verbose prints
if '1' in str(verb):
self.__verbose('a')
# -- dataset & cfg params
if 'path' in self.datasetdict and self.datasetdict['path'] != 'None':
dataset = datasetFactory(
self.datasetdict['dataset'], videodataDIR=self.datasetdict['videodataDIR'], BVPdataDIR=self.datasetdict['BVPdataDIR'], path=self.datasetdict['path'])
else:
dataset = datasetFactory(
self.datasetdict['dataset'], videodataDIR=self.datasetdict['videodataDIR'], BVPdataDIR=self.datasetdict['BVPdataDIR'])
# -- catch data (object)
res = TestResult()
# -- SIG processing
sig_processing = SignalProcessing()
if bool(self.sigdict['cuda']):
sig_processing.display_cuda_device()
sig_processing.choose_cuda_device(int(self.sigdict['cuda_device']))
# set skin extractor
target_device = 'GPU' if bool(self.sigdict['cuda']) else 'CPU'
if self.sigdict['skin_extractor'] == 'convexhull':
sig_processing.set_skin_extractor(
SkinExtractionConvexHull(target_device))
elif self.sigdict['skin_extractor'] == 'faceparsing':
sig_processing.set_skin_extractor(
SkinExtractionFaceParsing(target_device))
# set patches
if self.sigdict['approach'] == 'patches':
ldmks_list = ast.literal_eval(
self.sigdict['landmarks_list'])
if len(ldmks_list) > 0:
sig_processing.set_landmarks(ldmks_list)
if self.sigdict['patches'] == 'squares':
# set squares patches side dimension
sig_processing.set_square_patches_side(
float(self.sigdict['squares_dim']))
elif self.sigdict['patches'] == 'rects':
# set rects patches sides dimensions
rects_dims = ast.literal_eval(
self.sigdict['rects_dims'])
if len(rects_dims) > 0:
sig_processing.set_rect_patches_sides(
np.array(rects_dims, dtype=np.float32))
# set sig-processing and skin-processing params
SignalProcessingParams.RGB_LOW_TH = np.int32(
self.sigdict['sig_color_low_threshold'])
SignalProcessingParams.RGB_HIGH_TH = np.int32(
self.sigdict['sig_color_high_threshold'])
SkinProcessingParams.RGB_LOW_TH = np.int32(
self.sigdict['skin_color_low_threshold'])
SkinProcessingParams.RGB_HIGH_TH = np.int32(
self.sigdict['skin_color_high_threshold'])
# load all the videos
if self.videoIdx == []:
self.videoIdx = [int(v)
for v in range(len(dataset.videoFilenames))]
# -- loop on videos
for v in self.videoIdx:
# -- verbose prints
if '1' in str(verb):
print("\n## videoID: %d" % (v))
# -- ground-truth signal
try:
fname = dataset.getSigFilename(v)
sigGT = dataset.readSigfile(fname)
except:
continue
winSizeGT = int(self.sigdict['winSize'])
bpmGT, timesGT = sigGT.getBPM(winSizeGT)
# -- video file name
videoFileName = dataset.getVideoFilename(v)
print(videoFileName)
fps = get_fps(videoFileName)
sig_processing.set_total_frames(
int(self.sigdict['tot_sec'])*fps)
sig = []
if str(self.sigdict['approach']) == 'hol':
# SIG extraction with holistic
sig = sig_processing.extract_holistic(videoFileName)
elif str(self.sigdict['approach']) == 'patches':
# SIG extraction with patches
sig = sig_processing.extract_patches(
videoFileName, str(self.sigdict['patches']), str(self.sigdict['type']))
# -- sig windowing
windowed_sig, timesES = sig_windowing(
sig, int(self.sigdict['winSize']), 1, fps)
# -- loop on methods
bvps_collection = []
methods_collection = []
for m in self.methods:
if '1' in str(verb):
print("## Computing method: %s" % (str(m)))
# -- PRE FILTERING
filtered_windowed_sig = windowed_sig
# -- color threshold - applied only with patches
if str(self.sigdict['approach']) == 'patches':
filtered_windowed_sig = apply_filter(
windowed_sig,
rgb_filter_th,
params={'RGB_LOW_TH': np.int32(self.bvpdict['color_low_threshold']),
'RGB_HIGH_TH': np.int32(self.bvpdict['color_high_threshold'])})
# -- custom filters
prefilter_list = ast.literal_eval(
self.methodsdict[m]['pre_filtering'])
if len(prefilter_list) > 0:
for f in prefilter_list:
if '1' in str(verb):
print(" pre-filter: %s" % f)
fdict = dict(parser[f].items())
if fdict['path'] != 'None':
# custom path
spec = util.spec_from_file_location(
fdict['name'], fdict['path'])
mod = util.module_from_spec(spec)
spec.loader.exec_module(mod)
method_to_call = getattr(
mod, fdict['name'])
else:
# package path
module = import_module(
'pyVHR.BVP.filters')
method_to_call = getattr(
module, fdict['name'])
filtered_windowed_sig = apply_filter(
filtered_windowed_sig, method_to_call, fps=fps, params=ast.literal_eval(fdict['params']))
# -- BVP extraction
if self.methodsdict[m]['path'] != 'None':
# custom path
spec = util.spec_from_file_location(
self.methodsdict[m]['name'], self.methodsdict[m]['path'])
mod = util.module_from_spec(spec)
spec.loader.exec_module(mod)
method_to_call = getattr(mod, self.methodsdict[m]['name'])
else:
# package path
module = import_module(
'pyVHR.BVP.methods')
method_to_call = getattr(
module, self.methodsdict[m]['name'])
bvps = RGB_sig_to_BVP(filtered_windowed_sig, fps,
device_type=self.methodsdict[m]['device_type'], method=method_to_call, params=ast.literal_eval(self.methodsdict[m]['params']))
# POST FILTERING
postfilter_list = ast.literal_eval(
self.methodsdict[m]['post_filtering'])
if len(postfilter_list) > 0:
for f in postfilter_list:
if '1' in str(verb):
print(" post-filter: %s" % f)
fdict = dict(parser[f].items())
if fdict['path'] != 'None':
# custom path
spec = util.spec_from_file_location(
fdict['name'], fdict['path'])
mod = util.module_from_spec(spec)
spec.loader.exec_module(mod)
method_to_call = getattr(
mod, fdict['name'])
else:
# package path
module = import_module(
'pyVHR.BVP.filters')
method_to_call = getattr(
module, fdict['name'])
bvps = apply_filter(
bvps, method_to_call, fps=fps, params=ast.literal_eval(fdict['params']))
bvps_collection.append(bvps)
methods_collection.append(m)
# -- BPM extraction
# first each method, then multi-method
multi_method_coll = [e for e in bvps_collection]
methods_collection.append("Multi-Method")
for i in range(len(bvps_collection)+1):
if i == len(bvps_collection): # last element is the multi-method
bvp_element = concatenate_BVPs(multi_method_coll)
if bvp_element == 0:
print("[ERROR] Multi method Concatenation can't be executed because BVP signals have different shapes!")
continue
else: # use single methods
bvp_element = bvps_collection[i]
bpmES = None
if str(self.bpmdict['type']) == 'welch':
bpmES = BVP_to_BPM(bvp_element, fps, minHz=float(
self.bpmdict['minHz']), maxHz=float(self.bpmdict['maxHz']))
elif str(self.bpmdict['type']) == 'psd_clustering':
bpmES = BVP_to_BPM_PSD_clustering(bvp_element, fps, minHz=float(
self.bpmdict['minHz']), maxHz=float(self.bpmdict['maxHz']))
if bpmES is None:
print("[ERROR] BPM extraction error; check cfg params!")
continue
# median BPM from multiple estimators BPM
# this doesn't affect holistic approach
median_bpmES = multi_est_BPM_median(bpmES)
# -- error metrics
RMSE, MAE, MAX, PCC = getErrors(median_bpmES, bpmGT, timesES, timesGT)
# -- save results
method_name = methods_collection[i]
res.newDataSerie()
res.addData('dataset', str(self.datasetdict['dataset']))
res.addData('method', str(method_name))
res.addData('videoIdx', v)
res.addData('RMSE', RMSE)
res.addData('MAE', MAE)
res.addData('MAX', MAX)
res.addData('PCC', PCC)
res.addData('bpmGT', bpmGT)
res.addData('bpmES', median_bpmES)
res.addData('timeGT', timesGT)
res.addData('timeES', timesES)
res.addData('videoFilename', videoFileName)
res.addDataSerie()
if '1' in str(verb):
print("## Results for method: %s" % (str(method_name)))
printErrors(RMSE, MAE, MAX, PCC)
return res
[docs] def parse_cfg(self, configFilename):
""" parses the given configuration file for loading the test's parameters.
Args:
configFilename: configuation file (.cfg) name of path .
"""
self.parser = configparser.ConfigParser(
inline_comment_prefixes=('#', ';'))
self.parser.optionxform = str
if not self.parser.read(configFilename):
raise FileNotFoundError(configFilename)
# load paramas
self.datasetdict = dict(self.parser['DATASET'].items())
self.sigdict = dict(self.parser['SIG'].items())
self.bvpdict = dict(self.parser['BVP'].items())
self.bpmdict = dict(self.parser['BPM'].items())
# video idx list extraction
if isinstance(ast.literal_eval(self.datasetdict['videoIdx']), list):
self.videoIdx = [int(v) for v in ast.literal_eval(
self.datasetdict['videoIdx'])]
# load parameters for each methods
self.methodsdict = {}
self.methods = ast.literal_eval(self.bvpdict['methods'])
for x in self.methods:
self.methodsdict[x] = dict(self.parser[x].items())
def __merge(self, dict1, dict2):
for key in dict2:
if key not in dict1:
dict1[key] = dict2[key]
def __verbose(self, verb):
if verb == 'a':
print("** Run the test with the following config:")
print(" dataset: " + self.datasetdict['dataset'].upper())
print(" methods: " + str(self.methods))
[docs]class TestResult():
"""
This class is used by :py:class:`pyVHR.analysis.multi_method_suite.MultiMethodSuite` to manage the results
of a test for a given video dataset on multiple rPPG methods
"""
def __init__(self, filename=None):
if filename == None:
self.dataFrame = pd.DataFrame()
else:
self.dataFrame = pd.read_hdf(filename)
self.dict = None
[docs] def addDataSerie(self):
# -- store serie
if self.dict != None:
self.dataFrame = self.dataFrame.append(
self.dict, ignore_index=True)
[docs] def newDataSerie(self):
# -- new dict
D = {}
D['method'] = ''
D['dataset'] = ''
D['videoIdx'] = '' # video filename
D['sigFilename'] = '' # GT signal filename
D['videoFilename'] = '' # GT signal filename
D['RMSE'] = ''
D['MAE'] = ''
D['PCC'] = ''
D['MAX'] = ''
D['bpmGT'] = '' # GT bpm
D['bpmES'] = ''
D['timeGT'] = '' # GT bpm
D['timeES'] = ''
self.dict = D
[docs] def addData(self, key, value):
self.dict[key] = value
[docs] def saveResults(self, outFilename=None):
"""
Save the test results in a HDF5 library that can be opened using pandas.
You can analyze the results using :py:class:`pyVHR.analysis.stats.StatAnalysis`
"""
if outFilename == None:
outFilename = "testResults.h5"
else:
self.outFilename = outFilename
# -- save data
self.dataFrame.to_hdf(outFilename, key='df', mode='w')