Wait on stop event when possible

Generally eliminate the `while True` loops while waiting for a stop
event and prefer to condition the loops on if the stop event is set,
blocking on that where it makes sense.  This generally comes in 3
flavors.  First and simplest, when there is a sleep and the stop event
is the only thing the loop blocks on, instead do a check using
`stop_event.wait(timeout)` to instead block on the stop event for the
designated amount of time. Second, when there is a different event that
is blocking in the loop, condition the loop on `stop_event.is_set()`
rather than breaking when it is set. Finally, when there is a separate
internal condition that requires a counter, have the loop iterate over
the counter and use `if stop_event.wait(timeout)` internal to the loop.
This commit is contained in:
Sean Vig 2021-05-21 11:39:14 -04:00 committed by Blake Blackshear
parent f4bc68d396
commit 57864f2be6
7 changed files with 27 additions and 63 deletions

View File

@ -154,10 +154,7 @@ def run_detector(
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np} outputs[name] = {"shm": out_shm, "np": out_np}
while True: while not stop_event.is_set():
if stop_event.is_set():
break
try: try:
connection_id = detection_queue.get(timeout=5) connection_id = detection_queue.get(timeout=5)
except queue.Empty: except queue.Empty:

View File

@ -221,11 +221,7 @@ class EventProcessor(threading.Thread):
return True return True
def run(self): def run(self):
while True: while not self.stop_event.is_set():
if self.stop_event.is_set():
logger.info(f"Exiting event processor...")
break
try: try:
event_type, camera, event_data = self.event_queue.get(timeout=10) event_type, camera, event_data = self.event_queue.get(timeout=10)
except queue.Empty: except queue.Empty:
@ -272,6 +268,8 @@ class EventProcessor(threading.Thread):
del self.events_in_process[event_data["id"]] del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera)) self.event_processed_queue.put((event_data["id"], camera))
logger.info(f"Exiting event processor...")
class EventCleanup(threading.Thread): class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event): def __init__(self, config: FrigateConfig, stop_event):
@ -398,19 +396,8 @@ class EventCleanup(threading.Thread):
) )
def run(self): def run(self):
counter = 0 # only expire events every 5 minutes
while True: while not self.stop_event.wait(300):
if self.stop_event.is_set():
logger.info(f"Exiting event cleanup...")
break
# only expire events every 5 minutes, but check for stop events every 10 seconds
time.sleep(10)
counter = counter + 1
if counter < 30:
continue
counter = 0
self.expire("clips") self.expire("clips")
self.expire("snapshots") self.expire("snapshots")
self.purge_duplicates() self.purge_duplicates()
@ -420,3 +407,5 @@ class EventCleanup(threading.Thread):
Event.has_clip == False, Event.has_snapshot == False Event.has_clip == False, Event.has_snapshot == False
) )
delete_query.execute() delete_query.execute()
logger.info(f"Exiting event cleanup...")

View File

@ -706,11 +706,7 @@ class TrackedObjectProcessor(threading.Thread):
return self.camera_states[camera].get_current_frame(draw_options) return self.camera_states[camera].get_current_frame(draw_options)
def run(self): def run(self):
while True: while not self.stop_event.is_set():
if self.stop_event.is_set():
logger.info(f"Exiting object processor...")
break
try: try:
( (
camera, camera,
@ -769,3 +765,5 @@ class TrackedObjectProcessor(threading.Thread):
while not self.event_processed_queue.empty(): while not self.event_processed_queue.empty():
event_id, camera = self.event_processed_queue.get() event_id, camera = self.event_processed_queue.get()
self.camera_states[camera].finished(event_id) self.camera_states[camera].finished(event_id)
logger.info(f"Exiting object processor...")

View File

@ -1,4 +1,5 @@
import datetime import datetime
import itertools
import json import json
import logging import logging
import os import os
@ -114,19 +115,14 @@ class RecordingMaintainer(threading.Thread):
p.unlink(missing_ok=True) p.unlink(missing_ok=True)
def run(self): def run(self):
counter = 0 for counter in itertools.cycle(range(60)):
self.expire_files() if self.stop_event.wait(10):
while True:
if self.stop_event.is_set():
logger.info(f"Exiting recording maintenance...") logger.info(f"Exiting recording maintenance...")
break break
# only expire events every 10 minutes, but check for new files every 10 seconds # only expire events every 10 minutes, but check for new files every 10 seconds
time.sleep(10) if counter == 0:
counter = counter + 1
if counter > 60:
self.expire_files() self.expire_files()
remove_empty_directories(RECORD_DIR) remove_empty_directories(RECORD_DIR)
counter = 0
self.move_files() self.move_files()

View File

@ -94,12 +94,9 @@ class StatsEmitter(threading.Thread):
def run(self): def run(self):
time.sleep(10) time.sleep(10)
while True: while not self.stop_event.wait(self.config.mqtt.stats_interval):
if self.stop_event.is_set():
logger.info(f"Exiting watchdog...")
break
stats = stats_snapshot(self.stats_tracking) stats = stats_snapshot(self.stats_tracking)
self.mqtt_client.publish( self.mqtt_client.publish(
f"{self.topic_prefix}/stats", json.dumps(stats), retain=False f"{self.topic_prefix}/stats", json.dumps(stats), retain=False
) )
time.sleep(self.config.mqtt.stats_interval) logger.info(f"Exiting watchdog...")

View File

@ -212,15 +212,7 @@ class CameraWatchdog(threading.Thread):
) )
time.sleep(10) time.sleep(10)
while True: while not self.stop_event.wait(10):
if self.stop_event.is_set():
stop_ffmpeg(self.ffmpeg_detect_process, self.logger)
for p in self.ffmpeg_other_processes:
stop_ffmpeg(p["process"], self.logger)
p["logpipe"].close()
self.logpipe.close()
break
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
if not self.capture_thread.is_alive(): if not self.capture_thread.is_alive():
@ -248,8 +240,11 @@ class CameraWatchdog(threading.Thread):
p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
) )
# wait a bit before checking again stop_ffmpeg(self.ffmpeg_detect_process, self.logger)
time.sleep(10) for p in self.ffmpeg_other_processes:
stop_ffmpeg(p["process"], self.logger)
p["logpipe"].close()
self.logpipe.close()
def start_ffmpeg_detect(self): def start_ffmpeg_detect(self):
ffmpeg_cmd = [ ffmpeg_cmd = [
@ -451,10 +446,7 @@ def process_frames(
fps_tracker = EventsPerSecond() fps_tracker = EventsPerSecond()
fps_tracker.start() fps_tracker.start()
while True: while not stop_event.is_set():
if stop_event.is_set():
break
if exit_on_empty and frame_queue.empty(): if exit_on_empty and frame_queue.empty():
logger.info(f"Exiting track_objects...") logger.info(f"Exiting track_objects...")
break break

View File

@ -17,14 +17,7 @@ class FrigateWatchdog(threading.Thread):
def run(self): def run(self):
time.sleep(10) time.sleep(10)
while True: while not self.stop_event.wait(10):
# wait a bit before checking
time.sleep(10)
if self.stop_event.is_set():
logger.info(f"Exiting watchdog...")
break
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
# check the detection processes # check the detection processes
@ -38,3 +31,5 @@ class FrigateWatchdog(threading.Thread):
elif not detector.detect_process.is_alive(): elif not detector.detect_process.is_alive():
logger.info("Detection appears to have stopped. Exiting frigate...") logger.info("Detection appears to have stopped. Exiting frigate...")
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
logger.info(f"Exiting watchdog...")