Refactor manual event api to use ZMQ (#17105)

* Don't wait for topic

* Refactor object processing and camera state

* Move manual event handling to camera state / tracked object

* Cleanup

* Refactor audio to use internal zmq

* Cleanup

* Clenaup

* Cleanup

* Quick label fix

* Fix tests

* Cleanup
This commit is contained in:
Nicolas Mowen 2025-03-11 21:31:05 -06:00 committed by GitHub
parent c6bed1e108
commit b3d5cd9e4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 648 additions and 674 deletions

View File

@ -3,6 +3,8 @@
import datetime
import logging
import os
import random
import string
from functools import reduce
from pathlib import Path
from urllib.parse import unquote
@ -43,9 +45,8 @@ from frigate.api.defs.tags import Tags
from frigate.comms.event_metadata_updater import EventMetadataTypeEnum
from frigate.const import CLIPS_DIR
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.models import Event, ReviewSegment, Timeline
from frigate.object_processing import TrackedObject, TrackedObjectProcessor
from frigate.track.object_processing import TrackedObject
from frigate.util.builtin import get_tz_modifiers
logger = logging.getLogger(__name__)
@ -1202,28 +1203,25 @@ def create_event(
status_code=404,
)
try:
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
external_processor: ExternalEventProcessor = request.app.external_processor
now = datetime.datetime.now().timestamp()
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
frame = frame_processor.get_current_frame(camera_name)
event_id = external_processor.create_manual_event(
request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
camera_name,
label,
body.source_type,
body.sub_label,
body.score,
body.duration,
event_id,
body.include_recording,
body.score,
body.sub_label,
body.duration,
body.source_type,
body.draw,
frame,
)
except Exception as e:
logger.error(e)
return JSONResponse(
content=({"success": False, "message": "An unknown error occurred"}),
status_code=500,
)
),
)
return JSONResponse(
content=(
@ -1245,7 +1243,9 @@ def create_event(
def end_event(request: Request, event_id: str, body: EventsEndBody):
try:
end_time = body.end_time or datetime.datetime.now().timestamp()
request.app.external_processor.finish_manual_event(event_id, end_time)
request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_end, (event_id, end_time)
)
except Exception:
return JSONResponse(
content=(

View File

@ -27,7 +27,6 @@ from frigate.comms.event_metadata_updater import (
)
from frigate.config import FrigateConfig
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.ptz.onvif import OnvifController
from frigate.stats.emitter import StatsEmitter
from frigate.storage import StorageMaintainer
@ -56,7 +55,6 @@ def create_fastapi_app(
detected_frames_processor,
storage_maintainer: StorageMaintainer,
onvif: OnvifController,
external_processor: ExternalEventProcessor,
stats_emitter: StatsEmitter,
event_metadata_updater: EventMetadataPublisher,
):
@ -129,7 +127,6 @@ def create_fastapi_app(
app.onvif = onvif
app.stats_emitter = stats_emitter
app.event_metadata_updater = event_metadata_updater
app.external_processor = external_processor
app.jwt_token = get_jwt_secret() if frigate_config.auth.enabled else None
return app

View File

@ -37,7 +37,7 @@ from frigate.const import (
RECORD_DIR,
)
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.object_processing import TrackedObjectProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import get_tz_modifiers
from frigate.util.image import get_image_from_recording
from frigate.util.path import get_event_thumbnail_bytes

View File

@ -43,7 +43,6 @@ from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.models import (
Event,
@ -57,7 +56,6 @@ from frigate.models import (
User,
)
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
@ -69,6 +67,7 @@ from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
@ -318,9 +317,6 @@ class FrigateApp:
# Create a client for other processes to use
self.embeddings = EmbeddingsContext(self.db)
def init_external_event_processor(self) -> None:
self.external_event_processor = ExternalEventProcessor(self.config)
def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator()
self.inter_config_updater = ConfigPublisher()
@ -657,7 +653,6 @@ class FrigateApp:
self.start_camera_capture_processes()
self.start_audio_processor()
self.start_storage_maintainer()
self.init_external_event_processor()
self.start_stats_emitter()
self.start_timeline_processor()
self.start_event_processor()
@ -676,7 +671,6 @@ class FrigateApp:
self.detected_frames_processor,
self.storage_maintainer,
self.onvif_controller,
self.external_event_processor,
self.stats_emitter,
self.event_metadata_updater,
),
@ -748,7 +742,6 @@ class FrigateApp:
self.review_segment_process.terminate()
self.review_segment_process.join()
self.external_event_processor.stop()
self.dispatcher.stop()
self.ptz_autotracker_thread.join()

464
frigate/camera/state.py Normal file
View File

@ -0,0 +1,464 @@
"""Maintains state of camera."""
import datetime
import logging
import os
import threading
from collections import defaultdict
from typing import Callable
import cv2
import numpy as np
from frigate.config import (
FrigateConfig,
ZoomingModeEnum,
)
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
SharedMemoryFrameManager,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail,
is_label_printable,
)
logger = logging.getLogger(__name__)
class CameraState:
def __init__(
self,
name,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.mask == [0])
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
# draw the bounding boxes on the frame
for obj in tracked_objects.values():
if obj["frame_time"] == frame_time:
if obj["stationary"]:
color = (220, 220, 220)
thickness = 1
else:
thickness = 2
color = self.config.model.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw thicker box around ptz autotracked object
if (
self.camera_config.onvif.autotracking.enabled
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
self.name
]
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
]
is not None
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
and obj["frame_time"] == frame_time
):
thickness = 5
color = self.config.model.colormap[obj["label"]]
# debug autotracking zooming - show the zoom factor box
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
self.camera_config.detect.height,
)
)
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
top_left = (
int(centroid_x - side_length // 2),
int(centroid_y - side_length // 2),
)
bottom_right = (
int(centroid_x + side_length // 2),
int(centroid_y + side_length // 2),
)
cv2.rectangle(
frame_copy,
top_left,
bottom_right,
(255, 255, 0),
2,
)
# draw the bounding boxes on the frame
box = obj["box"]
text = (
obj["label"]
if (
not obj.get("sub_label")
or not is_label_printable(obj["sub_label"][0])
)
else obj["sub_label"][0]
)
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
text,
f"{obj['score']:.0%} {int(obj['area'])}"
+ (
f" {float(obj['current_estimated_speed']):.1f}"
if obj["current_estimated_speed"] != 0
else ""
),
thickness=thickness,
color=color,
)
# draw any attributes
for attribute in obj["current_attributes"]:
box = attribute["box"]
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%}",
thickness=thickness,
color=color,
)
if draw_options.get("regions"):
for region in regions:
cv2.rectangle(
frame_copy,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
if draw_options.get("zones"):
for name, zone in self.camera_config.zones.items():
thickness = (
8
if any(
name in obj["current_zones"] for obj in tracked_objects.values()
)
else 2
)
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
if draw_options.get("motion_boxes"):
for m_box in motion_boxes:
cv2.rectangle(
frame_copy,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
position=self.camera_config.timestamp_style.position,
)
return frame_copy
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback)
def update(
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
removed_ids = previous_ids.difference(current_ids)
new_ids = current_ids.difference(previous_ids)
updated_ids = current_ids.intersection(previous_ids)
for id in new_ids:
new_obj = tracked_objects[id] = TrackedObject(
self.config.model,
self.camera_config,
self.config.ui,
self.frame_cache,
current_detections[id],
)
# call event handlers
for c in self.callbacks["start"]:
c(self.name, new_obj, frame_name)
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
self.frame_cache[frame_time] = np.copy(current_frame)
updated_obj.last_updated = frame_time
# if it has been more than 5 seconds since the last thumb update
# and the last update is greater than the last publish or
# the object has changed significantly
if (
frame_time - updated_obj.last_published > 5
and updated_obj.last_updated > updated_obj.last_published
) or significant_update:
# call event handlers
for c in self.callbacks["update"]:
c(self.name, updated_obj, frame_name)
updated_obj.last_published = frame_time
for id in removed_ids:
# publish events to mqtt
removed_obj = tracked_objects[id]
if "end_time" not in removed_obj.obj_data:
removed_obj.obj_data["end_time"] = frame_time
for c in self.callbacks["end"]:
c(self.name, removed_obj, frame_name)
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
"enabled": True,
"motion": len(motion_boxes) > 0,
"objects": [],
}
for obj in tracked_objects.values():
object_type = obj.obj_data["label"]
active = obj.is_active()
if not obj.false_positive:
label = object_type
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
sub_label = obj.obj_data["sub_label"][0]
camera_activity["objects"].append(
{
"id": obj.obj_data["id"],
"label": label,
"stationary": not active,
"area": obj.obj_data["area"],
"ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"],
"sub_label": sub_label,
"current_zones": obj.current_zones,
}
)
# if we don't have access to the current frame or
# if the object's thumbnail is not from the current frame, skip
if (
current_frame is None
or obj.thumbnail_data is None
or obj.false_positive
or obj.thumbnail_data["frame_time"] != frame_time
):
continue
if object_type in self.best_objects:
current_best = self.best_objects[object_type]
now = datetime.datetime.now().timestamp()
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
):
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
else:
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity)
# cleanup thumbnail frame cache
current_thumb_frames = {
obj.thumbnail_data["frame_time"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
}
thumb_frames_to_delete = [
t
for t in self.frame_cache.keys()
if t not in current_thumb_frames and t not in current_best_frames
]
for t in thumb_frames_to_delete:
del self.frame_cache[t]
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.motion_boxes = motion_boxes
self.regions = regions
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = np.copy(current_frame)
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_name
def save_manual_event_image(
self, event_id: str, label: str, draw: dict[str, list[dict]]
) -> None:
img_frame = self.get_current_frame()
# write clean snapshot if enabled
if self.camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{self.camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = int(box["box"][0] * self.camera_config.detect.width)
y = int(box["box"][1] * self.camera_config.detect.height)
width = int(box["box"][2] * self.camera_config.detect.width)
height = int(box["box"][3] * self.camera_config.detect.height)
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{self.camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
cv2.imwrite(
os.path.join(THUMB_DIR, self.camera_config.name, f"{event_id}.webp"), thumb
)
def shutdown(self) -> None:
for obj in self.tracked_objects.values():
if not obj.obj_data.get("end_time"):
obj.write_thumbnail_to_disk()

View File

@ -10,6 +10,8 @@ logger = logging.getLogger(__name__)
class EventMetadataTypeEnum(str, Enum):
all = ""
manual_event_create = "manual_event_create"
manual_event_end = "manual_event_end"
regenerate_description = "regenerate_description"
sub_label = "sub_label"

View File

@ -2,17 +2,22 @@
import datetime
import logging
import random
import string
import threading
import time
from typing import Tuple
import numpy as np
import requests
import frigate.util as util
from frigate.camera import CameraMetrics
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig
from frigate.const import (
@ -21,7 +26,6 @@ from frigate.const import (
AUDIO_MAX_BIT_RANGE,
AUDIO_MIN_CONFIDENCE,
AUDIO_SAMPLE_RATE,
FRIGATE_LOCALHOST,
)
from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe
@ -139,6 +143,7 @@ class AudioEventMaintainer(threading.Thread):
f"config/enabled/{camera.name}", True
)
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio)
self.event_metadata_publisher = EventMetadataPublisher()
self.was_enabled = camera.enabled
@ -207,24 +212,33 @@ class AudioEventMaintainer(threading.Thread):
datetime.datetime.now().timestamp()
)
else:
now = datetime.datetime.now().timestamp()
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON")
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{self.config.name}/{label}/create",
json={"duration": None, "score": score, "source_type": "audio"},
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
self.config.name,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
),
)
if resp.status_code == 200:
event_id = resp.json()["event_id"]
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": datetime.datetime.now().timestamp(),
}
else:
self.logger.warning(
f"Failed to create audio event with status code {resp.status_code}"
)
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": now,
}
def expire_detections(self) -> None:
now = datetime.datetime.now().timestamp()
@ -241,17 +255,11 @@ class AudioEventMaintainer(threading.Thread):
f"{self.config.name}/audio/{detection['label']}", "OFF"
)
resp = requests.put(
f"{FRIGATE_LOCALHOST}/api/events/{detection['id']}/end",
json={"end_time": detection["last_detection"]},
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], detection["last_detection"]),
)
if resp.status_code == 200:
self.detections[detection["label"]] = None
else:
self.logger.warning(
f"Failed to end audio event {detection['id']} with status code {resp.status_code}"
)
self.detections[detection["label"]] = None
def expire_all_detections(self) -> None:
"""Immediately end all current detections"""
@ -259,16 +267,11 @@ class AudioEventMaintainer(threading.Thread):
for label, detection in list(self.detections.items()):
if detection:
self.requestor.send_data(f"{self.config.name}/audio/{label}", "OFF")
resp = requests.put(
f"{FRIGATE_LOCALHOST}/api/events/{detection['id']}/end",
json={"end_time": now},
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], now),
)
if resp.status_code == 200:
self.detections[label] = None
else:
self.logger.warning(
f"Failed to end audio event {detection['id']} with status code {resp.status_code}"
)
self.detections[label] = None
def start_or_restart_ffmpeg(self) -> None:
self.audio_listener = start_or_restart_ffmpeg(

View File

@ -1,187 +0,0 @@
"""Handle external events created by the user."""
import datetime
import logging
import os
import random
import string
from enum import Enum
from typing import Optional
import cv2
from numpy import ndarray
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.events_updater import EventUpdatePublisher
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.util.image import draw_box_with_label
logger = logging.getLogger(__name__)
class ManualEventState(str, Enum):
complete = "complete"
start = "start"
end = "end"
class ExternalEventProcessor:
def __init__(self, config: FrigateConfig) -> None:
self.config = config
self.default_thumbnail = None
self.event_sender = EventUpdatePublisher()
self.detection_updater = DetectionPublisher(DetectionTypeEnum.api)
self.event_camera = {}
def create_manual_event(
self,
camera: str,
label: str,
source_type: str,
sub_label: Optional[str],
score: int,
duration: Optional[int],
include_recording: bool,
draw: dict[str, any],
snapshot_frame: Optional[ndarray],
) -> str:
now = datetime.datetime.now().timestamp()
camera_config = self.config.cameras.get(camera)
# create event id and start frame time
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
self._write_images(camera_config, label, event_id, draw, snapshot_frame)
end = now + duration if duration is not None else None
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera,
"start_time": now - camera_config.record.event_pre_capture,
"end_time": end,
"has_clip": camera_config.record.enabled and include_recording,
"has_snapshot": True,
"type": source_type,
},
)
)
if source_type == "api":
self.event_camera[event_id] = camera
self.detection_updater.publish(
(
camera,
now,
{
"state": (
ManualEventState.complete if end else ManualEventState.start
),
"label": f"{label}: {sub_label}" if sub_label else label,
"event_id": event_id,
"end_time": end,
},
)
)
return event_id
def finish_manual_event(self, event_id: str, end_time: float) -> None:
"""Finish external event with indeterminate duration."""
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.end,
None,
"",
{"id": event_id, "end_time": end_time},
)
)
if event_id in self.event_camera:
self.detection_updater.publish(
(
self.event_camera[event_id],
end_time,
{
"state": ManualEventState.end,
"event_id": event_id,
"end_time": end_time,
},
)
)
self.event_camera.pop(event_id)
def _write_images(
self,
camera_config: CameraConfig,
label: str,
event_id: str,
draw: dict[str, any],
img_frame: Optional[ndarray],
) -> None:
if img_frame is None:
return
# write clean snapshot if enabled
if camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = int(box["box"][0] * camera_config.detect.width)
y = int(box["box"][1] * camera_config.detect.height)
width = int(box["box"][2] * camera_config.detect.width)
height = int(box["box"][3] * camera_config.detect.height)
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
cv2.imwrite(
os.path.join(THUMB_DIR, camera_config.name, f"{event_id}.webp"), thumb
)
def stop(self):
self.event_sender.stop()
self.detection_updater.stop()

View File

@ -22,7 +22,7 @@ from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_encode,
)
from frigate.models import Previews
from frigate.object_processing import TrackedObject
from frigate.track.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop
logger = logging.getLogger(__name__)

View File

@ -23,10 +23,9 @@ from frigate.const import (
CLIPS_DIR,
UPSERT_REVIEW_SEGMENT,
)
from frigate.events.external import ManualEventState
from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
from frigate.review.types import SeverityEnum
from frigate.track.object_processing import ManualEventState, TrackedObject
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop
logger = logging.getLogger(__name__)

View File

@ -117,7 +117,6 @@ class BaseTestHttp(unittest.TestCase):
None,
None,
None,
None,
stats,
None,
)

View File

@ -122,7 +122,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
@ -144,7 +143,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
bad_id = "654321.other"
@ -165,7 +163,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
@ -188,7 +185,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
@ -215,7 +211,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
morning_id = "123456.random"
evening_id = "654321.random"
@ -254,7 +249,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
mock_event_updater,
)
id = "123456.random"
@ -300,7 +294,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
mock_event_updater,
)
id = "123456.random"
@ -334,7 +327,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
with TestClient(app) as client:
@ -352,7 +344,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"

View File

@ -4,13 +4,13 @@ import logging
import queue
import threading
from collections import defaultdict
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable, Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.camera.state import CameraState
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher
@ -25,406 +25,20 @@ from frigate.config import (
FrigateConfig,
RecordConfig,
SnapshotsConfig,
ZoomingModeEnum,
)
from frigate.const import UPDATE_CAMERA_ACTIVITY
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event, Timeline
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
SharedMemoryFrameManager,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail,
is_label_printable,
)
from frigate.util.image import SharedMemoryFrameManager
logger = logging.getLogger(__name__)
# Maintains the state of a camera
class CameraState:
def __init__(
self,
name,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.mask == [0])
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
# draw the bounding boxes on the frame
for obj in tracked_objects.values():
if obj["frame_time"] == frame_time:
if obj["stationary"]:
color = (220, 220, 220)
thickness = 1
else:
thickness = 2
color = self.config.model.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw thicker box around ptz autotracked object
if (
self.camera_config.onvif.autotracking.enabled
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
self.name
]
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
]
is not None
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
and obj["frame_time"] == frame_time
):
thickness = 5
color = self.config.model.colormap[obj["label"]]
# debug autotracking zooming - show the zoom factor box
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
self.camera_config.detect.height,
)
)
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
top_left = (
int(centroid_x - side_length // 2),
int(centroid_y - side_length // 2),
)
bottom_right = (
int(centroid_x + side_length // 2),
int(centroid_y + side_length // 2),
)
cv2.rectangle(
frame_copy,
top_left,
bottom_right,
(255, 255, 0),
2,
)
# draw the bounding boxes on the frame
box = obj["box"]
text = (
obj["label"]
if (
not obj.get("sub_label")
or not is_label_printable(obj["sub_label"][0])
)
else obj["sub_label"][0]
)
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
text,
f"{obj['score']:.0%} {int(obj['area'])}"
+ (
f" {float(obj['current_estimated_speed']):.1f}"
if obj["current_estimated_speed"] != 0
else ""
),
thickness=thickness,
color=color,
)
# draw any attributes
for attribute in obj["current_attributes"]:
box = attribute["box"]
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%}",
thickness=thickness,
color=color,
)
if draw_options.get("regions"):
for region in regions:
cv2.rectangle(
frame_copy,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
if draw_options.get("zones"):
for name, zone in self.camera_config.zones.items():
thickness = (
8
if any(
name in obj["current_zones"] for obj in tracked_objects.values()
)
else 2
)
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
if draw_options.get("motion_boxes"):
for m_box in motion_boxes:
cv2.rectangle(
frame_copy,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
position=self.camera_config.timestamp_style.position,
)
return frame_copy
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback)
def update(
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
removed_ids = previous_ids.difference(current_ids)
new_ids = current_ids.difference(previous_ids)
updated_ids = current_ids.intersection(previous_ids)
for id in new_ids:
new_obj = tracked_objects[id] = TrackedObject(
self.config.model,
self.camera_config,
self.config.ui,
self.frame_cache,
current_detections[id],
)
# call event handlers
for c in self.callbacks["start"]:
c(self.name, new_obj, frame_name)
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
self.frame_cache[frame_time] = np.copy(current_frame)
updated_obj.last_updated = frame_time
# if it has been more than 5 seconds since the last thumb update
# and the last update is greater than the last publish or
# the object has changed significantly
if (
frame_time - updated_obj.last_published > 5
and updated_obj.last_updated > updated_obj.last_published
) or significant_update:
# call event handlers
for c in self.callbacks["update"]:
c(self.name, updated_obj, frame_name)
updated_obj.last_published = frame_time
for id in removed_ids:
# publish events to mqtt
removed_obj = tracked_objects[id]
if "end_time" not in removed_obj.obj_data:
removed_obj.obj_data["end_time"] = frame_time
for c in self.callbacks["end"]:
c(self.name, removed_obj, frame_name)
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
"enabled": True,
"motion": len(motion_boxes) > 0,
"objects": [],
}
for obj in tracked_objects.values():
object_type = obj.obj_data["label"]
active = obj.is_active()
if not obj.false_positive:
label = object_type
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
sub_label = obj.obj_data["sub_label"][0]
camera_activity["objects"].append(
{
"id": obj.obj_data["id"],
"label": label,
"stationary": not active,
"area": obj.obj_data["area"],
"ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"],
"sub_label": sub_label,
"current_zones": obj.current_zones,
}
)
# if we don't have access to the current frame or
# if the object's thumbnail is not from the current frame, skip
if (
current_frame is None
or obj.thumbnail_data is None
or obj.false_positive
or obj.thumbnail_data["frame_time"] != frame_time
):
continue
if object_type in self.best_objects:
current_best = self.best_objects[object_type]
now = datetime.datetime.now().timestamp()
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
):
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
else:
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity)
# cleanup thumbnail frame cache
current_thumb_frames = {
obj.thumbnail_data["frame_time"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
}
thumb_frames_to_delete = [
t
for t in self.frame_cache.keys()
if t not in current_thumb_frames and t not in current_best_frames
]
for t in thumb_frames_to_delete:
del self.frame_cache[t]
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.motion_boxes = motion_boxes
self.regions = regions
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = np.copy(current_frame)
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_name
def shutdown(self) -> None:
for obj in self.tracked_objects.values():
if not obj.obj_data.get("end_time"):
obj.write_thumbnail_to_disk()
class ManualEventState(str, Enum):
complete = "complete"
start = "start"
end = "end"
class TrackedObjectProcessor(threading.Thread):
@ -449,14 +63,13 @@ class TrackedObjectProcessor(threading.Thread):
self.config_enabled_subscriber = ConfigSubscriber("config/enabled/")
self.requestor = InterProcessRequestor()
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all)
self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber()
self.sub_label_subscriber = EventMetadataSubscriber(
EventMetadataTypeEnum.sub_label
)
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)
self.camera_activity: dict[str, dict[str, any]] = {}
self.ongoing_manual_events: dict[str, str] = {}
# {
# 'zone_name': {
@ -677,7 +290,7 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(
self, camera: str, draw_options: dict[str, any] = {}
) -> Optional[np.ndarray]:
) -> np.ndarray | None:
if camera == "birdseye":
return self.frame_manager.get(
"birdseye",
@ -733,6 +346,96 @@ class TrackedObjectProcessor(threading.Thread):
return True
def create_manual_event(self, payload: tuple) -> None:
(
frame_time,
camera_name,
label,
event_id,
include_recording,
score,
sub_label,
duration,
source_type,
draw,
) = payload
# save the snapshot image
self.camera_states[camera_name].save_manual_event_image(event_id, label, draw)
end_time = frame_time + duration if duration is not None else None
# send event to event maintainer
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera_name,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera_name,
"start_time": frame_time
- self.config.cameras[camera_name].record.event_pre_capture,
"end_time": end_time,
"has_clip": self.config.cameras[camera_name].record.enabled
and include_recording,
"has_snapshot": True,
"type": source_type,
},
)
)
if source_type == "api":
self.ongoing_manual_events[event_id] = camera_name
self.detection_publisher.publish(
(
camera_name,
frame_time,
{
"state": (
ManualEventState.complete
if end_time
else ManualEventState.start
),
"label": f"{label}: {sub_label}" if sub_label else label,
"event_id": event_id,
"end_time": end_time,
},
),
DetectionTypeEnum.api.value,
)
def end_manual_event(self, payload: tuple) -> None:
(event_id, end_time) = payload
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.end,
None,
"",
{"id": event_id, "end_time": end_time},
)
)
if event_id in self.ongoing_manual_events:
self.detection_publisher.publish(
(
self.ongoing_manual_events[event_id],
end_time,
{
"state": ManualEventState.end,
"event_id": event_id,
"end_time": end_time,
},
),
DetectionTypeEnum.api.value,
)
self.ongoing_manual_events.pop(event_id)
def force_end_all_events(self, camera: str, camera_state: CameraState):
"""Ends all active events on camera when disabling."""
last_frame_name = camera_state.previous_frame_id
@ -792,15 +495,22 @@ class TrackedObjectProcessor(threading.Thread):
# check for sub label updates
while True:
(topic, payload) = self.sub_label_subscriber.check_for_update(
timeout=0.1
(raw_topic, payload) = self.sub_label_subscriber.check_for_update(
timeout=0
)
if not topic:
if not raw_topic:
break
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
topic = str(raw_topic)
if topic.endswith(EventMetadataTypeEnum.sub_label.value):
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
elif topic.endswith(EventMetadataTypeEnum.manual_event_create.value):
self.create_manual_event(payload)
elif topic.endswith(EventMetadataTypeEnum.manual_event_end.value):
self.end_manual_event(payload)
try:
(
@ -839,7 +549,8 @@ class TrackedObjectProcessor(threading.Thread):
tracked_objects,
motion_boxes,
regions,
)
),
DetectionTypeEnum.video.value,
)
# cleanup event finished queue

View File

@ -15,8 +15,8 @@ sys.path.append("/workspace/frigate")
from frigate.config import FrigateConfig # noqa: E402
from frigate.motion import MotionDetector # noqa: E402
from frigate.object_detection import LocalObjectDetector # noqa: E402
from frigate.object_processing import CameraState # noqa: E402
from frigate.track.centroid_tracker import CentroidTracker # noqa: E402
from frigate.track.object_processing import CameraState # noqa: E402
from frigate.util import ( # noqa: E402
EventsPerSecond,
SharedMemoryFrameManager,

View File

@ -378,7 +378,9 @@ export default function LivePlayer({
{[
...new Set([
...(objects || []).map(({ label, sub_label }) =>
label.endsWith("verified") ? sub_label : label,
label.endsWith("verified")
? sub_label
: label.replaceAll("_", " "),
),
]),
]