Cleanup dispatcher

This commit is contained in:
Nicolas Mowen 2025-06-06 09:17:26 -06:00
parent d077fe38f3
commit b3b40f58f4
5 changed files with 36 additions and 34 deletions

View File

@ -33,7 +33,7 @@ class DetectionSubscriber(Subscriber):
def check_for_update(
self, timeout: float | None = None
) -> tuple[str, Any] | tuple[None, None]:
) -> tuple[str, Any] | tuple[None, None] | None:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: Any) -> Any:

View File

@ -54,10 +54,9 @@ class Dispatcher:
self.ptz_metrics = ptz_metrics
self.comms = communicators
self.camera_activity = CameraActivityManager(config, self.publish)
self.model_state = {}
self.embeddings_reindex = {}
self.birdseye_layout = {}
self.model_state: dict[str, ModelStatusTypesEnum] = {}
self.embeddings_reindex: dict[str, Any] = {}
self.birdseye_layout: dict[str, Any] = {}
self._camera_settings_handlers: dict[str, Callable] = {
"audio": self._on_audio_command,
"audio_transcription": self._on_audio_transcription_command,
@ -88,10 +87,12 @@ class Dispatcher:
(comm for comm in communicators if isinstance(comm, WebPushClient)), None
)
def _receive(self, topic: str, payload: str) -> Optional[Any]:
def _receive(self, topic: str, payload: Any) -> Optional[Any]:
"""Handle receiving of payload from communicators."""
def handle_camera_command(command_type, camera_name, command, payload):
def handle_camera_command(
command_type: str, camera_name: str, command: str, payload: str
) -> None:
try:
if command_type == "set":
self._camera_settings_handlers[command](camera_name, payload)
@ -100,13 +101,13 @@ class Dispatcher:
except KeyError:
logger.error(f"Invalid command type or handler: {command_type}")
def handle_restart():
def handle_restart() -> None:
restart_frigate()
def handle_insert_many_recordings():
def handle_insert_many_recordings() -> None:
Recordings.insert_many(payload).execute()
def handle_request_region_grid():
def handle_request_region_grid() -> Any:
camera = payload
grid = get_camera_regions_grid(
camera,
@ -115,24 +116,24 @@ class Dispatcher:
)
return grid
def handle_insert_preview():
def handle_insert_preview() -> None:
Previews.insert(payload).execute()
def handle_upsert_review_segment():
def handle_upsert_review_segment() -> None:
ReviewSegment.insert(payload).on_conflict(
conflict_target=[ReviewSegment.id],
update=payload,
).execute()
def handle_clear_ongoing_review_segments():
def handle_clear_ongoing_review_segments() -> None:
ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where(
ReviewSegment.end_time.is_null(True)
).execute()
def handle_update_camera_activity():
def handle_update_camera_activity() -> None:
self.camera_activity.update_activity(payload)
def handle_update_event_description():
def handle_update_event_description() -> None:
event: Event = Event.get(Event.id == payload["id"])
event.data["description"] = payload["description"]
event.save()
@ -148,24 +149,24 @@ class Dispatcher:
),
)
def handle_update_model_state():
def handle_update_model_state() -> None:
if payload:
model = payload["model"]
state = payload["state"]
self.model_state[model] = ModelStatusTypesEnum[state]
self.publish("model_state", json.dumps(self.model_state))
def handle_model_state():
def handle_model_state() -> None:
self.publish("model_state", json.dumps(self.model_state.copy()))
def handle_update_embeddings_reindex_progress():
def handle_update_embeddings_reindex_progress() -> None:
self.embeddings_reindex = payload
self.publish(
"embeddings_reindex_progress",
json.dumps(payload),
)
def handle_embeddings_reindex_progress():
def handle_embeddings_reindex_progress() -> None:
self.publish(
"embeddings_reindex_progress",
json.dumps(self.embeddings_reindex.copy()),
@ -179,7 +180,7 @@ class Dispatcher:
def handle_birdseye_layout():
self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy()))
def handle_on_connect():
def handle_on_connect() -> None:
camera_status = self.camera_activity.last_camera_activity.copy()
cameras_with_status = camera_status.keys()
@ -219,7 +220,7 @@ class Dispatcher:
)
self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy()))
def handle_notification_test():
def handle_notification_test() -> None:
self.publish("notification_test", "Test notification")
# Dictionary mapping topic to handlers
@ -266,11 +267,12 @@ class Dispatcher:
logger.error(
f"Received invalid {topic.split('/')[-1]} command: {topic}"
)
return
return None
elif topic in topic_handlers:
return topic_handlers[topic]()
else:
self.publish(topic, payload, retain=False)
return None
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Handle publishing to communicators."""
@ -373,11 +375,11 @@ class Dispatcher:
if payload == "ON":
if not motion_settings.improve_contrast:
logger.info(f"Turning on improve contrast for {camera_name}")
motion_settings.improve_contrast = True # type: ignore[union-attr]
motion_settings.improve_contrast = True
elif payload == "OFF":
if motion_settings.improve_contrast:
logger.info(f"Turning off improve contrast for {camera_name}")
motion_settings.improve_contrast = False # type: ignore[union-attr]
motion_settings.improve_contrast = False
self.config_updater.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name),
@ -421,7 +423,7 @@ class Dispatcher:
motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion contour area for {camera_name}: {payload}")
motion_settings.contour_area = payload # type: ignore[union-attr]
motion_settings.contour_area = payload
self.config_updater.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name),
motion_settings,
@ -438,7 +440,7 @@ class Dispatcher:
motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion threshold for {camera_name}: {payload}")
motion_settings.threshold = payload # type: ignore[union-attr]
motion_settings.threshold = payload
self.config_updater.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name),
motion_settings,
@ -453,7 +455,7 @@ class Dispatcher:
notification_settings = self.config.notifications
logger.info(f"Setting all notifications: {payload}")
notification_settings.enabled = payload == "ON" # type: ignore[union-attr]
notification_settings.enabled = payload == "ON"
self.config_updater.publisher.publish(
"config/notifications", notification_settings
)

View File

@ -75,7 +75,7 @@ class MqttClient(Communicator):
)
self.publish(
f"{camera_name}/improve_contrast/state",
"ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr]
"ON" if camera.motion.improve_contrast else "OFF",
retain=True,
)
self.publish(
@ -85,12 +85,12 @@ class MqttClient(Communicator):
)
self.publish(
f"{camera_name}/motion_threshold/state",
camera.motion.threshold, # type: ignore[union-attr]
camera.motion.threshold,
retain=True,
)
self.publish(
f"{camera_name}/motion_contour_area/state",
camera.motion.contour_area, # type: ignore[union-attr]
camera.motion.contour_area,
retain=True,
)
self.publish(

View File

@ -84,7 +84,7 @@ class Subscriber:
def check_for_update(
self, timeout: float | None = FAST_QUEUE_TIMEOUT
) -> tuple[str, Any] | tuple[None, None]:
) -> tuple[str, Any] | tuple[None, None] | None:
"""Returns message or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
@ -103,5 +103,5 @@ class Subscriber:
def _return_object(
self, topic: str, payload: Optional[tuple[str, Any]]
) -> tuple[str, Any] | tuple[None, None]:
return payload or (None, None)
) -> tuple[str, Any] | tuple[None, None] | None:
return payload

View File

@ -80,7 +80,7 @@ class CameraConfig(FrigateBaseModel):
lpr: CameraLicensePlateRecognitionConfig = Field(
default_factory=CameraLicensePlateRecognitionConfig, title="LPR config."
)
motion: Optional[MotionConfig] = Field(
motion: MotionConfig = Field(
None, title="Motion detection configuration."
)
objects: ObjectConfig = Field(