Source code for pyVHR.realtime.VHRroutine

from threading import Thread
import time
from time import sleep
from pyVHR.realtime.params import Params
from pyVHR.realtime.video_capture import VideoCapture
import cv2
import numpy as np
import plotly.graph_objects as go
from importlib import import_module, util
from pyVHR.datasets.dataset import datasetFactory
from pyVHR.utils.errors import getErrors, printErrors, displayErrors
from pyVHR.extraction.sig_processing import *
from pyVHR.extraction.sig_extraction_methods import *
from pyVHR.extraction.skin_extraction_methods import *
from pyVHR.BVP.BVP import *
from pyVHR.BPM.BPM import *
from pyVHR.BVP.methods import *
from pyVHR.BVP.filters import *
import PySimpleGUI as sg
import numpy as np
import plotly.graph_objects as go
import queue
import statistics


[docs]class SharedData: def __init__(self): self.q_bpm = queue.Queue() self.q_video_image = queue.Queue() self.q_skin_image = queue.Queue() self.q_patches_image = queue.Queue() self.q_stop = queue.Queue() self.q_stop_cap = queue.Queue() self.q_frames = queue.Queue()
[docs]def VHRroutine(sharedData): sig_ext_met = None ldmks_regions = None # Holistic settings # if Params.approach == 'holistic': sig_ext_met = holistic_mean # Patches settings # elif Params.approach == 'patches': # extraction method if Params.type == "mean" and Params.patches == "squares": sig_ext_met = landmarks_mean elif Params.type == "mean" and Params.patches == "rects": sig_ext_met = landmarks_mean_custom_rect elif Params.type == "median" and Params.patches == "squares": sig_ext_met = landmarks_median elif Params.type == "median" and Params.patches == "rects": sig_ext_met = landmarks_median_custom_rect # patches dims if Params.patches == "squares": ldmks_regions = np.float32(Params.squares_dim) elif Params.patches == "rects": ldmks_regions = np.float32(Params.rects_dims) SignalProcessingParams.RGB_LOW_TH = np.int32( Params.sig_color_low_threshold) SignalProcessingParams.RGB_HIGH_TH = np.int32( Params.sig_color_high_threshold) SkinProcessingParams.RGB_LOW_TH = np.int32( Params.skin_color_low_threshold) SkinProcessingParams.RGB_HIGH_TH = np.int32( Params.skin_color_high_threshold) color = np.array([Params.font_color[0], Params.font_color[1], Params.font_color[2]], dtype=np.uint8) skin_ex = None if Params.skin_extractor == 'convexhull': skin_ex = SkinExtractionConvexHull() elif Params.skin_extractor == 'faceparsing': skin_ex = SkinExtractionFaceParsing() mp_drawing = mp.solutions.drawing_utils mp_face_mesh = mp.solutions.face_mesh PRESENCE_THRESHOLD = 0.5 VISIBILITY_THRESHOLD = 0.5 if Params.fps_fixed is not None: fps = Params.fps_fixed else: fps = get_fps(Params.videoFileName) tot_frames = int(Params.tot_sec*fps) sig = [] processed_frames_count = 0 sig_buff_dim = int(fps * Params.winSize) sig_stride = int(fps * Params.stride) sig_buff_counter = sig_stride BPM_obj = None timeCount = [] cap = VideoCapture(Params.videoFileName, sharedData, fps=fps, sleep=Params.fake_delay, resize=Params.resize) send_images_count = 0 send_images_stride = 3 with mp_face_mesh.FaceMesh( max_num_faces=1, min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_mesh: while True: start_time = time.perf_counter()*1000 frame = None if not sharedData.q_frames.empty(): # read frames from shared data frame = sharedData.q_frames.get() if type(frame) == int: # cap stopped break if not sharedData.q_stop.empty(): # GUI stopped sharedData.q_stop.get() break if frame is None: continue # convert the BGR image to RGB. image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) processed_frames_count += 1 width = image.shape[1] height = image.shape[0] # [landmarks, info], with info->x_center ,y_center, r, g, b ldmks = np.zeros((468, 5), dtype=np.float32) ldmks[:, 0] = -1.0 ldmks[:, 1] = -1.0 magic_ldmks = [] ### face landmarks ### results = face_mesh.process(image) if results.multi_face_landmarks: face_landmarks = results.multi_face_landmarks[0] landmarks = [l for l in face_landmarks.landmark] for idx in range(len(landmarks)): landmark = landmarks[idx] if not ((landmark.HasField('visibility') and landmark.visibility < VISIBILITY_THRESHOLD) or (landmark.HasField('presence') and landmark.presence < PRESENCE_THRESHOLD)): coords = mp_drawing._normalized_to_pixel_coordinates( landmark.x, landmark.y, width, height) if coords: ldmks[idx, 0] = coords[1] ldmks[idx, 1] = coords[0] ### skin extraction ### cropped_skin_im, full_skin_im = skin_ex.extract_skin( image, ldmks) else: cropped_skin_im = np.zeros_like(image) full_skin_im = np.zeros_like(image) ### SIG ### if Params.approach == 'patches': magic_ldmks = np.array( ldmks[Params.landmarks_list], dtype=np.float32) temp = sig_ext_met(magic_ldmks, full_skin_im, ldmks_regions, np.int32(SignalProcessingParams.RGB_LOW_TH), np.int32(SignalProcessingParams.RGB_HIGH_TH)) temp = temp[:, 2:] # keep only rgb mean elif Params.approach == 'holistic': temp = sig_ext_met(cropped_skin_im, np.int32( SignalProcessingParams.RGB_LOW_TH), np.int32(SignalProcessingParams.RGB_HIGH_TH)) sig.append(temp) # visualize original, patches and skin if send_images_count == send_images_stride: send_images_count = 0 sharedData.q_video_image.put(image) if Params.visualize_skin == True: sharedData.q_skin_image.put(full_skin_im) if Params.approach == 'patches' and Params.visualize_landmarks == True: annotated_image = full_skin_im.copy() for idx in Params.landmarks_list: cv2.circle( annotated_image, (int(ldmks[idx, 1]), int(ldmks[idx, 0])), radius=0, color=Params.font_color, thickness=-1) if Params.visualize_landmarks_number == True: cv2.putText(annotated_image, str(idx), (int(ldmks[idx, 1]), int(ldmks[idx, 0])), cv2.FONT_HERSHEY_SIMPLEX, Params.font_size, Params.font_color, 1) if Params.visualize_patches == True: if Params.patches == "squares": sides = [Params.squares_dim, ] * len(magic_ldmks) sides = np.array(sides) annotated_image = draw_rects( annotated_image, np.array(magic_ldmks[:, 1]), np.array(magic_ldmks[:, 0]), sides, sides, color) elif Params.patches == "rects": rects_dims = np.array(Params.rects_dims) annotated_image = draw_rects( annotated_image, np.array(magic_ldmks[:, 1]), np.array(magic_ldmks[:, 0]), rects_dims[:, 0], rects_dims[:, 1], color) # visualize patches sharedData.q_patches_image.put(annotated_image) else: send_images_count += 1 if processed_frames_count > sig_buff_dim: sig = sig[1:] if sig_buff_counter == 0: sig_buff_counter = sig_stride copy_sig = np.array(sig, dtype=np.float32) copy_sig = np.swapaxes(copy_sig, 0, 1) copy_sig = np.swapaxes(copy_sig, 1, 2) ### Pre_filtering ### if Params.approach == 'patches': copy_sig = rgb_filter_th(copy_sig, **{'RGB_LOW_TH': np.int32(Params.color_low_threshold), 'RGB_HIGH_TH': np.int32(Params.color_high_threshold)}) for filt in Params.pre_filter: if filt != {}: if 'fps' in filt['params'] and filt['params']['fps'] == 'adaptive' and fps is not None: filt['params']['fps'] = float(fps) if filt['params'] == {}: copy_sig = filt['filter_func']( copy_sig) else: copy_sig = filt['filter_func']( copy_sig, **filt['params']) ### BVP ### bvp = np.zeros((0, 1), dtype=np.float32) if Params.method['device_type'] == 'cpu': bvp = signals_to_bvps_cpu( copy_sig, Params.method['method_func'], Params.method['params']) elif Params.method['device_type'] == 'torch': bvp = signals_to_bvps_torch( copy_sig, Params.method['method_func'], Params.method['params']) ### Post_filtering ### for filt in Params.pre_filter: if filt != {}: bvp = np.expand_dims(bvp, axis=1) if 'fps' in filt['params'] and filt['params']['fps'] == 'adaptive' and fps is not None: filt['params']['fps'] = float(fps) if filt['params'] == {}: bvp = filt['filter_func'](bvp) else: bvp = filt['filter_func']( bvp, **filt['params']) bvp = np.squeeze(bvp, axis=1) ### BPM ### if BPM_obj == None: BPM_obj = BPM(bvp, fps, minHz=Params.minHz, maxHz=Params.maxHz) else: BPM_obj.data = bvp if Params.BPM_extraction_type == "welch": bpm = BPM_obj.BVP_to_BPM() elif Params.BPM_extraction_type == "psd_clustering": bpm = BPM_obj.BVP_to_BPM_PSD_clustering() if Params.approach == 'patches': # Median of multi BPMs if len(bpm.shape) > 0 and bpm.shape[0] == 0: bpm = np.float32(0.0) else: bpm = np.float32(np.median(bpm)) sharedData.q_bpm.put(bpm) else: sig_buff_counter -= 1 end_time = time.perf_counter()*1000 timeCount.append(end_time-start_time) if len(timeCount) > 100: timeCount = timeCount[1:] ### loop break ### if tot_frames is not None and tot_frames > 0 and processed_frames_count >= tot_frames: break if len(timeCount) > 2: print("Times in milliseconds of the computation of a frame:") print("mean: ",statistics.mean(timeCount)) print("median: ",statistics.median(timeCount)) print("max: ",max(timeCount)) print("min: ",min(timeCount)) return
""" if __name__ == "__main__": sd = SharedData() Params.videoFileName = "/home/frea/Documents/VHR/LGI_PPGI/lgi_alex/alex_resting/cv_camera_sensor_stream_handler.avi" Params.tot_sec = 0 t = Thread(target=VHRroutine, args=(sd,)) t.start() t.join() """