diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index d810e87c4..a45b10a9c 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -5,6 +5,7 @@ import multiprocessing as mp import os import queue import threading +import signal from abc import ABC, abstractmethod from multiprocessing.connection import Connection from typing import Dict @@ -109,6 +110,14 @@ def run_detector(name: str, detection_queue: mp.Queue, out_events: Dict[str, mp. threading.current_thread().name = f"detector:{name}" logging.info(f"Starting detection process: {os.getpid()}") listen() + + stop_event = mp.Event() + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + frame_manager = SharedMemoryFrameManager() object_detector = LocalObjectDetector(tf_device=tf_device) @@ -122,7 +131,13 @@ def run_detector(name: str, detection_queue: mp.Queue, out_events: Dict[str, mp. } while True: - connection_id = detection_queue.get() + if stop_event.is_set(): + break + + try: + connection_id = detection_queue.get(timeout=5) + except queue.Empty: + continue input_frame = frame_manager.get(connection_id, (1,300,300,3)) if input_frame is None: diff --git a/frigate/log.py b/frigate/log.py index 9e36a6be5..09536bc8d 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -1,6 +1,8 @@ # adapted from https://medium.com/@jonathonbao/python3-logging-with-multiprocessing-f51f460b8778 import logging import threading +import signal +import multiprocessing as mp from logging import handlers @@ -19,9 +21,21 @@ def root_configurer(queue): root.setLevel(logging.INFO) def log_process(queue): + stop_event = mp.Event() + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + threading.current_thread().name = f"logger" listener_configurer() while True: - record = queue.get() + if stop_event.is_set() and queue.empty(): + break + try: + record = queue.get(timeout=5) + except queue.Empty: + continue logger = logging.getLogger(record.name) logger.handle(record) diff --git a/frigate/video.py b/frigate/video.py index 2a53d97d5..e7862f899 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -9,6 +9,7 @@ import multiprocessing as mp import os import queue import subprocess as sp +import signal import threading import time from collections import defaultdict @@ -72,18 +73,21 @@ def create_tensor_input(frame, region): # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] return np.expand_dims(cropped_frame, axis=0) +def stop_ffmpeg(ffmpeg_process): + logger.info("Terminating the existing ffmpeg process...") + ffmpeg_process.terminate() + try: + logger.info("Waiting for ffmpeg to exit gracefully...") + ffmpeg_process.communicate(timeout=30) + except sp.TimeoutExpired: + logger.info("FFmpeg didnt exit. Force killing...") + ffmpeg_process.kill() + ffmpeg_process.communicate() + ffmpeg_process = None + def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size=None, ffmpeg_process=None): if not ffmpeg_process is None: - logger.info("Terminating the existing ffmpeg process...") - ffmpeg_process.terminate() - try: - logger.info("Waiting for ffmpeg to exit gracefully...") - ffmpeg_process.communicate(timeout=30) - except sp.TimeoutExpired: - logger.info("FFmpeg didnt exit. Force killing...") - ffmpeg_process.kill() - ffmpeg_process.communicate() - ffmpeg_process = None + stop_ffmpeg(ffmpeg_process) if frame_size is None: process = sp.Popen(ffmpeg_cmd, stdout = sp.DEVNULL, stdin = sp.DEVNULL, start_new_session=True) @@ -133,7 +137,7 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram frame_queue.put(current_frame.value) class CameraWatchdog(threading.Thread): - def __init__(self, camera_name, config, frame_queue, camera_fps, ffmpeg_pid): + def __init__(self, camera_name, config, frame_queue, camera_fps, ffmpeg_pid, stop_event): threading.Thread.__init__(self) self.name = f"watchdog:{camera_name}" self.camera_name = camera_name @@ -146,6 +150,7 @@ class CameraWatchdog(threading.Thread): self.frame_queue = frame_queue self.frame_shape = self.config.frame_shape_yuv self.frame_size = self.frame_shape[0] * self.frame_shape[1] + self.stop_event = stop_event def run(self): self.start_ffmpeg_detect() @@ -160,6 +165,12 @@ class CameraWatchdog(threading.Thread): time.sleep(10) while True: + if self.stop_event.is_set(): + stop_ffmpeg(self.ffmpeg_detect_process) + for p in self.ffmpeg_other_processes: + stop_ffmpeg(p['process']) + break + now = datetime.datetime.now().timestamp() if not self.capture_thread.is_alive(): @@ -212,12 +223,26 @@ class CameraCapture(threading.Thread): self.fps, self.skipped_fps, self.current_frame) def capture_camera(name, config: CameraConfig, process_info): + stop_event = mp.Event() + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + frame_queue = process_info['frame_queue'] - camera_watchdog = CameraWatchdog(name, config, frame_queue, process_info['camera_fps'], process_info['ffmpeg_pid']) + camera_watchdog = CameraWatchdog(name, config, frame_queue, process_info['camera_fps'], process_info['ffmpeg_pid'], stop_event) camera_watchdog.start() camera_watchdog.join() def track_camera(name, config: CameraConfig, detection_queue, result_connection, detected_objects_queue, process_info): + stop_event = mp.Event() + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + threading.current_thread().name = f"process:{name}" listen() @@ -236,7 +261,7 @@ def track_camera(name, config: CameraConfig, detection_queue, result_connection, frame_manager = SharedMemoryFrameManager() process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector, - object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask) + object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask, stop_event) logger.info(f"{name}: exiting subprocess") @@ -273,7 +298,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, frame_manager: FrameManager, motion_detector: MotionDetector, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, detected_objects_queue: mp.Queue, process_info: Dict, - objects_to_track: List[str], object_filters, mask, + objects_to_track: List[str], object_filters, mask, stop_event, exit_on_empty: bool = False): fps = process_info['process_fps'] @@ -284,6 +309,9 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, fps_tracker.start() while True: + if stop_event.is_set(): + break + if exit_on_empty and frame_queue.empty(): logger.info(f"Exiting track_objects...") break