blakeblackshear.frigate/frigate/comms/dispatcher.py
Josh Hawkins 9df5927ac5
Autotracking bugfixes and zooming updates (#8103)
* zoom in/out in search for lost objects

* predicted box should not be empty

* clean up and update zoom logic

* only zoom if enabled

* more cleanup

* check for valid velocity when zooming

* only try absolute zoom in if obj area has changed

* zoom logic

* don't enqueue lost object zoom if already at limit

* don't disable motion boxes during ptz moves

* velocity threshold based on move coefficients

* fix area zoom logic

* disable debug zoom

* don't process objects if ptz moving

* recalc with exponent

* change exponent

* remove lost object zooming

* increase distance threshold for stationary object

* increase distance threshold constant

* only zoom out if nonzero

* camera name in all debug logging

* add camera name to debug logging

* camera variable name consistency

* update calibration behavior and docs

* docs and better zooming

* more sensible target values

* docs wording

* fix velocity threshold variable

* zooming tweaks and remove iou for current objects

* debug and docs

* get valid velocity

* include zero

* additional debug statements

* add zoom hysteresis

* zoom on initial move if relative

* only update target box if we actually zoom

* merge dev

* use getattr instead of get

* increase distance threshold

* reverse logic

* get_camera_status after preset move to store zoom

* final tweaks and docs

* use constants and catch possible debug exception

* adjust zoom factor exponent

* don't run motion estimation when calling preset

* adjust dimension threshold

* use numpy for velocity estimate calcs

* more numpy conversion

* fix numpy shapes

* numpy zeros dimension

* more zoom out conditions

* fix velocity bug

* ensure init has been called in debug view

* ensure onvif init if enabling by mqtt

* change default hysteresis values

* recalc relative zoom value

* zoom out value

* try to zoom when object isn't moving

* try zoom when tracked object is not moving

* don't try to zoom every time

* negate zoom out condition when needed

* hysteresis constants for absolute zooming

* update zoom conditions

* don't recalc target box on zoom only

* zoom out if above area threshold

* don't print zooming debug for stationary obj

* revamp zooming to use area moving average

* zooming tweaks and expose property

* limit zoom with max target box

* use calibration to determine zoom levels

* zoom logic fix

* docs

* add tapo c200 camera

* fix initial absolute zoom

* small zoom logic fix

* better invalid velocity checks

* fix test

* really fix test this time
2023-10-22 12:59:13 -04:00

299 lines
13 KiB
Python

"""Handle communication between Frigate and other applications."""
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable
from frigate.config import FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS, REQUEST_REGION_GRID
from frigate.models import Recordings
from frigate.ptz.onvif import OnvifCommandEnum, OnvifController
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes
from frigate.util.object import get_camera_regions_grid
from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__)
class Communicator(ABC):
"""pub/sub model via specific protocol."""
@abstractmethod
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Send data via specific protocol."""
pass
@abstractmethod
def subscribe(self, receiver: Callable) -> None:
"""Pass receiver so communicators can pass commands."""
pass
@abstractmethod
def stop(self) -> None:
"""Stop the communicator."""
pass
class Dispatcher:
"""Handle communication between Frigate and communicators."""
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
camera_metrics: dict[str, CameraMetricsTypes],
feature_metrics: dict[str, FeatureMetricsTypes],
ptz_metrics: dict[str, PTZMetricsTypes],
communicators: list[Communicator],
) -> None:
self.config = config
self.onvif = onvif
self.camera_metrics = camera_metrics
self.feature_metrics = feature_metrics
self.ptz_metrics = ptz_metrics
self.comms = communicators
self._camera_settings_handlers: dict[str, Callable] = {
"audio": self._on_audio_command,
"detect": self._on_detect_command,
"improve_contrast": self._on_motion_improve_contrast_command,
"ptz_autotracker": self._on_ptz_autotracker_command,
"motion": self._on_motion_command,
"motion_contour_area": self._on_motion_contour_area_command,
"motion_threshold": self._on_motion_threshold_command,
"recordings": self._on_recordings_command,
"snapshots": self._on_snapshots_command,
}
for comm in self.comms:
comm.subscribe(self._receive)
def _receive(self, topic: str, payload: str) -> None:
"""Handle receiving of payload from communicators."""
if topic.endswith("set"):
try:
# example /cam_name/detect/set payload=ON|OFF
camera_name = topic.split("/")[-3]
command = topic.split("/")[-2]
self._camera_settings_handlers[command](camera_name, payload)
except IndexError:
logger.error(f"Received invalid set command: {topic}")
return
elif topic.endswith("ptz"):
try:
# example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP...
camera_name = topic.split("/")[-2]
self._on_ptz_command(camera_name, payload)
except IndexError:
logger.error(f"Received invalid ptz command: {topic}")
return
elif topic == "restart":
restart_frigate()
elif topic == INSERT_MANY_RECORDINGS:
Recordings.insert_many(payload).execute()
elif topic == REQUEST_REGION_GRID:
camera = payload
self.camera_metrics[camera]["region_grid_queue"].put(
get_camera_regions_grid(camera, self.config.cameras[camera].detect)
)
else:
self.publish(topic, payload, retain=False)
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Handle publishing to communicators."""
for comm in self.comms:
comm.publish(topic, payload, retain)
def stop(self) -> None:
for comm in self.comms:
comm.stop()
def _on_detect_command(self, camera_name: str, payload: str) -> None:
"""Callback for detect topic."""
detect_settings = self.config.cameras[camera_name].detect
if payload == "ON":
if not self.camera_metrics[camera_name]["detection_enabled"].value:
logger.info(f"Turning on detection for {camera_name}")
self.camera_metrics[camera_name]["detection_enabled"].value = True
detect_settings.enabled = True
if not self.camera_metrics[camera_name]["motion_enabled"].value:
logger.info(
f"Turning on motion for {camera_name} due to detection being enabled."
)
self.camera_metrics[camera_name]["motion_enabled"].value = True
self.publish(f"{camera_name}/motion/state", payload, retain=True)
elif payload == "OFF":
if self.camera_metrics[camera_name]["detection_enabled"].value:
logger.info(f"Turning off detection for {camera_name}")
self.camera_metrics[camera_name]["detection_enabled"].value = False
detect_settings.enabled = False
self.publish(f"{camera_name}/detect/state", payload, retain=True)
def _on_motion_command(self, camera_name: str, payload: str) -> None:
"""Callback for motion topic."""
if payload == "ON":
if not self.camera_metrics[camera_name]["motion_enabled"].value:
logger.info(f"Turning on motion for {camera_name}")
self.camera_metrics[camera_name]["motion_enabled"].value = True
elif payload == "OFF":
if self.camera_metrics[camera_name]["detection_enabled"].value:
logger.error(
"Turning off motion is not allowed when detection is enabled."
)
return
if self.camera_metrics[camera_name]["motion_enabled"].value:
logger.info(f"Turning off motion for {camera_name}")
self.camera_metrics[camera_name]["motion_enabled"].value = False
self.publish(f"{camera_name}/motion/state", payload, retain=True)
def _on_motion_improve_contrast_command(
self, camera_name: str, payload: str
) -> None:
"""Callback for improve_contrast topic."""
motion_settings = self.config.cameras[camera_name].motion
if payload == "ON":
if not self.camera_metrics[camera_name]["improve_contrast_enabled"].value:
logger.info(f"Turning on improve contrast for {camera_name}")
self.camera_metrics[camera_name][
"improve_contrast_enabled"
].value = True
motion_settings.improve_contrast = True # type: ignore[union-attr]
elif payload == "OFF":
if self.camera_metrics[camera_name]["improve_contrast_enabled"].value:
logger.info(f"Turning off improve contrast for {camera_name}")
self.camera_metrics[camera_name][
"improve_contrast_enabled"
].value = False
motion_settings.improve_contrast = False # type: ignore[union-attr]
self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True)
def _on_ptz_autotracker_command(self, camera_name: str, payload: str) -> None:
"""Callback for ptz_autotracker topic."""
ptz_autotracker_settings = self.config.cameras[camera_name].onvif.autotracking
if payload == "ON":
if not self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value:
logger.info(f"Turning on ptz autotracker for {camera_name}")
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = True
self.ptz_metrics[camera_name]["ptz_start_time"].value = 0
ptz_autotracker_settings.enabled = True
elif payload == "OFF":
if self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value:
logger.info(f"Turning off ptz autotracker for {camera_name}")
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
self.ptz_metrics[camera_name]["ptz_start_time"].value = 0
ptz_autotracker_settings.enabled = False
self.publish(f"{camera_name}/ptz_autotracker/state", payload, retain=True)
def _on_motion_contour_area_command(self, camera_name: str, payload: int) -> None:
"""Callback for motion contour topic."""
try:
payload = int(payload)
except ValueError:
f"Received unsupported value for motion contour area: {payload}"
return
motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion contour area for {camera_name}: {payload}")
self.camera_metrics[camera_name]["motion_contour_area"].value = payload
motion_settings.contour_area = payload # type: ignore[union-attr]
self.publish(f"{camera_name}/motion_contour_area/state", payload, retain=True)
def _on_motion_threshold_command(self, camera_name: str, payload: int) -> None:
"""Callback for motion threshold topic."""
try:
payload = int(payload)
except ValueError:
f"Received unsupported value for motion threshold: {payload}"
return
motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion threshold for {camera_name}: {payload}")
self.camera_metrics[camera_name]["motion_threshold"].value = payload
motion_settings.threshold = payload # type: ignore[union-attr]
self.publish(f"{camera_name}/motion_threshold/state", payload, retain=True)
def _on_audio_command(self, camera_name: str, payload: str) -> None:
"""Callback for audio topic."""
audio_settings = self.config.cameras[camera_name].audio
if payload == "ON":
if not self.config.cameras[camera_name].audio.enabled_in_config:
logger.error(
"Audio detection must be enabled in the config to be turned on via MQTT."
)
return
if not audio_settings.enabled:
logger.info(f"Turning on audio detection for {camera_name}")
audio_settings.enabled = True
self.feature_metrics[camera_name]["audio_enabled"].value = True
elif payload == "OFF":
if self.feature_metrics[camera_name]["audio_enabled"].value:
logger.info(f"Turning off audio detection for {camera_name}")
audio_settings.enabled = False
self.feature_metrics[camera_name]["audio_enabled"].value = False
self.publish(f"{camera_name}/audio/state", payload, retain=True)
def _on_recordings_command(self, camera_name: str, payload: str) -> None:
"""Callback for recordings topic."""
record_settings = self.config.cameras[camera_name].record
if payload == "ON":
if not self.config.cameras[camera_name].record.enabled_in_config:
logger.error(
"Recordings must be enabled in the config to be turned on via MQTT."
)
return
if not record_settings.enabled:
logger.info(f"Turning on recordings for {camera_name}")
record_settings.enabled = True
self.feature_metrics[camera_name]["record_enabled"].value = True
elif payload == "OFF":
if self.feature_metrics[camera_name]["record_enabled"].value:
logger.info(f"Turning off recordings for {camera_name}")
record_settings.enabled = False
self.feature_metrics[camera_name]["record_enabled"].value = False
self.publish(f"{camera_name}/recordings/state", payload, retain=True)
def _on_snapshots_command(self, camera_name: str, payload: str) -> None:
"""Callback for snapshots topic."""
snapshots_settings = self.config.cameras[camera_name].snapshots
if payload == "ON":
if not snapshots_settings.enabled:
logger.info(f"Turning on snapshots for {camera_name}")
snapshots_settings.enabled = True
elif payload == "OFF":
if snapshots_settings.enabled:
logger.info(f"Turning off snapshots for {camera_name}")
snapshots_settings.enabled = False
self.publish(f"{camera_name}/snapshots/state", payload, retain=True)
def _on_ptz_command(self, camera_name: str, payload: str) -> None:
"""Callback for ptz topic."""
try:
if "preset" in payload.lower():
command = OnvifCommandEnum.preset
param = payload.lower()[payload.index("_") + 1 :]
else:
command = OnvifCommandEnum[payload.lower()]
param = ""
self.onvif.handle_command(camera_name, command, param)
logger.info(f"Setting ptz command to {command} for {camera_name}")
except KeyError as k:
logger.error(f"Invalid PTZ command {payload}: {k}")