Source code for pyVHR.realtime.GUI

from pyVHR.realtime.params import Params
from pyVHR.realtime.VHRroutine import *
import PySimpleGUI as sg
import threading
import ast
import pickle
import time

[docs]def GUI_MENU(): col1 = [[sg.Text("Set Parameters:")], [sg.Text("Video file path, or device number:")], [sg.In("", key="-VideoFileName-"), sg.FileBrowse(target=("-VideoFileName-"))], [sg.Text("Window Size:"), sg.In(str(Params.winSize), key="-winSize-", size=(5, 1))], [sg.Text("Stride:"), sg.In(str(Params.stride), key="-stride-", size=(5, 1))], [sg.Text("FPS fixed:"), sg.In("None", key="-fpsfixed-", size=(5, 1))], [sg.Text(" ")], [sg.Text("Skin Extractor:"), sg.Radio("Convex Hull", "RADIO2", default=True, key="-skinConvex-"), sg.Radio("FaceParsing", "RADIO2", default=False, key="-skinFaceParsing-")], [sg.Text("ROI:"), sg.Radio("Holistic", "RADIO3", default=False, key="-holistic-"), sg.Radio("Patches", "RADIO3", default=True, key="-patches-")], [sg.Text("Method for Signal computation:"), sg.Radio("mean", "RADIO0", default=True, key="-mean-"), sg.Radio("median", "RADIO0", default=False, key="-median-")], [sg.Text(" ")], [sg.Text("Patches type:"), sg.Radio("Squares", "RADIO4", default=True, key="-squares-"), sg.Radio("Rectangles", "RADIO4", default=False, key="-rects-")], [sg.Text("Landmarks list (ex: [1,2,3]):")], [sg.In("Default", key="-ldmkslist-")], [sg.Text("Squares dimension:"), sg.In(str(Params.squares_dim), key="-squares_dim-", size=(5, 1))], [sg.Text("Rects dimension (ex for two landmarks: [[20,15],[10,10]]):")], [sg.In("[[],]", key="-rectsdim-")], [sg.Text(" ")], [sg.Button('Apply Parameters', key='-APPLY-')], [sg.Button('START', key='-START-', visible=False)] ] col2 = [[sg.Text(" ")], [sg.Text("Skin RGB color low threshold (ex: input V -> RGB(V,V,V)): "), sg.In(str(Params.skin_color_low_threshold), key="-skin_color_low_threshold-", size=(5, 1))], [sg.Text("Skin RGB color high threshold (ex: input V -> RGB(V,V,V)): "), sg.In(str(Params.skin_color_high_threshold), key="-skin_color_high_threshold-", size=(5, 1))], [sg.Text("ROI RGB color low threshold (ex: input V -> RGB(V,V,V)): "), sg.In(str(Params.sig_color_low_threshold), key="-sig_color_low_threshold-", size=(5, 1))], [sg.Text("ROI RGB color high threshold (ex: input V -> RGB(V,V,V)): "), sg.In(str(Params.sig_color_high_threshold), key="-sig_color_high_threshold-", size=(5, 1))], [sg.Text(" ")], [sg.Text("Signal RGB color low threshold (ex: input V -> RGB(V,V,V)): "), sg.In(str(Params.color_low_threshold), key="-color_low_threshold-", size=(5, 1))], [sg.Text("Signal RGB color high threshold (ex: input V -> RGB(V,V,V)): "), sg.In(str(Params.color_high_threshold), key="-color_high_threshold-", size=(5, 1))], [sg.Text(" ")], [sg.Text("Visualize Skin:"), sg.Radio("True", "RADIO5", default=True, key="-visualizeskintrue-"), sg.Radio("False", "RADIO5", default=False, key="-visualizeskinfalse-")], [sg.Text("Visualize Landmarks:"), sg.Radio("True", "RADIO6", default=True, key="-visualizeldmkstrue-"), sg.Radio("False", "RADIO6", default=False, key="-visualizeldmksfalse-")], [sg.Text("\t - Visualize Patches:"), sg.Radio("True", "RADIO7", default=True, key="-visualizepatchestrue-"), sg.Radio("False", "RADIO7", default=False, key="-visualizepatchesfalse-")], [sg.Text("\t - Visualize Landmarks number:"), sg.Radio("True", "RADIO8", default=True, key="-visualizeldmksnumtrue-"), sg.Radio("False", "RADIO8", default=False, key="-visualizeldmksnumfalse-")], [sg.Text(" \n Font Color (R,G,B,A)")], [sg.In(str(Params.font_color), key='-FontColor-', size=(25, 1))], [sg.Text("Font Size: "), sg.In( str(Params.font_size), key='-FontSize-', size=(4, 1))], [sg.Text("For Editing PRE-POST filters, BVP method and BPM params edit:\n Params.pre_filter, Params.method, Params.post_filter,\n Params.minHz, Params.maxHz")] ] layout = [[sg.Column(col1, element_justification='l'), sg.Column(col2, element_justification='l')]] window = sg.Window('pyVHR', layout, finalize=True, location=(0, 0)) bpm_plot = [] bpm_save = [] bpm_plot_max = 60 image = None original_image = None skin_image = None patches_image = None vhr_t = None sd = SharedData() def create_run_window(): fps = Params.fps_fixed if Params.fps_fixed is not None else get_fps( Params.videoFileName) visualize_list = [[sg.Radio( 'Original Video', "RADIO11", default=True, key="-IN_Visualize_original-")], ] if Params.visualize_skin: visualize_list.append( [sg.Radio('Skin', "RADIO11", default=False, key="-IN_Visualize_skin-")]) if Params.approach == 'patches': visualize_list.append( [sg.Radio('Patches', "RADIO11", default=False, key="-IN_Visualize_patches-")]) run_col1 = [[sg.Text(Params.videoFileName)], [sg.Text( "Video FPS: "+str(fps)), sg.Text( "Video resolution: ", key="-res-")], [sg.Text("Visualize: ")], *visualize_list, [sg.Text('BPMs')], [sg.Image(filename='', key='-BPM_PLOT-')], [sg.Button('Stop')], [sg.Button('Save BPMs')]] run_col2 = [[sg.Text('Visualize')], [sg.Image(filename='', key='-VIDEO-')]] run_layout = [[sg.Column(run_col1, element_justification='l'), sg.Column(run_col2, element_justification='c')]] wr = sg.Window('pyVHR - RUN', run_layout, finalize=True, location=(0, 0)) # blank plots for compiling kernels imgbytes = cv2.imencode('.png', cv2.cvtColor( np.zeros((480, 640, 3), dtype=np.uint8), cv2.COLOR_BGR2RGB))[1].tobytes() wr['-VIDEO-'].update(data=imgbytes) fig = go.Figure() fig.add_trace(go.Scatter(x=np.arange(0, 10, 1), y=np.zeros(9), mode='lines+markers', name='lines+markers')) fig.update_layout(autosize=False, width=400, height=300, margin=dict( l=1, r=1, b=1, t=1, pad=1), paper_bgcolor="LightSteelBlue",) img_bytes = fig.to_image(format="png") wr['-BPM_PLOT-'].update(data=img_bytes) return wr run_window = None while True: win, event, values = sg.read_all_windows(timeout=16) if event in (None, 'Exit', 'Stop', sg.WIN_CLOSED) and run_window is not None: sd.q_stop_cap.put(0) # stop cap, it will stop VHR model thread sd.q_stop.put(0) # stop VHR model thread if cap can't vhr_t.join() vhr_t = None sd = None sd = SharedData() run_window.close() run_window = None elif event in (None, 'Exit', sg.WIN_CLOSED) and run_window is None: window.close() break if run_window is not None: run_win_event, run_win_values = run_window.read(timeout=16) # Video plot if not sd.q_video_image.empty(): original_image = sd.q_video_image.get() if not sd.q_skin_image.empty(): skin_image = sd.q_skin_image.get() if not sd.q_patches_image.empty(): patches_image = sd.q_patches_image.get() if original_image is not None and bool(run_win_values["-IN_Visualize_original-"]): image = original_image elif Params.visualize_skin and skin_image is not None and bool(run_win_values["-IN_Visualize_skin-"]): image = skin_image elif Params.visualize_landmarks and patches_image is not None and bool(run_win_values["-IN_Visualize_patches-"]): image = patches_image if image is not None: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) imgbytes = cv2.imencode('.png', image)[1].tobytes() run_window['-VIDEO-'].update(data=imgbytes) run_window['-res-'].update("Video resolution: " + str(image.shape[1])+" x "+str(image.shape[0])) # BPM plot if not sd.q_bpm.empty(): bpm = sd.q_bpm.get() bpm_plot.append(bpm) bpm_save.append(bpm) if len(bpm_plot) >= bpm_plot_max: bpm_plot = bpm_plot[1:] fig = go.Figure() fig.add_trace(go.Scatter(x=np.arange(0, len(bpm_plot), Params.stride), y=np.array(bpm_plot), mode='lines+markers', name='lines+markers')) fig.update_layout( autosize=False, width=600, height=400, margin=dict(l=1, r=1, b=1, t=1, pad=1), paper_bgcolor="LightSteelBlue", ) img_bytes = fig.to_image(format="png") run_window['-BPM_PLOT-'].update(data=img_bytes) if run_win_event == 'Save BPMs': try: path = Params.out_path if Params.out_path is not None else os.getcwd() + \ os.path.sep with open(str(path) + str(time.time())+'_BPMs', 'wb') as f: pickle.dump(bpm_save, f) print("BPMs saved!") except: print("[ERROR] wrong path for saving BPMs!") if event == '-APPLY-': if values['-VideoFileName-'] == '': Params.videoFileName = 0 window['-VideoFileName-'].update('0') else: Params.videoFileName = values['-VideoFileName-'] if values['-VideoFileName-'].isnumeric(): Params.videoFileName = int(values['-VideoFileName-']) Params.fake_delay = False else: Params.fake_delay = True if values['-winSize-'].isnumeric(): Params.winSize = int( values['-winSize-']) if int(values['-winSize-']) > 1 else 3 window['-winSize-'].update(str(Params.winSize)) if values['-stride-'].isnumeric(): Params.stride = int( values['-stride-']) if int(values['-stride-']) > 0 else 1 window['-stride-'].update(str(Params.stride)) Params.cuda = False if values['-fpsfixed-'].isnumeric(): Params.fps_fixed = int( values['-fpsfixed-']) if int(values['-fpsfixed-']) > 0 else None window['-fpsfixed-'].update(str(Params.fps_fixed)) Params.skin_extractor = 'convexhull' if bool( values['-skinConvex-']) else 'faceparsing' Params.approach = 'holistic' if bool( values['-holistic-']) else 'patches' Params.type = 'mean' if bool( values['-mean-']) else 'median' Params.patches = 'squares' if bool( values['-squares-']) else 'rects' try: ldlist = ast.literal_eval(values['-ldmkslist-']) Params.landmarks_list = ldlist if len( ldlist) > 0 else Params.landmarks_list except: window['-ldmkslist-'].update(str(Params.landmarks_list)) if values['-squares_dim-'].isnumeric(): Params.squares_dim = float( values['-squares_dim-']) if float(values['-squares_dim-']) > 0 else None else: window['-squares_dim-'].update(str(Params.squares_dim)) try: Params.rects_dims = ast.literal_eval(values['-rectsdim-']) # Default if len is not the same if len(Params.rects_dims) != len(Params.landmarks_list): new_rect_dim = [] for i in range(len(Params.landmarks_list)): new_rect_dim.append( [Params.squares_dim, Params.squares_dim]) Params.rects_dims = new_rect_dim window['-rectsdim-'].update(str(new_rect_dim)) except: # Default if parameter is wrong new_rect_dim = [] for i in range(len(Params.landmarks_list)): new_rect_dim.append( [Params.squares_dim, Params.squares_dim]) Params.rects_dims = new_rect_dim window['-rectsdim-'].update(str(new_rect_dim)) if values['-skin_color_low_threshold-'].isnumeric(): Params.skin_color_low_threshold = int( values['-skin_color_low_threshold-']) if int(values['-skin_color_low_threshold-']) >= 0 and int(values['-skin_color_low_threshold-']) <= 255 else 2 window['-skin_color_low_threshold-'].update( str(Params.skin_color_low_threshold)) if values['-skin_color_high_threshold-'].isnumeric(): Params.skin_color_high_threshold = int( values['-skin_color_high_threshold-']) if int(values['-skin_color_high_threshold-']) >= 0 and int(values['-skin_color_high_threshold-']) <= 255 else 254 window['-skin_color_high_threshold-'].update( str(Params.skin_color_high_threshold)) if values['-sig_color_low_threshold-'].isnumeric(): Params.sig_color_low_threshold = int( values['-sig_color_low_threshold-']) if int(values['-sig_color_low_threshold-']) >= 0 and int(values['-sig_color_low_threshold-']) <= 255 else 2 window['-sig_color_low_threshold-'].update( str(Params.sig_color_low_threshold)) if values['-sig_color_high_threshold-'].isnumeric(): Params.sig_color_high_threshold = int( values['-sig_color_high_threshold-']) if int(values['-sig_color_high_threshold-']) >= 0 and int(values['-sig_color_high_threshold-']) <= 255 else 254 window['-sig_color_high_threshold-'].update( str(Params.sig_color_high_threshold)) if values['-color_low_threshold-'].isnumeric(): Params.color_low_threshold = int( values['-color_low_threshold-']) if int(values['-color_low_threshold-']) >= 0 and int(values['-color_low_threshold-']) <= 255 else -1 window['-color_low_threshold-'].update( str(Params.color_low_threshold)) if values['-color_high_threshold-'].isnumeric(): Params.color_high_threshold = int( values['-color_high_threshold-']) if int(values['-color_high_threshold-']) >= 0 and int(values['-color_high_threshold-']) <= 255 else 255 window['-color_high_threshold-'].update( str(Params.color_high_threshold)) Params.visualize_skin = True if bool( values['-visualizeskintrue-']) else False Params.visualize_landmarks = True if bool( values['-visualizeldmkstrue-']) else False Params.visualize_patches = True if bool( values['-visualizepatchestrue-']) else False Params.visualize_landmarks_number = True if bool( values['-visualizeldmksnumtrue-']) else False try: colorfont = ast.literal_eval(values['-FontColor-']) if len(colorfont) == 4: correct = True for e in colorfont: if not(e >= 0 and e <= 255): correct = False if correct: Params.font_color = colorfont except: pass window['-FontColor-'].update(str(Params.font_color)) if values['-FontSize-'].isnumeric(): Params.font_size = float( values['-FontSize-']) if float(values['-FontSize-']) > 0.0 else 0.3 window['-FontSize-'].update(str(Params.font_size)) window['-START-'].update(visible=True) if event == '-START-' and not run_window: bpm_plot = [] bpm_save = [] image = None sd = SharedData() run_window = create_run_window() if vhr_t == None: vhr_t = threading.Thread(target=VHRroutine, args=(sd,)) vhr_t.daemon = False vhr_t.start() window.close()
if __name__ == '__main__': # Theese parameters can be edited only here. # Pre and Post filters are list of dictionary with the following structure: # {'filter_func': method_name, 'params': {}} # A rPPG method is a dictionary with the following structure: # {'method_func': method_name, 'device_type': 'device_name', 'params': {}} # device_type can be: 'cpu', 'torch'. Params.pre_filter = [{'filter_func': BPfilter, 'params': { 'minHz': 0.7, 'maxHz': 3.0, 'fps': 'adaptive', 'order': 6}}] Params.method = {'method_func': cpu_CHROM, 'device_type': 'cpu', 'params': {}} Params.post_filter = [{'filter_func': BPfilter, 'params': { 'minHz': 0.7, 'maxHz': 3.0, 'fps': 'adaptive', 'order': 6}}] # BPM extraction type # 'psd_clustering' or welch Params.BPM_extraction_type = 'welch' # Saving path for BPMs #Params.out_path = "/home/user/Downloads/" Params.out_path = None # Working directory will be used # Downscale input video to width 640 (preserve aspect-ratio) Params.resize = False GUI_MENU()