mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
Autotracker: Basic zooming and moves with velocity estimation (#7713)
* don't zoom if camera doesn't support it * basic zooming * make zooming configurable * zooming docs * optional zooming in camera status * Use absolute instead of relative zooming * increase edge threshold * zoom considering object area * bugfixes * catch onvif zooming errors * relative zooming option for dahua/amcrest cams * docs * docs * don't make small movements * remove old logger statement * fix small movements * use enum in config for zooming * fix formatting * empty move queue first * clear tracked object before waiting for stop * use velocity estimation for movements * docs updates * add tests * typos * recalc every 50 moves * adjust zoom based on estimate box if calibrated * tweaks for fast objects and large movements * use real time for calibration and add info logging * docs updates * remove area scale * Add example video to docs * zooming font header size the same as the others * log an error if a ptz doesn't report a MoveStatus * debug logging for onvif service capabilities * ensure camera supports ONVIF MoveStatus
This commit is contained in:
parent
64705c065f
commit
27144eb0b9
@ -5,6 +5,8 @@ title: Camera Autotracking
|
||||
|
||||
An ONVIF-capable, PTZ (pan-tilt-zoom) camera that supports relative movement within the field of view (FOV) can be configured to automatically track moving objects and keep them in the center of the frame.
|
||||
|
||||
![Autotracking Example](/img/frigate-autotracking-example.gif)
|
||||
|
||||
## Autotracking behavior
|
||||
|
||||
Once Frigate determines that an object is not a false positive and has entered one of the required zones, the autotracker will move the PTZ camera to keep the object centered in the frame until the object either moves out of the frame, the PTZ is not capable of any more movement, or Frigate loses track of it.
|
||||
@ -50,6 +52,18 @@ cameras:
|
||||
autotracking:
|
||||
# Optional: enable/disable object autotracking. (default: shown below)
|
||||
enabled: False
|
||||
# Optional: calibrate the camera on startup (default: shown below)
|
||||
# A calibration will move the PTZ in increments and measure the time it takes to move.
|
||||
# The results are used to help estimate the position of tracked objects after a camera move.
|
||||
# Frigate will update your config file automatically after a calibration with
|
||||
# a "movement_weights" entry for the camera. You should then set calibrate_on_startup to False.
|
||||
calibrate_on_startup: False
|
||||
# Optional: the mode to use for zooming in/out on objects during autotracking. (default: shown below)
|
||||
# Available options are: disabled, absolute, and relative
|
||||
# disabled - don't zoom in/out on autotracked objects, use pan/tilt only
|
||||
# absolute - use absolute zooming (supported by most PTZ capable cameras)
|
||||
# relative - use relative zooming (not supported on all PTZs, but makes concurrent pan/tilt/zoom movements)
|
||||
zooming: disabled
|
||||
# Optional: list of objects to track from labelmap.txt (default: shown below)
|
||||
track:
|
||||
- person
|
||||
@ -60,17 +74,41 @@ cameras:
|
||||
return_preset: home
|
||||
# Optional: Seconds to delay before returning to preset. (default: shown below)
|
||||
timeout: 10
|
||||
# Optional: Values generated automatically by a camera calibration. Do not modify these manually. (default: shown below)
|
||||
movement_weights: []
|
||||
```
|
||||
|
||||
## Calibration
|
||||
|
||||
PTZ motors operate at different speeds. Performing a calibration will direct Frigate to measure this speed over a variety of movements and use those measurements to better predict the amount of movement necessary to keep autotracked objects in the center of the frame.
|
||||
|
||||
Calibration is optional, but will greatly assist Frigate in autotracking objects that move across the camera's field of view more quickly.
|
||||
|
||||
To begin calibration, set the `calibrate_on_startup` for your camera to `True` and restart Frigate. Frigate will then make a series of 30 small and large movements with your camera. Don't move the PTZ manually while calibration is in progress. Once complete, camera motion will stop and your config file will be automatically updated with a `movement_weights` parameter to be used in movement calculations. You should not modify this parameter manually.
|
||||
|
||||
After calibration has ended, your PTZ will be moved to the preset specified by `return_preset` and you should set `calibrate_on_startup` in your config file to `False`.
|
||||
|
||||
Note that Frigate will refine and update the `movement_weights` parameter in your config automatically as the PTZ moves during autotracking and more measurements are obtained.
|
||||
|
||||
You can recalibrate at any time by removing the `movement_weights` parameter, setting `calibrate_on_startup` to `True`, and then restarting Frigate. You may need to recalibrate or remove `movement_weights` from your config altogether if autotracking is erratic. If you change your `return_preset` in any way, a recalibration is also recommended.
|
||||
|
||||
## Best practices and considerations
|
||||
|
||||
Every PTZ camera is different, so autotracking may not perform ideally in every situation. This experimental feature was initially developed using an EmpireTech/Dahua SD1A404XB-GNR.
|
||||
|
||||
The object tracker in Frigate estimates the motion of the PTZ so that tracked objects are preserved when the camera moves. In most cases (especially for faster moving objects), the default 5 fps is insufficient for the motion estimator to perform accurately. 10 fps is the current recommendation. Higher frame rates will likely not be more performant and will only slow down Frigate and the motion estimator. Adjust your camera to output at least 10 frames per second and change the `fps` parameter in the [detect configuration](index.md) of your configuration file.
|
||||
|
||||
A fast [detector](object_detectors.md) is recommended. CPU detectors will not perform well or won't work at all. If Frigate already has trouble keeping track of your object, the autotracker will struggle as well.
|
||||
A fast [detector](object_detectors.md) is recommended. CPU detectors will not perform well or won't work at all. You can watch Frigate's debug viewer for your camera to see a thicker colored box around the object currently being autotracked.
|
||||
|
||||
The autotracker will add PTZ motion requests to a queue while the motor is moving. Once the motor stops, the events in the queue will be executed together as one large move (rather than incremental moves). If your PTZ's motor is slow, you may not be able to reliably autotrack fast moving objects.
|
||||
A full-frame zone in `required_zones` is not recommended, especially if you've calibrated your camera and there are `movement_weights` defined in the configuration file. Frigate will continue to autotrack an object that has entered one of the `required_zones`, even if it moves outside of that zone.
|
||||
|
||||
## Zooming
|
||||
|
||||
Zooming is an experimental feature and may use significantly more CPU when tracking objects than panning/tilting only. It may be helpful to tweak your camera's autofocus settings if you are noticing focus problems when using zooming.
|
||||
|
||||
Absolute zooming makes zoom movements separate from pan/tilt movements. Most PTZ cameras will support absolute zooming.
|
||||
|
||||
Relative zooming attempts to make a zoom movement concurrently with any pan/tilt movements. It was tested to work with some Dahua and Amcrest PTZs. But the ONVIF specification indicates that there no assumption about how the generic zoom range is mapped to magnification, field of view or other physical zoom dimension when using relative zooming. So if relative zooming behavior is erratic or just doesn't work, use absolute zooming.
|
||||
|
||||
## Usage applications
|
||||
|
||||
|
@ -584,6 +584,18 @@ cameras:
|
||||
autotracking:
|
||||
# Optional: enable/disable object autotracking. (default: shown below)
|
||||
enabled: False
|
||||
# Optional: calibrate the camera on startup (default: shown below)
|
||||
# A calibration will move the PTZ in increments and measure the time it takes to move.
|
||||
# The results are used to help estimate the position of tracked objects after a camera move.
|
||||
# Frigate will update your config file automatically after a calibration with
|
||||
# a "movement_weights" entry for the camera. You should then set calibrate_on_startup to False.
|
||||
calibrate_on_startup: False
|
||||
# Optional: the mode to use for zooming in/out on objects during autotracking. (default: shown below)
|
||||
# Available options are: disabled, absolute, and relative
|
||||
# disabled - don't zoom in/out on autotracked objects, use pan/tilt only
|
||||
# absolute - use absolute zooming (supported by most PTZ capable cameras)
|
||||
# relative - use relative zooming (not supported on all PTZs, but makes concurrent pan/tilt/zoom movements)
|
||||
zooming: disabled
|
||||
# Optional: list of objects to track from labelmap.txt (default: shown below)
|
||||
track:
|
||||
- person
|
||||
@ -591,9 +603,11 @@ cameras:
|
||||
required_zones:
|
||||
- zone_name
|
||||
# Required: Name of ONVIF preset in camera's firmware to return to when tracking is over. (default: shown below)
|
||||
return_preset: preset_name
|
||||
return_preset: home
|
||||
# Optional: Seconds to delay before returning to preset. (default: shown below)
|
||||
timeout: 10
|
||||
# Optional: Values generated automatically by a camera calibration. Do not modify these manually. (default: shown below)
|
||||
movement_weights: []
|
||||
|
||||
# Optional: Configuration for how to sort the cameras in the Birdseye view.
|
||||
birdseye:
|
||||
|
BIN
docs/static/img/frigate-autotracking-example.gif
vendored
Normal file
BIN
docs/static/img/frigate-autotracking-example.gif
vendored
Normal file
Binary file not shown.
After Width: | Height: | Size: 8.6 MiB |
@ -179,6 +179,12 @@ class FrigateApp:
|
||||
"ptz_stop_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"ptz_frame_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"ptz_zoom_level": mp.Value("d", 0.0), # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
}
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
||||
self.feature_metrics[camera_name] = {
|
||||
|
@ -138,8 +138,20 @@ class MqttConfig(FrigateBaseModel):
|
||||
return v
|
||||
|
||||
|
||||
class ZoomingModeEnum(str, Enum):
|
||||
disabled = "disabled"
|
||||
absolute = "absolute"
|
||||
relative = "relative"
|
||||
|
||||
|
||||
class PtzAutotrackConfig(FrigateBaseModel):
|
||||
enabled: bool = Field(default=False, title="Enable PTZ object autotracking.")
|
||||
calibrate_on_startup: bool = Field(
|
||||
default=False, title="Perform a camera calibration when Frigate starts."
|
||||
)
|
||||
zooming: ZoomingModeEnum = Field(
|
||||
default=ZoomingModeEnum.disabled, title="Autotracker zooming mode."
|
||||
)
|
||||
track: List[str] = Field(default=DEFAULT_TRACKED_OBJECTS, title="Objects to track.")
|
||||
required_zones: List[str] = Field(
|
||||
default_factory=list,
|
||||
@ -152,6 +164,27 @@ class PtzAutotrackConfig(FrigateBaseModel):
|
||||
timeout: int = Field(
|
||||
default=10, title="Seconds to delay before returning to preset."
|
||||
)
|
||||
movement_weights: Optional[Union[float, List[float]]] = Field(
|
||||
default=[],
|
||||
title="Internal value used for PTZ movements based on the speed of your camera's motor.",
|
||||
)
|
||||
|
||||
@validator("movement_weights", pre=True)
|
||||
def validate_weights(cls, v):
|
||||
if v is None:
|
||||
return None
|
||||
|
||||
if isinstance(v, str):
|
||||
weights = list(map(float, v.split(",")))
|
||||
elif isinstance(v, list):
|
||||
weights = [float(val) for val in v]
|
||||
else:
|
||||
raise ValueError("Invalid type for movement_weights")
|
||||
|
||||
if len(weights) != 3:
|
||||
raise ValueError("movement_weights must have exactly 3 floats")
|
||||
|
||||
return weights
|
||||
|
||||
|
||||
class OnvifConfig(FrigateBaseModel):
|
||||
|
@ -3,6 +3,7 @@
|
||||
import copy
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
@ -11,11 +12,17 @@ from multiprocessing.synchronize import Event as MpEvent
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from norfair.camera_motion import MotionEstimator, TranslationTransformationGetter
|
||||
from norfair.camera_motion import (
|
||||
HomographyTransformationGetter,
|
||||
MotionEstimator,
|
||||
TranslationTransformationGetter,
|
||||
)
|
||||
|
||||
from frigate.config import CameraConfig, FrigateConfig
|
||||
from frigate.config import CameraConfig, FrigateConfig, ZoomingModeEnum
|
||||
from frigate.const import CONFIG_DIR
|
||||
from frigate.ptz.onvif import OnvifController
|
||||
from frigate.types import PTZMetricsTypes
|
||||
from frigate.util.builtin import update_yaml_file
|
||||
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -26,12 +33,8 @@ def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
|
||||
# for non ptz/autotracking cameras, this will always return False
|
||||
# ptz_start_time is initialized to 0 on startup and only changes
|
||||
# when autotracking movements are made
|
||||
|
||||
# the offset "primes" the motion estimator with a few frames before movement
|
||||
offset = 0.5
|
||||
|
||||
return (ptz_start_time != 0.0 and frame_time >= ptz_start_time - offset) and (
|
||||
ptz_stop_time == 0.0 or (ptz_start_time - offset <= frame_time <= ptz_stop_time)
|
||||
return (ptz_start_time != 0.0 and frame_time > ptz_start_time) and (
|
||||
ptz_stop_time == 0.0 or (ptz_start_time <= frame_time <= ptz_stop_time)
|
||||
)
|
||||
|
||||
|
||||
@ -54,13 +57,24 @@ class PtzMotionEstimator:
|
||||
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
|
||||
if self.ptz_metrics["ptz_reset"].is_set():
|
||||
self.ptz_metrics["ptz_reset"].clear()
|
||||
logger.debug("Motion estimator reset")
|
||||
|
||||
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
|
||||
if (
|
||||
self.camera_config.onvif.autotracking.zooming
|
||||
!= ZoomingModeEnum.disabled
|
||||
):
|
||||
logger.debug("Motion estimator reset - homography")
|
||||
transformation_type = HomographyTransformationGetter()
|
||||
else:
|
||||
logger.debug("Motion estimator reset - translation")
|
||||
transformation_type = TranslationTransformationGetter()
|
||||
|
||||
self.norfair_motion_estimator = MotionEstimator(
|
||||
transformations_getter=TranslationTransformationGetter(),
|
||||
transformations_getter=transformation_type,
|
||||
min_distance=30,
|
||||
max_points=900,
|
||||
)
|
||||
|
||||
self.coord_transformations = None
|
||||
|
||||
if ptz_moving_at_frame_time(
|
||||
@ -98,7 +112,7 @@ class PtzMotionEstimator:
|
||||
self.frame_manager.close(frame_id)
|
||||
|
||||
logger.debug(
|
||||
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs((0,0))}"
|
||||
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs([[0,0]])}"
|
||||
)
|
||||
|
||||
return self.coord_transformations
|
||||
@ -147,12 +161,16 @@ class PtzAutoTracker:
|
||||
self.ptz_metrics = ptz_metrics
|
||||
self.tracked_object: dict[str, object] = {}
|
||||
self.tracked_object_previous: dict[str, object] = {}
|
||||
self.previous_frame_time = None
|
||||
self.object_types = {}
|
||||
self.required_zones = {}
|
||||
self.move_queues = {}
|
||||
self.move_threads = {}
|
||||
self.autotracker_init = {}
|
||||
self.previous_frame_time: dict[str, object] = {}
|
||||
self.object_types: dict[str, object] = {}
|
||||
self.required_zones: dict[str, object] = {}
|
||||
self.move_queues: dict[str, object] = {}
|
||||
self.move_threads: dict[str, object] = {}
|
||||
self.autotracker_init: dict[str, object] = {}
|
||||
self.move_metrics: dict[str, object] = {}
|
||||
self.calibrating: dict[str, object] = {}
|
||||
self.intercept: dict[str, object] = {}
|
||||
self.move_coefficients: dict[str, object] = {}
|
||||
|
||||
# if cam is set to autotrack, onvif should be set up
|
||||
for camera_name, cam in self.config.cameras.items():
|
||||
@ -172,6 +190,11 @@ class PtzAutoTracker:
|
||||
self.tracked_object[camera_name] = None
|
||||
self.tracked_object_previous[camera_name] = None
|
||||
|
||||
self.calibrating[camera_name] = False
|
||||
self.move_metrics[camera_name] = []
|
||||
self.intercept[camera_name] = None
|
||||
self.move_coefficients[camera_name] = []
|
||||
|
||||
self.move_queues[camera_name] = queue.Queue()
|
||||
|
||||
if not self.onvif.cams[camera_name]["init"]:
|
||||
@ -182,7 +205,7 @@ class PtzAutoTracker:
|
||||
|
||||
return
|
||||
|
||||
if not self.onvif.cams[camera_name]["relative_fov_supported"]:
|
||||
if "pt-r-fov" not in self.onvif.cams[camera_name]["features"]:
|
||||
cam.onvif.autotracking.enabled = False
|
||||
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
||||
logger.warning(
|
||||
@ -191,6 +214,19 @@ class PtzAutoTracker:
|
||||
|
||||
return
|
||||
|
||||
movestatus_supported = self.onvif.get_service_capabilities(camera_name)
|
||||
|
||||
if movestatus_supported is None or movestatus_supported.lower() != "true":
|
||||
cam.onvif.autotracking.enabled = False
|
||||
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
||||
logger.warning(
|
||||
f"Disabling autotracking for {camera_name}: ONVIF MoveStatus not supported"
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
self.onvif.get_camera_status(camera_name)
|
||||
|
||||
# movement thread per camera
|
||||
if not self.move_threads or not self.move_threads[camera_name]:
|
||||
self.move_threads[camera_name] = threading.Thread(
|
||||
@ -200,13 +236,144 @@ class PtzAutoTracker:
|
||||
self.move_threads[camera_name].daemon = True
|
||||
self.move_threads[camera_name].start()
|
||||
|
||||
if cam.onvif.autotracking.movement_weights:
|
||||
self.intercept[camera_name] = cam.onvif.autotracking.movement_weights[0]
|
||||
self.move_coefficients[
|
||||
camera_name
|
||||
] = cam.onvif.autotracking.movement_weights[1:]
|
||||
|
||||
if cam.onvif.autotracking.calibrate_on_startup:
|
||||
self._calibrate_camera(camera_name)
|
||||
|
||||
self.autotracker_init[camera_name] = True
|
||||
|
||||
def write_config(self, camera):
|
||||
config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml")
|
||||
|
||||
logger.debug(
|
||||
f"Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
|
||||
)
|
||||
|
||||
update_yaml_file(
|
||||
config_file,
|
||||
["cameras", camera, "onvif", "autotracking", "movement_weights"],
|
||||
self.config.cameras[camera].onvif.autotracking.movement_weights,
|
||||
)
|
||||
|
||||
def _calibrate_camera(self, camera):
|
||||
# move the camera from the preset in steps and measure the time it takes to move that amount
|
||||
# this will allow us to predict movement times with a simple linear regression
|
||||
# start with 0 so we can determine a baseline (to be used as the intercept in the regression calc)
|
||||
# TODO: take zooming into account too
|
||||
num_steps = 30
|
||||
step_sizes = np.linspace(0, 1, num_steps)
|
||||
|
||||
self.calibrating[camera] = True
|
||||
|
||||
logger.info(f"Camera calibration for {camera} in progress")
|
||||
|
||||
self.onvif._move_to_preset(
|
||||
camera,
|
||||
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
|
||||
)
|
||||
self.ptz_metrics[camera]["ptz_reset"].set()
|
||||
self.ptz_metrics[camera]["ptz_stopped"].clear()
|
||||
|
||||
# Wait until the camera finishes moving
|
||||
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.onvif.get_camera_status(camera)
|
||||
|
||||
for step in range(num_steps):
|
||||
pan = step_sizes[step]
|
||||
tilt = step_sizes[step]
|
||||
|
||||
start_time = time.time()
|
||||
self.onvif._move_relative(camera, pan, tilt, 0, 1)
|
||||
|
||||
# Wait until the camera finishes moving
|
||||
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.onvif.get_camera_status(camera)
|
||||
stop_time = time.time()
|
||||
|
||||
self.move_metrics[camera].append(
|
||||
{
|
||||
"pan": pan,
|
||||
"tilt": tilt,
|
||||
"start_timestamp": start_time,
|
||||
"end_timestamp": stop_time,
|
||||
}
|
||||
)
|
||||
|
||||
self.onvif._move_to_preset(
|
||||
camera,
|
||||
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
|
||||
)
|
||||
self.ptz_metrics[camera]["ptz_reset"].set()
|
||||
self.ptz_metrics[camera]["ptz_stopped"].clear()
|
||||
|
||||
# Wait until the camera finishes moving
|
||||
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.onvif.get_camera_status(camera)
|
||||
|
||||
self.calibrating[camera] = False
|
||||
|
||||
logger.info(f"Calibration for {camera} complete")
|
||||
|
||||
# calculate and save new intercept and coefficients
|
||||
self._calculate_move_coefficients(camera, True)
|
||||
|
||||
def _calculate_move_coefficients(self, camera, calibration=False):
|
||||
# calculate new coefficients when we have 50 more new values. Save up to 500
|
||||
if calibration or (
|
||||
len(self.move_metrics[camera]) % 50 == 0
|
||||
and len(self.move_metrics[camera]) != 0
|
||||
and len(self.move_metrics[camera]) <= 500
|
||||
):
|
||||
X = np.array(
|
||||
[abs(d["pan"]) + abs(d["tilt"]) for d in self.move_metrics[camera]]
|
||||
)
|
||||
y = np.array(
|
||||
[
|
||||
d["end_timestamp"] - d["start_timestamp"]
|
||||
for d in self.move_metrics[camera]
|
||||
]
|
||||
)
|
||||
|
||||
# simple linear regression with intercept
|
||||
X_with_intercept = np.column_stack((np.ones(X.shape[0]), X))
|
||||
self.move_coefficients[camera] = np.linalg.lstsq(
|
||||
X_with_intercept, y, rcond=None
|
||||
)[0]
|
||||
|
||||
# only assign a new intercept if we're calibrating
|
||||
if calibration:
|
||||
self.intercept[camera] = y[0]
|
||||
|
||||
# write the intercept and coefficients back to the config file as a comma separated string
|
||||
movement_weights = np.concatenate(
|
||||
([self.intercept[camera]], self.move_coefficients[camera])
|
||||
)
|
||||
self.config.cameras[camera].onvif.autotracking.movement_weights = ", ".join(
|
||||
map(str, movement_weights)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"New regression parameters - intercept: {self.intercept[camera]}, coefficients: {self.move_coefficients[camera]}"
|
||||
)
|
||||
|
||||
self.write_config(camera)
|
||||
|
||||
def _predict_movement_time(self, camera, pan, tilt):
|
||||
combined_movement = abs(pan) + abs(tilt)
|
||||
input_data = np.array([self.intercept[camera], combined_movement])
|
||||
|
||||
return np.dot(self.move_coefficients[camera], input_data)
|
||||
|
||||
def _process_move_queue(self, camera):
|
||||
while True:
|
||||
try:
|
||||
move_data = self.move_queues[camera].get()
|
||||
frame_time, pan, tilt = move_data
|
||||
frame_time, pan, tilt, zoom = move_data
|
||||
|
||||
# if we're receiving move requests during a PTZ move, ignore them
|
||||
if ptz_moving_at_frame_time(
|
||||
@ -217,36 +384,117 @@ class PtzAutoTracker:
|
||||
# instead of dequeueing this might be a good place to preemptively move based
|
||||
# on an estimate - for fast moving objects, etc.
|
||||
logger.debug(
|
||||
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}"
|
||||
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
|
||||
)
|
||||
continue
|
||||
|
||||
else:
|
||||
# on some cameras with cheaper motors it seems like small values can cause jerky movement
|
||||
# TODO: double check, might not need this
|
||||
if abs(pan) > 0.02 or abs(tilt) > 0.02:
|
||||
self.onvif._move_relative(camera, pan, tilt, 1)
|
||||
if (
|
||||
self.config.cameras[camera].onvif.autotracking.zooming
|
||||
== ZoomingModeEnum.relative
|
||||
):
|
||||
self.onvif._move_relative(camera, pan, tilt, zoom, 1)
|
||||
else:
|
||||
logger.debug(
|
||||
f"Not moving, pan and tilt too small: {pan}, {tilt}"
|
||||
)
|
||||
if zoom > 0:
|
||||
self.onvif._zoom_absolute(camera, zoom, 1)
|
||||
else:
|
||||
self.onvif._move_relative(camera, pan, tilt, 0, 1)
|
||||
|
||||
# Wait until the camera finishes moving
|
||||
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
# check if ptz is moving
|
||||
self.onvif.get_camera_status(camera)
|
||||
|
||||
if self.config.cameras[camera].onvif.autotracking.movement_weights:
|
||||
logger.debug(
|
||||
f"Predicted movement time: {self._predict_movement_time(camera, pan, tilt)}"
|
||||
)
|
||||
logger.debug(
|
||||
f'Actual movement time: {self.ptz_metrics[camera]["ptz_stop_time"].value-self.ptz_metrics[camera]["ptz_start_time"].value}'
|
||||
)
|
||||
|
||||
# save metrics for better estimate calculations
|
||||
if (
|
||||
self.intercept[camera] is not None
|
||||
and len(self.move_metrics[camera]) < 500
|
||||
):
|
||||
logger.debug("Adding new values to move metrics")
|
||||
self.move_metrics[camera].append(
|
||||
{
|
||||
"pan": pan,
|
||||
"tilt": tilt,
|
||||
"start_timestamp": self.ptz_metrics[camera][
|
||||
"ptz_start_time"
|
||||
].value,
|
||||
"end_timestamp": self.ptz_metrics[camera][
|
||||
"ptz_stop_time"
|
||||
].value,
|
||||
}
|
||||
)
|
||||
|
||||
# calculate new coefficients if we have enough data
|
||||
self._calculate_move_coefficients(camera)
|
||||
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
def _enqueue_move(self, camera, frame_time, pan, tilt):
|
||||
move_data = (frame_time, pan, tilt)
|
||||
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
|
||||
def split_value(value):
|
||||
clipped = np.clip(value, -1, 1)
|
||||
return clipped, value - clipped
|
||||
|
||||
if (
|
||||
frame_time > self.ptz_metrics[camera]["ptz_start_time"].value
|
||||
and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value
|
||||
and self.move_queues[camera].qsize() == 0
|
||||
):
|
||||
logger.debug(f"enqueue pan: {pan}, enqueue tilt: {tilt}")
|
||||
self.move_queues[camera].put(move_data)
|
||||
# don't make small movements
|
||||
if abs(pan) < 0.02:
|
||||
pan = 0
|
||||
if abs(tilt) < 0.02:
|
||||
tilt = 0
|
||||
|
||||
# split up any large moves caused by velocity estimated movements
|
||||
while pan != 0 or tilt != 0 or zoom != 0:
|
||||
pan, pan_excess = split_value(pan)
|
||||
tilt, tilt_excess = split_value(tilt)
|
||||
zoom, zoom_excess = split_value(zoom)
|
||||
|
||||
logger.debug(
|
||||
f"Enqueue movement for frame time: {frame_time} pan: {pan}, enqueue tilt: {tilt}, enqueue zoom: {zoom}"
|
||||
)
|
||||
move_data = (frame_time, pan, tilt, zoom)
|
||||
self.move_queues[camera].put(move_data)
|
||||
|
||||
pan = pan_excess
|
||||
tilt = tilt_excess
|
||||
zoom = zoom_excess
|
||||
|
||||
def _should_zoom_in(self, camera, box, area):
|
||||
camera_config = self.config.cameras[camera]
|
||||
camera_width = camera_config.frame_shape[1]
|
||||
camera_height = camera_config.frame_shape[0]
|
||||
camera_area = camera_width * camera_height
|
||||
|
||||
bb_left, bb_top, bb_right, bb_bottom = box
|
||||
|
||||
# If bounding box is not within 5% of an edge
|
||||
# If object area is less than 70% of frame
|
||||
# Then zoom in, otherwise try zooming out
|
||||
# should we make these configurable?
|
||||
#
|
||||
# TODO: Take into account the area changing when an object is moving out of frame
|
||||
edge_threshold = 0.15
|
||||
area_threshold = 0.7
|
||||
|
||||
# returns True to zoom in, False to zoom out
|
||||
return (
|
||||
bb_left > edge_threshold * camera_width
|
||||
and bb_right < (1 - edge_threshold) * camera_width
|
||||
and bb_top > edge_threshold * camera_height
|
||||
and bb_bottom < (1 - edge_threshold) * camera_height
|
||||
and area < area_threshold * camera_area
|
||||
)
|
||||
|
||||
def _autotrack_move_ptz(self, camera, obj):
|
||||
camera_config = self.config.cameras[camera]
|
||||
@ -254,13 +502,91 @@ class PtzAutoTracker:
|
||||
# # frame width and height
|
||||
camera_width = camera_config.frame_shape[1]
|
||||
camera_height = camera_config.frame_shape[0]
|
||||
camera_fps = camera_config.detect.fps
|
||||
|
||||
centroid_x = obj.obj_data["centroid"][0]
|
||||
centroid_y = obj.obj_data["centroid"][1]
|
||||
|
||||
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
|
||||
pan = ((obj.obj_data["centroid"][0] / camera_width) - 0.5) * 2
|
||||
tilt = (0.5 - (obj.obj_data["centroid"][1] / camera_height)) * 2
|
||||
pan = ((centroid_x / camera_width) - 0.5) * 2
|
||||
tilt = (0.5 - (centroid_y / camera_height)) * 2
|
||||
|
||||
# ideas: check object velocity for camera speed?
|
||||
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt)
|
||||
if (
|
||||
camera_config.onvif.autotracking.movement_weights
|
||||
): # use estimates if we have available coefficients
|
||||
predicted_movement_time = self._predict_movement_time(camera, pan, tilt)
|
||||
|
||||
# Norfair gives us two points for the velocity of an object represented as x1, y1, x2, y2
|
||||
x1, y1, x2, y2 = obj.obj_data["estimate_velocity"]
|
||||
average_velocity = (
|
||||
(x1 + x2) / 2,
|
||||
(y1 + y2) / 2,
|
||||
(x1 + x2) / 2,
|
||||
(y1 + y2) / 2,
|
||||
)
|
||||
|
||||
# this box could exceed the frame boundaries if velocity is high
|
||||
# but we'll handle that in _enqueue_move() as two separate moves
|
||||
predicted_box = [
|
||||
round(x + camera_fps * predicted_movement_time * v)
|
||||
for x, v in zip(obj.obj_data["box"], average_velocity)
|
||||
]
|
||||
|
||||
centroid_x = round((predicted_box[0] + predicted_box[2]) / 2)
|
||||
centroid_y = round((predicted_box[1] + predicted_box[3]) / 2)
|
||||
|
||||
# recalculate pan and tilt with new centroid
|
||||
pan = ((centroid_x / camera_width) - 0.5) * 2
|
||||
tilt = (0.5 - (centroid_y / camera_height)) * 2
|
||||
|
||||
logger.debug(f'Original box: {obj.obj_data["box"]}')
|
||||
logger.debug(f"Predicted box: {predicted_box}")
|
||||
logger.debug(f'Velocity: {obj.obj_data["estimate_velocity"]}')
|
||||
|
||||
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.relative:
|
||||
# relative zooming concurrently with pan/tilt
|
||||
zoom = obj.obj_data["area"] / (camera_width * camera_height)
|
||||
|
||||
# test if we need to zoom out
|
||||
if not self._should_zoom_in(
|
||||
camera,
|
||||
predicted_box
|
||||
if camera_config.onvif.autotracking.movement_weights
|
||||
else obj.obj_data["box"],
|
||||
obj.obj_data["area"],
|
||||
):
|
||||
zoom = -(1 - zoom)
|
||||
|
||||
# don't make small movements if area hasn't changed significantly
|
||||
if (
|
||||
"area" in obj.previous
|
||||
and abs(obj.obj_data["area"] - obj.previous["area"])
|
||||
/ obj.obj_data["area"]
|
||||
< 0.1
|
||||
):
|
||||
zoom = 0
|
||||
else:
|
||||
zoom = 0
|
||||
|
||||
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, zoom)
|
||||
|
||||
def _autotrack_zoom_only(self, camera, obj):
|
||||
camera_config = self.config.cameras[camera]
|
||||
|
||||
# absolute zooming separately from pan/tilt
|
||||
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.absolute:
|
||||
zoom_level = self.ptz_metrics[camera]["ptz_zoom_level"].value
|
||||
|
||||
if 0 < zoom_level <= 1:
|
||||
if self._should_zoom_in(
|
||||
camera, obj.obj_data["box"], obj.obj_data["area"]
|
||||
):
|
||||
zoom = min(1.0, zoom_level + 0.1)
|
||||
else:
|
||||
zoom = max(0.0, zoom_level - 0.1)
|
||||
|
||||
if zoom != zoom_level:
|
||||
self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom)
|
||||
|
||||
def autotrack_object(self, camera, obj):
|
||||
camera_config = self.config.cameras[camera]
|
||||
@ -269,6 +595,10 @@ class PtzAutoTracker:
|
||||
if not self.autotracker_init[camera]:
|
||||
self._autotracker_setup(self.config.cameras[camera], camera)
|
||||
|
||||
if self.calibrating[camera]:
|
||||
logger.debug("Calibrating camera")
|
||||
return
|
||||
|
||||
# either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive,
|
||||
# and is not initially motionless - or one we're already tracking, which assumes all those things are already true
|
||||
if (
|
||||
@ -287,7 +617,7 @@ class PtzAutoTracker:
|
||||
)
|
||||
self.tracked_object[camera] = obj
|
||||
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
||||
self.previous_frame_time = obj.obj_data["frame_time"]
|
||||
self.previous_frame_time[camera] = obj.obj_data["frame_time"]
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
|
||||
return
|
||||
@ -299,7 +629,7 @@ class PtzAutoTracker:
|
||||
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
|
||||
and obj.obj_data["frame_time"] != self.previous_frame_time
|
||||
):
|
||||
self.previous_frame_time = obj.obj_data["frame_time"]
|
||||
self.previous_frame_time[camera] = obj.obj_data["frame_time"]
|
||||
# Don't move ptz if Euclidean distance from object to center of frame is
|
||||
# less than 15% of the of the larger dimension (width or height) of the frame,
|
||||
# multiplied by a scaling factor for object size.
|
||||
@ -337,6 +667,10 @@ class PtzAutoTracker:
|
||||
logger.debug(
|
||||
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
||||
)
|
||||
|
||||
# no need to move, but try absolute zooming
|
||||
self._autotrack_zoom_only(camera, obj)
|
||||
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
@ -345,6 +679,9 @@ class PtzAutoTracker:
|
||||
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
|
||||
# try absolute zooming too
|
||||
self._autotrack_zoom_only(camera, obj)
|
||||
|
||||
return
|
||||
|
||||
if (
|
||||
@ -356,10 +693,9 @@ class PtzAutoTracker:
|
||||
and obj.obj_data["label"] in self.object_types[camera]
|
||||
and not obj.previous["false_positive"]
|
||||
and not obj.false_positive
|
||||
and obj.obj_data["motionless_count"] == 0
|
||||
and self.tracked_object_previous[camera] is not None
|
||||
):
|
||||
self.previous_frame_time = obj.obj_data["frame_time"]
|
||||
self.previous_frame_time[camera] = obj.obj_data["frame_time"]
|
||||
if (
|
||||
intersection_over_union(
|
||||
self.tracked_object_previous[camera].obj_data["region"],
|
||||
@ -388,6 +724,12 @@ class PtzAutoTracker:
|
||||
self.tracked_object[camera] = None
|
||||
|
||||
def camera_maintenance(self, camera):
|
||||
# bail and don't check anything if we're calibrating or tracking an object
|
||||
if self.calibrating[camera] or self.tracked_object[camera] is not None:
|
||||
return
|
||||
|
||||
logger.debug("Running camera maintenance")
|
||||
|
||||
# calls get_camera_status to check/update ptz movement
|
||||
# returns camera to preset after timeout when tracking is over
|
||||
autotracker_config = self.config.cameras[camera].onvif.autotracking
|
||||
@ -404,19 +746,26 @@ class PtzAutoTracker:
|
||||
and self.tracked_object_previous[camera] is not None
|
||||
and (
|
||||
# might want to use a different timestamp here?
|
||||
time.time()
|
||||
self.ptz_metrics[camera]["ptz_frame_time"].value
|
||||
- self.tracked_object_previous[camera].obj_data["frame_time"]
|
||||
> autotracker_config.timeout
|
||||
)
|
||||
and autotracker_config.return_preset
|
||||
):
|
||||
# empty move queue
|
||||
while not self.move_queues[camera].empty():
|
||||
self.move_queues[camera].get()
|
||||
|
||||
# clear tracked object
|
||||
self.tracked_object[camera] = None
|
||||
self.tracked_object_previous[camera] = None
|
||||
|
||||
self.ptz_metrics[camera]["ptz_stopped"].wait()
|
||||
logger.debug(
|
||||
f"Autotrack: Time is {time.time()}, returning to preset: {autotracker_config.return_preset}"
|
||||
f"Autotrack: Time is {self.ptz_metrics[camera]['ptz_frame_time'].value}, returning to preset: {autotracker_config.return_preset}"
|
||||
)
|
||||
self.onvif._move_to_preset(
|
||||
camera,
|
||||
autotracker_config.return_preset.lower(),
|
||||
)
|
||||
self.ptz_metrics[camera]["ptz_reset"].set()
|
||||
self.tracked_object_previous[camera] = None
|
||||
|
@ -1,6 +1,5 @@
|
||||
"""Configure and control camera via onvif."""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import site
|
||||
from enum import Enum
|
||||
@ -8,8 +7,9 @@ from enum import Enum
|
||||
import numpy
|
||||
from onvif import ONVIFCamera, ONVIFError
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.config import FrigateConfig, ZoomingModeEnum
|
||||
from frigate.types import PTZMetricsTypes
|
||||
from frigate.util.builtin import find_by_key
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -33,6 +33,7 @@ class OnvifController:
|
||||
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetricsTypes]
|
||||
) -> None:
|
||||
self.cams: dict[str, ONVIFCamera] = {}
|
||||
self.config = config
|
||||
self.ptz_metrics = ptz_metrics
|
||||
|
||||
for cam_name, cam in config.cameras.items():
|
||||
@ -73,11 +74,20 @@ class OnvifController:
|
||||
return False
|
||||
|
||||
ptz = onvif.create_ptz_service()
|
||||
|
||||
request = ptz.create_type("GetConfigurations")
|
||||
configs = ptz.GetConfigurations(request)[0]
|
||||
|
||||
request = ptz.create_type("GetConfigurationOptions")
|
||||
request.ConfigurationToken = profile.PTZConfiguration.token
|
||||
ptz_config = ptz.GetConfigurationOptions(request)
|
||||
logger.debug(f"Onvif config for {camera_name}: {ptz_config}")
|
||||
|
||||
service_capabilities_request = ptz.create_type("GetServiceCapabilities")
|
||||
self.cams[camera_name][
|
||||
"service_capabilities_request"
|
||||
] = service_capabilities_request
|
||||
|
||||
fov_space_id = next(
|
||||
(
|
||||
i
|
||||
@ -89,6 +99,20 @@ class OnvifController:
|
||||
None,
|
||||
)
|
||||
|
||||
# autoracking relative panning/tilting needs a relative zoom value set to 0
|
||||
# if camera supports relative movement
|
||||
if self.config.cameras[camera_name].onvif.autotracking.zooming:
|
||||
zoom_space_id = next(
|
||||
(
|
||||
i
|
||||
for i, space in enumerate(
|
||||
ptz_config.Spaces.RelativeZoomTranslationSpace
|
||||
)
|
||||
if "TranslationGenericSpace" in space["URI"]
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
# setup continuous moving request
|
||||
move_request = ptz.create_type("ContinuousMove")
|
||||
move_request.ProfileToken = profile.token
|
||||
@ -105,19 +129,27 @@ class OnvifController:
|
||||
"RelativePanTiltTranslationSpace"
|
||||
][fov_space_id]["URI"]
|
||||
|
||||
# try setting relative zoom translation space
|
||||
try:
|
||||
move_request.Translation.Zoom.space = ptz_config["Spaces"][
|
||||
"RelativeZoomTranslationSpace"
|
||||
][0]["URI"]
|
||||
if self.config.cameras[camera_name].onvif.autotracking.zooming:
|
||||
if zoom_space_id is not None:
|
||||
move_request.Translation.Zoom.space = ptz_config["Spaces"][
|
||||
"RelativeZoomTranslationSpace"
|
||||
][0]["URI"]
|
||||
except Exception:
|
||||
# camera does not support relative zoom
|
||||
pass
|
||||
if self.config.cameras[camera_name].onvif.autotracking.zoom_relative:
|
||||
self.config.cameras[
|
||||
camera_name
|
||||
].onvif.autotracking.zoom_relative = False
|
||||
logger.warning(
|
||||
f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported"
|
||||
)
|
||||
|
||||
if move_request.Speed is None:
|
||||
move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position
|
||||
self.cams[camera_name]["relative_move_request"] = move_request
|
||||
|
||||
# setup relative moving request for autotracking
|
||||
# setup absolute moving request for autotracking zooming
|
||||
move_request = ptz.create_type("AbsoluteMove")
|
||||
move_request.ProfileToken = profile.token
|
||||
self.cams[camera_name]["absolute_move_request"] = move_request
|
||||
@ -126,6 +158,8 @@ class OnvifController:
|
||||
status_request = ptz.create_type("GetStatus")
|
||||
status_request.ProfileToken = profile.token
|
||||
self.cams[camera_name]["status_request"] = status_request
|
||||
status = ptz.GetStatus(status_request)
|
||||
logger.debug(f"Onvif status config for {camera_name}: {status}")
|
||||
|
||||
# setup existing presets
|
||||
try:
|
||||
@ -153,14 +187,28 @@ class OnvifController:
|
||||
if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace:
|
||||
supported_features.append("zoom-r")
|
||||
|
||||
if ptz_config.Spaces and ptz_config.Spaces.AbsoluteZoomPositionSpace:
|
||||
supported_features.append("zoom-a")
|
||||
try:
|
||||
# get camera's zoom limits from onvif config
|
||||
self.cams[camera_name][
|
||||
"absolute_zoom_range"
|
||||
] = ptz_config.Spaces.AbsoluteZoomPositionSpace[0]
|
||||
self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits
|
||||
except Exception:
|
||||
if self.config.cameras[camera_name].onvif.autotracking.zooming:
|
||||
self.config.cameras[camera_name].onvif.autotracking.zooming = False
|
||||
logger.warning(
|
||||
f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported"
|
||||
)
|
||||
|
||||
# set relative pan/tilt space for autotracker
|
||||
if fov_space_id is not None:
|
||||
supported_features.append("pt-r-fov")
|
||||
self.cams[camera_name][
|
||||
"relative_fov_range"
|
||||
] = ptz_config.Spaces.RelativePanTiltTranslationSpace[fov_space_id]
|
||||
|
||||
self.cams[camera_name]["relative_fov_supported"] = fov_space_id is not None
|
||||
|
||||
self.cams[camera_name]["features"] = supported_features
|
||||
|
||||
self.cams[camera_name]["init"] = True
|
||||
@ -210,8 +258,8 @@ class OnvifController:
|
||||
|
||||
onvif.get_service("ptz").ContinuousMove(move_request)
|
||||
|
||||
def _move_relative(self, camera_name: str, pan, tilt, speed) -> None:
|
||||
if not self.cams[camera_name]["relative_fov_supported"]:
|
||||
def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None:
|
||||
if "pt-r-fov" not in self.cams[camera_name]["features"]:
|
||||
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
|
||||
return
|
||||
|
||||
@ -225,10 +273,12 @@ class OnvifController:
|
||||
|
||||
self.cams[camera_name]["active"] = True
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
||||
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
|
||||
self.ptz_metrics[camera_name][
|
||||
"ptz_start_time"
|
||||
].value = datetime.datetime.now().timestamp()
|
||||
logger.debug(
|
||||
f"PTZ start time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
||||
)
|
||||
self.ptz_metrics[camera_name]["ptz_start_time"].value = self.ptz_metrics[
|
||||
camera_name
|
||||
]["ptz_frame_time"].value
|
||||
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
move_request = self.cams[camera_name]["relative_move_request"]
|
||||
@ -257,15 +307,30 @@ class OnvifController:
|
||||
"x": speed,
|
||||
"y": speed,
|
||||
},
|
||||
"Zoom": 0,
|
||||
}
|
||||
|
||||
move_request.Translation.PanTilt.x = pan
|
||||
move_request.Translation.PanTilt.y = tilt
|
||||
move_request.Translation.Zoom.x = 0
|
||||
|
||||
if "zoom-r" in self.cams[camera_name]["features"]:
|
||||
move_request.Speed = {
|
||||
"PanTilt": {
|
||||
"x": speed,
|
||||
"y": speed,
|
||||
},
|
||||
"Zoom": {"x": speed},
|
||||
}
|
||||
move_request.Translation.Zoom.x = zoom
|
||||
|
||||
onvif.get_service("ptz").RelativeMove(move_request)
|
||||
|
||||
# reset after the move request
|
||||
move_request.Translation.PanTilt.x = 0
|
||||
move_request.Translation.PanTilt.y = 0
|
||||
|
||||
if "zoom-r" in self.cams[camera_name]["features"]:
|
||||
move_request.Translation.Zoom.x = 0
|
||||
|
||||
self.cams[camera_name]["active"] = False
|
||||
|
||||
def _move_to_preset(self, camera_name: str, preset: str) -> None:
|
||||
@ -305,6 +370,50 @@ class OnvifController:
|
||||
|
||||
onvif.get_service("ptz").ContinuousMove(move_request)
|
||||
|
||||
def _zoom_absolute(self, camera_name: str, zoom, speed) -> None:
|
||||
if "zoom-a" not in self.cams[camera_name]["features"]:
|
||||
logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.")
|
||||
return
|
||||
|
||||
logger.debug(f"{camera_name} called AbsoluteMove: zoom: {zoom}")
|
||||
|
||||
if self.cams[camera_name]["active"]:
|
||||
logger.warning(
|
||||
f"{camera_name} is already performing an action, not moving..."
|
||||
)
|
||||
return
|
||||
|
||||
self.cams[camera_name]["active"] = True
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
||||
logger.debug(
|
||||
f"PTZ start time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
||||
)
|
||||
self.ptz_metrics[camera_name]["ptz_start_time"].value = self.ptz_metrics[
|
||||
camera_name
|
||||
]["ptz_frame_time"].value
|
||||
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
move_request = self.cams[camera_name]["absolute_move_request"]
|
||||
|
||||
# function takes in 0 to 1 for zoom, interpolate to the values of the camera.
|
||||
zoom = numpy.interp(
|
||||
zoom,
|
||||
[0, 1],
|
||||
[
|
||||
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
|
||||
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
|
||||
],
|
||||
)
|
||||
|
||||
move_request.Speed = {"Zoom": speed}
|
||||
move_request.Position = {"Zoom": zoom}
|
||||
|
||||
logger.debug(f"Absolute zoom: {zoom}")
|
||||
|
||||
onvif.get_service("ptz").AbsoluteMove(move_request)
|
||||
|
||||
self.cams[camera_name]["active"] = False
|
||||
|
||||
def handle_command(
|
||||
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
|
||||
) -> None:
|
||||
@ -344,7 +453,30 @@ class OnvifController:
|
||||
"presets": list(self.cams[camera_name]["presets"].keys()),
|
||||
}
|
||||
|
||||
def get_camera_status(self, camera_name: str) -> dict[str, any]:
|
||||
def get_service_capabilities(self, camera_name: str) -> None:
|
||||
if camera_name not in self.cams.keys():
|
||||
logger.error(f"Onvif is not setup for {camera_name}")
|
||||
return {}
|
||||
|
||||
if not self.cams[camera_name]["init"]:
|
||||
self._init_onvif(camera_name)
|
||||
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
service_capabilities_request = self.cams[camera_name][
|
||||
"service_capabilities_request"
|
||||
]
|
||||
service_capabilities = onvif.get_service("ptz").GetServiceCapabilities(
|
||||
service_capabilities_request
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Onvif service capabilities for {camera_name}: {service_capabilities}"
|
||||
)
|
||||
|
||||
# MoveStatus is required for autotracking - should return "true" if supported
|
||||
return find_by_key(vars(service_capabilities), "MoveStatus")
|
||||
|
||||
def get_camera_status(self, camera_name: str) -> None:
|
||||
if camera_name not in self.cams.keys():
|
||||
logger.error(f"Onvif is not setup for {camera_name}")
|
||||
return {}
|
||||
@ -356,32 +488,66 @@ class OnvifController:
|
||||
status_request = self.cams[camera_name]["status_request"]
|
||||
status = onvif.get_service("ptz").GetStatus(status_request)
|
||||
|
||||
if status.MoveStatus.PanTilt == "IDLE" and status.MoveStatus.Zoom == "IDLE":
|
||||
# there doesn't seem to be an onvif standard with this optional parameter
|
||||
# some cameras can report MoveStatus with or without PanTilt or Zoom attributes
|
||||
pan_tilt_status = getattr(status.MoveStatus, "PanTilt", None)
|
||||
zoom_status = getattr(status.MoveStatus, "Zoom", None)
|
||||
|
||||
# if it's not an attribute, see if MoveStatus even exists in the status result
|
||||
if pan_tilt_status is None:
|
||||
pan_tilt_status = getattr(status, "MoveStatus", None)
|
||||
|
||||
# we're unsupported
|
||||
if pan_tilt_status is None or pan_tilt_status.lower() not in [
|
||||
"idle",
|
||||
"moving",
|
||||
]:
|
||||
logger.error(
|
||||
f"Camera {camera_name} does not support the ONVIF GetStatus method. Autotracking will not function correctly and must be disabled in your config."
|
||||
)
|
||||
return
|
||||
|
||||
if pan_tilt_status.lower() == "idle" and (
|
||||
zoom_status is None or zoom_status.lower() == "idle"
|
||||
):
|
||||
self.cams[camera_name]["active"] = False
|
||||
if not self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
||||
|
||||
logger.debug(f"PTZ stop time: {datetime.datetime.now().timestamp()}")
|
||||
logger.debug(
|
||||
f"PTZ stop time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
||||
)
|
||||
|
||||
self.ptz_metrics[camera_name][
|
||||
"ptz_stop_time"
|
||||
].value = datetime.datetime.now().timestamp()
|
||||
self.ptz_metrics[camera_name]["ptz_stop_time"].value = self.ptz_metrics[
|
||||
camera_name
|
||||
]["ptz_frame_time"].value
|
||||
else:
|
||||
self.cams[camera_name]["active"] = True
|
||||
if self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
||||
|
||||
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
|
||||
logger.debug(
|
||||
f"PTZ start time: {self.ptz_metrics[camera_name]['ptz_frame_time'].value}"
|
||||
)
|
||||
|
||||
self.ptz_metrics[camera_name][
|
||||
"ptz_start_time"
|
||||
].value = datetime.datetime.now().timestamp()
|
||||
].value = self.ptz_metrics[camera_name]["ptz_frame_time"].value
|
||||
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
||||
|
||||
return {
|
||||
"pan": status.Position.PanTilt.x,
|
||||
"tilt": status.Position.PanTilt.y,
|
||||
"zoom": status.Position.Zoom.x,
|
||||
"pantilt_moving": status.MoveStatus.PanTilt,
|
||||
"zoom_moving": status.MoveStatus.Zoom,
|
||||
}
|
||||
if (
|
||||
self.config.cameras[camera_name].onvif.autotracking.zooming
|
||||
== ZoomingModeEnum.absolute
|
||||
):
|
||||
# store absolute zoom level as 0 to 1 interpolated from the values of the camera
|
||||
self.ptz_metrics[camera_name]["ptz_zoom_level"].value = numpy.interp(
|
||||
round(status.Position.Zoom.x, 2),
|
||||
[0, 1],
|
||||
[
|
||||
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
|
||||
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
|
||||
],
|
||||
)
|
||||
logger.debug(
|
||||
f'Camera zoom level: {self.ptz_metrics[camera_name]["ptz_zoom_level"].value}'
|
||||
)
|
||||
|
@ -1536,6 +1536,46 @@ class TestConfig(unittest.TestCase):
|
||||
assert runtime_config.cameras["back"].objects.filters["dog"].min_ratio == 0.2
|
||||
assert runtime_config.cameras["back"].objects.filters["dog"].max_ratio == 10.1
|
||||
|
||||
def test_valid_movement_weights(self):
|
||||
config = {
|
||||
"mqtt": {"host": "mqtt"},
|
||||
"cameras": {
|
||||
"back": {
|
||||
"ffmpeg": {
|
||||
"inputs": [
|
||||
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||||
]
|
||||
},
|
||||
"onvif": {"autotracking": {"movement_weights": "1.23, 2.34, 0.50"}},
|
||||
}
|
||||
},
|
||||
}
|
||||
frigate_config = FrigateConfig(**config)
|
||||
|
||||
runtime_config = frigate_config.runtime_config()
|
||||
assert runtime_config.cameras["back"].onvif.autotracking.movement_weights == [
|
||||
1.23,
|
||||
2.34,
|
||||
0.50,
|
||||
]
|
||||
|
||||
def test_fails_invalid_movement_weights(self):
|
||||
config = {
|
||||
"mqtt": {"host": "mqtt"},
|
||||
"cameras": {
|
||||
"back": {
|
||||
"ffmpeg": {
|
||||
"inputs": [
|
||||
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
|
||||
]
|
||||
},
|
||||
"onvif": {"autotracking": {"movement_weights": "1.234, 2.345a"}},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
self.assertRaises(ValueError, lambda: FrigateConfig(**config))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
@ -278,9 +278,11 @@ class NorfairTracker(ObjectTracker):
|
||||
min(self.detect_config.width - 1, estimate[2]),
|
||||
min(self.detect_config.height - 1, estimate[3]),
|
||||
)
|
||||
estimate_velocity = tuple(t.estimate_velocity.flatten().astype(int))
|
||||
obj = {
|
||||
**t.last_detection.data,
|
||||
"estimate": estimate,
|
||||
"estimate_velocity": estimate_velocity,
|
||||
}
|
||||
active_ids.append(t.global_id)
|
||||
if t.global_id not in self.track_id_map:
|
||||
|
@ -31,6 +31,8 @@ class PTZMetricsTypes(TypedDict):
|
||||
ptz_reset: Event
|
||||
ptz_start_time: Synchronized
|
||||
ptz_stop_time: Synchronized
|
||||
ptz_frame_time: Synchronized
|
||||
ptz_zoom_level: Synchronized
|
||||
|
||||
|
||||
class FeatureMetricsTypes(TypedDict):
|
||||
|
@ -249,3 +249,15 @@ def update_yaml(data, key_path, new_value):
|
||||
temp[last_key] = new_value
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def find_by_key(dictionary, target_key):
|
||||
if target_key in dictionary:
|
||||
return dictionary[target_key]
|
||||
else:
|
||||
for value in dictionary.values():
|
||||
if isinstance(value, dict):
|
||||
result = find_by_key(value, target_key)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
@ -767,6 +767,7 @@ def process_frames(
|
||||
continue
|
||||
|
||||
current_frame_time.value = frame_time
|
||||
ptz_metrics["ptz_frame_time"].value = frame_time
|
||||
|
||||
frame = frame_manager.get(
|
||||
f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1])
|
||||
|
Loading…
Reference in New Issue
Block a user