"""Handle communication between Frigate and other applications.""" import logging from abc import ABC, abstractmethod from typing import Any, Callable from frigate.config import FrigateConfig from frigate.const import INSERT_MANY_RECORDINGS from frigate.models import Recordings from frigate.ptz.onvif import OnvifCommandEnum, OnvifController from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes from frigate.util.services import restart_frigate logger = logging.getLogger(__name__) class Communicator(ABC): """pub/sub model via specific protocol.""" @abstractmethod def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Send data via specific protocol.""" pass @abstractmethod def subscribe(self, receiver: Callable) -> None: """Pass receiver so communicators can pass commands.""" pass @abstractmethod def stop(self) -> None: """Stop the communicator.""" pass class Dispatcher: """Handle communication between Frigate and communicators.""" def __init__( self, config: FrigateConfig, onvif: OnvifController, camera_metrics: dict[str, CameraMetricsTypes], feature_metrics: dict[str, FeatureMetricsTypes], ptz_metrics: dict[str, PTZMetricsTypes], communicators: list[Communicator], ) -> None: self.config = config self.onvif = onvif self.camera_metrics = camera_metrics self.feature_metrics = feature_metrics self.ptz_metrics = ptz_metrics self.comms = communicators for comm in self.comms: comm.subscribe(self._receive) self._camera_settings_handlers: dict[str, Callable] = { "audio": self._on_audio_command, "detect": self._on_detect_command, "improve_contrast": self._on_motion_improve_contrast_command, "ptz_autotracker": self._on_ptz_autotracker_command, "motion": self._on_motion_command, "motion_contour_area": self._on_motion_contour_area_command, "motion_threshold": self._on_motion_threshold_command, "recordings": self._on_recordings_command, "snapshots": self._on_snapshots_command, } def _receive(self, topic: str, payload: str) -> None: """Handle receiving of payload from communicators.""" if topic.endswith("set"): try: # example /cam_name/detect/set payload=ON|OFF camera_name = topic.split("/")[-3] command = topic.split("/")[-2] self._camera_settings_handlers[command](camera_name, payload) except IndexError: logger.error(f"Received invalid set command: {topic}") return elif topic.endswith("ptz"): try: # example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP... camera_name = topic.split("/")[-2] self._on_ptz_command(camera_name, payload) except IndexError: logger.error(f"Received invalid ptz command: {topic}") return elif topic == "restart": restart_frigate() elif topic == INSERT_MANY_RECORDINGS: Recordings.insert_many(payload).execute() else: self.publish(topic, payload, retain=False) def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Handle publishing to communicators.""" for comm in self.comms: comm.publish(topic, payload, retain) def stop(self) -> None: for comm in self.comms: comm.stop() def _on_detect_command(self, camera_name: str, payload: str) -> None: """Callback for detect topic.""" detect_settings = self.config.cameras[camera_name].detect if payload == "ON": if not self.camera_metrics[camera_name]["detection_enabled"].value: logger.info(f"Turning on detection for {camera_name}") self.camera_metrics[camera_name]["detection_enabled"].value = True detect_settings.enabled = True if not self.camera_metrics[camera_name]["motion_enabled"].value: logger.info( f"Turning on motion for {camera_name} due to detection being enabled." ) self.camera_metrics[camera_name]["motion_enabled"].value = True self.publish(f"{camera_name}/motion/state", payload, retain=True) elif payload == "OFF": if self.camera_metrics[camera_name]["detection_enabled"].value: logger.info(f"Turning off detection for {camera_name}") self.camera_metrics[camera_name]["detection_enabled"].value = False detect_settings.enabled = False self.publish(f"{camera_name}/detect/state", payload, retain=True) def _on_motion_command(self, camera_name: str, payload: str) -> None: """Callback for motion topic.""" if payload == "ON": if not self.camera_metrics[camera_name]["motion_enabled"].value: logger.info(f"Turning on motion for {camera_name}") self.camera_metrics[camera_name]["motion_enabled"].value = True elif payload == "OFF": if self.camera_metrics[camera_name]["detection_enabled"].value: logger.error( "Turning off motion is not allowed when detection is enabled." ) return if self.camera_metrics[camera_name]["motion_enabled"].value: logger.info(f"Turning off motion for {camera_name}") self.camera_metrics[camera_name]["motion_enabled"].value = False self.publish(f"{camera_name}/motion/state", payload, retain=True) def _on_motion_improve_contrast_command( self, camera_name: str, payload: str ) -> None: """Callback for improve_contrast topic.""" motion_settings = self.config.cameras[camera_name].motion if payload == "ON": if not self.camera_metrics[camera_name]["improve_contrast_enabled"].value: logger.info(f"Turning on improve contrast for {camera_name}") self.camera_metrics[camera_name][ "improve_contrast_enabled" ].value = True motion_settings.improve_contrast = True # type: ignore[union-attr] elif payload == "OFF": if self.camera_metrics[camera_name]["improve_contrast_enabled"].value: logger.info(f"Turning off improve contrast for {camera_name}") self.camera_metrics[camera_name][ "improve_contrast_enabled" ].value = False motion_settings.improve_contrast = False # type: ignore[union-attr] self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True) def _on_ptz_autotracker_command(self, camera_name: str, payload: str) -> None: """Callback for ptz_autotracker topic.""" ptz_autotracker_settings = self.config.cameras[camera_name].onvif.autotracking if payload == "ON": if not self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value: logger.info(f"Turning on ptz autotracker for {camera_name}") self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = True ptz_autotracker_settings.enabled = True elif payload == "OFF": if self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value: logger.info(f"Turning off ptz autotracker for {camera_name}") self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False ptz_autotracker_settings.enabled = False self.publish(f"{camera_name}/ptz_autotracker/state", payload, retain=True) def _on_motion_contour_area_command(self, camera_name: str, payload: int) -> None: """Callback for motion contour topic.""" try: payload = int(payload) except ValueError: f"Received unsupported value for motion contour area: {payload}" return motion_settings = self.config.cameras[camera_name].motion logger.info(f"Setting motion contour area for {camera_name}: {payload}") self.camera_metrics[camera_name]["motion_contour_area"].value = payload motion_settings.contour_area = payload # type: ignore[union-attr] self.publish(f"{camera_name}/motion_contour_area/state", payload, retain=True) def _on_motion_threshold_command(self, camera_name: str, payload: int) -> None: """Callback for motion threshold topic.""" try: payload = int(payload) except ValueError: f"Received unsupported value for motion threshold: {payload}" return motion_settings = self.config.cameras[camera_name].motion logger.info(f"Setting motion threshold for {camera_name}: {payload}") self.camera_metrics[camera_name]["motion_threshold"].value = payload motion_settings.threshold = payload # type: ignore[union-attr] self.publish(f"{camera_name}/motion_threshold/state", payload, retain=True) def _on_audio_command(self, camera_name: str, payload: str) -> None: """Callback for audio topic.""" audio_settings = self.config.cameras[camera_name].audio if payload == "ON": if not self.config.cameras[camera_name].audio.enabled_in_config: logger.error( "Audio detection must be enabled in the config to be turned on via MQTT." ) return if not audio_settings.enabled: logger.info(f"Turning on audio detection for {camera_name}") audio_settings.enabled = True self.feature_metrics[camera_name]["audio_enabled"].value = True elif payload == "OFF": if self.feature_metrics[camera_name]["audio_enabled"].value: logger.info(f"Turning off audio detection for {camera_name}") audio_settings.enabled = False self.feature_metrics[camera_name]["audio_enabled"].value = False self.publish(f"{camera_name}/audio/state", payload, retain=True) def _on_recordings_command(self, camera_name: str, payload: str) -> None: """Callback for recordings topic.""" record_settings = self.config.cameras[camera_name].record if payload == "ON": if not self.config.cameras[camera_name].record.enabled_in_config: logger.error( "Recordings must be enabled in the config to be turned on via MQTT." ) return if not record_settings.enabled: logger.info(f"Turning on recordings for {camera_name}") record_settings.enabled = True self.feature_metrics[camera_name]["record_enabled"].value = True elif payload == "OFF": if self.feature_metrics[camera_name]["record_enabled"].value: logger.info(f"Turning off recordings for {camera_name}") record_settings.enabled = False self.feature_metrics[camera_name]["record_enabled"].value = False self.publish(f"{camera_name}/recordings/state", payload, retain=True) def _on_snapshots_command(self, camera_name: str, payload: str) -> None: """Callback for snapshots topic.""" snapshots_settings = self.config.cameras[camera_name].snapshots if payload == "ON": if not snapshots_settings.enabled: logger.info(f"Turning on snapshots for {camera_name}") snapshots_settings.enabled = True elif payload == "OFF": if snapshots_settings.enabled: logger.info(f"Turning off snapshots for {camera_name}") snapshots_settings.enabled = False self.publish(f"{camera_name}/snapshots/state", payload, retain=True) def _on_ptz_command(self, camera_name: str, payload: str) -> None: """Callback for ptz topic.""" try: if "preset" in payload.lower(): command = OnvifCommandEnum.preset param = payload.lower()[payload.index("_") + 1 :] else: command = OnvifCommandEnum[payload.lower()] param = "" self.onvif.handle_command(camera_name, command, param) logger.info(f"Setting ptz command to {command} for {camera_name}") except KeyError as k: logger.error(f"Invalid PTZ command {payload}: {k}")