diff --git a/frigate/app.py b/frigate/app.py index 9d85f461e..79e785a13 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -117,6 +117,11 @@ class FrigateApp: "improve_contrast_enabled": mp.Value( "i", self.config.cameras[camera_name].motion.improve_contrast ), + "ptz_autotracker_enabled": mp.Value( + "i", + self.config.cameras[camera_name].onvif.autotracking.enabled, + ), + "ptz_moving": mp.Value("i", 0), "motion_threshold": mp.Value( "i", self.config.cameras[camera_name].motion.threshold ), @@ -268,7 +273,7 @@ class FrigateApp: ) def init_onvif(self) -> None: - self.onvif_controller = OnvifController(self.config) + self.onvif_controller = OnvifController(self.config, self.camera_metrics) def init_dispatcher(self) -> None: comms: list[Communicator] = [] diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index b7e9e8858..d6b242937 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -54,6 +54,7 @@ class Dispatcher: self._camera_settings_handlers: dict[str, Callable] = { "detect": self._on_detect_command, "improve_contrast": self._on_motion_improve_contrast_command, + "ptz_autotracker": self._on_ptz_autotracker_command, "motion": self._on_motion_command, "motion_contour_area": self._on_motion_contour_area_command, "motion_threshold": self._on_motion_threshold_command, @@ -158,6 +159,25 @@ class Dispatcher: self.publish(f"{camera_name}/improve_contrast/state", payload, retain=True) + def _on_ptz_autotracker_command(self, camera_name: str, payload: str) -> None: + """Callback for ptz_autotracker topic.""" + ptz_autotracker_settings = self.config.cameras[camera_name].onvif.autotracking + + if payload == "ON": + if not self.camera_metrics[camera_name]["ptz_autotracker_enabled"].value: + logger.info(f"Turning on ptz autotracker for {camera_name}") + self.camera_metrics[camera_name]["ptz_autotracker_enabled"].value = True + ptz_autotracker_settings.enabled = True + elif payload == "OFF": + if self.camera_metrics[camera_name]["ptz_autotracker_enabled"].value: + logger.info(f"Turning off ptz autotracker for {camera_name}") + self.camera_metrics[camera_name][ + "ptz_autotracker_enabled" + ].value = False + ptz_autotracker_settings.enabled = False + + self.publish(f"{camera_name}/ptz_autotracker/state", payload, retain=True) + def _on_motion_contour_area_command(self, camera_name: str, payload: int) -> None: """Callback for motion contour topic.""" try: diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 07799f9da..8a941d7fa 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -64,6 +64,11 @@ class MqttClient(Communicator): # type: ignore[misc] "ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr] retain=True, ) + self.publish( + f"{camera_name}/ptz_autotracker/state", + "ON" if camera.onvif.autotracking.enabled else "OFF", # type: ignore[union-attr] + retain=True, + ) self.publish( f"{camera_name}/motion_threshold/state", camera.motion.threshold, # type: ignore[union-attr] diff --git a/frigate/config.py b/frigate/config.py index b71ba1907..14b15e07b 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -127,11 +127,37 @@ class MqttConfig(FrigateBaseModel): return v +class PtzAutotrackConfig(FrigateBaseModel): + enabled: bool = Field(default=False, title="Enable PTZ auto tracking.") + motion_estimator: bool = Field(default=False, title="Use Norfair motion estimator.") + track: List[str] = Field(default=DEFAULT_TRACKED_OBJECTS, title="Objects to track.") + required_zones: List[str] = Field( + default_factory=list, + title="List of required zones to be entered in order to begin autotracking.", + ) + size_ratio: float = Field( + default=0.5, + title="Target ratio of tracked object to field of view (0.2-0.9).", + ge=0.2, + le=0.9, + ) + return_preset: Optional[str] = Field( + title="Name of camera preset to return to when object tracking is over." + ) + timeout: int = Field( + default=5, title="Seconds to delay before returning to preset." + ) + + class OnvifConfig(FrigateBaseModel): host: str = Field(default="", title="Onvif Host") port: int = Field(default=8000, title="Onvif Port") user: Optional[str] = Field(title="Onvif Username") password: Optional[str] = Field(title="Onvif Password") + autotracking: PtzAutotrackConfig = Field( + default_factory=PtzAutotrackConfig, + title="PTZ auto tracking config.", + ) class RetainModeEnum(str, Enum): diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 6d31c3cdd..3ddb08e97 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -22,6 +22,7 @@ from frigate.config import ( ) from frigate.const import CLIPS_DIR from frigate.events.maintainer import EventTypeEnum +from frigate.ptz_autotrack import PtzAutoTrackerThread from frigate.util import ( SharedMemoryFrameManager, area, @@ -143,6 +144,7 @@ class TrackedObject: def update(self, current_frame_time, obj_data): thumb_update = False significant_change = False + autotracker_update = False # if the object is not in the current frame, add a 0.0 to the score history if obj_data["frame_time"] != current_frame_time: self.score_history.append(0.0) @@ -237,9 +239,13 @@ class TrackedObject: if self.obj_data["frame_time"] - self.previous["frame_time"] > 60: significant_change = True + # update autotrack every second? or fps? + if self.obj_data["frame_time"] - self.previous["frame_time"] > 1: + autotracker_update = True + self.obj_data.update(obj_data) self.current_zones = current_zones - return (thumb_update, significant_change) + return (thumb_update, significant_change, autotracker_update) def to_dict(self, include_thumbnail: bool = False): (self.thumbnail_data["frame_time"] if self.thumbnail_data is not None else 0.0) @@ -438,7 +444,11 @@ def zone_filtered(obj: TrackedObject, object_config): # Maintains the state of a camera class CameraState: def __init__( - self, name, config: FrigateConfig, frame_manager: SharedMemoryFrameManager + self, + name, + config: FrigateConfig, + frame_manager: SharedMemoryFrameManager, + ptz_autotracker_thread: PtzAutoTrackerThread, ): self.name = name self.config = config @@ -456,6 +466,7 @@ class CameraState: self.regions = [] self.previous_frame_id = None self.callbacks = defaultdict(list) + self.ptz_autotracker_thread = ptz_autotracker_thread def get_current_frame(self, draw_options={}): with self.current_frame_lock: @@ -477,6 +488,20 @@ class CameraState: thickness = 1 color = (255, 0, 0) + # draw thicker box around ptz autotracked object + if ( + self.ptz_autotracker_thread.ptz_autotracker.tracked_object[ + self.name + ] + is not None + and obj["id"] + == self.ptz_autotracker_thread.ptz_autotracker.tracked_object[ + self.name + ].obj_data["id"] + ): + thickness = 5 + color = self.config.model.colormap[obj["label"]] + # draw the bounding boxes on the frame box = obj["box"] draw_box_with_label( @@ -590,10 +615,14 @@ class CameraState: for id in updated_ids: updated_obj = tracked_objects[id] - thumb_update, significant_update = updated_obj.update( + thumb_update, significant_update, autotracker_update = updated_obj.update( frame_time, current_detections[id] ) + if autotracker_update: + for c in self.callbacks["autotrack"]: + c(self.name, updated_obj, frame_time) + if thumb_update: # ensure this frame is stored in the cache if ( @@ -749,6 +778,9 @@ class TrackedObjectProcessor(threading.Thread): self.camera_states: dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() self.last_motion_detected: dict[str, float] = {} + self.ptz_autotracker_thread = PtzAutoTrackerThread( + config, dispatcher.onvif, dispatcher.camera_metrics, self.stop_event + ) def start(camera, obj: TrackedObject, current_frame_time): self.event_queue.put( @@ -775,6 +807,9 @@ class TrackedObjectProcessor(threading.Thread): ) ) + def autotrack(camera, obj: TrackedObject, current_frame_time): + self.ptz_autotracker_thread.ptz_autotracker.autotrack_object(camera, obj) + def end(camera, obj: TrackedObject, current_frame_time): # populate has_snapshot obj.has_snapshot = self.should_save_snapshot(camera, obj) @@ -823,6 +858,7 @@ class TrackedObjectProcessor(threading.Thread): "type": "end", } self.dispatcher.publish("events", json.dumps(message), retain=False) + self.ptz_autotracker_thread.ptz_autotracker.end_object(camera, obj) self.event_queue.put( ( @@ -859,8 +895,11 @@ class TrackedObjectProcessor(threading.Thread): self.dispatcher.publish(f"{camera}/{object_name}", status, retain=False) for camera in self.config.cameras.keys(): - camera_state = CameraState(camera, self.config, self.frame_manager) + camera_state = CameraState( + camera, self.config, self.frame_manager, self.ptz_autotracker_thread + ) camera_state.on("start", start) + camera_state.on("autotrack", autotrack) camera_state.on("update", update) camera_state.on("end", end) camera_state.on("snapshot", snapshot) @@ -1002,6 +1041,7 @@ class TrackedObjectProcessor(threading.Thread): return self.camera_states[camera].current_frame_time def run(self): + self.ptz_autotracker_thread.start() while not self.stop_event.is_set(): try: ( @@ -1122,4 +1162,5 @@ class TrackedObjectProcessor(threading.Thread): event_id, camera = self.event_processed_queue.get() self.camera_states[camera].finished(event_id) + self.ptz_autotracker_thread.join() logger.info("Exiting object processor...") diff --git a/frigate/ptz.py b/frigate/ptz.py index 385a230bc..f7971a455 100644 --- a/frigate/ptz.py +++ b/frigate/ptz.py @@ -4,9 +4,11 @@ import logging import site from enum import Enum +import numpy from onvif import ONVIFCamera, ONVIFError from frigate.config import FrigateConfig +from frigate.types import CameraMetricsTypes logger = logging.getLogger(__name__) @@ -26,8 +28,11 @@ class OnvifCommandEnum(str, Enum): class OnvifController: - def __init__(self, config: FrigateConfig) -> None: + def __init__( + self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes] + ) -> None: self.cams: dict[str, ONVIFCamera] = {} + self.camera_metrics = camera_metrics for cam_name, cam in config.cameras.items(): if not cam.enabled: @@ -68,12 +73,51 @@ class OnvifController: ptz = onvif.create_ptz_service() request = ptz.create_type("GetConfigurationOptions") request.ConfigurationToken = profile.PTZConfiguration.token + ptz_config = ptz.GetConfigurationOptions(request) - # setup moving request + fov_space_id = next( + ( + i + for i, space in enumerate( + ptz_config.Spaces.RelativePanTiltTranslationSpace + ) + if "TranslationSpaceFov" in space["URI"] + ), + None, + ) + + # setup continuous moving request move_request = ptz.create_type("ContinuousMove") move_request.ProfileToken = profile.token self.cams[camera_name]["move_request"] = move_request + # setup relative moving request for autotracking + move_request = ptz.create_type("RelativeMove") + move_request.ProfileToken = profile.token + if move_request.Translation is None and fov_space_id is not None: + move_request.Translation = ptz.GetStatus( + {"ProfileToken": profile.token} + ).Position + move_request.Translation.PanTilt.space = ptz_config["Spaces"][ + "RelativePanTiltTranslationSpace" + ][fov_space_id]["URI"] + move_request.Translation.Zoom.space = ptz_config["Spaces"][ + "RelativeZoomTranslationSpace" + ][0]["URI"] + if move_request.Speed is None: + move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position + self.cams[camera_name]["relative_move_request"] = move_request + + # setup relative moving request for autotracking + move_request = ptz.create_type("AbsoluteMove") + move_request.ProfileToken = profile.token + self.cams[camera_name]["absolute_move_request"] = move_request + + # status request for autotracking + status_request = ptz.create_type("GetStatus") + status_request.ProfileToken = profile.token + self.cams[camera_name]["status_request"] = status_request + # setup existing presets try: presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token}) @@ -94,6 +138,20 @@ class OnvifController: if ptz_config.Spaces and ptz_config.Spaces.ContinuousZoomVelocitySpace: supported_features.append("zoom") + if ptz_config.Spaces and ptz_config.Spaces.RelativePanTiltTranslationSpace: + supported_features.append("pt-r") + + if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace: + supported_features.append("zoom-r") + + if fov_space_id is not None: + supported_features.append("pt-r-fov") + self.cams[camera_name][ + "relative_fov_range" + ] = ptz_config.Spaces.RelativePanTiltTranslationSpace[fov_space_id] + + self.cams[camera_name]["relative_fov_supported"] = fov_space_id is not None + self.cams[camera_name]["features"] = supported_features self.cams[camera_name]["init"] = True @@ -143,12 +201,74 @@ class OnvifController: onvif.get_service("ptz").ContinuousMove(move_request) + def _move_relative(self, camera_name: str, pan, tilt, speed) -> None: + if not self.cams[camera_name]["relative_fov_supported"]: + logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).") + return + + logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}") + self.get_camera_status(camera_name) + + if self.cams[camera_name]["active"]: + logger.warning( + f"{camera_name} is already performing an action, not moving..." + ) + return + + self.cams[camera_name]["active"] = True + self.camera_metrics[camera_name]["ptz_moving"].value = True + onvif: ONVIFCamera = self.cams[camera_name]["onvif"] + move_request = self.cams[camera_name]["relative_move_request"] + + # function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera. + # The onvif spec says this can report as +INF and -INF, so this may need to be modified + pan = numpy.interp( + pan, + [-1, 1], + [ + self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"], + self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"], + ], + ) + tilt = numpy.interp( + tilt, + [-1, 1], + [ + self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"], + self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"], + ], + ) + + move_request.Speed = { + "PanTilt": { + "x": speed, + "y": speed, + }, + "Zoom": 0, + } + + # move pan and tilt separately + move_request.Translation.PanTilt.x = pan + move_request.Translation.PanTilt.y = 0 + move_request.Translation.Zoom.x = 0 + + onvif.get_service("ptz").RelativeMove(move_request) + + move_request.Translation.PanTilt.x = 0 + move_request.Translation.PanTilt.y = tilt + move_request.Translation.Zoom.x = 0 + + onvif.get_service("ptz").RelativeMove(move_request) + + self.cams[camera_name]["active"] = False + def _move_to_preset(self, camera_name: str, preset: str) -> None: if preset not in self.cams[camera_name]["presets"]: logger.error(f"{preset} is not a valid preset for {camera_name}") return self.cams[camera_name]["active"] = True + self.camera_metrics[camera_name]["ptz_moving"].value = True move_request = self.cams[camera_name]["move_request"] onvif: ONVIFCamera = self.cams[camera_name]["onvif"] preset_token = self.cams[camera_name]["presets"][preset] @@ -158,6 +278,7 @@ class OnvifController: "PresetToken": preset_token, } ) + self.camera_metrics[camera_name]["ptz_moving"].value = False self.cams[camera_name]["active"] = False def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: @@ -216,3 +337,28 @@ class OnvifController: "features": self.cams[camera_name]["features"], "presets": list(self.cams[camera_name]["presets"].keys()), } + + def get_camera_status(self, camera_name: str) -> dict[str, any]: + if camera_name not in self.cams.keys(): + logger.error(f"Onvif is not setup for {camera_name}") + return {} + + if not self.cams[camera_name]["init"]: + self._init_onvif(camera_name) + + onvif: ONVIFCamera = self.cams[camera_name]["onvif"] + status_request = self.cams[camera_name]["status_request"] + status = onvif.get_service("ptz").GetStatus(status_request) + + self.cams[camera_name]["active"] = status.MoveStatus.PanTilt != "IDLE" + self.camera_metrics[camera_name]["ptz_moving"].value = ( + status.MoveStatus.PanTilt != "IDLE" + ) + + return { + "pan": status.Position.PanTilt.x, + "tilt": status.Position.PanTilt.y, + "zoom": status.Position.Zoom.x, + "pantilt_moving": status.MoveStatus.PanTilt, + "zoom_moving": status.MoveStatus.Zoom, + } diff --git a/frigate/ptz_autotrack.py b/frigate/ptz_autotrack.py new file mode 100644 index 000000000..5761aa475 --- /dev/null +++ b/frigate/ptz_autotrack.py @@ -0,0 +1,269 @@ +"""Automatically pan, tilt, and zoom on detected objects via onvif.""" + +import logging +import threading +import time +from multiprocessing.synchronize import Event as MpEvent + +import cv2 +import numpy as np +from norfair.camera_motion import MotionEstimator, TranslationTransformationGetter + +from frigate.config import CameraConfig, FrigateConfig +from frigate.ptz import OnvifController +from frigate.types import CameraMetricsTypes +from frigate.util import SharedMemoryFrameManager, intersection_over_union + +logger = logging.getLogger(__name__) + + +class PtzMotionEstimator: + def __init__(self, config: CameraConfig, ptz_moving) -> None: + self.frame_manager = SharedMemoryFrameManager() + # homography is nice (zooming) but slow, translation is pan/tilt only but fast. + self.norfair_motion_estimator = MotionEstimator( + transformations_getter=TranslationTransformationGetter() + ) + self.camera_config = config + self.coord_transformations = None + self.ptz_moving = ptz_moving + logger.debug(f"Motion estimator init for cam: {config.name}") + + def motion_estimator(self, detections, frame_time, camera_name): + if self.camera_config.onvif.autotracking.enabled and self.ptz_moving.value: + logger.debug(f"Motion estimator running for {camera_name}") + + frame_id = f"{camera_name}{frame_time}" + frame = self.frame_manager.get(frame_id, self.camera_config.frame_shape) + + # mask out detections for better motion estimation + mask = np.ones(frame.shape[:2], frame.dtype) + + detection_boxes = [x[2] for x in detections] + for detection in detection_boxes: + x1, y1, x2, y2 = detection + mask[y1:y2, x1:x2] = 0 + + # merge camera config motion mask with detections. Norfair function needs 0,1 mask + mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1) + + # Norfair estimator function needs color + frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA) + + self.coord_transformations = self.norfair_motion_estimator.update( + frame, mask + ) + + self.frame_manager.close(frame_id) + + return self.coord_transformations + + return None + + +class PtzAutoTrackerThread(threading.Thread): + def __init__( + self, + config: FrigateConfig, + onvif: OnvifController, + camera_metrics: CameraMetricsTypes, + stop_event: MpEvent, + ) -> None: + threading.Thread.__init__(self) + self.name = "frigate_ptz_autotracker" + self.ptz_autotracker = PtzAutoTracker(config, onvif, camera_metrics) + self.stop_event = stop_event + self.config = config + + def run(self): + while not self.stop_event.is_set(): + for camera_name, cam in self.config.cameras.items(): + if cam.onvif.autotracking.enabled: + self.ptz_autotracker.camera_maintenance(camera_name) + time.sleep(1) + logger.info("Exiting autotracker...") + + +class PtzAutoTracker: + def __init__( + self, + config: FrigateConfig, + onvif: OnvifController, + camera_metrics: CameraMetricsTypes, + ) -> None: + self.config = config + self.onvif = onvif + self.camera_metrics = camera_metrics + self.tracked_object: dict[str, object] = {} + self.tracked_object_previous: dict[str, object] = {} + self.object_types = {} + self.required_zones = {} + + # if cam is set to autotrack, onvif should be set up + for camera_name, cam in self.config.cameras.items(): + if cam.onvif.autotracking.enabled: + logger.debug(f"Autotracker init for cam: {camera_name}") + + self.object_types[camera_name] = cam.onvif.autotracking.track + self.required_zones[camera_name] = cam.onvif.autotracking.required_zones + + self.tracked_object[camera_name] = None + self.tracked_object_previous[camera_name] = None + + if not onvif.cams[camera_name]["init"]: + if not self.onvif._init_onvif(camera_name): + return + if not onvif.cams[camera_name]["relative_fov_supported"]: + cam.onvif.autotracking.enabled = False + self.camera_metrics[camera_name][ + "ptz_autotracker_enabled" + ].value = False + logger.warning( + f"Disabling autotracking for {camera_name}: FOV relative movement not supported" + ) + + def _autotrack_move_ptz(self, camera, obj): + camera_config = self.config.cameras[camera] + + # # frame width and height + camera_width = camera_config.frame_shape[1] + camera_height = camera_config.frame_shape[0] + + # Normalize coordinates. top right of the fov is (1,1). + pan = 0.5 - (obj.obj_data["centroid"][0] / camera_width) + tilt = 0.5 - (obj.obj_data["centroid"][1] / camera_height) + + # Calculate zoom amount + size_ratio = camera_config.onvif.autotracking.size_ratio + int(size_ratio * camera_width) + int(size_ratio * camera_height) + + # ideas: check object velocity for camera speed? + self.onvif._move_relative(camera, -pan, tilt, 1) + + def autotrack_object(self, camera, obj): + camera_config = self.config.cameras[camera] + + # check if ptz is moving + self.onvif.get_camera_status(camera) + + if camera_config.onvif.autotracking.enabled: + # either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive, and is not initially motionless + # or one we're already tracking, which assumes all those things are already true + if ( + # new object + self.tracked_object[camera] is None + and obj.camera == camera + and obj.obj_data["label"] in self.object_types[camera] + and set(obj.entered_zones) & set(self.required_zones[camera]) + and not obj.previous["false_positive"] + and not obj.false_positive + and self.tracked_object_previous[camera] is None + ): + logger.debug(f"Autotrack: New object: {obj.to_dict()}") + self.tracked_object[camera] = obj + self.tracked_object_previous[camera] = obj + self._autotrack_move_ptz(camera, obj) + + return + + if ( + # already tracking an object + self.tracked_object[camera] is not None + and self.tracked_object_previous[camera] is not None + and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"] + and obj.obj_data["frame_time"] + != self.tracked_object_previous[camera].obj_data["frame_time"] + ): + # don't move the ptz if we're relatively close to the existing box + # should we use iou or euclidean distance or both? + # distance = math.sqrt((obj.obj_data["centroid"][0] - camera_width/2)**2 + (obj.obj_data["centroid"][1] - obj.camera_height/2)**2) + # if distance <= (self.camera_width * .15) or distance <= (self.camera_height * .15) + if ( + intersection_over_union( + self.tracked_object_previous[camera].obj_data["box"], + obj.obj_data["box"], + ) + < 0.05 + ): + logger.debug( + f"Autotrack: Existing object (do NOT move ptz): {obj.to_dict()}" + ) + return + + logger.debug(f"Autotrack: Existing object (move ptz): {obj.to_dict()}") + self.tracked_object_previous[camera] = obj + self._autotrack_move_ptz(camera, obj) + + return + + if ( + # The tracker lost an object, so let's check the previous object's region and compare it with the incoming object + # If it's within bounds, start tracking that object. + # Should we check region (maybe too broad) or expand the previous object's box a bit and check that? + self.tracked_object[camera] is None + and obj.camera == camera + and obj.obj_data["label"] in self.object_types[camera] + and not obj.previous["false_positive"] + and not obj.false_positive + and obj.obj_data["motionless_count"] == 0 + and self.tracked_object_previous[camera] is not None + ): + if ( + intersection_over_union( + self.tracked_object_previous[camera].obj_data["region"], + obj.obj_data["box"], + ) + < 0.2 + ): + logger.debug(f"Autotrack: Reacquired object: {obj.to_dict()}") + self.tracked_object[camera] = obj + self.tracked_object_previous[camera] = obj + self._autotrack_move_ptz(camera, obj) + + return + + def end_object(self, camera, obj): + if self.config.cameras[camera].onvif.autotracking.enabled: + if ( + self.tracked_object[camera] is not None + and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"] + ): + logger.debug(f"Autotrack: End object: {obj.to_dict()}") + self.tracked_object[camera] = None + self.onvif.get_camera_status(camera) + + def camera_maintenance(self, camera): + # calls get_camera_status to check/update ptz movement + # returns camera to preset after timeout when tracking is over + autotracker_config = self.config.cameras[camera].onvif.autotracking + + if autotracker_config.enabled: + # regularly update camera status + if self.camera_metrics[camera]["ptz_moving"].value: + self.onvif.get_camera_status(camera) + + # return to preset if tracking is over + if ( + self.tracked_object[camera] is None + and self.tracked_object_previous[camera] is not None + and ( + # might want to use a different timestamp here? + time.time() - self.tracked_object_previous[camera].last_published + > autotracker_config.timeout + ) + and autotracker_config.return_preset + and not self.camera_metrics[camera]["ptz_moving"].value + ): + logger.debug( + f"Autotrack: Time is {time.time()}, returning to preset: {autotracker_config.return_preset}" + ) + self.onvif._move_to_preset( + camera, + autotracker_config.return_preset.lower(), + ) + self.tracked_object_previous[camera] = None + + def disable_autotracking(self, camera): + # need to call this if autotracking is disabled by mqtt?? + self.tracked_object[camera] = None diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index b0c4621b4..48cc029ee 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -5,7 +5,8 @@ import numpy as np from norfair import Detection, Drawable, Tracker, draw_boxes from norfair.drawing.drawer import Drawer -from frigate.config import DetectConfig +from frigate.config import CameraConfig +from frigate.ptz_autotrack import PtzMotionEstimator from frigate.track import ObjectTracker from frigate.util import intersection_over_union @@ -54,12 +55,16 @@ def frigate_distance(detection: Detection, tracked_object) -> float: class NorfairTracker(ObjectTracker): - def __init__(self, config: DetectConfig): + def __init__(self, config: CameraConfig, ptz_autotracker_enabled, ptz_moving): self.tracked_objects = {} self.disappeared = {} self.positions = {} - self.max_disappeared = config.max_disappeared - self.detect_config = config + self.max_disappeared = config.detect.max_disappeared + self.camera_config = config + self.detect_config = config.detect + self.ptz_autotracker_enabled = ptz_autotracker_enabled.value + self.ptz_moving = ptz_moving + self.camera_name = config.name self.track_id_map = {} # TODO: could also initialize a tracker per object class if there # was a good reason to have different distance calculations @@ -69,6 +74,8 @@ class NorfairTracker(ObjectTracker): initialization_delay=0, hit_counter_max=self.max_disappeared, ) + if self.ptz_autotracker_enabled: + self.ptz_motion_estimator = PtzMotionEstimator(config, self.ptz_moving) def register(self, track_id, obj): rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) @@ -230,7 +237,19 @@ class NorfairTracker(ObjectTracker): ) ) - tracked_objects = self.tracker.update(detections=norfair_detections) + coord_transformations = None + + if ( + self.ptz_autotracker_enabled + and self.camera_config.onvif.autotracking.motion_estimator + ): + coord_transformations = self.ptz_motion_estimator.motion_estimator( + detections, frame_time, self.camera_name + ) + + tracked_objects = self.tracker.update( + detections=norfair_detections, coord_transformations=coord_transformations + ) # update or create new tracks active_ids = [] diff --git a/frigate/types.py b/frigate/types.py index 8c3e54654..29991552f 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -16,6 +16,8 @@ class CameraMetricsTypes(TypedDict): frame_queue: Queue motion_enabled: Synchronized improve_contrast_enabled: Synchronized + ptz_autotracker_enabled: Synchronized + ptz_moving: Synchronized motion_threshold: Synchronized motion_contour_area: Synchronized process: Optional[Process] diff --git a/frigate/video.py b/frigate/video.py index c02ad15c4..4dfcbedab 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -457,6 +457,8 @@ def track_camera( detection_enabled = process_info["detection_enabled"] motion_enabled = process_info["motion_enabled"] improve_contrast_enabled = process_info["improve_contrast_enabled"] + ptz_autotracker_enabled = process_info["ptz_autotracker_enabled"] + ptz_moving = process_info["ptz_moving"] motion_threshold = process_info["motion_threshold"] motion_contour_area = process_info["motion_contour_area"] @@ -476,7 +478,7 @@ def track_camera( name, labelmap, detection_queue, result_connection, model_config, stop_event ) - object_tracker = NorfairTracker(config.detect) + object_tracker = NorfairTracker(config, ptz_autotracker_enabled, ptz_moving) frame_manager = SharedMemoryFrameManager() @@ -497,6 +499,7 @@ def track_camera( detection_enabled, motion_enabled, stop_event, + ptz_moving, ) logger.info(f"{name}: exiting subprocess") @@ -721,6 +724,7 @@ def process_frames( detection_enabled: mp.Value, motion_enabled: mp.Value, stop_event, + ptz_moving: mp.Value, exit_on_empty: bool = False, ): # attribute labels are not tracked and are not assigned regions