From 57864f2be6842bc2396b14a2105bfaf36a600270 Mon Sep 17 00:00:00 2001 From: Sean Vig Date: Fri, 21 May 2021 11:39:14 -0400 Subject: [PATCH] Wait on stop event when possible Generally eliminate the `while True` loops while waiting for a stop event and prefer to condition the loops on if the stop event is set, blocking on that where it makes sense. This generally comes in 3 flavors. First and simplest, when there is a sleep and the stop event is the only thing the loop blocks on, instead do a check using `stop_event.wait(timeout)` to instead block on the stop event for the designated amount of time. Second, when there is a different event that is blocking in the loop, condition the loop on `stop_event.is_set()` rather than breaking when it is set. Finally, when there is a separate internal condition that requires a counter, have the loop iterate over the counter and use `if stop_event.wait(timeout)` internal to the loop. --- frigate/edgetpu.py | 5 +---- frigate/events.py | 25 +++++++------------------ frigate/object_processing.py | 8 +++----- frigate/record.py | 12 ++++-------- frigate/stats.py | 7 ++----- frigate/video.py | 22 +++++++--------------- frigate/watchdog.py | 11 +++-------- 7 files changed, 27 insertions(+), 63 deletions(-) diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index 73916bd19..2ffd8c198 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -154,10 +154,7 @@ def run_detector( out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) outputs[name] = {"shm": out_shm, "np": out_np} - while True: - if stop_event.is_set(): - break - + while not stop_event.is_set(): try: connection_id = detection_queue.get(timeout=5) except queue.Empty: diff --git a/frigate/events.py b/frigate/events.py index b1e666d7c..95ef002cd 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -221,11 +221,7 @@ class EventProcessor(threading.Thread): return True def run(self): - while True: - if self.stop_event.is_set(): - logger.info(f"Exiting event processor...") - break - + while not self.stop_event.is_set(): try: event_type, camera, event_data = self.event_queue.get(timeout=10) except queue.Empty: @@ -272,6 +268,8 @@ class EventProcessor(threading.Thread): del self.events_in_process[event_data["id"]] self.event_processed_queue.put((event_data["id"], camera)) + logger.info(f"Exiting event processor...") + class EventCleanup(threading.Thread): def __init__(self, config: FrigateConfig, stop_event): @@ -398,19 +396,8 @@ class EventCleanup(threading.Thread): ) def run(self): - counter = 0 - while True: - if self.stop_event.is_set(): - logger.info(f"Exiting event cleanup...") - break - - # only expire events every 5 minutes, but check for stop events every 10 seconds - time.sleep(10) - counter = counter + 1 - if counter < 30: - continue - counter = 0 - + # only expire events every 5 minutes + while not self.stop_event.wait(300): self.expire("clips") self.expire("snapshots") self.purge_duplicates() @@ -420,3 +407,5 @@ class EventCleanup(threading.Thread): Event.has_clip == False, Event.has_snapshot == False ) delete_query.execute() + + logger.info(f"Exiting event cleanup...") diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 1037a5fb3..03ec3bcfd 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -706,11 +706,7 @@ class TrackedObjectProcessor(threading.Thread): return self.camera_states[camera].get_current_frame(draw_options) def run(self): - while True: - if self.stop_event.is_set(): - logger.info(f"Exiting object processor...") - break - + while not self.stop_event.is_set(): try: ( camera, @@ -769,3 +765,5 @@ class TrackedObjectProcessor(threading.Thread): while not self.event_processed_queue.empty(): event_id, camera = self.event_processed_queue.get() self.camera_states[camera].finished(event_id) + + logger.info(f"Exiting object processor...") diff --git a/frigate/record.py b/frigate/record.py index ea2593ca0..d4e22437d 100644 --- a/frigate/record.py +++ b/frigate/record.py @@ -1,4 +1,5 @@ import datetime +import itertools import json import logging import os @@ -114,19 +115,14 @@ class RecordingMaintainer(threading.Thread): p.unlink(missing_ok=True) def run(self): - counter = 0 - self.expire_files() - while True: - if self.stop_event.is_set(): + for counter in itertools.cycle(range(60)): + if self.stop_event.wait(10): logger.info(f"Exiting recording maintenance...") break # only expire events every 10 minutes, but check for new files every 10 seconds - time.sleep(10) - counter = counter + 1 - if counter > 60: + if counter == 0: self.expire_files() remove_empty_directories(RECORD_DIR) - counter = 0 self.move_files() diff --git a/frigate/stats.py b/frigate/stats.py index f8c1eccd3..7370d7a53 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -94,12 +94,9 @@ class StatsEmitter(threading.Thread): def run(self): time.sleep(10) - while True: - if self.stop_event.is_set(): - logger.info(f"Exiting watchdog...") - break + while not self.stop_event.wait(self.config.mqtt.stats_interval): stats = stats_snapshot(self.stats_tracking) self.mqtt_client.publish( f"{self.topic_prefix}/stats", json.dumps(stats), retain=False ) - time.sleep(self.config.mqtt.stats_interval) + logger.info(f"Exiting watchdog...") diff --git a/frigate/video.py b/frigate/video.py index 6a28cfcd1..c2d377a47 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -212,15 +212,7 @@ class CameraWatchdog(threading.Thread): ) time.sleep(10) - while True: - if self.stop_event.is_set(): - stop_ffmpeg(self.ffmpeg_detect_process, self.logger) - for p in self.ffmpeg_other_processes: - stop_ffmpeg(p["process"], self.logger) - p["logpipe"].close() - self.logpipe.close() - break - + while not self.stop_event.wait(10): now = datetime.datetime.now().timestamp() if not self.capture_thread.is_alive(): @@ -248,8 +240,11 @@ class CameraWatchdog(threading.Thread): p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] ) - # wait a bit before checking again - time.sleep(10) + stop_ffmpeg(self.ffmpeg_detect_process, self.logger) + for p in self.ffmpeg_other_processes: + stop_ffmpeg(p["process"], self.logger) + p["logpipe"].close() + self.logpipe.close() def start_ffmpeg_detect(self): ffmpeg_cmd = [ @@ -451,10 +446,7 @@ def process_frames( fps_tracker = EventsPerSecond() fps_tracker.start() - while True: - if stop_event.is_set(): - break - + while not stop_event.is_set(): if exit_on_empty and frame_queue.empty(): logger.info(f"Exiting track_objects...") break diff --git a/frigate/watchdog.py b/frigate/watchdog.py index 8be3646b7..979ffa3a9 100644 --- a/frigate/watchdog.py +++ b/frigate/watchdog.py @@ -17,14 +17,7 @@ class FrigateWatchdog(threading.Thread): def run(self): time.sleep(10) - while True: - # wait a bit before checking - time.sleep(10) - - if self.stop_event.is_set(): - logger.info(f"Exiting watchdog...") - break - + while not self.stop_event.wait(10): now = datetime.datetime.now().timestamp() # check the detection processes @@ -38,3 +31,5 @@ class FrigateWatchdog(threading.Thread): elif not detector.detect_process.is_alive(): logger.info("Detection appears to have stopped. Exiting frigate...") os.kill(os.getpid(), signal.SIGTERM) + + logger.info(f"Exiting watchdog...")