Refactor enabled camera listeners (#16979)

* Monitor if camera is disabled for review items

* Simplify multi camera disabled check

* Cleanup birdseye config handling

* Cleanup

* Remove old listeners
This commit is contained in:
Nicolas Mowen 2025-03-06 06:59:35 -07:00 committed by GitHub
parent 73c2c34127
commit 66d5f4f3b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 71 additions and 54 deletions

View File

@ -440,10 +440,7 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {} self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread self.ptz_autotracker_thread = ptz_autotracker_thread
self.enabled_subscribers = { self.config_enabled_subscriber = ConfigSubscriber("config/enabled/")
camera: ConfigSubscriber(f"config/enabled/{camera}", True)
for camera in config.cameras.keys()
}
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
@ -705,24 +702,34 @@ class TrackedObjectProcessor(threading.Thread):
{"enabled": False, "motion": 0, "objects": []}, {"enabled": False, "motion": 0, "objects": []},
) )
def _get_enabled_state(self, camera: str) -> bool:
_, config_data = self.enabled_subscribers[camera].check_for_update()
if config_data:
self.config.cameras[camera].enabled = config_data.enabled
if self.camera_states[camera].prev_enabled is None:
self.camera_states[camera].prev_enabled = config_data.enabled
return self.config.cameras[camera].enabled
def run(self): def run(self):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
# check for config updates
while True:
(
updated_enabled_topic,
updated_enabled_config,
) = self.config_enabled_subscriber.check_for_update()
if not updated_enabled_topic:
break
camera_name = updated_enabled_topic.rpartition("/")[-1]
self.config.cameras[
camera_name
].enabled = updated_enabled_config.enabled
if self.camera_states[camera_name].prev_enabled is None:
self.camera_states[
camera_name
].prev_enabled = updated_enabled_config.enabled
# manage camera disabled state
for camera, config in self.config.cameras.items(): for camera, config in self.config.cameras.items():
if not config.enabled_in_config: if not config.enabled_in_config:
continue continue
current_enabled = self._get_enabled_state(camera) current_enabled = config.enabled
camera_state = self.camera_states[camera] camera_state = self.camera_states[camera]
if camera_state.prev_enabled and not current_enabled: if camera_state.prev_enabled and not current_enabled:
@ -746,7 +753,7 @@ class TrackedObjectProcessor(threading.Thread):
except queue.Empty: except queue.Empty:
continue continue
if not self._get_enabled_state(camera): if not self.config.cameras[camera].enabled:
logger.debug(f"Camera {camera} disabled, skipping update") logger.debug(f"Camera {camera} disabled, skipping update")
continue continue
@ -792,7 +799,6 @@ class TrackedObjectProcessor(threading.Thread):
self.detection_publisher.stop() self.detection_publisher.stop()
self.event_sender.stop() self.event_sender.stop()
self.event_end_subscriber.stop() self.event_end_subscriber.stop()
for subscriber in self.enabled_subscribers.values(): self.config_enabled_subscriber.stop()
subscriber.stop()
logger.info("Exiting object processor...") logger.info("Exiting object processor...")

View File

@ -281,12 +281,6 @@ class BirdsEyeFrameManager:
self.stop_event = stop_event self.stop_event = stop_event
self.inactivity_threshold = config.birdseye.inactivity_threshold self.inactivity_threshold = config.birdseye.inactivity_threshold
self.enabled_subscribers = {
cam: ConfigSubscriber(f"config/enabled/{cam}", True)
for cam in config.cameras.keys()
if config.cameras[cam].enabled_in_config
}
if config.birdseye.layout.max_cameras: if config.birdseye.layout.max_cameras:
self.last_refresh_time = 0 self.last_refresh_time = 0
@ -387,16 +381,6 @@ class BirdsEyeFrameManager:
if mode == BirdseyeModeEnum.objects and object_box_count > 0: if mode == BirdseyeModeEnum.objects and object_box_count > 0:
return True return True
def _get_enabled_state(self, camera: str) -> bool:
"""Fetch the latest enabled state for a camera from ZMQ."""
_, config_data = self.enabled_subscribers[camera].check_for_update()
if config_data:
self.config.cameras[camera].enabled = config_data.enabled
return config_data.enabled
return self.config.cameras[camera].enabled
def update_frame(self, frame: Optional[np.ndarray] = None) -> bool: def update_frame(self, frame: Optional[np.ndarray] = None) -> bool:
""" """
Update birdseye, optionally with a new frame. Update birdseye, optionally with a new frame.
@ -410,7 +394,7 @@ class BirdsEyeFrameManager:
for cam, cam_data in self.cameras.items() for cam, cam_data in self.cameras.items()
if self.config.cameras[cam].birdseye.enabled if self.config.cameras[cam].birdseye.enabled
and self.config.cameras[cam].enabled_in_config and self.config.cameras[cam].enabled_in_config
and self._get_enabled_state(cam) and self.config.cameras[cam].enabled
and cam_data["last_active_frame"] > 0 and cam_data["last_active_frame"] > 0
and cam_data["current_frame_time"] - cam_data["last_active_frame"] and cam_data["current_frame_time"] - cam_data["last_active_frame"]
< self.inactivity_threshold < self.inactivity_threshold
@ -706,11 +690,11 @@ class BirdsEyeFrameManager:
frame: np.ndarray, frame: np.ndarray,
) -> bool: ) -> bool:
# don't process if birdseye is disabled for this camera # don't process if birdseye is disabled for this camera
camera_config = self.config.cameras[camera].birdseye camera_config = self.config.cameras[camera]
force_update = False force_update = False
# disabling birdseye is a little tricky # disabling birdseye is a little tricky
if not self._get_enabled_state(camera): if not camera_config.birdseye.enabled or not camera_config.enabled:
# if we've rendered a frame (we have a value for last_active_frame) # if we've rendered a frame (we have a value for last_active_frame)
# then we need to set it to zero # then we need to set it to zero
if self.cameras[camera]["last_active_frame"] > 0: if self.cameras[camera]["last_active_frame"] > 0:
@ -722,7 +706,7 @@ class BirdsEyeFrameManager:
# update the last active frame for the camera # update the last active frame for the camera
self.cameras[camera]["current_frame"] = frame.copy() self.cameras[camera]["current_frame"] = frame.copy()
self.cameras[camera]["current_frame_time"] = frame_time self.cameras[camera]["current_frame_time"] = frame_time
if self.camera_active(camera_config.mode, object_count, motion_count): if self.camera_active(camera_config.birdseye.mode, object_count, motion_count):
self.cameras[camera]["last_active_frame"] = frame_time self.cameras[camera]["last_active_frame"] = frame_time
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
@ -745,11 +729,6 @@ class BirdsEyeFrameManager:
return True return True
return False return False
def stop(self):
"""Clean up subscribers when stopping."""
for subscriber in self.enabled_subscribers.values():
subscriber.stop()
class Birdseye: class Birdseye:
def __init__( def __init__(
@ -775,7 +754,8 @@ class Birdseye:
"birdseye", self.converter, websocket_server, stop_event "birdseye", self.converter, websocket_server, stop_event
) )
self.birdseye_manager = BirdsEyeFrameManager(config, stop_event) self.birdseye_manager = BirdsEyeFrameManager(config, stop_event)
self.config_subscriber = ConfigSubscriber("config/birdseye/") self.config_enabled_subscriber = ConfigSubscriber("config/enabled/")
self.birdseye_subscriber = ConfigSubscriber("config/birdseye/")
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
self.stop_event = stop_event self.stop_event = stop_event
@ -815,15 +795,27 @@ class Birdseye:
# check if there is an updated config # check if there is an updated config
while True: while True:
( (
updated_topic, updated_birdseye_topic,
updated_birdseye_config, updated_birdseye_config,
) = self.config_subscriber.check_for_update() ) = self.birdseye_subscriber.check_for_update()
if not updated_topic: (
updated_enabled_topic,
updated_enabled_config,
) = self.config_enabled_subscriber.check_for_update()
if not updated_birdseye_topic and not updated_enabled_topic:
break break
camera_name = updated_topic.rpartition("/")[-1] if updated_birdseye_config:
self.config.cameras[camera_name].birdseye = updated_birdseye_config camera_name = updated_birdseye_topic.rpartition("/")[-1]
self.config.cameras[camera_name].birdseye = updated_birdseye_config
if updated_enabled_config:
camera_name = updated_enabled_topic.rpartition("/")[-1]
self.config.cameras[
camera_name
].enabled = updated_enabled_config.enabled
if self.birdseye_manager.update( if self.birdseye_manager.update(
camera, camera,
@ -835,7 +827,7 @@ class Birdseye:
self.__send_new_frame() self.__send_new_frame()
def stop(self) -> None: def stop(self) -> None:
self.config_subscriber.stop() self.birdseye_subscriber.stop()
self.birdseye_manager.stop() self.config_enabled_subscriber.stop()
self.converter.join() self.converter.join()
self.broadcaster.join() self.broadcaster.join()

View File

@ -150,6 +150,7 @@ class ReviewSegmentMaintainer(threading.Thread):
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.record_config_subscriber = ConfigSubscriber("config/record/") self.record_config_subscriber = ConfigSubscriber("config/record/")
self.review_config_subscriber = ConfigSubscriber("config/review/") self.review_config_subscriber = ConfigSubscriber("config/review/")
self.enabled_config_subscriber = ConfigSubscriber("config/enabled/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
# manual events # manual events
@ -450,7 +451,16 @@ class ReviewSegmentMaintainer(threading.Thread):
updated_review_config, updated_review_config,
) = self.review_config_subscriber.check_for_update() ) = self.review_config_subscriber.check_for_update()
if not updated_record_topic and not updated_review_topic: (
updated_enabled_topic,
updated_enabled_config,
) = self.enabled_config_subscriber.check_for_update()
if (
not updated_record_topic
and not updated_review_topic
and not updated_enabled_topic
):
break break
if updated_record_topic: if updated_record_topic:
@ -461,6 +471,12 @@ class ReviewSegmentMaintainer(threading.Thread):
camera_name = updated_review_topic.rpartition("/")[-1] camera_name = updated_review_topic.rpartition("/")[-1]
self.config.cameras[camera_name].review = updated_review_config self.config.cameras[camera_name].review = updated_review_config
if updated_enabled_config:
camera_name = updated_enabled_topic.rpartition("/")[-1]
self.config.cameras[
camera_name
].enabled = updated_enabled_config.enabled
(topic, data) = self.detection_subscriber.check_for_update(timeout=1) (topic, data) = self.detection_subscriber.check_for_update(timeout=1)
if not topic: if not topic:
@ -494,7 +510,10 @@ class ReviewSegmentMaintainer(threading.Thread):
current_segment = self.active_review_segments.get(camera) current_segment = self.active_review_segments.get(camera)
if not self.config.cameras[camera].record.enabled: if (
not self.config.cameras[camera].enabled
or not self.config.cameras[camera].record.enabled
):
if current_segment: if current_segment:
self.end_segment(camera) self.end_segment(camera)
continue continue