mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-21 00:06:44 +01:00
423 lines
18 KiB
Python
423 lines
18 KiB
Python
"""Automatically pan, tilt, and zoom on detected objects via onvif."""
|
|
|
|
import copy
|
|
import logging
|
|
import math
|
|
import queue
|
|
import threading
|
|
import time
|
|
from functools import partial
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from norfair.camera_motion import MotionEstimator, TranslationTransformationGetter
|
|
|
|
from frigate.config import CameraConfig, FrigateConfig
|
|
from frigate.ptz.onvif import OnvifController
|
|
from frigate.types import PTZMetricsTypes
|
|
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
|
|
# Determine if the PTZ was in motion at the set frame time
|
|
# for non ptz/autotracking cameras, this will always return False
|
|
# ptz_start_time is initialized to 0 on startup and only changes
|
|
# when autotracking movements are made
|
|
|
|
# the offset "primes" the motion estimator with a few frames before movement
|
|
offset = 0.5
|
|
|
|
return (ptz_start_time != 0.0 and frame_time >= ptz_start_time - offset) and (
|
|
ptz_stop_time == 0.0 or (ptz_start_time - offset <= frame_time <= ptz_stop_time)
|
|
)
|
|
|
|
|
|
class PtzMotionEstimator:
|
|
def __init__(
|
|
self, config: CameraConfig, ptz_metrics: dict[str, PTZMetricsTypes]
|
|
) -> None:
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
self.norfair_motion_estimator = None
|
|
self.camera_config = config
|
|
self.coord_transformations = None
|
|
self.ptz_metrics = ptz_metrics
|
|
self.ptz_start_time = self.ptz_metrics["ptz_start_time"]
|
|
self.ptz_stop_time = self.ptz_metrics["ptz_stop_time"]
|
|
|
|
self.ptz_metrics["ptz_reset"].set()
|
|
logger.debug(f"Motion estimator init for cam: {config.name}")
|
|
|
|
def motion_estimator(self, detections, frame_time, camera_name):
|
|
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
|
|
if self.ptz_metrics["ptz_reset"].is_set():
|
|
self.ptz_metrics["ptz_reset"].clear()
|
|
logger.debug("Motion estimator reset")
|
|
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
|
|
self.norfair_motion_estimator = MotionEstimator(
|
|
transformations_getter=TranslationTransformationGetter(),
|
|
min_distance=30,
|
|
max_points=900,
|
|
)
|
|
self.coord_transformations = None
|
|
|
|
if ptz_moving_at_frame_time(
|
|
frame_time, self.ptz_start_time.value, self.ptz_stop_time.value
|
|
):
|
|
logger.debug(
|
|
f"Motion estimator running for {camera_name} - frame time: {frame_time}, {self.ptz_start_time.value}, {self.ptz_stop_time.value}"
|
|
)
|
|
|
|
frame_id = f"{camera_name}{frame_time}"
|
|
yuv_frame = self.frame_manager.get(
|
|
frame_id, self.camera_config.frame_shape_yuv
|
|
)
|
|
|
|
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
|
|
|
|
# mask out detections for better motion estimation
|
|
mask = np.ones(frame.shape[:2], frame.dtype)
|
|
|
|
detection_boxes = [x[2] for x in detections]
|
|
for detection in detection_boxes:
|
|
x1, y1, x2, y2 = detection
|
|
mask[y1:y2, x1:x2] = 0
|
|
|
|
# merge camera config motion mask with detections. Norfair function needs 0,1 mask
|
|
mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1)
|
|
|
|
# Norfair estimator function needs color so it can convert it right back to gray
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA)
|
|
|
|
self.coord_transformations = self.norfair_motion_estimator.update(
|
|
frame, mask
|
|
)
|
|
|
|
self.frame_manager.close(frame_id)
|
|
|
|
logger.debug(
|
|
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs((0,0))}"
|
|
)
|
|
|
|
return self.coord_transformations
|
|
|
|
|
|
class PtzAutoTrackerThread(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
onvif: OnvifController,
|
|
ptz_metrics: dict[str, PTZMetricsTypes],
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
threading.Thread.__init__(self)
|
|
self.name = "ptz_autotracker"
|
|
self.ptz_autotracker = PtzAutoTracker(config, onvif, ptz_metrics)
|
|
self.stop_event = stop_event
|
|
self.config = config
|
|
|
|
def run(self):
|
|
while not self.stop_event.wait(1):
|
|
for camera_name, cam in self.config.cameras.items():
|
|
if not cam.enabled:
|
|
continue
|
|
|
|
if cam.onvif.autotracking.enabled:
|
|
self.ptz_autotracker.camera_maintenance(camera_name)
|
|
else:
|
|
# disabled dynamically by mqtt
|
|
if self.ptz_autotracker.tracked_object.get(camera_name):
|
|
self.ptz_autotracker.tracked_object[camera_name] = None
|
|
self.ptz_autotracker.tracked_object_previous[camera_name] = None
|
|
|
|
logger.info("Exiting autotracker...")
|
|
|
|
|
|
class PtzAutoTracker:
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
onvif: OnvifController,
|
|
ptz_metrics: PTZMetricsTypes,
|
|
) -> None:
|
|
self.config = config
|
|
self.onvif = onvif
|
|
self.ptz_metrics = ptz_metrics
|
|
self.tracked_object: dict[str, object] = {}
|
|
self.tracked_object_previous: dict[str, object] = {}
|
|
self.previous_frame_time = None
|
|
self.object_types = {}
|
|
self.required_zones = {}
|
|
self.move_queues = {}
|
|
self.move_threads = {}
|
|
self.autotracker_init = {}
|
|
|
|
# if cam is set to autotrack, onvif should be set up
|
|
for camera_name, cam in self.config.cameras.items():
|
|
if not cam.enabled:
|
|
continue
|
|
|
|
self.autotracker_init[camera_name] = False
|
|
if cam.onvif.autotracking.enabled:
|
|
self._autotracker_setup(cam, camera_name)
|
|
|
|
def _autotracker_setup(self, cam, camera_name):
|
|
logger.debug(f"Autotracker init for cam: {camera_name}")
|
|
|
|
self.object_types[camera_name] = cam.onvif.autotracking.track
|
|
self.required_zones[camera_name] = cam.onvif.autotracking.required_zones
|
|
|
|
self.tracked_object[camera_name] = None
|
|
self.tracked_object_previous[camera_name] = None
|
|
|
|
self.move_queues[camera_name] = queue.Queue()
|
|
|
|
if not self.onvif.cams[camera_name]["init"]:
|
|
if not self.onvif._init_onvif(camera_name):
|
|
logger.warning(f"Unable to initialize onvif for {camera_name}")
|
|
cam.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
|
|
|
return
|
|
|
|
if not self.onvif.cams[camera_name]["relative_fov_supported"]:
|
|
cam.onvif.autotracking.enabled = False
|
|
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
|
|
logger.warning(
|
|
f"Disabling autotracking for {camera_name}: FOV relative movement not supported"
|
|
)
|
|
|
|
return
|
|
|
|
# movement thread per camera
|
|
if not self.move_threads or not self.move_threads[camera_name]:
|
|
self.move_threads[camera_name] = threading.Thread(
|
|
name=f"move_thread_{camera_name}",
|
|
target=partial(self._process_move_queue, camera_name),
|
|
)
|
|
self.move_threads[camera_name].daemon = True
|
|
self.move_threads[camera_name].start()
|
|
|
|
self.autotracker_init[camera_name] = True
|
|
|
|
def _process_move_queue(self, camera):
|
|
while True:
|
|
try:
|
|
move_data = self.move_queues[camera].get()
|
|
frame_time, pan, tilt = move_data
|
|
|
|
# if we're receiving move requests during a PTZ move, ignore them
|
|
if ptz_moving_at_frame_time(
|
|
frame_time,
|
|
self.ptz_metrics[camera]["ptz_start_time"].value,
|
|
self.ptz_metrics[camera]["ptz_stop_time"].value,
|
|
):
|
|
# instead of dequeueing this might be a good place to preemptively move based
|
|
# on an estimate - for fast moving objects, etc.
|
|
logger.debug(
|
|
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}"
|
|
)
|
|
continue
|
|
|
|
else:
|
|
# on some cameras with cheaper motors it seems like small values can cause jerky movement
|
|
# TODO: double check, might not need this
|
|
if abs(pan) > 0.02 or abs(tilt) > 0.02:
|
|
self.onvif._move_relative(camera, pan, tilt, 1)
|
|
else:
|
|
logger.debug(
|
|
f"Not moving, pan and tilt too small: {pan}, {tilt}"
|
|
)
|
|
|
|
# Wait until the camera finishes moving
|
|
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
|
# check if ptz is moving
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
except queue.Empty:
|
|
continue
|
|
|
|
def _enqueue_move(self, camera, frame_time, pan, tilt):
|
|
move_data = (frame_time, pan, tilt)
|
|
if (
|
|
frame_time > self.ptz_metrics[camera]["ptz_start_time"].value
|
|
and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value
|
|
):
|
|
logger.debug(f"enqueue pan: {pan}, enqueue tilt: {tilt}")
|
|
self.move_queues[camera].put(move_data)
|
|
|
|
def _autotrack_move_ptz(self, camera, obj):
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
# # frame width and height
|
|
camera_width = camera_config.frame_shape[1]
|
|
camera_height = camera_config.frame_shape[0]
|
|
|
|
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
|
|
pan = ((obj.obj_data["centroid"][0] / camera_width) - 0.5) * 2
|
|
tilt = (0.5 - (obj.obj_data["centroid"][1] / camera_height)) * 2
|
|
|
|
# ideas: check object velocity for camera speed?
|
|
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt)
|
|
|
|
def autotrack_object(self, camera, obj):
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
if camera_config.onvif.autotracking.enabled:
|
|
if not self.autotracker_init[camera]:
|
|
self._autotracker_setup(self.config.cameras[camera], camera)
|
|
|
|
# either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive,
|
|
# and is not initially motionless - or one we're already tracking, which assumes all those things are already true
|
|
if (
|
|
# new object
|
|
self.tracked_object[camera] is None
|
|
and obj.camera == camera
|
|
and obj.obj_data["label"] in self.object_types[camera]
|
|
and set(obj.entered_zones) & set(self.required_zones[camera])
|
|
and not obj.previous["false_positive"]
|
|
and not obj.false_positive
|
|
and self.tracked_object_previous[camera] is None
|
|
and obj.obj_data["motionless_count"] == 0
|
|
):
|
|
logger.debug(
|
|
f"Autotrack: New object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
self.tracked_object[camera] = obj
|
|
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
|
self.previous_frame_time = obj.obj_data["frame_time"]
|
|
self._autotrack_move_ptz(camera, obj)
|
|
|
|
return
|
|
|
|
if (
|
|
# already tracking an object
|
|
self.tracked_object[camera] is not None
|
|
and self.tracked_object_previous[camera] is not None
|
|
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
|
|
and obj.obj_data["frame_time"] != self.previous_frame_time
|
|
):
|
|
self.previous_frame_time = obj.obj_data["frame_time"]
|
|
# Don't move ptz if Euclidean distance from object to center of frame is
|
|
# less than 15% of the of the larger dimension (width or height) of the frame,
|
|
# multiplied by a scaling factor for object size.
|
|
# Adjusting this percentage slightly lower will effectively cause the camera to move
|
|
# more often to keep the object in the center. Raising the percentage will cause less
|
|
# movement and will be more flexible with objects not quite being centered.
|
|
# TODO: there's probably a better way to approach this
|
|
distance = math.sqrt(
|
|
(obj.obj_data["centroid"][0] - camera_config.detect.width / 2) ** 2
|
|
+ (obj.obj_data["centroid"][1] - camera_config.detect.height / 2)
|
|
** 2
|
|
)
|
|
|
|
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
|
|
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
|
|
|
|
max_obj = max(obj_width, obj_height)
|
|
max_frame = max(camera_config.detect.width, camera_config.detect.height)
|
|
|
|
# larger objects should lower the threshold, smaller objects should raise it
|
|
scaling_factor = 1 - (max_obj / max_frame)
|
|
|
|
distance_threshold = 0.15 * (max_frame) * scaling_factor
|
|
|
|
iou = intersection_over_union(
|
|
self.tracked_object_previous[camera].obj_data["box"],
|
|
obj.obj_data["box"],
|
|
)
|
|
|
|
logger.debug(
|
|
f"Distance: {distance}, threshold: {distance_threshold}, iou: {iou}"
|
|
)
|
|
|
|
if distance < distance_threshold and iou > 0.2:
|
|
logger.debug(
|
|
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
return
|
|
|
|
logger.debug(
|
|
f"Autotrack: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
|
self._autotrack_move_ptz(camera, obj)
|
|
|
|
return
|
|
|
|
if (
|
|
# The tracker lost an object, so let's check the previous object's region and compare it with the incoming object
|
|
# If it's within bounds, start tracking that object.
|
|
# Should we check region (maybe too broad) or expand the previous object's box a bit and check that?
|
|
self.tracked_object[camera] is None
|
|
and obj.camera == camera
|
|
and obj.obj_data["label"] in self.object_types[camera]
|
|
and not obj.previous["false_positive"]
|
|
and not obj.false_positive
|
|
and obj.obj_data["motionless_count"] == 0
|
|
and self.tracked_object_previous[camera] is not None
|
|
):
|
|
self.previous_frame_time = obj.obj_data["frame_time"]
|
|
if (
|
|
intersection_over_union(
|
|
self.tracked_object_previous[camera].obj_data["region"],
|
|
obj.obj_data["box"],
|
|
)
|
|
< 0.2
|
|
):
|
|
logger.debug(
|
|
f"Autotrack: Reacquired object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
|
|
)
|
|
self.tracked_object[camera] = obj
|
|
self.tracked_object_previous[camera] = copy.deepcopy(obj)
|
|
self._autotrack_move_ptz(camera, obj)
|
|
|
|
return
|
|
|
|
def end_object(self, camera, obj):
|
|
if self.config.cameras[camera].onvif.autotracking.enabled:
|
|
if (
|
|
self.tracked_object[camera] is not None
|
|
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
|
|
):
|
|
logger.debug(
|
|
f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
|
|
)
|
|
self.tracked_object[camera] = None
|
|
|
|
def camera_maintenance(self, camera):
|
|
# calls get_camera_status to check/update ptz movement
|
|
# returns camera to preset after timeout when tracking is over
|
|
autotracker_config = self.config.cameras[camera].onvif.autotracking
|
|
|
|
if not self.autotracker_init[camera]:
|
|
self._autotracker_setup(self.config.cameras[camera], camera)
|
|
# regularly update camera status
|
|
if not self.ptz_metrics[camera]["ptz_stopped"].is_set():
|
|
self.onvif.get_camera_status(camera)
|
|
|
|
# return to preset if tracking is over
|
|
if (
|
|
self.tracked_object[camera] is None
|
|
and self.tracked_object_previous[camera] is not None
|
|
and (
|
|
# might want to use a different timestamp here?
|
|
time.time()
|
|
- self.tracked_object_previous[camera].obj_data["frame_time"]
|
|
> autotracker_config.timeout
|
|
)
|
|
and autotracker_config.return_preset
|
|
):
|
|
self.ptz_metrics[camera]["ptz_stopped"].wait()
|
|
logger.debug(
|
|
f"Autotrack: Time is {time.time()}, returning to preset: {autotracker_config.return_preset}"
|
|
)
|
|
self.onvif._move_to_preset(
|
|
camera,
|
|
autotracker_config.return_preset.lower(),
|
|
)
|
|
self.ptz_metrics[camera]["ptz_reset"].set()
|
|
self.tracked_object_previous[camera] = None
|