Source code for pyVHR.BPM.utils

from sklearn.metrics import silhouette_score, pairwise_distances
from numpy import linalg as LA
from sklearn.cluster import KMeans
#from spherecluster import VonMisesFisherMixture
from sklearn.neighbors import KernelDensity
from lmfit import Model
from scipy.stats import iqr
import numpy as np
from scipy.signal import welch

"""
This module contains usefull methods used in pyVHR.BPM.BPM and
pyVHR.plot.visualize.
"""

[docs]def Welch(bvps, fps, minHz=0.65, maxHz=4.0, nfft=2048): """ This function computes Welch'method for spectral density estimation. Args: bvps(flaot32 numpy.ndarray): BVP signal as float32 Numpy.ndarray with shape [num_estimators, num_frames]. fps (float): frames per seconds. minHz (float): frequency in Hz used to isolate a specific subband [minHz, maxHz] (esclusive). maxHz (float): frequency in Hz used to isolate a specific subband [minHz, maxHz] (esclusive). nfft (int): number of DFT points, specified as a positive integer. Returns: Sample frequencies as float32 numpy.ndarray, and Power spectral density or power spectrum as float32 numpy.ndarray. """ _, n = bvps.shape if n < 256: seglength = n overlap = int(0.8*n) # fixed overlapping else: seglength = 256 overlap = 200 # -- periodogram by Welch F, P = welch(bvps, nperseg=seglength, noverlap=overlap, fs=fps, nfft=nfft) F = F.astype(np.float32) P = P.astype(np.float32) # -- freq subband (0.65 Hz - 4.0 Hz) band = np.argwhere((F > minHz) & (F < maxHz)).flatten() Pfreqs = 60*F[band] Power = P[:, band] return Pfreqs, Power
[docs]def circle_clustering(W, eps=0.01, theta0=None, normalize=False): """ TODO:documentare Args: W (tipo): spiegare W Returns: cosa ritorna (tipo): spiegare , questa funzione restituisce: P, kjs, Q, hahdh """ # general vars PI = np.pi n = W.shape[0] # param check if normalize: W = W / LA.norm(W) if theta0 is None: theta = 2*PI*np.random.rand(n) # init. values in [0, 2*PI] else: theta = theta0 # preliminar computations sin_t = np.sin(theta) cos_t = np.cos(theta) A = np.dot(W, cos_t) B = np.dot(W, sin_t) # main loop ok = True rounds = 0 while ok: ok = False rounds += 1 nchanges = 0 # loop on angles for i in range(n): old = theta[i] # change i-th theta #print(A[i], np.arctan(B[i]/A[i])) if np.abs(A[i])==0: if np.sign(A[i]*B[i])>0: theta[i] = PI/2 else: theta[i] = -PI/2 else: theta[i] = np.arctan(B[i]/A[i]) # within [-PI/2, PI/2] if A[i] >= 0: theta[i] += PI elif B[i] > 0: theta[i] += 2*PI # update Ak & Bk by elementwise product and diff A += np.multiply(W[i,:], np.repeat(np.cos(theta[i]) - np.cos(old), n)) B += np.multiply(W[i,:], np.repeat(np.sin(theta[i]) - np.sin(old), n)) if min(abs(old-theta[i]),abs(2*PI-old+theta[i])) > eps: ok = True nchanges += 1 return theta
[docs]def optimize_partition(theta, out_fact=1): n = theta.shape[0] T = theta[:,None] - theta # x[:,None] adds a second axis to the array T = np.cos(T) - np.eye(n) OK = True # compute partitions P,Q P = [] Q = [] Tmean = np.mean(theta) for idx,th in enumerate(theta): if np.sign(np.sin(Tmean-th)) > 0: P.append(idx) else: Q.append(idx) # adjust partitions P,Q while OK: OK = False for i in range(n): if i in P: if np.sum(T[i,P]) < np.sum(T[i,Q]) and len(P) > 1: Q.append(i) P.remove(i) OK = True else: if np.sum(T[i,P]) > np.sum(T[i,Q]) and len(Q) > 1: Q.remove(i) P.append(i) OK = True # pull out outliers from P and Q A = [] B = [] Z = [] for i in P: A.append(np.sum(T[i,P]) / (T[i,P].shape[0]-1)) M = np.max(A) M_idx = np.argmax(A) S = np.std(A) max_theta_P = theta[P[M_idx]] L = M - S # prune outliers in P for idx,i in enumerate(P): if M_idx != idx and A[idx] < L: Z.append(i) for i in Q: B.append(np.sum(T[i,Q]) / (T[i,Q].shape[0]-1)) M = np.max(B) M_idx = np.argmax(B) S = np.std(B) max_theta_Q = theta[Q[M_idx]] L = M - S # prune outliers in Q for idx,i in enumerate(Q): if M_idx != idx and B[idx] < L: Z.append(i) P = list(set(P) - set(Z)) Q = list(set(Q) - set(Z)) return P, Q, Z, max_theta_P, max_theta_Q
[docs]def gaussian(x,a,mu,sigma): return a*np.exp(-(x-mu)**2/(2*sigma**2))
[docs]def gaussian_fit(model,p, x, mu, max): result = model.fit(p, x=x, a=max, mu=mu, sigma=1) sigma = result.params['sigma'].value g = gaussian(x, max, mu, sigma) return result, g, sigma