create camera type, add manual event and save snapshot

This commit is contained in:
Josh Hawkins 2025-03-21 07:38:02 -05:00
parent ed1cffd4ae
commit 61efdadd63
8 changed files with 107 additions and 78 deletions

View File

@ -410,9 +410,9 @@ class CameraState:
self.previous_frame_id = frame_name
def save_manual_event_image(
self, event_id: str, label: str, draw: dict[str, list[dict]]
self, frame: np.ndarray, event_id: str, label: str, draw: dict[str, list[dict]]
) -> None:
img_frame = self.get_current_frame()
img_frame = frame if frame is not None else self.get_current_frame()
# write clean snapshot if enabled
if self.camera_config.snapshots.clean_copy:

View File

@ -15,6 +15,8 @@ class EventMetadataTypeEnum(str, Enum):
regenerate_description = "regenerate_description"
sub_label = "sub_label"
recognized_license_plate = "recognized_license_plate"
lpr_event_create = "lpr_event_create"
save_lpr_snapshot = "save_lpr_snapshot"
class EventMetadataPublisher(Publisher):

View File

@ -1,4 +1,5 @@
import os
from enum import Enum
from typing import Optional
from pydantic import Field, PrivateAttr
@ -42,6 +43,11 @@ from .zone import ZoneConfig
__all__ = ["CameraConfig"]
class CameraTypeEnum(str, Enum):
generic = "generic"
lpr = "lpr"
class CameraConfig(FrigateBaseModel):
name: Optional[str] = Field(None, title="Camera name.", pattern=REGEX_CAMERA_NAME)
enabled: bool = Field(default=True, title="Enable camera.")
@ -102,6 +108,7 @@ class CameraConfig(FrigateBaseModel):
onvif: OnvifConfig = Field(
default_factory=OnvifConfig, title="Camera Onvif Configuration."
)
type: str = Field(default=CameraTypeEnum.generic, title="Camera Type")
ui: CameraUiConfig = Field(
default_factory=CameraUiConfig, title="Camera UI Modifications."
)

View File

@ -19,11 +19,6 @@ class SemanticSearchModelEnum(str, Enum):
jinav2 = "jinav2"
class CameraTypeEnum(str, Enum):
standard = "standard"
lpr = "lpr"
class BirdClassificationConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable bird classification.")
threshold: float = Field(
@ -132,7 +127,6 @@ class LicensePlateRecognitionConfig(FrigateBaseModel):
class CameraLicensePlateRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable license plate recognition.")
camera_type: str = Field(default=CameraTypeEnum.standard, title="Camera Type")
expire_time: int = Field(
default=3,
title="Expire plates not seen after number of seconds (for dedicated LPR cameras only).",

View File

@ -1,5 +1,6 @@
"""Handle processing images for face detection and recognition."""
import base64
import datetime
import logging
import math
@ -18,6 +19,7 @@ from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.config.camera.camera import CameraTypeEnum
from frigate.embeddings.onnx.lpr_embedding import LPR_EMBEDDING_SIZE
from frigate.util.image import area
@ -46,6 +48,9 @@ class LicensePlateProcessingMixin:
self.box_thresh = 0.6
self.mask_thresh = 0.6
# matching
self.similarity_threshold = 0.8
def _detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
Detect possible license plates in the input image by first resizing and normalizing it,
@ -870,14 +875,14 @@ class LicensePlateProcessingMixin:
"""
self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10
def _generate_plate_event(self, camera: str, plate_score: float) -> str:
def _generate_plate_event(self, camera: str, plate: str, plate_score: float) -> str:
"""Generate a unique ID for a plate event based on camera and text."""
now = datetime.datetime.now().timestamp()
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_create,
EventMetadataTypeEnum.lpr_event_create,
(
now,
camera,
@ -886,9 +891,7 @@ class LicensePlateProcessingMixin:
True,
plate_score,
None,
None,
"api",
{},
plate,
),
)
return event_id
@ -903,6 +906,9 @@ class LicensePlateProcessingMixin:
if not self.config.cameras[camera].lpr.enabled:
return
if not dedicated_lpr and self.config.cameras[camera].type == CameraTypeEnum.lpr:
return
if dedicated_lpr:
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
@ -941,27 +947,6 @@ class LicensePlateProcessingMixin:
license_plate[0] : license_plate[2],
]
if WRITE_DEBUG_IMAGES:
if license_plate:
frame_with_rect = rgb.copy()
cv2.rectangle(
frame_with_rect,
(
int(license_plate[0]),
int(license_plate[1]),
),
(
int(license_plate[2]),
int(license_plate[3]),
),
(0, 255, 0),
2,
)
cv2.imwrite(
f"debug/frames/dedicated_lpr_with_rect_{current_time}.jpg",
frame_with_rect,
)
# Double the size for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
@ -971,12 +956,6 @@ class LicensePlateProcessingMixin:
),
)
if WRITE_DEBUG_IMAGES:
cv2.imwrite(
f"debug/frames/dedicated_lpr_doubled_{current_time}.jpg",
license_plate_frame,
)
else:
id = obj_data["id"]
@ -1036,26 +1015,6 @@ class LicensePlateProcessingMixin:
logger.debug("Detected no license plates for car object.")
return
if WRITE_DEBUG_IMAGES and license_plate:
frame_with_rect = car.copy()
cv2.rectangle(
frame_with_rect,
(
int(license_plate[0]),
int(license_plate[1]),
),
(
int(license_plate[2]),
int(license_plate[3]),
),
(0, 255, 0),
2,
)
cv2.imwrite(
f"debug/frames/car_frame_with_rect_{current_time}.jpg",
frame_with_rect,
)
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
@ -1145,19 +1104,13 @@ class LicensePlateProcessingMixin:
license_plate_frame,
)
start = datetime.datetime.now().timestamp()
# run detection, returns results sorted by confidence, best first
start = datetime.datetime.now().timestamp()
license_plates, confidences, areas = self._process_license_plate(
license_plate_frame
)
self.__update_lpr_metrics(datetime.datetime.now().timestamp() - start)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
avg_confidence = (
@ -1168,7 +1121,6 @@ class LicensePlateProcessingMixin:
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
# no plates found
logger.debug("No text detected")
return
@ -1193,8 +1145,6 @@ class LicensePlateProcessingMixin:
# For LPR cameras, match or assign plate ID using Jaro-Winkler distance
if dedicated_lpr:
plate_id = None
# Similarity threshold for matching plates
jaro_winkler_threshold = 0.8
for existing_id, data in self.detected_license_plates.items():
if (
@ -1204,15 +1154,16 @@ class LicensePlateProcessingMixin:
<= self.config.cameras[camera].lpr.expire_time
):
similarity = jaro_winkler(data["plate"], top_plate)
if similarity >= jaro_winkler_threshold:
if similarity >= self.similarity_threshold:
plate_id = existing_id
logger.debug(
f"Matched plate {top_plate} to {data['plate']} (similarity: {similarity:.3f})"
)
break
if plate_id is None:
# start new manual lpr event
plate_id = self._generate_plate_event(obj_data, avg_confidence)
plate_id = self._generate_plate_event(
obj_data, top_plate, avg_confidence
)
logger.debug(
f"New plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
@ -1253,12 +1204,21 @@ class LicensePlateProcessingMixin:
EventMetadataTypeEnum.sub_label, (id, sub_label, avg_confidence)
)
logger.debug("publishing plate for", id, top_plate)
self.sub_label_publisher.publish(
EventMetadataTypeEnum.recognized_license_plate,
(id, top_plate, avg_confidence),
)
if dedicated_lpr:
# save the best snapshot
logger.debug(f"Writing snapshot for {id}, {top_plate}, {current_time}")
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, buffer = cv2.imencode(".jpg", frame_bgr)
self.sub_label_publisher.publish(
EventMetadataTypeEnum.save_lpr_snapshot,
(base64.b64encode(buffer).decode("ASCII"), id, camera),
)
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,

View File

@ -28,7 +28,7 @@ from frigate.comms.recordings_updater import (
RecordingsDataTypeEnum,
)
from frigate.config import FrigateConfig
from frigate.config.classification import CameraTypeEnum
from frigate.config.camera.camera import CameraTypeEnum
from frigate.const import (
CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION,
@ -324,6 +324,7 @@ class EmbeddingMaintainer(threading.Thread):
if (
recordings_available is not None
and event_id in self.detected_license_plates
and self.config.cameras[camera].type != "lpr"
):
processor.process_data(
{
@ -395,7 +396,6 @@ class EmbeddingMaintainer(threading.Thread):
if now - last_seen > self.config.cameras[data["camera"]].lpr.expire_time:
to_remove.append(id)
for id in to_remove:
print(f"Expiring plate event {id}")
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(id, now),
@ -448,7 +448,7 @@ class EmbeddingMaintainer(threading.Thread):
camera_config = self.config.cameras[camera]
if not camera_config.lpr.camera_type == CameraTypeEnum.lpr:
if not camera_config.type == CameraTypeEnum.lpr:
return
try:

View File

@ -278,6 +278,13 @@ class EventProcessor(threading.Thread):
"top_score": event_data["score"],
},
}
if event_data.get("recognized_license_plate") is not None:
event[Event.data]["recognized_license_plate"] = event_data[
"recognized_license_plate"
]
event[Event.data]["recognized_license_plate_score"] = event_data[
"score"
]
Event.insert(event).execute()
elif event_type == EventStateEnum.end:
event = {

View File

@ -1,3 +1,4 @@
import base64
import datetime
import json
import logging
@ -7,6 +8,7 @@ from collections import defaultdict
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
import cv2
import numpy as np
from peewee import DoesNotExist
@ -384,6 +386,19 @@ class TrackedObjectProcessor(threading.Thread):
return True
def save_lpr_snapshot(self, payload: tuple) -> None:
# save the snapshot image
(frame, event_id, camera) = payload
img = cv2.imdecode(
np.frombuffer(base64.b64decode(frame), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
self.camera_states[camera].save_manual_event_image(
img, event_id, "license_plate", {}
)
def create_manual_event(self, payload: tuple) -> None:
(
frame_time,
@ -399,7 +414,9 @@ class TrackedObjectProcessor(threading.Thread):
) = payload
# save the snapshot image
self.camera_states[camera_name].save_manual_event_image(event_id, label, draw)
self.camera_states[camera_name].save_manual_event_image(
None, event_id, label, draw
)
end_time = frame_time + duration if duration is not None else None
# send event to event maintainer
@ -446,6 +463,44 @@ class TrackedObjectProcessor(threading.Thread):
DetectionTypeEnum.api.value,
)
def create_lpr_event(self, payload: tuple) -> None:
(
frame_time,
camera_name,
label,
event_id,
include_recording,
score,
sub_label,
plate,
) = payload
# send event to event maintainer
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera_name,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera_name,
"start_time": frame_time
- self.config.cameras[camera_name].record.event_pre_capture,
"end_time": None,
"has_clip": self.config.cameras[camera_name].record.enabled
and include_recording,
"has_snapshot": True,
"type": "api",
"recognized_license_plate": plate,
"recognized_license_plate_score": score,
},
)
)
def end_manual_event(self, payload: tuple) -> None:
(event_id, end_time) = payload
@ -550,6 +605,10 @@ class TrackedObjectProcessor(threading.Thread):
self.set_recognized_license_plate(
event_id, recognized_license_plate, score
)
elif topic.endswith(EventMetadataTypeEnum.lpr_event_create.value):
self.create_lpr_event(payload)
elif topic.endswith(EventMetadataTypeEnum.save_lpr_snapshot.value):
self.save_lpr_snapshot(payload)
elif topic.endswith(EventMetadataTypeEnum.manual_event_create.value):
self.create_manual_event(payload)
elif topic.endswith(EventMetadataTypeEnum.manual_event_end.value):