diff --git a/frigate/app.py b/frigate/app.py index fce3467ab..1e5140603 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -183,8 +183,7 @@ class FrigateApp: if self.config.mqtt.enabled: comms.append(MqttClient(self.config)) - self.ws_client = WebSocketClient(self.config) - comms.append(self.ws_client) + comms.append(WebSocketClient(self.config)) self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) def start_detectors(self) -> None: @@ -417,7 +416,12 @@ class FrigateApp: logger.info(f"Stopping...") self.stop_event.set() - self.ws_client.stop() + # Set the events for the camera processor processes because + # they may be waiting on the event coming out of the detection process + for name in self.config.cameras.keys(): + self.detection_out_events[name].set() + + self.dispatcher.stop() self.detected_frames_processor.join() self.event_processor.join() self.event_cleanup.join() @@ -434,3 +438,15 @@ class FrigateApp: shm = self.detection_shms.pop() shm.close() shm.unlink() + + for queue in [ + self.event_queue, + self.event_processed_queue, + self.video_output_queue, + self.detected_frames_queue, + self.recordings_info_queue, + ]: + while not queue.empty(): + queue.get_nowait() + queue.close() + queue.join_thread() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index aab011fe9..d304509e4 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -27,6 +27,11 @@ class Communicator(ABC): """Pass receiver so communicators can pass commands.""" pass + @abstractmethod + def stop(self) -> None: + """Stop the communicator.""" + pass + class Dispatcher: """Handle communication between Frigate and communicators.""" @@ -72,6 +77,10 @@ class Dispatcher: for comm in self.comms: comm.publish(topic, payload, retain) + def stop(self) -> None: + for comm in self.comms: + comm.stop() + def _on_detect_command(self, camera_name: str, payload: str) -> None: """Callback for detect topic.""" detect_settings = self.config.cameras[camera_name].detect diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index b8c1a0ea6..d106aae71 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -35,6 +35,9 @@ class MqttClient(Communicator): # type: ignore[misc] f"{self.mqtt_config.topic_prefix}/{topic}", payload, retain=retain ) + def stop(self) -> None: + self.client.disconnect() + def _set_initial_topics(self) -> None: """Set initial state topics.""" for camera_name, camera in self.config.cameras.items(): diff --git a/frigate/comms/ws.py b/frigate/comms/ws.py index 0a3aea169..c9cc3988a 100644 --- a/frigate/comms/ws.py +++ b/frigate/comms/ws.py @@ -95,3 +95,4 @@ class WebSocketClient(Communicator): # type: ignore[misc] self.websocket_server.manager.join() self.websocket_server.shutdown() self.websocket_thread.join() + logger.info("Exiting websocket client...") diff --git a/frigate/events.py b/frigate/events.py index 5f30f8633..f502c4ded 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -67,7 +67,7 @@ class EventProcessor(threading.Thread): while not self.stop_event.is_set(): try: - event_type, camera, event_data = self.event_queue.get(timeout=10) + event_type, camera, event_data = self.event_queue.get(timeout=1) except queue.Empty: continue diff --git a/frigate/log.py b/frigate/log.py index 20827cc1d..a8041592f 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -2,11 +2,16 @@ import logging import threading import os +import signal import queue +import multiprocessing as mp from multiprocessing.queues import Queue from logging import handlers +from typing import Optional +from types import FrameType from setproctitle import setproctitle -from typing import Deque +from typing import Deque, Optional +from types import FrameType from collections import deque from frigate.util import clean_camera_user_pass @@ -34,10 +39,21 @@ def log_process(log_queue: Queue) -> None: threading.current_thread().name = f"logger" setproctitle("frigate.logger") listener_configurer() + + stop_event = mp.Event() + + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + while True: try: - record = log_queue.get(timeout=5) + record = log_queue.get(timeout=1) except (queue.Empty, KeyboardInterrupt): + if stop_event.is_set(): + break continue logger = logging.getLogger(record.name) logger.handle(record) diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 2fc080329..206c47839 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -88,6 +88,7 @@ def run_detector( stop_event = mp.Event() def receiveSignal(signalNumber, frame): + logger.info("Signal to exit detection process...") stop_event.set() signal.signal(signal.SIGTERM, receiveSignal) @@ -104,7 +105,7 @@ def run_detector( while not stop_event.is_set(): try: - connection_id = detection_queue.get(timeout=5) + connection_id = detection_queue.get(timeout=1) except queue.Empty: continue input_frame = frame_manager.get( @@ -125,6 +126,8 @@ def run_detector( avg_speed.value = (avg_speed.value * 9 + duration) / 10 + logger.info("Exited detection process...") + class ObjectDetectProcess: def __init__( @@ -144,6 +147,9 @@ class ObjectDetectProcess: self.start_or_restart() def stop(self): + # if the process has already exited on its own, just return + if self.detect_process and self.detect_process.exitcode: + return self.detect_process.terminate() logging.info("Waiting for detection process to exit gracefully...") self.detect_process.join(timeout=30) @@ -151,6 +157,7 @@ class ObjectDetectProcess: logging.info("Detection process didnt exit. Force killing...") self.detect_process.kill() self.detect_process.join() + logging.info("Detection process has exited...") def start_or_restart(self): self.detection_start.value = 0.0 diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 5477f57b9..97ef6f1ef 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -901,7 +901,7 @@ class TrackedObjectProcessor(threading.Thread): current_tracked_objects, motion_boxes, regions, - ) = self.tracked_objects_queue.get(True, 10) + ) = self.tracked_objects_queue.get(True, 1) except queue.Empty: continue diff --git a/frigate/output.py b/frigate/output.py index 9f02a7afd..3ddf4f2f6 100644 --- a/frigate/output.py +++ b/frigate/output.py @@ -109,14 +109,15 @@ class FFMpegConverter: class BroadcastThread(threading.Thread): - def __init__(self, camera, converter, websocket_server): + def __init__(self, camera, converter, websocket_server, stop_event): super(BroadcastThread, self).__init__() self.camera = camera self.converter = converter self.websocket_server = websocket_server + self.stop_event = stop_event def run(self): - while True: + while not self.stop_event.is_set(): buf = self.converter.read(65536) if buf: manager = self.websocket_server.manager @@ -426,7 +427,7 @@ def output_frames(config: FrigateConfig, video_output_queue): cam_config.live.quality, ) broadcasters[camera] = BroadcastThread( - camera, converters[camera], websocket_server + camera, converters[camera], websocket_server, stop_event ) if config.birdseye.enabled: @@ -439,7 +440,7 @@ def output_frames(config: FrigateConfig, video_output_queue): config.birdseye.restream, ) broadcasters["birdseye"] = BroadcastThread( - "birdseye", converters["birdseye"], websocket_server + "birdseye", converters["birdseye"], websocket_server, stop_event ) websocket_thread.start() @@ -463,7 +464,7 @@ def output_frames(config: FrigateConfig, video_output_queue): current_tracked_objects, motion_boxes, regions, - ) = video_output_queue.get(True, 10) + ) = video_output_queue.get(True, 1) except queue.Empty: continue diff --git a/frigate/stats.py b/frigate/stats.py index 459457d6e..60f805c62 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -283,8 +283,10 @@ class StatsEmitter(threading.Thread): def run(self) -> None: time.sleep(10) while not self.stop_event.wait(self.config.mqtt.stats_interval): + logger.debug("Starting stats collection") stats = stats_snapshot( self.config, self.stats_tracking, self.hwaccel_errors ) self.dispatcher.publish("stats", json.dumps(stats), retain=False) - logger.info(f"Exiting watchdog...") + logger.debug("Finished stats collection") + logger.info(f"Exiting stats emitter...") diff --git a/frigate/video.py b/frigate/video.py index f733bb357..1b64b21a4 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -601,7 +601,7 @@ def process_frames( break try: - frame_time = frame_queue.get(True, 10) + frame_time = frame_queue.get(True, 1) except queue.Empty: continue @@ -723,6 +723,9 @@ def process_frames( object_filters, ) ) + # if frigate is exiting + if stop_event.is_set(): + return ######### # merge objects, check for clipped objects and look again up to 4 times @@ -787,6 +790,9 @@ def process_frames( refining = True else: selected_objects.append(obj) + # if frigate is exiting + if stop_event.is_set(): + return # set the detections list to only include top, complete objects # and new detections detections = selected_objects