Source code for pyVHR.BVP.methods

import numpy as np
import torch
from sklearn.decomposition import PCA
from pyVHR.BVP.utils import jadeR


"""
This module contains a collection of known rPPG methods.

rPPG METHOD SIGNATURE
An rPPG method must accept theese parameters:
    > signal -> RGB signal as float32 ndarray with shape [num_estimators, rgb_channels, num_frames], or a custom signal.
    > **kargs [OPTIONAL] -> usefull parameters passed to the filter method.
It must return a BVP signal as float32 ndarray.
"""


# ------------------------------------------------------------------------------------- #
#                                     rPPG METHODS                                      #
# ------------------------------------------------------------------------------------- #


[docs]def cpu_CHROM(signal): """ CHROM method on CPU using Numpy. De Haan, G., & Jeanne, V. (2013). Robust pulse rate from chrominance-based rPPG. IEEE Transactions on Biomedical Engineering, 60(10), 2878-2886. """ X = signal Xcomp = 3*X[:, 0] - 2*X[:, 1] Ycomp = (1.5*X[:, 0])+X[:, 1]-(1.5*X[:, 2]) sX = np.std(Xcomp, axis=1) sY = np.std(Ycomp, axis=1) alpha = (sX/sY).reshape(-1, 1) alpha = np.repeat(alpha, Xcomp.shape[1], 1) bvp = Xcomp - np.multiply(alpha, Ycomp) return bvp
[docs]def torch_CHROM(signal): """ CHROM method on CPU using Torch. De Haan, G., & Jeanne, V. (2013). Robust pulse rate from chrominance-based rPPG. IEEE Transactions on Biomedical Engineering, 60(10), 2878-2886. """ X = signal Xcomp = 3*X[:, 0] - 2*X[:, 1] Ycomp = (1.5*X[:, 0])+X[:, 1]-(1.5*X[:, 2]) sX = torch.std(Xcomp, axis=1) sY = torch.std(Ycomp, axis=1) alpha = (sX/sY).reshape(-1, 1) alpha = torch.repeat_interleave(alpha, Xcomp.shape[1], 1) bvp = Xcomp - torch.mul(alpha, Ycomp) return bvp
[docs]def cpu_LGI(signal): """ LGI method on CPU using Numpy. Pilz, C. S., Zaunseder, S., Krajewski, J., & Blazek, V. (2018). Local group invariance for heart rate estimation from face videos in the wild. In Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition Workshops (pp. 1254-1262). """ X = signal U, _, _ = np.linalg.svd(X) S = U[:, :, 0] S = np.expand_dims(S, 2) sst = np.matmul(S, np.swapaxes(S, 1, 2)) p = np.tile(np.identity(3), (S.shape[0], 1, 1)) P = p - sst Y = np.matmul(P, X) bvp = Y[:, 1, :] return bvp
[docs]def cpu_POS(signal, **kargs): """ POS method on CPU using Numpy. The dictionary parameters are: {'fps':float}. Wang, W., den Brinker, A. C., Stuijk, S., & de Haan, G. (2016). Algorithmic principles of remote PPG. IEEE Transactions on Biomedical Engineering, 64(7), 1479-1491. """ # Run the pos algorithm on the RGB color signal c with sliding window length wlen # Recommended value for wlen is 32 for a 20 fps camera (1.6 s) eps = 10**-9 X = signal e, c, f = X.shape # e = #estimators, c = 3 rgb ch., f = #frames w = int(1.6 * kargs['fps']) # window length # stack e times fixed mat P P = np.array([[0, 1, -1], [-2, 1, 1]]) Q = np.stack([P for _ in range(e)], axis=0) # Initialize (1) H = np.zeros((e, f)) for n in np.arange(w, f): # Start index of sliding window (4) m = n - w + 1 # Temporal normalization (5) Cn = X[:, :, m:(n + 1)] M = 1.0 / (np.mean(Cn, axis=2)+eps) M = np.expand_dims(M, axis=2) # shape [e, c, w] Cn = np.multiply(M, Cn) # Projection (6) S = np.dot(Q, Cn) S = S[0, :, :, :] S = np.swapaxes(S, 0, 1) # remove 3-th dim # Tuning (7) S1 = S[:, 0, :] S2 = S[:, 1, :] alpha = np.std(S1, axis=1) / (eps + +np.std(S2, axis=1)) alpha = np.expand_dims(alpha, axis=1) Hn = np.add(S1, alpha * S2) Hnm = Hn - np.expand_dims(np.mean(Hn, axis=1), axis=1) # Overlap-adding (8) H[:, m:(n + 1)] = np.add(H[:, m:(n + 1)], Hnm) return H
[docs]def cpu_PBV(signal): """ PBV method on CPU using Numpy. De Haan, G., & Van Leest, A. (2014). Improved motion robustness of remote-PPG by using the blood volume pulse signature. Physiological measurement, 35(9), 1913. """ sig_mean = np.mean(signal, axis = 2) signal_norm_r = signal[:,0,:] / np.expand_dims(sig_mean[:,0],axis=1) signal_norm_g = signal[:,1,:] / np.expand_dims(sig_mean[:,1],axis=1) signal_norm_b = signal[:,2,:] / np.expand_dims(sig_mean[:,2],axis=1) pbv_n = np.array([np.std(signal_norm_r, axis = 1), np.std(signal_norm_g, axis = 1), np.std(signal_norm_b, axis = 1)]) pbv_d = np.sqrt(np.var(signal_norm_r, axis = 1) + np.var(signal_norm_g, axis = 1) + np.var(signal_norm_b, axis = 1)) pbv = pbv_n / pbv_d C = np.swapaxes(np.array([signal_norm_r, signal_norm_g, signal_norm_b]),0,1) Ct =np.swapaxes(np.swapaxes(np.transpose(C),0,2),1,2) Q = np.matmul(C, Ct) W = np.linalg.solve(Q,np.swapaxes(pbv,0,1)) A = np.matmul(Ct, np.expand_dims(W,axis = 2)) B = np.matmul(np.swapaxes(np.expand_dims(pbv.T,axis=2),1,2),np.expand_dims(W,axis = 2)) bvp = A / B return bvp.squeeze(axis=2)
[docs]def cpu_PCA(signal,**kargs): """ PCA method on CPU using Numpy. The dictionary parameters are {'component':str}. Where 'component' can be 'second_comp' or 'all_comp'. Lewandowska, M., Rumiński, J., Kocejko, T., & Nowak, J. (2011, September). Measuring pulse rate with a webcam—a non-contact method for evaluating cardiac activity. In 2011 federated conference on computer science and information systems (FedCSIS) (pp. 405-410). IEEE. """ bvp = [] for i in range(signal.shape[0]): X = signal[i] pca = PCA(n_components=3) pca.fit(X) # selector if kargs['component']=='all_comp': bvp.append(pca.components_[0] * pca.explained_variance_[0]) bvp.append(pca.components_[1] * pca.explained_variance_[1]) elif kargs['component']=='second_comp': bvp.append(pca.components_[1] * pca.explained_variance_[1]) bvp = np.array(bvp) return bvp
[docs]def cpu_GREEN(signal): """ GREEN method on CPU using Numpy Verkruysse, W., Svaasand, L. O., & Nelson, J. S. (2008). Remote plethysmographic imaging using ambient light. Optics express, 16(26), 21434-21445. """ return signal[:,1,:]
[docs]def cpu_ICA(signal, **kargs): """ ICA method on CPU using Numpy. The dictionary parameters are {'component':str}. Where 'component' can be 'second_comp' or 'all_comp'. Poh, M. Z., McDuff, D. J., & Picard, R. W. (2010). Non-contact, automated cardiac pulse measurements using video imaging and blind source separation. Optics express, 18(10), 10762-10774. """ bvp = [] for X in signal: W = jadeR(X, verbose=False) bvp.append(np.dot(W,X)) # selector bvp = np.array(bvp) l, c, f = bvp.shape # l=#landmks c=#3chs, f=#frames if kargs['component']=='all_comp': bvp = np.reshape(bvp, (l*c, f)) # compact into 2D matrix elif kargs['component']=='second_comp': bvp = np.reshape(bvp[:,1,:], (l, f)) # collect return bvp
[docs]def cpu_SSR(raw_signal,**kargs): """ SSR method on CPU using Numpy. 'raw_signal' is a float32 ndarray with shape [num_frames, rows, columns, rgb_channels]; it can be obtained by using the :py:class:‵pyVHR.extraction.sig_processing.SignalProcessing‵ class ('extract_raw_holistic' method). The dictionary parameters are: {'fps':float}. Wang, W., Stuijk, S., & De Haan, G. (2015). A novel algorithm for remote photoplethysmography: Spatial subspace rotation. IEEE transactions on biomedical engineering, 63(9), 1974-1984. """ # utils functions # def __build_p(τ, k, l, U, Λ): """ builds P Parameters ---------- k: int The frame index l: int The temporal stride to use U: numpy.ndarray The eigenvectors of the c matrix (for all frames up to counter). Λ: numpy.ndarray The eigenvalues of the c matrix (for all frames up to counter). Returns ------- p: numpy.ndarray The p signal to add to the pulse. """ # SR' SR = np.zeros((3, l), np.float32) # dim: 3xl z = 0 for t in range(τ, k, 1): # 6, 7 a = Λ[0, t] b = Λ[1, τ] c = Λ[2, τ] d = U[:, 0, t].T e = U[:, 1, τ] f = U[:, 2, τ] g = U[:, 1, τ].T h = U[:, 2, τ].T x1 = a / b x2 = a / c x3 = np.outer(e, g) x4 = np.dot(d, x3) x5 = np.outer(f, h) x6 = np.dot(d, x5) x7 = np.sqrt(x1) x8 = np.sqrt(x2) x9 = x7 * x4 x10 = x8 * x6 x11 = x9 + x10 SR[:, z] = x11 # 8 | dim: 3 z += 1 # build p and add it to the final pulse signal s0 = SR[0, :] # dim: l s1 = SR[1, :] # dim: l p = s0 - ((np.std(s0) / np.std(s1)) * s1) # 10 | dim: l p = p - np.mean(p) # 11 return p # dim: l def __build_correlation_matrix(V): # V dim: (W×H)x3 #V = np.unique(V, axis=0) V_T = V.T # dim: 3x(W×H) N = V.shape[0] # build the correlation matrix C = np.dot(V_T, V) # dim: 3x3 C = C / N return C def __eigs(C): """ get eigenvalues and eigenvectors, sort them. Parameters ---------- C: numpy.ndarray The RGB values of skin-colored pixels. Returns ------- Λ: numpy.ndarray The eigenvalues of the correlation matrix U: numpy.ndarray The (sorted) eigenvectors of the correlation matrix """ # get eigenvectors and sort them according to eigenvalues (largest first) L, U = np.linalg.eig(C) # dim Λ: 3 | dim U: 3x3 idx = L.argsort() # dim: 3x1 idx = idx[::-1] # dim: 1x3 L_ = L[idx] # dim: 3 U_ = U[:, idx] # dim: 3x3 return L_, U_ # ----------------------------------- # fps = int(kargs['fps']) raw_sig = raw_signal K = len(raw_sig) l = int(fps) P = np.zeros(K) # 1 | dim: K # store the eigenvalues Λ and the eigenvectors U at each frame L = np.zeros((3, K), dtype=np.float32) # dim: 3xK U = np.zeros((3, 3, K), dtype=np.float32) # dim: 3x3xK for k in range(K): n_roi = len(raw_sig[k]) VV = [] V = raw_sig[k].astype(np.float32) idx = V!=0 idx2 = np.logical_and(np.logical_and(idx[:,:,0], idx[:,:,1]), idx[:,:,2]) V_skin_only = V[idx2] VV.append(V_skin_only) VV = np.vstack(VV) C = __build_correlation_matrix(VV) #dim: 3x3 # get: eigenvalues Λ, eigenvectors U L[:,k], U[:,:,k] = __eigs(C) # dim Λ: 3 | dim U: 3x3 # build p and add it to the pulse signal P if k >= l: # 5 tau = k - l # 5 p = __build_p(tau, k, l, U, L) # 6, 7, 8, 9, 10, 11 | dim: l P[tau:k] += p # 11 if np.isnan(np.sum(P)): print('NAN') print(raw_sig[k]) bvp = P bvp = np.expand_dims(bvp,axis=0) return bvp