Fast restart (#5378)

* dont wait so long for queues

* implement stop methods for comms

* set the detection events on exit and return early from processing

* handle the stop event in the broadcast threads

* short circuit the detection process exit code if it already exited

* some logging for stats thread

* just keep the log process alive 1 second after the last log message

* ensure the multiprocessing queues are emptied and closed

* Update frigate/log.py

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>

* Update frigate/log.py

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>

* mypy fixes

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
This commit is contained in:
Blake Blackshear 2023-02-03 20:15:47 -06:00 committed by GitHub
parent b33094207c
commit 624c314335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 76 additions and 15 deletions

View File

@ -183,8 +183,7 @@ class FrigateApp:
if self.config.mqtt.enabled: if self.config.mqtt.enabled:
comms.append(MqttClient(self.config)) comms.append(MqttClient(self.config))
self.ws_client = WebSocketClient(self.config) comms.append(WebSocketClient(self.config))
comms.append(self.ws_client)
self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms)
def start_detectors(self) -> None: def start_detectors(self) -> None:
@ -417,7 +416,12 @@ class FrigateApp:
logger.info(f"Stopping...") logger.info(f"Stopping...")
self.stop_event.set() self.stop_event.set()
self.ws_client.stop() # Set the events for the camera processor processes because
# they may be waiting on the event coming out of the detection process
for name in self.config.cameras.keys():
self.detection_out_events[name].set()
self.dispatcher.stop()
self.detected_frames_processor.join() self.detected_frames_processor.join()
self.event_processor.join() self.event_processor.join()
self.event_cleanup.join() self.event_cleanup.join()
@ -434,3 +438,15 @@ class FrigateApp:
shm = self.detection_shms.pop() shm = self.detection_shms.pop()
shm.close() shm.close()
shm.unlink() shm.unlink()
for queue in [
self.event_queue,
self.event_processed_queue,
self.video_output_queue,
self.detected_frames_queue,
self.recordings_info_queue,
]:
while not queue.empty():
queue.get_nowait()
queue.close()
queue.join_thread()

View File

@ -27,6 +27,11 @@ class Communicator(ABC):
"""Pass receiver so communicators can pass commands.""" """Pass receiver so communicators can pass commands."""
pass pass
@abstractmethod
def stop(self) -> None:
"""Stop the communicator."""
pass
class Dispatcher: class Dispatcher:
"""Handle communication between Frigate and communicators.""" """Handle communication between Frigate and communicators."""
@ -72,6 +77,10 @@ class Dispatcher:
for comm in self.comms: for comm in self.comms:
comm.publish(topic, payload, retain) comm.publish(topic, payload, retain)
def stop(self) -> None:
for comm in self.comms:
comm.stop()
def _on_detect_command(self, camera_name: str, payload: str) -> None: def _on_detect_command(self, camera_name: str, payload: str) -> None:
"""Callback for detect topic.""" """Callback for detect topic."""
detect_settings = self.config.cameras[camera_name].detect detect_settings = self.config.cameras[camera_name].detect

View File

@ -35,6 +35,9 @@ class MqttClient(Communicator): # type: ignore[misc]
f"{self.mqtt_config.topic_prefix}/{topic}", payload, retain=retain f"{self.mqtt_config.topic_prefix}/{topic}", payload, retain=retain
) )
def stop(self) -> None:
self.client.disconnect()
def _set_initial_topics(self) -> None: def _set_initial_topics(self) -> None:
"""Set initial state topics.""" """Set initial state topics."""
for camera_name, camera in self.config.cameras.items(): for camera_name, camera in self.config.cameras.items():

View File

@ -95,3 +95,4 @@ class WebSocketClient(Communicator): # type: ignore[misc]
self.websocket_server.manager.join() self.websocket_server.manager.join()
self.websocket_server.shutdown() self.websocket_server.shutdown()
self.websocket_thread.join() self.websocket_thread.join()
logger.info("Exiting websocket client...")

View File

@ -67,7 +67,7 @@ class EventProcessor(threading.Thread):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
event_type, camera, event_data = self.event_queue.get(timeout=10) event_type, camera, event_data = self.event_queue.get(timeout=1)
except queue.Empty: except queue.Empty:
continue continue

View File

@ -2,11 +2,16 @@
import logging import logging
import threading import threading
import os import os
import signal
import queue import queue
import multiprocessing as mp
from multiprocessing.queues import Queue from multiprocessing.queues import Queue
from logging import handlers from logging import handlers
from typing import Optional
from types import FrameType
from setproctitle import setproctitle from setproctitle import setproctitle
from typing import Deque from typing import Deque, Optional
from types import FrameType
from collections import deque from collections import deque
from frigate.util import clean_camera_user_pass from frigate.util import clean_camera_user_pass
@ -34,10 +39,21 @@ def log_process(log_queue: Queue) -> None:
threading.current_thread().name = f"logger" threading.current_thread().name = f"logger"
setproctitle("frigate.logger") setproctitle("frigate.logger")
listener_configurer() listener_configurer()
stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
while True: while True:
try: try:
record = log_queue.get(timeout=5) record = log_queue.get(timeout=1)
except (queue.Empty, KeyboardInterrupt): except (queue.Empty, KeyboardInterrupt):
if stop_event.is_set():
break
continue continue
logger = logging.getLogger(record.name) logger = logging.getLogger(record.name)
logger.handle(record) logger.handle(record)

View File

@ -88,6 +88,7 @@ def run_detector(
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
logger.info("Signal to exit detection process...")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
@ -104,7 +105,7 @@ def run_detector(
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
connection_id = detection_queue.get(timeout=5) connection_id = detection_queue.get(timeout=1)
except queue.Empty: except queue.Empty:
continue continue
input_frame = frame_manager.get( input_frame = frame_manager.get(
@ -125,6 +126,8 @@ def run_detector(
avg_speed.value = (avg_speed.value * 9 + duration) / 10 avg_speed.value = (avg_speed.value * 9 + duration) / 10
logger.info("Exited detection process...")
class ObjectDetectProcess: class ObjectDetectProcess:
def __init__( def __init__(
@ -144,6 +147,9 @@ class ObjectDetectProcess:
self.start_or_restart() self.start_or_restart()
def stop(self): def stop(self):
# if the process has already exited on its own, just return
if self.detect_process and self.detect_process.exitcode:
return
self.detect_process.terminate() self.detect_process.terminate()
logging.info("Waiting for detection process to exit gracefully...") logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30) self.detect_process.join(timeout=30)
@ -151,6 +157,7 @@ class ObjectDetectProcess:
logging.info("Detection process didnt exit. Force killing...") logging.info("Detection process didnt exit. Force killing...")
self.detect_process.kill() self.detect_process.kill()
self.detect_process.join() self.detect_process.join()
logging.info("Detection process has exited...")
def start_or_restart(self): def start_or_restart(self):
self.detection_start.value = 0.0 self.detection_start.value = 0.0

View File

@ -901,7 +901,7 @@ class TrackedObjectProcessor(threading.Thread):
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
regions, regions,
) = self.tracked_objects_queue.get(True, 10) ) = self.tracked_objects_queue.get(True, 1)
except queue.Empty: except queue.Empty:
continue continue

View File

@ -109,14 +109,15 @@ class FFMpegConverter:
class BroadcastThread(threading.Thread): class BroadcastThread(threading.Thread):
def __init__(self, camera, converter, websocket_server): def __init__(self, camera, converter, websocket_server, stop_event):
super(BroadcastThread, self).__init__() super(BroadcastThread, self).__init__()
self.camera = camera self.camera = camera
self.converter = converter self.converter = converter
self.websocket_server = websocket_server self.websocket_server = websocket_server
self.stop_event = stop_event
def run(self): def run(self):
while True: while not self.stop_event.is_set():
buf = self.converter.read(65536) buf = self.converter.read(65536)
if buf: if buf:
manager = self.websocket_server.manager manager = self.websocket_server.manager
@ -426,7 +427,7 @@ def output_frames(config: FrigateConfig, video_output_queue):
cam_config.live.quality, cam_config.live.quality,
) )
broadcasters[camera] = BroadcastThread( broadcasters[camera] = BroadcastThread(
camera, converters[camera], websocket_server camera, converters[camera], websocket_server, stop_event
) )
if config.birdseye.enabled: if config.birdseye.enabled:
@ -439,7 +440,7 @@ def output_frames(config: FrigateConfig, video_output_queue):
config.birdseye.restream, config.birdseye.restream,
) )
broadcasters["birdseye"] = BroadcastThread( broadcasters["birdseye"] = BroadcastThread(
"birdseye", converters["birdseye"], websocket_server "birdseye", converters["birdseye"], websocket_server, stop_event
) )
websocket_thread.start() websocket_thread.start()
@ -463,7 +464,7 @@ def output_frames(config: FrigateConfig, video_output_queue):
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
regions, regions,
) = video_output_queue.get(True, 10) ) = video_output_queue.get(True, 1)
except queue.Empty: except queue.Empty:
continue continue

View File

@ -283,8 +283,10 @@ class StatsEmitter(threading.Thread):
def run(self) -> None: def run(self) -> None:
time.sleep(10) time.sleep(10)
while not self.stop_event.wait(self.config.mqtt.stats_interval): while not self.stop_event.wait(self.config.mqtt.stats_interval):
logger.debug("Starting stats collection")
stats = stats_snapshot( stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors self.config, self.stats_tracking, self.hwaccel_errors
) )
self.dispatcher.publish("stats", json.dumps(stats), retain=False) self.dispatcher.publish("stats", json.dumps(stats), retain=False)
logger.info(f"Exiting watchdog...") logger.debug("Finished stats collection")
logger.info(f"Exiting stats emitter...")

View File

@ -601,7 +601,7 @@ def process_frames(
break break
try: try:
frame_time = frame_queue.get(True, 10) frame_time = frame_queue.get(True, 1)
except queue.Empty: except queue.Empty:
continue continue
@ -723,6 +723,9 @@ def process_frames(
object_filters, object_filters,
) )
) )
# if frigate is exiting
if stop_event.is_set():
return
######### #########
# merge objects, check for clipped objects and look again up to 4 times # merge objects, check for clipped objects and look again up to 4 times
@ -787,6 +790,9 @@ def process_frames(
refining = True refining = True
else: else:
selected_objects.append(obj) selected_objects.append(obj)
# if frigate is exiting
if stop_event.is_set():
return
# set the detections list to only include top, complete objects # set the detections list to only include top, complete objects
# and new detections # and new detections
detections = selected_objects detections = selected_objects