Cleanup detections updater

This commit is contained in:
Nicolas Mowen 2025-06-06 08:56:04 -06:00
parent 107007a636
commit 0250db70d0
9 changed files with 20 additions and 22 deletions

View File

@ -3,7 +3,7 @@
import multiprocessing as mp import multiprocessing as mp
from _pickle import UnpicklingError from _pickle import UnpicklingError
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional from typing import Any
import zmq import zmq

View File

@ -1,7 +1,7 @@
"""Facilitates communication between processes.""" """Facilitates communication between processes."""
from enum import Enum from enum import Enum
from typing import Any, Optional from typing import Any
from .zmq_proxy import Publisher, Subscriber from .zmq_proxy import Publisher, Subscriber
@ -19,8 +19,7 @@ class DetectionPublisher(Publisher):
topic_base = "detection/" topic_base = "detection/"
def __init__(self, topic: DetectionTypeEnum) -> None: def __init__(self, topic: str) -> None:
topic = topic.value
super().__init__(topic) super().__init__(topic)
@ -29,16 +28,15 @@ class DetectionSubscriber(Subscriber):
topic_base = "detection/" topic_base = "detection/"
def __init__(self, topic: DetectionTypeEnum) -> None: def __init__(self, topic: str) -> None:
topic = topic.value
super().__init__(topic) super().__init__(topic)
def check_for_update( def check_for_update(
self, timeout: float = None self, timeout: float | None = None
) -> Optional[tuple[DetectionTypeEnum, Any]]: ) -> tuple[str, Any] | tuple[None, None]:
return super().check_for_update(timeout) return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: Any) -> Any: def _return_object(self, topic: str, payload: Any) -> Any:
if payload is None: if payload is None:
return (None, None) return (None, None)
return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload) return (topic[len(self.topic_base) :], payload)

View File

@ -83,7 +83,7 @@ class Subscriber:
self.socket.connect(SOCKET_SUB) self.socket.connect(SOCKET_SUB)
def check_for_update( def check_for_update(
self, timeout: float = FAST_QUEUE_TIMEOUT self, timeout: float | None = FAST_QUEUE_TIMEOUT
) -> tuple[str, Any] | tuple[None, None]: ) -> tuple[str, Any] | tuple[None, None]:
"""Returns message or None if no update.""" """Returns message or None if no update."""
try: try:

View File

@ -143,7 +143,7 @@ class EmbeddingMaintainer(threading.Thread):
self.recordings_subscriber = RecordingsDataSubscriber( self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through RecordingsDataTypeEnum.recordings_available_through
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
self.embeddings_responder = EmbeddingsResponder() self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()

View File

@ -183,7 +183,7 @@ class AudioEventMaintainer(threading.Thread):
CameraConfigUpdateEnum.audio_transcription, CameraConfigUpdateEnum.audio_transcription,
], ],
) )
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value)
self.event_metadata_publisher = EventMetadataPublisher() self.event_metadata_publisher = EventMetadataPublisher()
if self.camera_config.audio_transcription.enabled_in_config: if self.camera_config.audio_transcription.enabled_in_config:

View File

@ -96,7 +96,7 @@ class OutputProcess(FrigateProcess):
websocket_server.initialize_websockets_manager() websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever) websocket_thread = threading.Thread(target=websocket_server.serve_forever)
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
config_subscriber = CameraConfigUpdateSubscriber( config_subscriber = CameraConfigUpdateSubscriber(
self.config, self.config,
self.config.cameras, self.config.cameras,

View File

@ -79,7 +79,7 @@ class RecordingMaintainer(threading.Thread):
self.config.cameras, self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record], [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
self.recordings_publisher = RecordingsDataPublisher( self.recordings_publisher = RecordingsDataPublisher(
RecordingsDataTypeEnum.recordings_available_through RecordingsDataTypeEnum.recordings_available_through
) )
@ -545,7 +545,7 @@ class RecordingMaintainer(threading.Thread):
if not topic: if not topic:
break break
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video.value:
( (
camera, camera,
_, _,
@ -564,7 +564,7 @@ class RecordingMaintainer(threading.Thread):
regions, regions,
) )
) )
elif topic == DetectionTypeEnum.audio: elif topic == DetectionTypeEnum.audio.value:
( (
camera, camera,
frame_time, frame_time,
@ -580,7 +580,7 @@ class RecordingMaintainer(threading.Thread):
audio_detections, audio_detections,
) )
) )
elif topic == DetectionTypeEnum.api or DetectionTypeEnum.lpr: elif topic == DetectionTypeEnum.api.value or DetectionTypeEnum.lpr.value:
continue continue
if frame_time < run_start - stale_frame_count_threshold: if frame_time < run_start - stale_frame_count_threshold:

View File

@ -164,7 +164,7 @@ class ReviewSegmentMaintainer(threading.Thread):
CameraConfigUpdateEnum.review, CameraConfigUpdateEnum.review,
], ],
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
# manual events # manual events
self.indefinite_events: dict[str, dict[str, Any]] = {} self.indefinite_events: dict[str, dict[str, Any]] = {}
@ -484,7 +484,7 @@ class ReviewSegmentMaintainer(threading.Thread):
if not topic: if not topic:
continue continue
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video.value:
( (
camera, camera,
frame_name, frame_name,
@ -493,14 +493,14 @@ class ReviewSegmentMaintainer(threading.Thread):
_, _,
_, _,
) = data ) = data
elif topic == DetectionTypeEnum.audio: elif topic == DetectionTypeEnum.audio.value:
( (
camera, camera,
frame_time, frame_time,
_, _,
audio_detections, audio_detections,
) = data ) = data
elif topic == DetectionTypeEnum.api or DetectionTypeEnum.lpr: elif topic == DetectionTypeEnum.api.value or DetectionTypeEnum.lpr.value:
( (
camera, camera,
frame_time, frame_time,

View File

@ -78,7 +78,7 @@ class TrackedObjectProcessor(threading.Thread):
) )
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all.value)
self.event_sender = EventUpdatePublisher() self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber() self.event_end_subscriber = EventEndSubscriber()
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all) self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)