mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-30 13:48:07 +02:00
* i18n translated label fixes * Fix frame cache race bug Objects that were marked as false positives (that would later become true positives) would sometimes have their saved frame prematurely removed from the frame cache.
544 lines
21 KiB
Python
544 lines
21 KiB
Python
"""Maintains state of camera."""
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import threading
|
|
from collections import defaultdict
|
|
from typing import Any, Callable
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from frigate.config import (
|
|
FrigateConfig,
|
|
ZoomingModeEnum,
|
|
)
|
|
from frigate.const import CLIPS_DIR, THUMB_DIR
|
|
from frigate.ptz.autotrack import PtzAutoTrackerThread
|
|
from frigate.track.tracked_object import TrackedObject
|
|
from frigate.util.image import (
|
|
SharedMemoryFrameManager,
|
|
draw_box_with_label,
|
|
draw_timestamp,
|
|
is_better_thumbnail,
|
|
is_label_printable,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CameraState:
|
|
def __init__(
|
|
self,
|
|
name,
|
|
config: FrigateConfig,
|
|
frame_manager: SharedMemoryFrameManager,
|
|
ptz_autotracker_thread: PtzAutoTrackerThread,
|
|
):
|
|
self.name = name
|
|
self.config = config
|
|
self.camera_config = config.cameras[name]
|
|
self.frame_manager = frame_manager
|
|
self.best_objects: dict[str, TrackedObject] = {}
|
|
self.tracked_objects: dict[str, TrackedObject] = {}
|
|
self.frame_cache = {}
|
|
self.zone_objects = defaultdict(list)
|
|
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
|
|
self.current_frame_lock = threading.Lock()
|
|
self.current_frame_time = 0.0
|
|
self.motion_boxes = []
|
|
self.regions = []
|
|
self.previous_frame_id = None
|
|
self.callbacks = defaultdict(list)
|
|
self.ptz_autotracker_thread = ptz_autotracker_thread
|
|
self.prev_enabled = self.camera_config.enabled
|
|
|
|
def get_current_frame(self, draw_options: dict[str, Any] = {}):
|
|
with self.current_frame_lock:
|
|
frame_copy = np.copy(self._current_frame)
|
|
frame_time = self.current_frame_time
|
|
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
|
|
motion_boxes = self.motion_boxes.copy()
|
|
regions = self.regions.copy()
|
|
|
|
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
|
|
# draw on the frame
|
|
if draw_options.get("mask"):
|
|
mask_overlay = np.where(self.camera_config.motion.mask == [0])
|
|
frame_copy[mask_overlay] = [0, 0, 0]
|
|
|
|
if draw_options.get("bounding_boxes"):
|
|
# draw the bounding boxes on the frame
|
|
for obj in tracked_objects.values():
|
|
if obj["frame_time"] == frame_time:
|
|
if obj["stationary"]:
|
|
color = (220, 220, 220)
|
|
thickness = 1
|
|
else:
|
|
thickness = 2
|
|
color = self.config.model.colormap.get(
|
|
obj["label"], (255, 255, 255)
|
|
)
|
|
else:
|
|
thickness = 1
|
|
color = (255, 0, 0)
|
|
|
|
# draw thicker box around ptz autotracked object
|
|
if (
|
|
self.camera_config.onvif.autotracking.enabled
|
|
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
|
|
self.name
|
|
]
|
|
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
|
|
self.name
|
|
]
|
|
is not None
|
|
and obj["id"]
|
|
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
|
|
self.name
|
|
].obj_data["id"]
|
|
and obj["frame_time"] == frame_time
|
|
):
|
|
thickness = 5
|
|
color = self.config.model.colormap.get(
|
|
obj["label"], (255, 255, 255)
|
|
)
|
|
|
|
# debug autotracking zooming - show the zoom factor box
|
|
if (
|
|
self.camera_config.onvif.autotracking.zooming
|
|
!= ZoomingModeEnum.disabled
|
|
):
|
|
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
|
|
self.name
|
|
]["max_target_box"]
|
|
side_length = max_target_box * (
|
|
max(
|
|
self.camera_config.detect.width,
|
|
self.camera_config.detect.height,
|
|
)
|
|
)
|
|
|
|
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
|
|
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
|
|
top_left = (
|
|
int(centroid_x - side_length // 2),
|
|
int(centroid_y - side_length // 2),
|
|
)
|
|
bottom_right = (
|
|
int(centroid_x + side_length // 2),
|
|
int(centroid_y + side_length // 2),
|
|
)
|
|
cv2.rectangle(
|
|
frame_copy,
|
|
top_left,
|
|
bottom_right,
|
|
(255, 255, 0),
|
|
2,
|
|
)
|
|
|
|
# draw the bounding boxes on the frame
|
|
box = obj["box"]
|
|
text = (
|
|
obj["sub_label"][0]
|
|
if (
|
|
obj.get("sub_label") and is_label_printable(obj["sub_label"][0])
|
|
)
|
|
else obj.get("recognized_license_plate", [None])[0]
|
|
if (
|
|
obj.get("recognized_license_plate")
|
|
and obj["recognized_license_plate"][0]
|
|
)
|
|
else obj["label"]
|
|
)
|
|
draw_box_with_label(
|
|
frame_copy,
|
|
box[0],
|
|
box[1],
|
|
box[2],
|
|
box[3],
|
|
text,
|
|
f"{obj['score']:.0%} {int(obj['area'])}"
|
|
+ (
|
|
f" {float(obj['current_estimated_speed']):.1f}"
|
|
if obj["current_estimated_speed"] != 0
|
|
else ""
|
|
),
|
|
thickness=thickness,
|
|
color=color,
|
|
)
|
|
|
|
# draw any attributes
|
|
for attribute in obj["current_attributes"]:
|
|
box = attribute["box"]
|
|
box_area = int((box[2] - box[0]) * (box[3] - box[1]))
|
|
draw_box_with_label(
|
|
frame_copy,
|
|
box[0],
|
|
box[1],
|
|
box[2],
|
|
box[3],
|
|
attribute["label"],
|
|
f"{attribute['score']:.0%} {str(box_area)}",
|
|
thickness=thickness,
|
|
color=color,
|
|
)
|
|
|
|
if draw_options.get("regions"):
|
|
for region in regions:
|
|
cv2.rectangle(
|
|
frame_copy,
|
|
(region[0], region[1]),
|
|
(region[2], region[3]),
|
|
(0, 255, 0),
|
|
2,
|
|
)
|
|
|
|
if draw_options.get("zones"):
|
|
for name, zone in self.camera_config.zones.items():
|
|
thickness = (
|
|
8
|
|
if any(
|
|
name in obj["current_zones"] for obj in tracked_objects.values()
|
|
)
|
|
else 2
|
|
)
|
|
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
|
|
|
|
if draw_options.get("motion_boxes"):
|
|
for m_box in motion_boxes:
|
|
cv2.rectangle(
|
|
frame_copy,
|
|
(m_box[0], m_box[1]),
|
|
(m_box[2], m_box[3]),
|
|
(0, 0, 255),
|
|
2,
|
|
)
|
|
|
|
if draw_options.get("timestamp"):
|
|
color = self.camera_config.timestamp_style.color
|
|
draw_timestamp(
|
|
frame_copy,
|
|
frame_time,
|
|
self.camera_config.timestamp_style.format,
|
|
font_effect=self.camera_config.timestamp_style.effect,
|
|
font_thickness=self.camera_config.timestamp_style.thickness,
|
|
font_color=(color.blue, color.green, color.red),
|
|
position=self.camera_config.timestamp_style.position,
|
|
)
|
|
|
|
return frame_copy
|
|
|
|
def finished(self, obj_id):
|
|
del self.tracked_objects[obj_id]
|
|
|
|
def on(self, event_type: str, callback: Callable[[dict], None]):
|
|
self.callbacks[event_type].append(callback)
|
|
|
|
def update(
|
|
self,
|
|
frame_name: str,
|
|
frame_time: float,
|
|
current_detections: dict[str, dict[str, Any]],
|
|
motion_boxes: list[tuple[int, int, int, int]],
|
|
regions: list[tuple[int, int, int, int]],
|
|
):
|
|
current_frame = self.frame_manager.get(
|
|
frame_name, self.camera_config.frame_shape_yuv
|
|
)
|
|
|
|
tracked_objects = self.tracked_objects.copy()
|
|
current_ids = set(current_detections.keys())
|
|
previous_ids = set(tracked_objects.keys())
|
|
removed_ids = previous_ids.difference(current_ids)
|
|
new_ids = current_ids.difference(previous_ids)
|
|
updated_ids = current_ids.intersection(previous_ids)
|
|
|
|
for id in new_ids:
|
|
logger.debug(f"{self.name}: New tracked object ID: {id}")
|
|
new_obj = tracked_objects[id] = TrackedObject(
|
|
self.config.model,
|
|
self.camera_config,
|
|
self.config.ui,
|
|
self.frame_cache,
|
|
current_detections[id],
|
|
)
|
|
|
|
# add initial frame to frame cache
|
|
logger.debug(
|
|
f"{self.name}: New object, adding {frame_time} to frame cache for {id}"
|
|
)
|
|
self.frame_cache[frame_time] = {
|
|
"frame": np.copy(current_frame),
|
|
"object_id": id,
|
|
}
|
|
|
|
# save initial thumbnail data and best object
|
|
thumbnail_data = {
|
|
"frame_time": frame_time,
|
|
"box": new_obj.obj_data["box"],
|
|
"area": new_obj.obj_data["area"],
|
|
"region": new_obj.obj_data["region"],
|
|
"score": new_obj.obj_data["score"],
|
|
"attributes": new_obj.obj_data["attributes"],
|
|
"current_estimated_speed": 0,
|
|
"velocity_angle": 0,
|
|
"path_data": [],
|
|
"recognized_license_plate": None,
|
|
"recognized_license_plate_score": None,
|
|
}
|
|
new_obj.thumbnail_data = thumbnail_data
|
|
tracked_objects[id].thumbnail_data = thumbnail_data
|
|
object_type = new_obj.obj_data["label"]
|
|
|
|
# call event handlers
|
|
self.send_mqtt_snapshot(new_obj, object_type)
|
|
|
|
for c in self.callbacks["start"]:
|
|
c(self.name, new_obj, frame_name)
|
|
|
|
for id in updated_ids:
|
|
updated_obj = tracked_objects[id]
|
|
thumb_update, significant_update, path_update, autotracker_update = (
|
|
updated_obj.update(
|
|
frame_time, current_detections[id], current_frame is not None
|
|
)
|
|
)
|
|
|
|
if autotracker_update or significant_update:
|
|
for c in self.callbacks["autotrack"]:
|
|
c(self.name, updated_obj, frame_name)
|
|
|
|
if thumb_update and current_frame is not None:
|
|
# ensure this frame is stored in the cache
|
|
if (
|
|
updated_obj.thumbnail_data["frame_time"] == frame_time
|
|
and frame_time not in self.frame_cache
|
|
):
|
|
logger.debug(
|
|
f"{self.name}: Existing object, adding {frame_time} to frame cache for {id}"
|
|
)
|
|
self.frame_cache[frame_time] = {
|
|
"frame": np.copy(current_frame),
|
|
"object_id": id,
|
|
}
|
|
|
|
updated_obj.last_updated = frame_time
|
|
|
|
# if it has been more than 5 seconds since the last thumb update
|
|
# and the last update is greater than the last publish or
|
|
# the object has changed significantly or
|
|
# the object moved enough to update the path
|
|
if (
|
|
(
|
|
frame_time - updated_obj.last_published > 5
|
|
and updated_obj.last_updated > updated_obj.last_published
|
|
)
|
|
or significant_update
|
|
or path_update
|
|
):
|
|
# call event handlers
|
|
for c in self.callbacks["update"]:
|
|
c(self.name, updated_obj, frame_name)
|
|
updated_obj.last_published = frame_time
|
|
|
|
for id in removed_ids:
|
|
# publish events to mqtt
|
|
removed_obj = tracked_objects[id]
|
|
if "end_time" not in removed_obj.obj_data:
|
|
removed_obj.obj_data["end_time"] = frame_time
|
|
logger.debug(f"{self.name}: end callback for object {id}")
|
|
for c in self.callbacks["end"]:
|
|
c(self.name, removed_obj, frame_name)
|
|
|
|
# TODO: can i switch to looking this up and only changing when an event ends?
|
|
# maintain best objects
|
|
camera_activity: dict[str, list[Any]] = {
|
|
"motion": len(motion_boxes) > 0,
|
|
"objects": [],
|
|
}
|
|
|
|
for obj in tracked_objects.values():
|
|
object_type = obj.obj_data["label"]
|
|
active = obj.is_active()
|
|
|
|
if not obj.false_positive:
|
|
label = object_type
|
|
sub_label = None
|
|
|
|
if obj.obj_data.get("sub_label"):
|
|
if (
|
|
obj.obj_data.get("sub_label")[0]
|
|
in self.config.model.all_attributes
|
|
):
|
|
label = obj.obj_data["sub_label"][0]
|
|
else:
|
|
label = f"{object_type}-verified"
|
|
sub_label = obj.obj_data["sub_label"][0]
|
|
|
|
camera_activity["objects"].append(
|
|
{
|
|
"id": obj.obj_data["id"],
|
|
"label": label,
|
|
"stationary": not active,
|
|
"area": obj.obj_data["area"],
|
|
"ratio": obj.obj_data["ratio"],
|
|
"score": obj.obj_data["score"],
|
|
"sub_label": sub_label,
|
|
"current_zones": obj.current_zones,
|
|
}
|
|
)
|
|
|
|
# if we don't have access to the current frame or
|
|
# if the object's thumbnail is not from the current frame, skip
|
|
if (
|
|
current_frame is None
|
|
or obj.thumbnail_data is None
|
|
or obj.false_positive
|
|
or obj.thumbnail_data["frame_time"] != frame_time
|
|
):
|
|
continue
|
|
|
|
if object_type in self.best_objects:
|
|
current_best = self.best_objects[object_type]
|
|
now = datetime.datetime.now().timestamp()
|
|
# if the object is a higher score than the current best score
|
|
# or the current object is older than desired, use the new object
|
|
if (
|
|
is_better_thumbnail(
|
|
object_type,
|
|
current_best.thumbnail_data,
|
|
obj.thumbnail_data,
|
|
self.camera_config.frame_shape,
|
|
)
|
|
or (now - current_best.thumbnail_data["frame_time"])
|
|
> self.camera_config.best_image_timeout
|
|
):
|
|
self.send_mqtt_snapshot(obj, object_type)
|
|
else:
|
|
self.send_mqtt_snapshot(obj, object_type)
|
|
|
|
for c in self.callbacks["camera_activity"]:
|
|
c(self.name, camera_activity)
|
|
|
|
# cleanup thumbnail frame cache
|
|
current_thumb_frames = {
|
|
obj.thumbnail_data["frame_time"]
|
|
for obj in tracked_objects.values()
|
|
if obj.thumbnail_data is not None
|
|
}
|
|
current_best_frames = {
|
|
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
|
|
}
|
|
thumb_frames_to_delete = [
|
|
t
|
|
for t in self.frame_cache.keys()
|
|
if t not in current_thumb_frames and t not in current_best_frames
|
|
]
|
|
if len(thumb_frames_to_delete) > 0:
|
|
logger.debug(f"{self.name}: Current frame cache contents:")
|
|
for k, v in self.frame_cache.items():
|
|
logger.debug(f" frame time: {k}, object id: {v['object_id']}")
|
|
for obj_id, obj in tracked_objects.items():
|
|
thumb_time = (
|
|
obj.thumbnail_data["frame_time"] if obj.thumbnail_data else None
|
|
)
|
|
logger.debug(
|
|
f"{self.name}: Tracked object {obj_id} thumbnail frame_time: {thumb_time}, false positive: {obj.false_positive}"
|
|
)
|
|
for t in thumb_frames_to_delete:
|
|
object_id = self.frame_cache[t].get("object_id", "unknown")
|
|
logger.debug(f"{self.name}: Deleting {t} from frame cache for {object_id}")
|
|
del self.frame_cache[t]
|
|
|
|
with self.current_frame_lock:
|
|
self.tracked_objects = tracked_objects
|
|
self.motion_boxes = motion_boxes
|
|
self.regions = regions
|
|
|
|
if current_frame is not None:
|
|
self.current_frame_time = frame_time
|
|
self._current_frame = np.copy(current_frame)
|
|
|
|
if self.previous_frame_id is not None:
|
|
self.frame_manager.close(self.previous_frame_id)
|
|
|
|
self.previous_frame_id = frame_name
|
|
|
|
def send_mqtt_snapshot(self, new_obj: TrackedObject, object_type: str) -> None:
|
|
for c in self.callbacks["snapshot"]:
|
|
updated = c(self.name, new_obj)
|
|
|
|
# if the snapshot was not updated, then this object is not a best object
|
|
# but all new objects should be considered the next best object
|
|
# so we remove the label from the best objects
|
|
if updated:
|
|
self.best_objects[object_type] = new_obj
|
|
else:
|
|
if object_type in self.best_objects:
|
|
self.best_objects.pop(object_type)
|
|
break
|
|
|
|
def save_manual_event_image(
|
|
self,
|
|
frame: np.ndarray | None,
|
|
event_id: str,
|
|
label: str,
|
|
draw: dict[str, list[dict]],
|
|
) -> None:
|
|
img_frame = frame if frame is not None else self.get_current_frame()
|
|
|
|
# write clean snapshot if enabled
|
|
if self.camera_config.snapshots.clean_copy:
|
|
ret, png = cv2.imencode(".png", img_frame)
|
|
|
|
if ret:
|
|
with open(
|
|
os.path.join(
|
|
CLIPS_DIR,
|
|
f"{self.camera_config.name}-{event_id}-clean.png",
|
|
),
|
|
"wb",
|
|
) as p:
|
|
p.write(png.tobytes())
|
|
|
|
# write jpg snapshot with optional annotations
|
|
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
|
|
for box in draw.get("boxes"):
|
|
x = int(box["box"][0] * self.camera_config.detect.width)
|
|
y = int(box["box"][1] * self.camera_config.detect.height)
|
|
width = int(box["box"][2] * self.camera_config.detect.width)
|
|
height = int(box["box"][3] * self.camera_config.detect.height)
|
|
|
|
draw_box_with_label(
|
|
img_frame,
|
|
x,
|
|
y,
|
|
x + width,
|
|
y + height,
|
|
label,
|
|
f"{box.get('score', '-')}% {int(width * height)}",
|
|
thickness=2,
|
|
color=box.get("color", (255, 0, 0)),
|
|
)
|
|
|
|
ret, jpg = cv2.imencode(".jpg", img_frame)
|
|
with open(
|
|
os.path.join(CLIPS_DIR, f"{self.camera_config.name}-{event_id}.jpg"),
|
|
"wb",
|
|
) as j:
|
|
j.write(jpg.tobytes())
|
|
|
|
# create thumbnail with max height of 175 and save
|
|
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
|
|
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
|
|
thumb_path = os.path.join(THUMB_DIR, self.camera_config.name)
|
|
os.makedirs(thumb_path, exist_ok=True)
|
|
cv2.imwrite(os.path.join(thumb_path, f"{event_id}.webp"), thumb)
|
|
|
|
def shutdown(self) -> None:
|
|
for obj in self.tracked_objects.values():
|
|
if not obj.obj_data.get("end_time"):
|
|
obj.write_thumbnail_to_disk()
|