mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-21 00:06:44 +01:00
Autotracking optimizations (#7109)
* much improved motion estimation and tracking * docs updates * move ptz specific mp values to ptz_metrics dict * only check if moving at frame time * pass full dict instead of individual values
This commit is contained in:
parent
a94346e17e
commit
7a2d09dc35
@ -3,13 +3,13 @@ id: autotracking
|
||||
title: Autotracking
|
||||
---
|
||||
|
||||
An ONVIF-capable camera that supports relative movement within the field of view (FOV) can also be configured to automatically track moving objects and keep them in the center of the frame.
|
||||
An ONVIF-capable, PTZ (pan-tilt-zoom) camera that supports relative movement within the field of view (FOV) can be configured to automatically track moving objects and keep them in the center of the frame.
|
||||
|
||||
## Autotracking behavior
|
||||
|
||||
Once Frigate determines that an object is not a false positive and has entered one of the required zones, the autotracker will move the PTZ camera to keep the object centered in the frame until the object either moves out of the frame, the PTZ is not capable of any more movement, or Frigate loses track of it.
|
||||
|
||||
Upon loss of tracking, Frigate will scan the region of the lost object for `timeout` seconds. If an object of the same type is found in that region, Frigate will track that new object.
|
||||
Upon loss of tracking, Frigate will scan the region of the lost object for `timeout` seconds. If an object of the same type is found in that region, Frigate will autotrack that new object.
|
||||
|
||||
When tracking has ended, Frigate will return to the camera preset specified by the `return_preset` configuration entry.
|
||||
|
||||
@ -17,13 +17,15 @@ When tracking has ended, Frigate will return to the camera preset specified by t
|
||||
|
||||
Frigate autotracking functions with PTZ cameras capable of relative movement within the field of view (as specified in the [ONVIF spec](https://www.onvif.org/specs/srv/ptz/ONVIF-PTZ-Service-Spec-v1712.pdf) as `RelativePanTiltTranslationSpace` having a `TranslationSpaceFov` entry).
|
||||
|
||||
Many cheaper PTZs likely don't support this standard. Frigate will report an error message in the log and disable autotracking if your PTZ is unsupported.
|
||||
Many cheaper or older PTZs may not support this standard. Frigate will report an error message in the log and disable autotracking if your PTZ is unsupported.
|
||||
|
||||
Alternatively, you can download and run [this simple Python script](https://gist.github.com/hawkeye217/152a1d4ba80760dac95d46e143d37112), replacing the details on line 4 with your camera's IP address, ONVIF port, username, and password to check your camera.
|
||||
|
||||
## Configuration
|
||||
|
||||
First, configure the ONVIF parameters for your camera, then specify the object types to track, a required zone the object must enter, and a camera preset name to return to when tracking has ended. Optionally, specify a delay in seconds before Frigate returns the camera to the preset.
|
||||
First, set up a PTZ preset in your camera's firmware and give it a name.
|
||||
|
||||
Edit your Frigate configuration file and enter the ONVIF parameters for your camera. Specify the object types to track, a required zone the object must enter to begin autotracking, and the camera preset name you configured in your camera's firmware to return to when tracking has ended. Optionally, specify a delay in seconds before Frigate returns the camera to the preset.
|
||||
|
||||
An [ONVIF connection](cameras.md) is required for autotracking to function.
|
||||
|
||||
@ -69,3 +71,7 @@ The object tracker in Frigate estimates the motion of the PTZ so that tracked ob
|
||||
A fast [detector](object_detectors.md) is recommended. CPU detectors will not perform well or won't work at all. If Frigate already has trouble keeping track of your object, the autotracker will struggle as well.
|
||||
|
||||
The autotracker will add PTZ motion requests to a queue while the motor is moving. Once the motor stops, the events in the queue will be executed together as one large move (rather than incremental moves). If your PTZ's motor is slow, you may not be able to reliably autotrack fast moving objects.
|
||||
|
||||
## Usage applications
|
||||
|
||||
In security and surveillance, it's common to use "spotter" cameras in combination with your PTZ. When your fixed spotter camera detects an object, you could use an automation platform like Home Assistant to move the PTZ to a specific preset so that Frigate can begin automatically tracking the object. For example: a residence may have fixed cameras on the east and west side of the property, capturing views up and down a street. When the spotter camera on the west side detects a person, a Home Assistant automation could move the PTZ to a camera preset aimed toward the west. When the object enters the specified zone, Frigate's autotracker could then continue to track the person as it moves out of view of any of the fixed cameras.
|
||||
|
@ -48,7 +48,7 @@ from frigate.record.record import manage_recordings
|
||||
from frigate.stats import StatsEmitter, stats_init
|
||||
from frigate.storage import StorageMaintainer
|
||||
from frigate.timeline import TimelineProcessor
|
||||
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
|
||||
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes
|
||||
from frigate.version import VERSION
|
||||
from frigate.video import capture_camera, track_camera
|
||||
from frigate.watchdog import FrigateWatchdog
|
||||
@ -67,6 +67,7 @@ class FrigateApp:
|
||||
self.plus_api = PlusApi()
|
||||
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
|
||||
self.feature_metrics: dict[str, FeatureMetricsTypes] = {}
|
||||
self.ptz_metrics: dict[str, PTZMetricsTypes] = {}
|
||||
self.processes: dict[str, int] = {}
|
||||
|
||||
def set_environment_vars(self) -> None:
|
||||
@ -135,13 +136,6 @@ class FrigateApp:
|
||||
"i",
|
||||
self.config.cameras[camera_name].motion.improve_contrast,
|
||||
),
|
||||
"ptz_autotracker_enabled": mp.Value( # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"i",
|
||||
self.config.cameras[camera_name].onvif.autotracking.enabled,
|
||||
),
|
||||
"ptz_stopped": mp.Event(),
|
||||
"motion_threshold": mp.Value( # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
@ -170,7 +164,22 @@ class FrigateApp:
|
||||
"capture_process": None,
|
||||
"process": None,
|
||||
}
|
||||
self.camera_metrics[camera_name]["ptz_stopped"].set()
|
||||
self.ptz_metrics[camera_name] = {
|
||||
"ptz_autotracker_enabled": mp.Value( # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"i",
|
||||
self.config.cameras[camera_name].onvif.autotracking.enabled,
|
||||
),
|
||||
"ptz_stopped": mp.Event(),
|
||||
"ptz_start_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
"ptz_stop_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
# from mypy 0.981 onwards
|
||||
}
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
||||
self.feature_metrics[camera_name] = {
|
||||
"audio_enabled": mp.Value( # type: ignore[typeddict-item]
|
||||
# issue https://github.com/python/typeshed/issues/8799
|
||||
@ -317,7 +326,7 @@ class FrigateApp:
|
||||
)
|
||||
|
||||
def init_onvif(self) -> None:
|
||||
self.onvif_controller = OnvifController(self.config, self.camera_metrics)
|
||||
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
|
||||
|
||||
def init_dispatcher(self) -> None:
|
||||
comms: list[Communicator] = []
|
||||
@ -331,6 +340,7 @@ class FrigateApp:
|
||||
self.onvif_controller,
|
||||
self.camera_metrics,
|
||||
self.feature_metrics,
|
||||
self.ptz_metrics,
|
||||
comms,
|
||||
)
|
||||
|
||||
@ -375,7 +385,7 @@ class FrigateApp:
|
||||
self.ptz_autotracker_thread = PtzAutoTrackerThread(
|
||||
self.config,
|
||||
self.onvif_controller,
|
||||
self.camera_metrics,
|
||||
self.ptz_metrics,
|
||||
self.stop_event,
|
||||
)
|
||||
self.ptz_autotracker_thread.start()
|
||||
@ -426,6 +436,7 @@ class FrigateApp:
|
||||
self.detection_out_events[name],
|
||||
self.detected_frames_queue,
|
||||
self.camera_metrics[name],
|
||||
self.ptz_metrics[name],
|
||||
),
|
||||
)
|
||||
camera_process.daemon = True
|
||||
|
@ -6,7 +6,7 @@ from typing import Any, Callable
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.ptz.onvif import OnvifCommandEnum, OnvifController
|
||||
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
|
||||
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes
|
||||
from frigate.util.services import restart_frigate
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -40,12 +40,14 @@ class Dispatcher:
|
||||
onvif: OnvifController,
|
||||
camera_metrics: dict[str, CameraMetricsTypes],
|
||||
feature_metrics: dict[str, FeatureMetricsTypes],
|
||||
ptz_metrics: dict[str, PTZMetricsTypes],
|
||||
communicators: list[Communicator],
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.onvif = onvif
|
||||
self.camera_metrics = camera_metrics
|
||||
self.feature_metrics = feature_metrics
|
||||
self.ptz_metrics = ptz_metrics
|
||||
self.comms = communicators
|
||||
|
||||
for comm in self.comms:
|
||||
@ -165,16 +167,14 @@ class Dispatcher:
|
||||
ptz_autotracker_settings = self.config.cameras[camera_name].onvif.autotracking
|
||||
|
||||
if payload == "ON":
|
||||
if not self.camera_metrics[camera_name]["ptz_autotracker_enabled"].value:
|
||||
if not self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value:
|
||||
logger.info(f"Turning on ptz autotracker for {camera_name}")
|
||||
self.camera_metrics[camera_name]["ptz_autotracker_enabled"].value = True
|
||||
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = True
|
||||
ptz_autotracker_settings.enabled = True
|
||||
elif payload == "OFF":
|
||||
if self.camera_metrics[camera_name]["ptz_autotracker_enabled"].value:
|
||||
if self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value:
|
||||
logger.info(f"Turning off ptz autotracker for {camera_name}")
|
||||
self.camera_metrics[camera_name][
|
||||
"ptz_autotracker_enabled"
|
||||
].value = False
|
||||
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
||||
ptz_autotracker_settings.enabled = False
|
||||
|
||||
self.publish(f"{camera_name}/ptz_autotracker/state", payload, retain=True)
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import math
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
@ -14,33 +15,48 @@ from norfair.camera_motion import MotionEstimator, TranslationTransformationGett
|
||||
|
||||
from frigate.config import CameraConfig, FrigateConfig
|
||||
from frigate.ptz.onvif import OnvifController
|
||||
from frigate.types import CameraMetricsTypes
|
||||
from frigate.types import PTZMetricsTypes
|
||||
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
|
||||
# Determine if the PTZ was in motion at the set frame time
|
||||
# for non ptz/autotracking cameras, this will always return False
|
||||
# ptz_start_time is initialized to 0 on startup and only changes
|
||||
# when autotracking movements are made
|
||||
|
||||
# the offset "primes" the motion estimator with a few frames before movement
|
||||
offset = 0.5
|
||||
|
||||
return (ptz_start_time != 0.0 and frame_time >= ptz_start_time - offset) and (
|
||||
ptz_stop_time == 0.0 or (ptz_start_time - offset <= frame_time <= ptz_stop_time)
|
||||
)
|
||||
|
||||
|
||||
class PtzMotionEstimator:
|
||||
def __init__(self, config: CameraConfig, ptz_stopped) -> None:
|
||||
def __init__(self, config: CameraConfig, ptz_metrics: PTZMetricsTypes) -> None:
|
||||
self.frame_manager = SharedMemoryFrameManager()
|
||||
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
|
||||
self.norfair_motion_estimator = MotionEstimator(
|
||||
transformations_getter=TranslationTransformationGetter(),
|
||||
min_distance=30,
|
||||
max_points=500,
|
||||
max_points=900,
|
||||
)
|
||||
self.camera_config = config
|
||||
self.coord_transformations = None
|
||||
self.ptz_stopped = ptz_stopped
|
||||
self.ptz_metrics = ptz_metrics
|
||||
self.ptz_start_time = self.ptz_metrics["ptz_start_time"]
|
||||
self.ptz_stop_time = self.ptz_metrics["ptz_stop_time"]
|
||||
logger.debug(f"Motion estimator init for cam: {config.name}")
|
||||
|
||||
def motion_estimator(self, detections, frame_time, camera_name):
|
||||
if (
|
||||
self.camera_config.onvif.autotracking.enabled
|
||||
and not self.ptz_stopped.is_set()
|
||||
if ptz_moving_at_frame_time(
|
||||
frame_time, self.ptz_start_time.value, self.ptz_stop_time.value
|
||||
):
|
||||
logger.debug(
|
||||
f"Motion estimator running for {camera_name} - frame time: {frame_time}"
|
||||
f"Motion estimator running for {camera_name} - frame time: {frame_time}, {self.ptz_start_time.value}, {self.ptz_stop_time.value}"
|
||||
)
|
||||
|
||||
frame_id = f"{camera_name}{frame_time}"
|
||||
@ -74,9 +90,7 @@ class PtzMotionEstimator:
|
||||
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs((0,0))}"
|
||||
)
|
||||
|
||||
return self.coord_transformations
|
||||
|
||||
return None
|
||||
return self.coord_transformations
|
||||
|
||||
|
||||
class PtzAutoTrackerThread(threading.Thread):
|
||||
@ -84,12 +98,12 @@ class PtzAutoTrackerThread(threading.Thread):
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
onvif: OnvifController,
|
||||
camera_metrics: dict[str, CameraMetricsTypes],
|
||||
ptz_metrics: dict[str, PTZMetricsTypes],
|
||||
stop_event: MpEvent,
|
||||
) -> None:
|
||||
threading.Thread.__init__(self)
|
||||
self.name = "ptz_autotracker"
|
||||
self.ptz_autotracker = PtzAutoTracker(config, onvif, camera_metrics)
|
||||
self.ptz_autotracker = PtzAutoTracker(config, onvif, ptz_metrics)
|
||||
self.stop_event = stop_event
|
||||
self.config = config
|
||||
|
||||
@ -112,11 +126,11 @@ class PtzAutoTracker:
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
onvif: OnvifController,
|
||||
camera_metrics: CameraMetricsTypes,
|
||||
ptz_metrics: PTZMetricsTypes,
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.onvif = onvif
|
||||
self.camera_metrics = camera_metrics
|
||||
self.ptz_metrics = ptz_metrics
|
||||
self.tracked_object: dict[str, object] = {}
|
||||
self.tracked_object_previous: dict[str, object] = {}
|
||||
self.object_types = {}
|
||||
@ -146,17 +160,13 @@ class PtzAutoTracker:
|
||||
if not self.onvif._init_onvif(camera_name):
|
||||
logger.warning(f"Unable to initialize onvif for {camera_name}")
|
||||
cam.onvif.autotracking.enabled = False
|
||||
self.camera_metrics[camera_name][
|
||||
"ptz_autotracker_enabled"
|
||||
].value = False
|
||||
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
||||
|
||||
return
|
||||
|
||||
if not self.onvif.cams[camera_name]["relative_fov_supported"]:
|
||||
cam.onvif.autotracking.enabled = False
|
||||
self.camera_metrics[camera_name][
|
||||
"ptz_autotracker_enabled"
|
||||
].value = False
|
||||
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
||||
logger.warning(
|
||||
f"Disabling autotracking for {camera_name}: FOV relative movement not supported"
|
||||
)
|
||||
@ -198,16 +208,10 @@ class PtzAutoTracker:
|
||||
move_data = self.move_queues[camera].get()
|
||||
pan, tilt = move_data
|
||||
|
||||
# check if ptz is moving
|
||||
self.onvif.get_camera_status(camera)
|
||||
|
||||
# Wait until the camera finishes moving
|
||||
self.camera_metrics[camera]["ptz_stopped"].wait()
|
||||
|
||||
self.onvif._move_relative(camera, pan, tilt, 1)
|
||||
|
||||
# Wait until the camera finishes moving
|
||||
while not self.camera_metrics[camera]["ptz_stopped"].is_set():
|
||||
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
# check if ptz is moving
|
||||
self.onvif.get_camera_status(camera)
|
||||
time.sleep(1 / (self.config.cameras[camera].detect.fps / 2))
|
||||
@ -237,10 +241,7 @@ class PtzAutoTracker:
|
||||
def autotrack_object(self, camera, obj):
|
||||
camera_config = self.config.cameras[camera]
|
||||
|
||||
if (
|
||||
camera_config.onvif.autotracking.enabled
|
||||
and self.camera_metrics[camera]["ptz_stopped"].is_set()
|
||||
):
|
||||
if camera_config.onvif.autotracking.enabled:
|
||||
# either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive,
|
||||
# and is not initially motionless - or one we're already tracking, which assumes all those things are already true
|
||||
if (
|
||||
@ -259,7 +260,11 @@ class PtzAutoTracker:
|
||||
)
|
||||
self.tracked_object[camera] = obj
|
||||
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
# only enqueue another move if the camera isn't moving
|
||||
if self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera]["ptz_stopped"].clear()
|
||||
logger.debug("Autotrack: New object, moving ptz")
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
|
||||
return
|
||||
|
||||
@ -271,28 +276,57 @@ class PtzAutoTracker:
|
||||
and obj.obj_data["frame_time"]
|
||||
!= self.tracked_object_previous[camera].obj_data["frame_time"]
|
||||
):
|
||||
# don't move the ptz if we're relatively close to the existing box
|
||||
# should we use iou or euclidean distance or both?
|
||||
# distance = math.sqrt((obj.obj_data["centroid"][0] - camera_width/2)**2 + (obj.obj_data["centroid"][1] - obj.camera_height/2)**2)
|
||||
# if distance <= (self.camera_width * .15) or distance <= (self.camera_height * .15)
|
||||
if (
|
||||
intersection_over_union(
|
||||
self.tracked_object_previous[camera].obj_data["box"],
|
||||
obj.obj_data["box"],
|
||||
)
|
||||
> 0.5
|
||||
):
|
||||
# Don't move ptz if Euclidean distance from object to center of frame is
|
||||
# less than 15% of the of the larger dimension (width or height) of the frame,
|
||||
# multiplied by a scaling factor for object size.
|
||||
# Adjusting this percentage slightly lower will effectively cause the camera to move
|
||||
# more often to keep the object in the center. Raising the percentage will cause less
|
||||
# movement and will be more flexible with objects not quite being centered.
|
||||
# TODO: there's probably a better way to approach this
|
||||
distance = math.sqrt(
|
||||
(obj.obj_data["centroid"][0] - camera_config.detect.width / 2) ** 2
|
||||
+ (obj.obj_data["centroid"][1] - camera_config.detect.height / 2)
|
||||
** 2
|
||||
)
|
||||
|
||||
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
|
||||
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
|
||||
|
||||
max_obj = max(obj_width, obj_height)
|
||||
max_frame = max(camera_config.detect.width, camera_config.detect.height)
|
||||
|
||||
# larger objects should lower the threshold, smaller objects should raise it
|
||||
scaling_factor = 1 - (max_obj / max_frame)
|
||||
|
||||
distance_threshold = 0.15 * (max_frame) * scaling_factor
|
||||
|
||||
iou = intersection_over_union(
|
||||
self.tracked_object_previous[camera].obj_data["box"],
|
||||
obj.obj_data["box"],
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Distance: {distance}, threshold: {distance_threshold}, iou: {iou}"
|
||||
)
|
||||
|
||||
if (distance < distance_threshold or iou > 0.5) and self.ptz_metrics[
|
||||
camera
|
||||
]["ptz_stopped"].is_set():
|
||||
logger.debug(
|
||||
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
||||
)
|
||||
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
f"Autotrack: Existing object (move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
||||
f"Autotrack: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
||||
)
|
||||
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
|
||||
# only enqueue another move if the camera isn't moving
|
||||
if self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera]["ptz_stopped"].clear()
|
||||
logger.debug("Autotrack: Existing object, moving ptz")
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
|
||||
return
|
||||
|
||||
@ -320,7 +354,11 @@ class PtzAutoTracker:
|
||||
)
|
||||
self.tracked_object[camera] = obj
|
||||
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
# only enqueue another move if the camera isn't moving
|
||||
if self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera]["ptz_stopped"].clear()
|
||||
logger.debug("Autotrack: Reacquired object, moving ptz")
|
||||
self._autotrack_move_ptz(camera, obj)
|
||||
|
||||
return
|
||||
|
||||
@ -334,7 +372,6 @@ class PtzAutoTracker:
|
||||
f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
|
||||
)
|
||||
self.tracked_object[camera] = None
|
||||
self.onvif.get_camera_status(camera)
|
||||
|
||||
def camera_maintenance(self, camera):
|
||||
# calls get_camera_status to check/update ptz movement
|
||||
@ -344,7 +381,7 @@ class PtzAutoTracker:
|
||||
if not self.autotracker_init[camera]:
|
||||
self._autotracker_setup(self.config.cameras[camera], camera)
|
||||
# regularly update camera status
|
||||
if not self.camera_metrics[camera]["ptz_stopped"].is_set():
|
||||
if not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
||||
self.onvif.get_camera_status(camera)
|
||||
|
||||
# return to preset if tracking is over
|
||||
@ -359,7 +396,7 @@ class PtzAutoTracker:
|
||||
)
|
||||
and autotracker_config.return_preset
|
||||
):
|
||||
self.camera_metrics[camera]["ptz_stopped"].wait()
|
||||
self.ptz_metrics[camera]["ptz_stopped"].wait()
|
||||
logger.debug(
|
||||
f"Autotrack: Time is {time.time()}, returning to preset: {autotracker_config.return_preset}"
|
||||
)
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""Configure and control camera via onvif."""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import site
|
||||
from enum import Enum
|
||||
@ -8,7 +9,7 @@ import numpy
|
||||
from onvif import ONVIFCamera, ONVIFError
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.types import CameraMetricsTypes
|
||||
from frigate.types import PTZMetricsTypes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -29,10 +30,10 @@ class OnvifCommandEnum(str, Enum):
|
||||
|
||||
class OnvifController:
|
||||
def __init__(
|
||||
self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes]
|
||||
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetricsTypes]
|
||||
) -> None:
|
||||
self.cams: dict[str, ONVIFCamera] = {}
|
||||
self.camera_metrics = camera_metrics
|
||||
self.ptz_metrics = ptz_metrics
|
||||
|
||||
for cam_name, cam in config.cameras.items():
|
||||
if not cam.enabled:
|
||||
@ -207,7 +208,6 @@ class OnvifController:
|
||||
return
|
||||
|
||||
logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}")
|
||||
self.get_camera_status(camera_name)
|
||||
|
||||
if self.cams[camera_name]["active"]:
|
||||
logger.warning(
|
||||
@ -216,7 +216,12 @@ class OnvifController:
|
||||
return
|
||||
|
||||
self.cams[camera_name]["active"] = True
|
||||
self.camera_metrics[camera_name]["ptz_stopped"].clear()
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
||||
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
|
||||
self.ptz_metrics[camera_name][
|
||||
"ptz_start_time"
|
||||
].value = datetime.datetime.now().timestamp()
|
||||
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
move_request = self.cams[camera_name]["relative_move_request"]
|
||||
|
||||
@ -261,7 +266,7 @@ class OnvifController:
|
||||
return
|
||||
|
||||
self.cams[camera_name]["active"] = True
|
||||
self.camera_metrics[camera_name]["ptz_stopped"].clear()
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
||||
move_request = self.cams[camera_name]["move_request"]
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
preset_token = self.cams[camera_name]["presets"][preset]
|
||||
@ -271,7 +276,7 @@ class OnvifController:
|
||||
"PresetToken": preset_token,
|
||||
}
|
||||
)
|
||||
self.camera_metrics[camera_name]["ptz_stopped"].set()
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
||||
self.cams[camera_name]["active"] = False
|
||||
|
||||
def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
|
||||
@ -345,10 +350,25 @@ class OnvifController:
|
||||
|
||||
if status.MoveStatus.PanTilt == "IDLE" and status.MoveStatus.Zoom == "IDLE":
|
||||
self.cams[camera_name]["active"] = False
|
||||
self.camera_metrics[camera_name]["ptz_stopped"].set()
|
||||
if not self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].set()
|
||||
|
||||
logger.debug(f"PTZ stop time: {datetime.datetime.now().timestamp()}")
|
||||
|
||||
self.ptz_metrics[camera_name][
|
||||
"ptz_stop_time"
|
||||
].value = datetime.datetime.now().timestamp()
|
||||
else:
|
||||
self.cams[camera_name]["active"] = True
|
||||
self.camera_metrics[camera_name]["ptz_stopped"].clear()
|
||||
if self.ptz_metrics[camera_name]["ptz_stopped"].is_set():
|
||||
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
|
||||
|
||||
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
|
||||
|
||||
self.ptz_metrics[camera_name][
|
||||
"ptz_start_time"
|
||||
].value = datetime.datetime.now().timestamp()
|
||||
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
|
||||
|
||||
return {
|
||||
"pan": status.Position.PanTilt.x,
|
||||
|
@ -8,6 +8,7 @@ from norfair.drawing.drawer import Drawer
|
||||
from frigate.config import CameraConfig
|
||||
from frigate.ptz.autotrack import PtzMotionEstimator
|
||||
from frigate.track import ObjectTracker
|
||||
from frigate.types import PTZMetricsTypes
|
||||
from frigate.util.image import intersection_over_union
|
||||
|
||||
|
||||
@ -55,15 +56,18 @@ def frigate_distance(detection: Detection, tracked_object) -> float:
|
||||
|
||||
|
||||
class NorfairTracker(ObjectTracker):
|
||||
def __init__(self, config: CameraConfig, ptz_autotracker_enabled, ptz_stopped):
|
||||
def __init__(
|
||||
self,
|
||||
config: CameraConfig,
|
||||
ptz_metrics: PTZMetricsTypes,
|
||||
):
|
||||
self.tracked_objects = {}
|
||||
self.disappeared = {}
|
||||
self.positions = {}
|
||||
self.max_disappeared = config.detect.max_disappeared
|
||||
self.camera_config = config
|
||||
self.detect_config = config.detect
|
||||
self.ptz_autotracker_enabled = ptz_autotracker_enabled.value
|
||||
self.ptz_stopped = ptz_stopped
|
||||
self.ptz_autotracker_enabled = ptz_metrics["ptz_autotracker_enabled"]
|
||||
self.camera_name = config.name
|
||||
self.track_id_map = {}
|
||||
# TODO: could also initialize a tracker per object class if there
|
||||
@ -74,8 +78,8 @@ class NorfairTracker(ObjectTracker):
|
||||
initialization_delay=0,
|
||||
hit_counter_max=self.max_disappeared,
|
||||
)
|
||||
if self.ptz_autotracker_enabled:
|
||||
self.ptz_motion_estimator = PtzMotionEstimator(config, self.ptz_stopped)
|
||||
if self.ptz_autotracker_enabled.value:
|
||||
self.ptz_motion_estimator = PtzMotionEstimator(config, ptz_metrics)
|
||||
|
||||
def register(self, track_id, obj):
|
||||
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
|
||||
@ -239,7 +243,7 @@ class NorfairTracker(ObjectTracker):
|
||||
|
||||
coord_transformations = None
|
||||
|
||||
if self.ptz_autotracker_enabled:
|
||||
if self.ptz_autotracker_enabled.value:
|
||||
coord_transformations = self.ptz_motion_estimator.motion_estimator(
|
||||
detections, frame_time, self.camera_name
|
||||
)
|
||||
|
@ -18,8 +18,6 @@ class CameraMetricsTypes(TypedDict):
|
||||
frame_queue: Queue
|
||||
motion_enabled: Synchronized
|
||||
improve_contrast_enabled: Synchronized
|
||||
ptz_autotracker_enabled: Synchronized
|
||||
ptz_stopped: Event
|
||||
motion_threshold: Synchronized
|
||||
motion_contour_area: Synchronized
|
||||
process: Optional[Process]
|
||||
@ -28,6 +26,13 @@ class CameraMetricsTypes(TypedDict):
|
||||
skipped_fps: Synchronized
|
||||
|
||||
|
||||
class PTZMetricsTypes(TypedDict):
|
||||
ptz_autotracker_enabled: Synchronized
|
||||
ptz_stopped: Event
|
||||
ptz_start_time: Synchronized
|
||||
ptz_stop_time: Synchronized
|
||||
|
||||
|
||||
class FeatureMetricsTypes(TypedDict):
|
||||
audio_enabled: Synchronized
|
||||
record_enabled: Synchronized
|
||||
|
@ -22,8 +22,10 @@ from frigate.log import LogPipe
|
||||
from frigate.motion import MotionDetector
|
||||
from frigate.motion.improved_motion import ImprovedMotionDetector
|
||||
from frigate.object_detection import RemoteObjectDetector
|
||||
from frigate.ptz.autotrack import ptz_moving_at_frame_time
|
||||
from frigate.track import ObjectTracker
|
||||
from frigate.track.norfair_tracker import NorfairTracker
|
||||
from frigate.types import PTZMetricsTypes
|
||||
from frigate.util.builtin import EventsPerSecond
|
||||
from frigate.util.image import (
|
||||
FrameManager,
|
||||
@ -461,6 +463,7 @@ def track_camera(
|
||||
result_connection,
|
||||
detected_objects_queue,
|
||||
process_info,
|
||||
ptz_metrics,
|
||||
):
|
||||
stop_event = mp.Event()
|
||||
|
||||
@ -478,8 +481,6 @@ def track_camera(
|
||||
detection_enabled = process_info["detection_enabled"]
|
||||
motion_enabled = process_info["motion_enabled"]
|
||||
improve_contrast_enabled = process_info["improve_contrast_enabled"]
|
||||
ptz_autotracker_enabled = process_info["ptz_autotracker_enabled"]
|
||||
ptz_stopped = process_info["ptz_stopped"]
|
||||
motion_threshold = process_info["motion_threshold"]
|
||||
motion_contour_area = process_info["motion_contour_area"]
|
||||
|
||||
@ -499,7 +500,7 @@ def track_camera(
|
||||
name, labelmap, detection_queue, result_connection, model_config, stop_event
|
||||
)
|
||||
|
||||
object_tracker = NorfairTracker(config, ptz_autotracker_enabled, ptz_stopped)
|
||||
object_tracker = NorfairTracker(config, ptz_metrics)
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
|
||||
@ -520,7 +521,7 @@ def track_camera(
|
||||
detection_enabled,
|
||||
motion_enabled,
|
||||
stop_event,
|
||||
ptz_stopped,
|
||||
ptz_metrics,
|
||||
)
|
||||
|
||||
logger.info(f"{name}: exiting subprocess")
|
||||
@ -745,7 +746,7 @@ def process_frames(
|
||||
detection_enabled: mp.Value,
|
||||
motion_enabled: mp.Value,
|
||||
stop_event,
|
||||
ptz_stopped: mp.Event,
|
||||
ptz_metrics: PTZMetricsTypes,
|
||||
exit_on_empty: bool = False,
|
||||
):
|
||||
fps = process_info["process_fps"]
|
||||
@ -781,10 +782,17 @@ def process_frames(
|
||||
logger.info(f"{camera_name}: frame {frame_time} is not in memory store.")
|
||||
continue
|
||||
|
||||
# look for motion if enabled
|
||||
# look for motion if enabled and ptz is not moving
|
||||
# ptz_moving_at_frame_time() always returns False for
|
||||
# non ptz/autotracking cameras
|
||||
motion_boxes = (
|
||||
motion_detector.detect(frame)
|
||||
if motion_enabled.value and ptz_stopped.is_set()
|
||||
if motion_enabled.value
|
||||
and not ptz_moving_at_frame_time(
|
||||
frame_time,
|
||||
ptz_metrics["ptz_start_time"].value,
|
||||
ptz_metrics["ptz_stop_time"].value,
|
||||
)
|
||||
else []
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user