import numpy as np
from scipy.signal import welch
from scipy.signal import find_peaks, stft
from plotly.subplots import make_subplots
from plotly.colors import n_colors
import plotly.graph_objects as go
from scipy.interpolate import interp1d
from pyVHR.plot.visualize import VisualizeParams
from pyVHR.BPM.utils import *
"""
This module contains all the class and the methods for transforming
a BVP signal in a BPM signal.
"""
[docs]class BVPsignal:
"""
Manage (multi-channel, row-wise) BVP signals, and transforms it in BPM.
This class is used to obtain BPM signal from ground truth BVP signal.
"""
nFFT = 2048 # freq. resolution for STFTs
step = 1 # step in seconds
def __init__(self, data, fs, startTime=0, minHz=0.75, maxHz=4., verb=False):
if len(data.shape) == 1:
self.data = data.reshape(1, -1) # 2D array raw-wise
else:
self.data = data
self.fs = fs # sample rate
self.startTime = startTime
self.verb = verb
self.minHz = minHz
self.maxHz = maxHz
[docs] def spectrogram(self, winsize=5):
"""
Compute the BVP signal spectrogram restricted to the
band 42-240 BPM by using winsize (in sec) samples.
"""
# -- spect. Z is 3-dim: Z[#chnls, #freqs, #times]
F, T, Z = stft(self.data,
self.fs,
nperseg=self.fs*winsize,
noverlap=self.fs*(winsize-self.step),
boundary='even',
nfft=self.nFFT)
Z = np.squeeze(Z, axis=0)
# -- freq subband (0.75 Hz - 4.0 Hz)
minHz = 0.65
maxHz = 4.0
band = np.argwhere((F > minHz) & (F < maxHz)).flatten()
self.spect = np.abs(Z[band, :]) # spectrum magnitude
self.freqs = 60*F[band] # spectrum freq in bpm
self.times = T # spectrum times
# -- BPM estimate by spectrum
self.bpm = self.freqs[np.argmax(self.spect, axis=0)]
[docs] def displaySpectrum(self, display=False, dims=3):
"""Show the spectrogram of the BVP signal"""
# -- check if bpm exists
try:
bpm = self.bpm
except AttributeError:
self.spectrogram()
bpm = self.bpm
t = self.times
f = self.freqs
S = self.spect
fig = go.Figure()
fig.add_trace(go.Heatmap(z=S, x=t, y=f, colorscale="viridis"))
fig.add_trace(go.Scatter(
x=t, y=bpm, name='Frequency Domain', line=dict(color='red', width=2)))
fig.update_layout(autosize=False, height=420, showlegend=True,
title='Spectrogram of the BVP signal',
xaxis_title='Time (sec)',
yaxis_title='BPM (60*Hz)',
legend=dict(
x=0,
y=1,
traceorder="normal",
font=dict(
family="sans-serif",
size=12,
color="black"),
bgcolor="LightSteelBlue",
bordercolor="Black",
borderwidth=2)
)
fig.show(renderer=VisualizeParams.renderer)
[docs] def getBPM(self, winsize=5):
"""
Get the BPM signal extracted from the ground truth BVP signal.
"""
self.spectrogram(winsize)
return self.bpm, self.times
[docs]class BPM:
"""
This class transforms a BVP signal in a BPM signal using CPU.
BVP signal must be a float32 numpy.ndarray with shape [num_estimators, num_frames].
"""
def __init__(self, data, fps, startTime=0, minHz=0.65, maxHz=4., verb=False):
"""
Input 'data' is a BVP signal defined as a float32 Numpy.ndarray with shape [num_estimators, num_frames]
"""
self.nFFT = 2048//1 # freq. resolution for STFTs
if len(data.shape) == 1:
self.data = data.reshape(1, -1) # 2D array raw-wise
else:
self.data = data
self.fps = fps # sample rate
self.startTime = startTime
self.verb = verb
self.minHz = minHz
self.maxHz = maxHz
# PSD_clustering needs a gaussain fitting model
self.gmodel = Model(gaussian, independent_vars=['x', 'mu', 'a'])
[docs] def BVP_to_BPM(self):
"""
Return the BPM signal as a float32 Numpy.ndarray with shape [num_estimators, ].
This method use the Welch's method to estimate the spectral density of the BVP signal,
then it chooses as BPM the maximum Amplitude frequency.
"""
if self.data.shape[0] == 0:
return np.float32(0.0)
Pfreqs, Power = Welch(self.data, self.fps, self.minHz, self.maxHz, self.nFFT)
# -- BPM estimate
Pmax = np.argmax(Power, axis=1) # power max
return Pfreqs[Pmax.squeeze()]
[docs] def BVP_to_BPM_PSD_clustering(self, out_fact=1):
"""
Return the BPM signal as a numpy.float32.
TODO: cambiare descrizione.
This method use the Welch's method to estimate the spectral density of the BVP signal; in case
of multiple estimators the method sum all the Power Spectums, then it chooses as BPM the
maximum Amplitude frequency.
"""
# -- interpolation for less than 256 samples
if self.data.shape[0] == 0:
return np.float32(0.0)
Pfreqs, Power = Welch(self.data, self.fps, self.minHz, self.maxHz, self.nFFT)
# F are frequencies, PSD is Power Spectrum Density of all estimators
F = Pfreqs
PSD = Power
# Less then 3 estimators - don't need clustering
# we choose the maximum Amplitude frequency
if PSD.shape[0] < 3:
IDmax = np.unravel_index(np.argmax(PSD, axis=None), PSD.shape)
Fmax = F[IDmax[1]]
return np.float32(Fmax)
W = pairwise_distances(PSD, PSD, metric='cosine')
theta = circle_clustering(W, eps=0.01)
# bi-partition, sum and normalization
PSD0, PSD1, PSD_outlayers, median_PSD0, median_PSD1 = optimize_partition(theta, out_fact=out_fact)
P0 = np.sum(PSD[PSD0,:], axis=0)
max = np.max(P0, axis=0)
max = np.expand_dims(max, axis=0)
P0 = np.squeeze(np.divide(P0, max))
P1 = np.sum(PSD[PSD1,:], axis=0)
max = np.max(P1, axis=0)
max = np.expand_dims(max, axis=0)
P1 = np.squeeze(np.divide(P1, max))
# peaks
peak0_idx = np.argmax(P0)
P0_max = P0[peak0_idx]
F0 = F[peak0_idx]
peak1_idx = np.argmax(P1)
P1_max = P1[peak1_idx]
F1 = F[peak1_idx]
# Gaussian fitting
result0, G1, sigma0 = gaussian_fit(self.gmodel, P0, F, F0, 1) # Gaussian fit
result1, G2, sigma1 = gaussian_fit(self.gmodel, P1, F, F1, 1) # Gaussian fit
chis0 = result0.chisqr
chis1 = result1.chisqr
rchis0 = result0.redchi
rchis1 = result1.redchi
aic0 = result0.aic
aic1 = result1.aic
bic0 = result0.bic
bic1 = result1.bic
# ranking
rankP0 = 0
rankP1 = 0
rankP0 = rankP0 + 1 if chis0 < chis1 else rankP0
rankP1 = rankP1 + 1 if chis0 >= chis1 else rankP1
rankP0 = rankP0 + 1 if rchis0 < rchis1 else rankP0
rankP1 = rankP1 + 1 if rchis0 >= rchis1 else rankP1
rankP0 = rankP0 + 1 if aic0 < aic1 else rankP0
rankP1 = rankP1 + 1 if aic0 >= aic1 else rankP1
rankP0 = rankP0 + 1 if bic0 < bic1 else rankP0
rankP1 = rankP1 + 1 if bic0 >= bic1 else rankP1
rankP0 = rankP0 + 1 if sigma0 < sigma1 else rankP0
rankP1 = rankP1 + 1 if sigma0 >= sigma1 else rankP1
# best fit
bpm = None
if rankP0 > rankP1:
bpm = F0
else:
bpm = F1
return np.float32(bpm)
# --------------------------------------------------------------------- #
[docs]def BVP_to_BPM(bvps, fps, minHz=0.65, maxHz=4.):
"""
Transform a BVP windowed signal in a BPM signal using CPU.
This method use the Welch's method to estimate the spectral density of the BVP signal,
then it chooses as BPM the maximum Amplitude frequency.
Args:
bvps (list): list of length num_windows of BVP signal defined as float32 Numpy.ndarray with shape [num_estimators, num_frames].
fps (float): frames per seconds.
minHz (float): frequency in Hz used to isolate a specific subband [minHz, maxHz] (esclusive).
maxHz (float): frequency in Hz used to isolate a specific subband [minHz, maxHz] (esclusive).
Returns:
A list of length num_windows of BPM signals defined as a float32 Numpy.ndarray with shape [num_estimators, ].
If any BPM can't be found in a window, then the ndarray has num_estimators == 0.
"""
bpms = []
obj = None
for bvp in bvps:
if obj is None:
obj = BPM(bvp, fps, minHz=minHz, maxHz=maxHz)
else:
obj.data = bvp
bpm_es = obj.BVP_to_BPM()
bpms.append(bpm_es)
return bpms
[docs]def BVP_to_BPM_PSD_clustering(bvps, fps, minHz=0.65, maxHz=4., out_fact=1):
"""
Transform a BVP windowed signal in a BPM signal using CPU.
TODO: riscrivere descrizione
This method use the Welch's method to estimate the spectral density of the BVP signal; in case
of multiple estimators the method sum all the Power Spectums, then it chooses as BPM the
maximum Amplitude frequency.
Args:
bvps (list): list of length num_windows of BVP signal defined as float32 Numpy.ndarray with shape [num_estimators, num_frames].
fps (float): frames per seconds.
minHz (float): frequency in Hz used to isolate a specific subband [minHz, maxHz] (esclusive).
maxHz (float): frequency in Hz used to isolate a specific subband [minHz, maxHz] (esclusive).
Returns:
A list of length num_windows of BPM signals defined as a numpy.float32.
If any BPM can't be found in a window, then the BPM is 0.0.
"""
bpms = []
obj = None
for bvp in bvps:
if obj is None:
obj = BPM(bvp, fps, minHz=minHz, maxHz=maxHz)
else:
obj.data = bvp
bpm_es = obj.BVP_to_BPM_PSD_clustering(out_fact=out_fact)
bpms.append(bpm_es)
return bpms