diff --git a/detect_objects.py b/detect_objects.py index 678bc86f1..9dbe662cf 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -1,4 +1,5 @@ import os +import signal import sys import traceback import signal @@ -71,13 +72,14 @@ def start_plasma_store(): return plasma_process class CameraWatchdog(threading.Thread): - def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process): + def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process, stop_event): threading.Thread.__init__(self) self.camera_processes = camera_processes self.config = config self.tflite_process = tflite_process self.tracked_objects_queue = tracked_objects_queue self.plasma_process = plasma_process + self.stop_event = stop_event def run(self): time.sleep(10) @@ -85,6 +87,10 @@ class CameraWatchdog(threading.Thread): # wait a bit before checking time.sleep(10) + if self.stop_event.is_set(): + print(f"Exiting watchdog...") + break + now = datetime.datetime.now().timestamp() # check the plasma process @@ -125,7 +131,7 @@ class CameraWatchdog(threading.Thread): frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2] ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size) camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'], - camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame']) + camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'], self.stop_event) camera_capture.start() camera_process['ffmpeg_process'] = ffmpeg_process camera_process['capture_thread'] = camera_capture @@ -142,6 +148,7 @@ class CameraWatchdog(threading.Thread): ffmpeg_process.communicate() def main(): + stop_event = threading.Event() # connect to mqtt and setup last will def on_connect(client, userdata, flags, rc): print("On connect called") @@ -176,7 +183,7 @@ def main(): } # Queue for cameras to push tracked objects to - tracked_objects_queue = mp.SimpleQueue() + tracked_objects_queue = mp.Queue() # Queue for clip processing event_queue = mp.Queue() @@ -232,10 +239,10 @@ def main(): detection_frame = mp.Value('d', 0.0) ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size) - frame_queue = mp.SimpleQueue() + frame_queue = mp.Queue() camera_fps = EventsPerSecond() camera_fps.start() - camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame) + camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame, stop_event) camera_capture.start() camera_processes[name] = { @@ -263,15 +270,31 @@ def main(): camera_process['process'].start() print(f"Camera_process started for {name}: {camera_process['process'].pid}") - event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue) + event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue, stop_event) event_processor.start() - object_processor = TrackedObjectProcessor(CONFIG['cameras'], CONFIG.get('zones', {}), client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue) + object_processor = TrackedObjectProcessor(CONFIG['cameras'], CONFIG.get('zones', {}), client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue,stop_event) object_processor.start() - camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process) + camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process, stop_event) camera_watchdog.start() + def receiveSignal(signalNumber, frame): + print('Received:', signalNumber) + stop_event.set() + event_processor.join() + object_processor.join() + camera_watchdog.join() + for name, camera_process in camera_processes.items(): + camera_process['capture_thread'].join() + rc = camera_watchdog.plasma_process.poll() + if rc == None: + camera_watchdog.plasma_process.terminate() + sys.exit() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + # create a flask app that encodes frames a mjpeg on demand app = Flask(__name__) log = logging.getLogger('werkzeug') diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index b9a28976d..a5a216deb 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -87,7 +87,7 @@ def run_detector(detection_queue, avg_speed, start): class EdgeTPUProcess(): def __init__(self): - self.detection_queue = mp.SimpleQueue() + self.detection_queue = mp.Queue() self.avg_inference_speed = mp.Value('d', 0.01) self.detection_start = mp.Value('d', 0.0) self.detect_process = None diff --git a/frigate/events.py b/frigate/events.py index 19f8816ee..708ae3e45 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -9,7 +9,7 @@ import subprocess as sp import queue class EventProcessor(threading.Thread): - def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue): + def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue, stop_event): threading.Thread.__init__(self) self.config = config self.camera_processes = camera_processes @@ -18,6 +18,7 @@ class EventProcessor(threading.Thread): self.cached_clips = {} self.event_queue = event_queue self.events_in_process = {} + self.stop_event = stop_event def refresh_cache(self): cached_files = os.listdir(self.cache_dir) @@ -133,6 +134,10 @@ class EventProcessor(threading.Thread): def run(self): while True: + if self.stop_event.is_set(): + print(f"Exiting event processor...") + break + try: event_type, camera, event_data = self.event_queue.get(timeout=10) except queue.Empty: diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 5982e8279..b29eae912 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -5,6 +5,7 @@ import time import copy import cv2 import threading +import queue import numpy as np from collections import Counter, defaultdict import itertools @@ -51,7 +52,7 @@ def zone_filtered(obj, object_config): return False class TrackedObjectProcessor(threading.Thread): - def __init__(self, camera_config, zone_config, client, topic_prefix, tracked_objects_queue, event_queue): + def __init__(self, camera_config, zone_config, client, topic_prefix, tracked_objects_queue, event_queue, stop_event): threading.Thread.__init__(self) self.camera_config = camera_config self.zone_config = zone_config @@ -59,6 +60,7 @@ class TrackedObjectProcessor(threading.Thread): self.topic_prefix = topic_prefix self.tracked_objects_queue = tracked_objects_queue self.event_queue = event_queue + self.stop_event = stop_event self.camera_data = defaultdict(lambda: { 'best_objects': {}, 'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')), @@ -89,7 +91,7 @@ class TrackedObjectProcessor(threading.Thread): for i, zone in enumerate(self.zone_data.values()): zone['color'] = tuple(int(round(255 * c)) for c in colors(i)[:3]) - self.plasma_client = PlasmaManager() + self.plasma_client = PlasmaManager(self.stop_event) def get_best(self, camera, label): if label in self.camera_data[camera]['best_objects']: @@ -102,7 +104,14 @@ class TrackedObjectProcessor(threading.Thread): def run(self): while True: - camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get() + if self.stop_event.is_set(): + print(f"Exiting event processor...") + break + + try: + camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get(True, 10) + except queue.Empty: + continue camera_config = self.camera_config[camera] best_objects = self.camera_data[camera]['best_objects'] @@ -215,7 +224,7 @@ class TrackedObjectProcessor(threading.Thread): ### # Report over MQTT ### - + # get the zones that are relevant for this camera relevant_zones = [zone for zone, config in self.zone_config.items() if camera in config] # for each zone diff --git a/frigate/util.py b/frigate/util.py index 4e9a2307b..5a8ad3dd6 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -140,11 +140,14 @@ def listen(): signal.signal(signal.SIGUSR1, print_stack) class PlasmaManager: - def __init__(self): + def __init__(self, stop_event=None): + self.stop_event = stop_event self.connect() def connect(self): while True: + if self.stop_event != None and self.stop_event.is_set(): + return try: self.plasma_client = plasma.connect("/tmp/plasma") return @@ -155,6 +158,8 @@ class PlasmaManager: def get(self, name, timeout_ms=0): object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest()) while True: + if self.stop_event != None and self.stop_event.is_set(): + return try: return self.plasma_client.get(object_id, timeout_ms=timeout_ms) except: @@ -164,6 +169,8 @@ class PlasmaManager: def put(self, name, obj): object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest()) while True: + if self.stop_event != None and self.stop_event.is_set(): + return try: self.plasma_client.put(obj, object_id) return @@ -175,6 +182,8 @@ class PlasmaManager: def delete(self, name): object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest()) while True: + if self.stop_event != None and self.stop_event.is_set(): + return try: self.plasma_client.delete([object_id]) return diff --git a/frigate/video.py b/frigate/video.py index e4ec73510..3ea6d0258 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -116,7 +116,7 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): return process class CameraCapture(threading.Thread): - def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame): + def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame, stop_event): threading.Thread.__init__(self) self.name = name self.frame_shape = frame_shape @@ -125,16 +125,21 @@ class CameraCapture(threading.Thread): self.take_frame = take_frame self.fps = fps self.skipped_fps = EventsPerSecond() - self.plasma_client = PlasmaManager() + self.plasma_client = PlasmaManager(stop_event) self.ffmpeg_process = ffmpeg_process self.current_frame = 0 self.last_frame = 0 self.detection_frame = detection_frame + self.stop_event = stop_event def run(self): frame_num = 0 self.skipped_fps.start() while True: + if self.stop_event.is_set(): + print(f"{self.name}: stop event set. exiting capture thread...") + break + if self.ffmpeg_process.poll() != None: print(f"{self.name}: ffmpeg process is not running. exiting capture thread...") break