diff --git a/detect_objects.py b/detect_objects.py index ba1bc24b7..2c700f2a3 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -2,6 +2,7 @@ import cv2 import time import queue import yaml +import threading import multiprocessing as mp import subprocess as sp import numpy as np @@ -50,10 +51,40 @@ GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {}) WEB_PORT = CONFIG.get('web_port', 5000) DEBUG = (CONFIG.get('debug', '0') == '1') +# TODO: make CPU/Coral switching more seamless # MODEL_PATH = CONFIG.get('tflite_model', '/lab/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite') MODEL_PATH = CONFIG.get('tflite_model', '/lab/detect.tflite') LABEL_MAP = CONFIG.get('label_map', '/lab/labelmap.txt') + +class CameraWatchdog(threading.Thread): + def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue): + threading.Thread.__init__(self) + self.camera_processes = camera_processes + self.config = config + self.tflite_process = tflite_process + self.tracked_objects_queue = tracked_objects_queue + + def run(self): + time.sleep(10) + while True: + # wait a bit before checking + time.sleep(10) + + for name, camera_process in self.camera_processes.items(): + process = camera_process['process'] + if not process.is_alive(): + print(f"Process for {name} is not alive. Starting again...") + camera_process['fps'].value = 10.0 + camera_process['skipped_fps'].value = 0.0 + process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, + self.tflite_process.detect_lock, self.tflite_process.detect_ready, self.tflite_process.frame_ready, self.tracked_objects_queue, + camera_process['fps'], camera_process['skipped_fps'])) + process.daemon = True + camera_process['process'] = process + process.start() + print(f"Camera_process started for {name}: {process.pid}") + def main(): # connect to mqtt and setup last will def on_connect(client, userdata, flags, rc): @@ -101,22 +132,24 @@ def main(): tflite_process = EdgeTPUProcess(MODEL_PATH) # start the camera processes - camera_processes = [] - camera_stats_values = {} + camera_processes = {} for name, config in CONFIG['cameras'].items(): - camera_stats_values[name] = { + camera_processes[name] = { 'fps': mp.Value('d', 10.0), 'skipped_fps': mp.Value('d', 0.0) } camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue, - camera_stats_values[name]['fps'], camera_stats_values[name]['skipped_fps'])) + camera_processes[name]['fps'], camera_processes[name]['skipped_fps'])) camera_process.daemon = True - camera_processes.append(camera_process) + camera_processes[name]['process'] = camera_process - for camera_process in camera_processes: - camera_process.start() - print(f"Camera_process started {camera_process.pid}") + for name, camera_process in camera_processes.items(): + camera_process['process'].start() + print(f"Camera_process started for {name}: {camera_process['process'].pid}") + + camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue) + camera_watchdog.start() object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue) object_processor.start() @@ -138,7 +171,7 @@ def main(): } } - for name, camera_stats in camera_stats_values.items(): + for name, camera_stats in camera_processes.items(): stats[name] = { 'fps': camera_stats['fps'].value, 'skipped_fps': camera_stats['skipped_fps'].value @@ -183,8 +216,7 @@ def main(): app.run(host='0.0.0.0', port=WEB_PORT, debug=False) - for camera_process in camera_processes: - camera_process.join() + camera_watchdog.join() plasma_process.terminate() diff --git a/frigate/video.py b/frigate/video.py index f06e2a9f4..b67b0172d 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -55,534 +55,6 @@ def get_ffmpeg_input(ffmpeg_input): frigate_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')} return ffmpeg_input.format(**frigate_vars) -<<<<<<< HEAD -class CameraWatchdog(threading.Thread): - def __init__(self, camera): - threading.Thread.__init__(self) - self.camera = camera - - def run(self): - prctl.set_name(self.__class__.__name__) - while True: - # wait a bit before checking - time.sleep(10) - - if self.camera.frame_time.value != 0.0 and (datetime.datetime.now().timestamp() - self.camera.frame_time.value) > self.camera.watchdog_timeout: - print(self.camera.name + ": last frame is more than 5 minutes old, restarting camera capture...") - self.camera.start_or_restart_capture() - time.sleep(5) - -# Thread to read the stdout of the ffmpeg process and update the current frame -class CameraCapture(threading.Thread): - def __init__(self, camera): - threading.Thread.__init__(self) - self.camera = camera - - def run(self): - prctl.set_name(self.__class__.__name__) - frame_num = 0 - while True: - if self.camera.ffmpeg_process.poll() != None: - print(self.camera.name + ": ffmpeg process is not running. exiting capture thread...") - break - - raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size) - - if len(raw_image) == 0: - print(self.camera.name + ": ffmpeg didnt return a frame. something is wrong. exiting capture thread...") - break - - frame_num += 1 - if (frame_num % self.camera.take_frame) != 0: - continue - - with self.camera.frame_lock: - # TODO: use frame_queue instead - self.camera.frame_time.value = datetime.datetime.now().timestamp() - self.camera.frame_cache[self.camera.frame_time.value] = ( - np - .frombuffer(raw_image, np.uint8) - .reshape(self.camera.frame_shape) - ) - self.camera.frame_queue.put(self.camera.frame_time.value) - # Notify with the condition that a new frame is ready - with self.camera.frame_ready: - self.camera.frame_ready.notify_all() - - self.camera.fps.update() - -class VideoWriter(threading.Thread): - def __init__(self, camera): - threading.Thread.__init__(self) - self.camera = camera - - def run(self): - prctl.set_name(self.__class__.__name__) - while True: - (frame_time, tracked_objects) = self.camera.frame_output_queue.get() - # if len(tracked_objects) == 0: - # continue - # f = open(f"/debug/output/{self.camera.name}-{str(format(frame_time, '.8f'))}.jpg", 'wb') - # f.write(self.camera.frame_with_objects(frame_time, tracked_objects)) - # f.close() - -class Camera: - def __init__(self, name, ffmpeg_config, global_objects_config, config, prepped_frame_queue, mqtt_client, mqtt_prefix): - self.name = name - self.config = config - self.detected_objects = defaultdict(lambda: []) - self.frame_cache = {} - self.last_processed_frame = None - # queue for re-assembling frames in order - self.frame_queue = queue.Queue() - # track how many regions have been requested for a frame so we know when a frame is complete - self.regions_in_process = {} - # Lock to control access - self.regions_in_process_lock = mp.Lock() - self.finished_frame_queue = queue.Queue() - self.refined_frame_queue = queue.Queue() - self.frame_output_queue = queue.Queue() - - self.ffmpeg = config.get('ffmpeg', {}) - self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input']) - self.ffmpeg_global_args = self.ffmpeg.get('global_args', ffmpeg_config['global_args']) - self.ffmpeg_hwaccel_args = self.ffmpeg.get('hwaccel_args', ffmpeg_config['hwaccel_args']) - self.ffmpeg_input_args = self.ffmpeg.get('input_args', ffmpeg_config['input_args']) - self.ffmpeg_output_args = self.ffmpeg.get('output_args', ffmpeg_config['output_args']) - - camera_objects_config = config.get('objects', {}) - - self.take_frame = self.config.get('take_frame', 1) - self.watchdog_timeout = self.config.get('watchdog_timeout', 300) - self.snapshot_config = { - 'show_timestamp': self.config.get('snapshots', {}).get('show_timestamp', True) - } - self.regions = self.config['regions'] - if 'width' in self.config and 'height' in self.config: - self.frame_shape = (self.config['height'], self.config['width'], 3) - else: - self.frame_shape = get_frame_shape(self.ffmpeg_input) - self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2] - self.mqtt_client = mqtt_client - self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name) - - # create shared value for storing the frame_time - self.frame_time = mp.Value('d', 0.0) - # Lock to control access to the frame - self.frame_lock = mp.Lock() - # Condition for notifying that a new frame is ready - self.frame_ready = mp.Condition() - # Condition for notifying that objects were tracked - self.objects_tracked = mp.Condition() - - # Queue for prepped frames, max size set to (number of regions * 5) - self.resize_queue = queue.Queue() - - # Queue for raw detected objects - self.detected_objects_queue = queue.Queue() - self.detected_objects_processor = DetectedObjectsProcessor(self) - self.detected_objects_processor.start() - - # initialize the frame cache - self.cached_frame_with_objects = { - 'frame_bytes': [], - 'frame_time': 0 - } - - self.ffmpeg_process = None - self.capture_thread = None - self.fps = EventsPerSecond() - self.skipped_region_tracker = EventsPerSecond() - - # combine tracked objects lists - self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', [])) - - # merge object filters - global_object_filters = global_objects_config.get('filters', {}) - camera_object_filters = camera_objects_config.get('filters', {}) - objects_with_config = set().union(global_object_filters.keys(), camera_object_filters.keys()) - self.object_filters = {} - for obj in objects_with_config: - self.object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})} - - # start a thread to track objects - self.object_tracker = ObjectTracker(self, 10) - self.object_tracker.start() - - # start a thread to write tracked frames to disk - self.video_writer = VideoWriter(self) - self.video_writer.start() - - # start a thread to queue resize requests for regions - self.region_requester = RegionRequester(self) - self.region_requester.start() - - # start a thread to cache recent frames for processing - self.frame_tracker = FrameTracker(self.frame_time, - self.frame_ready, self.frame_lock, self.frame_cache) - self.frame_tracker.start() - - # start a thread to resize regions - self.region_prepper = RegionPrepper(self, self.frame_cache, self.resize_queue, prepped_frame_queue) - self.region_prepper.start() - - # start a thread to store the highest scoring recent frames for monitored object types - self.best_frames = BestFrames(self) - self.best_frames.start() - - # start a thread to expire objects from the detected objects list - self.object_cleaner = ObjectCleaner(self) - self.object_cleaner.start() - - # start a thread to refine regions when objects are clipped - self.dynamic_region_fps = EventsPerSecond() - self.region_refiner = RegionRefiner(self) - self.region_refiner.start() - self.dynamic_region_fps.start() - - # start a thread to publish object scores - mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self) - mqtt_publisher.start() - - # create a watchdog thread for capture process - self.watchdog = CameraWatchdog(self) - - # load in the mask for object detection - if 'mask' in self.config: - self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE) - else: - self.mask = None - - if self.mask is None: - self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8) - self.mask[:] = 255 - - - def start_or_restart_capture(self): - if not self.ffmpeg_process is None: - print("Terminating the existing ffmpeg process...") - self.ffmpeg_process.terminate() - try: - print("Waiting for ffmpeg to exit gracefully...") - self.ffmpeg_process.wait(timeout=30) - except sp.TimeoutExpired: - print("FFmpeg didnt exit. Force killing...") - self.ffmpeg_process.kill() - self.ffmpeg_process.wait() - - print("Waiting for the capture thread to exit...") - self.capture_thread.join() - self.ffmpeg_process = None - self.capture_thread = None - -# # Thread to read the stdout of the ffmpeg process and update the current frame -# class CameraCapture(threading.Thread): -# def __init__(self, camera): -# threading.Thread.__init__(self) -# self.camera = camera - -# def run(self): -# prctl.set_name(self.__class__.__name__) -# frame_num = 0 -# while True: -# if self.camera.ffmpeg_process.poll() != None: -# print(self.camera.name + ": ffmpeg process is not running. exiting capture thread...") -# break - -# raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size) - -# if len(raw_image) == 0: -# print(self.camera.name + ": ffmpeg didnt return a frame. something is wrong. exiting capture thread...") -# break - -# frame_num += 1 -# if (frame_num % self.camera.take_frame) != 0: -# continue - -# with self.camera.frame_lock: -# # TODO: use frame_queue instead -# self.camera.frame_time.value = datetime.datetime.now().timestamp() -# self.camera.frame_cache[self.camera.frame_time.value] = ( -# np -# .frombuffer(raw_image, np.uint8) -# .reshape(self.camera.frame_shape) -# ) -# self.camera.frame_queue.put(self.camera.frame_time.value) -# # Notify with the condition that a new frame is ready -# with self.camera.frame_ready: -# self.camera.frame_ready.notify_all() - -# self.camera.fps.update() - -# class VideoWriter(threading.Thread): -# def __init__(self, camera): -# threading.Thread.__init__(self) -# self.camera = camera - -# def run(self): -# prctl.set_name(self.__class__.__name__) -# while True: -# (frame_time, tracked_objects) = self.camera.frame_output_queue.get() -# # if len(tracked_objects) == 0: -# # continue -# # f = open(f"/debug/output/{self.camera.name}-{str(format(frame_time, '.8f'))}.jpg", 'wb') -# # f.write(self.camera.frame_with_objects(frame_time, tracked_objects)) -# # f.close() - -# class Camera: -# def __init__(self, name, ffmpeg_config, global_objects_config, config, tflite_process, mqtt_client, mqtt_prefix): -# self.name = name -# self.config = config -# self.detected_objects = defaultdict(lambda: []) -# self.frame_cache = {} -# self.last_processed_frame = None -# # queue for re-assembling frames in order -# self.frame_queue = queue.Queue() -# # track how many regions have been requested for a frame so we know when a frame is complete -# self.regions_in_process = {} -# # Lock to control access -# self.regions_in_process_lock = mp.Lock() -# self.finished_frame_queue = queue.Queue() -# self.refined_frame_queue = queue.Queue() -# self.frame_output_queue = queue.Queue() - -# self.ffmpeg = config.get('ffmpeg', {}) -# self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input']) -# self.ffmpeg_global_args = self.ffmpeg.get('global_args', ffmpeg_config['global_args']) -# self.ffmpeg_hwaccel_args = self.ffmpeg.get('hwaccel_args', ffmpeg_config['hwaccel_args']) -# self.ffmpeg_input_args = self.ffmpeg.get('input_args', ffmpeg_config['input_args']) -# self.ffmpeg_output_args = self.ffmpeg.get('output_args', ffmpeg_config['output_args']) - -# camera_objects_config = config.get('objects', {}) - -# self.take_frame = self.config.get('take_frame', 1) -# self.watchdog_timeout = self.config.get('watchdog_timeout', 300) -# self.snapshot_config = { -# 'show_timestamp': self.config.get('snapshots', {}).get('show_timestamp', True) -# } -# self.regions = self.config['regions'] -# self.frame_shape = get_frame_shape(self.ffmpeg_input) -# self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2] -# self.mqtt_client = mqtt_client -# self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name) - -# # create shared value for storing the frame_time -# self.frame_time = mp.Value('d', 0.0) -# # Lock to control access to the frame -# self.frame_lock = mp.Lock() -# # Condition for notifying that a new frame is ready -# self.frame_ready = mp.Condition() -# # Condition for notifying that objects were tracked -# self.objects_tracked = mp.Condition() - -# # Queue for prepped frames, max size set to (number of regions * 5) -# self.resize_queue = queue.Queue() - -# # Queue for raw detected objects -# self.detected_objects_queue = queue.Queue() -# self.detected_objects_processor = DetectedObjectsProcessor(self) -# self.detected_objects_processor.start() - -# # initialize the frame cache -# self.cached_frame_with_objects = { -# 'frame_bytes': [], -# 'frame_time': 0 -# } - -# self.ffmpeg_process = None -# self.capture_thread = None -# self.fps = EventsPerSecond() -# self.skipped_region_tracker = EventsPerSecond() - -# # combine tracked objects lists -# self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', [])) - -# # merge object filters -# global_object_filters = global_objects_config.get('filters', {}) -# camera_object_filters = camera_objects_config.get('filters', {}) -# objects_with_config = set().union(global_object_filters.keys(), camera_object_filters.keys()) -# self.object_filters = {} -# for obj in objects_with_config: -# self.object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})} - -# # start a thread to track objects -# self.object_tracker = ObjectTracker(self, 10) -# self.object_tracker.start() - -# # start a thread to write tracked frames to disk -# self.video_writer = VideoWriter(self) -# self.video_writer.start() - -# # start a thread to queue resize requests for regions -# self.region_requester = RegionRequester(self) -# self.region_requester.start() - -# # start a thread to cache recent frames for processing -# self.frame_tracker = FrameTracker(self.frame_time, -# self.frame_ready, self.frame_lock, self.frame_cache) -# self.frame_tracker.start() - -# # start a thread to resize regions -# self.region_prepper = RegionPrepper(self, self.frame_cache, self.resize_queue, prepped_frame_queue) -# self.region_prepper.start() - -# # start a thread to store the highest scoring recent frames for monitored object types -# self.best_frames = BestFrames(self) -# self.best_frames.start() - -# # start a thread to expire objects from the detected objects list -# self.object_cleaner = ObjectCleaner(self) -# self.object_cleaner.start() - -# # start a thread to refine regions when objects are clipped -# self.dynamic_region_fps = EventsPerSecond() -# self.region_refiner = RegionRefiner(self) -# self.region_refiner.start() -# self.dynamic_region_fps.start() - -# # start a thread to publish object scores -# mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self) -# mqtt_publisher.start() - -# # create a watchdog thread for capture process -# self.watchdog = CameraWatchdog(self) - -# # load in the mask for object detection -# if 'mask' in self.config: -# self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE) -# else: -# self.mask = None - -# if self.mask is None: -# self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8) -# self.mask[:] = 255 - - -# def start_or_restart_capture(self): -# if not self.ffmpeg_process is None: -# print("Terminating the existing ffmpeg process...") -# self.ffmpeg_process.terminate() -# try: -# print("Waiting for ffmpeg to exit gracefully...") -# self.ffmpeg_process.wait(timeout=30) -# except sp.TimeoutExpired: -# print("FFmpeg didnt exit. Force killing...") -# self.ffmpeg_process.kill() -# self.ffmpeg_process.wait() - -# print("Waiting for the capture thread to exit...") -# self.capture_thread.join() -# self.ffmpeg_process = None -# self.capture_thread = None - -# # create the process to capture frames from the input stream and store in a shared array -# print("Creating a new ffmpeg process...") -# self.start_ffmpeg() - -# print("Creating a new capture thread...") -# self.capture_thread = CameraCapture(self) -# print("Starting a new capture thread...") -# self.capture_thread.start() -# self.fps.start() -# self.skipped_region_tracker.start() - -# def start_ffmpeg(self): -# ffmpeg_cmd = (['ffmpeg'] + -# self.ffmpeg_global_args + -# self.ffmpeg_hwaccel_args + -# self.ffmpeg_input_args + -# ['-i', self.ffmpeg_input] + -# self.ffmpeg_output_args + -# ['pipe:']) - -# print(" ".join(ffmpeg_cmd)) - -# self.ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=self.frame_size) - -# def start(self): -# self.start_or_restart_capture() -# self.watchdog.start() - -# def join(self): -# self.capture_thread.join() - -# def get_capture_pid(self): -# return self.ffmpeg_process.pid - -# def get_best(self, label): -# return self.best_frames.best_frames.get(label) - -# def stats(self): -# # TODO: anything else? -# return { -# 'camera_fps': self.fps.eps(60), -# 'resize_queue': self.resize_queue.qsize(), -# 'frame_queue': self.frame_queue.qsize(), -# 'finished_frame_queue': self.finished_frame_queue.qsize(), -# 'refined_frame_queue': self.refined_frame_queue.qsize(), -# 'regions_in_process': self.regions_in_process, -# 'dynamic_regions_per_sec': self.dynamic_region_fps.eps(), -# 'skipped_regions_per_sec': self.skipped_region_tracker.eps(60) -# } - -# def frame_with_objects(self, frame_time, tracked_objects=None): -# if not frame_time in self.frame_cache: -# frame = np.zeros(self.frame_shape, np.uint8) -# else: -# frame = self.frame_cache[frame_time].copy() - -# detected_objects = self.detected_objects[frame_time].copy() - -# for region in self.regions: -# color = (255,255,255) -# cv2.rectangle(frame, (region['x_offset'], region['y_offset']), -# (region['x_offset']+region['size'], region['y_offset']+region['size']), -# color, 2) - -# # draw the bounding boxes on the screen - -# if tracked_objects is None: -# with self.object_tracker.tracked_objects_lock: -# tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects) - -# for obj in detected_objects: -# draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], "{}% {}".format(int(obj['score']*100), obj['area']), thickness=3) - -# for id, obj in tracked_objects.items(): -# color = (0, 255,0) if obj['frame_time'] == frame_time else (255, 0, 0) -# draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], id, color=color, thickness=1, position='bl') - -# # print a timestamp -# time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S") -# cv2.putText(frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2) - -# # print fps -# cv2.putText(frame, str(self.fps.eps())+'FPS', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2) - -# # convert to BGR -# frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) - -# # encode the image into a jpg -# ret, jpg = cv2.imencode('.jpg', frame) - -# return jpg.tobytes() - -# def get_current_frame_with_objects(self): -# frame_time = self.last_processed_frame -# if frame_time == self.cached_frame_with_objects['frame_time']: -# return self.cached_frame_with_objects['frame_bytes'] - -# frame_bytes = self.frame_with_objects(frame_time) - -# self.cached_frame_with_objects = { -# 'frame_bytes': frame_bytes, -# 'frame_time': frame_time -# } - -# return frame_bytes - -======= ->>>>>>> 2a2fbe7... cleanup old code def filtered(obj, objects_to_track, object_filters, mask): object_name = obj[0]