Source code for pyVHR.realtime.video_capture

import cv2
import threading
import time
from pyVHR.extraction.utils import *


[docs]class VideoCapture: # bufferless VideoCapture def __init__(self, name, sharedData, fps=None, sleep=False, resize=True): self.cap = cv2.VideoCapture(name) self.sleep = sleep self.resize = resize self.fps = None if fps is not None: self.cap.set(cv2.CAP_PROP_FPS, fps) self.fps = fps self.sd = sharedData self.t = threading.Thread(target=self._reader) self.t.daemon = False # when process ends all deamon will be killed self.t.start() # read frames as soon as they are available def _reader(self): while True: if not self.sd.q_stop_cap.empty(): # GUI stopped self.sd.q_stop_cap.get() self.sd.q_stop.put(0) # stop VHR model self.sd.q_frames.put(0) break ret, frame = self.cap.read() if not ret: # IO error self.sd.q_stop.put(0) # stop VHR model self.sd.q_frames.put(0) # end extraction -> send number break if self.resize: h, w = frame.shape[0], frame.shape[1] if h != 480 or w != 640: frame = cv2.resize(frame, (640, int(640*h/w)), interpolation=cv2.INTER_NEAREST) self.sd.q_frames.put(frame) if self.sleep and self.fps is not None: time.sleep(self.fps / 1000.0) self.cap.release()
""" if __name__ == "__main__": import cv2 from pyVHR.realtime.VHRroutine import SharedData sd = SharedData() cap = VideoCapture( "/home/frea/Documents/VHR/LGI_PPGI/lgi_alex/alex_resting/cv_camera_sensor_stream_handler.avi", fps=25, sharedData=sd) # Check if the webcam is opened correctly for i in range(10): frame = sd.q_frames.get() #cv2.imshow('Input', frame) print(frame.shape) sd.q_stop_cap.put(0) time.sleep(2) """