2024-10-07 22:30:45 +02:00
""" Maintain embeddings in SQLite-vec. """
2024-06-21 23:30:19 +02:00
import base64
import logging
2024-09-30 23:54:53 +02:00
import os
2024-06-21 23:30:19 +02:00
import threading
from multiprocessing . synchronize import Event as MpEvent
from typing import Optional
import cv2
import numpy as np
2024-10-23 00:05:48 +02:00
import requests
2024-06-21 23:30:19 +02:00
from peewee import DoesNotExist
2024-10-07 22:30:45 +02:00
from playhouse . sqliteq import SqliteQueueDatabase
2024-06-21 23:30:19 +02:00
2024-10-10 17:42:24 +02:00
from frigate . comms . embeddings_updater import EmbeddingsRequestEnum , EmbeddingsResponder
2024-09-24 16:14:51 +02:00
from frigate . comms . event_metadata_updater import (
EventMetadataSubscriber ,
EventMetadataTypeEnum ,
)
2024-06-21 23:30:19 +02:00
from frigate . comms . events_updater import EventEndSubscriber , EventUpdateSubscriber
from frigate . comms . inter_process import InterProcessRequestor
from frigate . config import FrigateConfig
2024-10-23 00:05:48 +02:00
from frigate . const import CLIPS_DIR , FRIGATE_LOCALHOST , UPDATE_EVENT_DESCRIPTION
2024-10-26 19:07:45 +02:00
from frigate . embeddings . alpr . alpr import LicensePlateRecognition
2024-06-21 23:30:19 +02:00
from frigate . events . types import EventTypeEnum
from frigate . genai import get_genai_client
from frigate . models import Event
2024-11-18 19:26:44 +01:00
from frigate . types import TrackedObjectUpdateTypesEnum
2024-10-10 17:42:24 +02:00
from frigate . util . builtin import serialize
2024-10-23 00:05:48 +02:00
from frigate . util . image import SharedMemoryFrameManager , area , calculate_region
2024-06-21 23:30:19 +02:00
2024-10-07 22:30:45 +02:00
from . embeddings import Embeddings
2024-06-21 23:30:19 +02:00
logger = logging . getLogger ( __name__ )
2024-10-23 17:03:18 +02:00
REQUIRED_FACES = 2
2024-10-20 22:14:51 +02:00
MAX_THUMBNAILS = 10
2024-06-21 23:30:19 +02:00
class EmbeddingMaintainer ( threading . Thread ) :
""" Handle embedding queue and post event updates. """
def __init__ (
self ,
2024-10-07 22:30:45 +02:00
db : SqliteQueueDatabase ,
2024-06-21 23:30:19 +02:00
config : FrigateConfig ,
stop_event : MpEvent ,
) - > None :
2024-10-10 23:37:43 +02:00
super ( ) . __init__ ( name = " embeddings_maintainer " )
2024-06-21 23:30:19 +02:00
self . config = config
2024-10-23 17:03:18 +02:00
self . embeddings = Embeddings ( config , db )
2024-10-10 23:37:43 +02:00
# Check if we need to re-index events
if config . semantic_search . reindex :
self . embeddings . reindex ( )
2024-06-21 23:30:19 +02:00
self . event_subscriber = EventUpdateSubscriber ( )
self . event_end_subscriber = EventEndSubscriber ( )
2024-09-24 16:14:51 +02:00
self . event_metadata_subscriber = EventMetadataSubscriber (
EventMetadataTypeEnum . regenerate_description
)
2024-10-10 17:42:24 +02:00
self . embeddings_responder = EmbeddingsResponder ( )
2024-06-21 23:30:19 +02:00
self . frame_manager = SharedMemoryFrameManager ( )
2024-10-23 00:05:48 +02:00
# set face recognition conditions
2024-10-23 17:03:18 +02:00
self . face_recognition_enabled = self . config . face_recognition . enabled
2024-10-26 20:14:21 +02:00
self . requires_face_detection = " face " not in self . config . objects . all_objects
2024-10-23 17:03:18 +02:00
self . detected_faces : dict [ str , float ] = { }
2024-10-23 00:05:48 +02:00
2024-06-21 23:30:19 +02:00
# create communication for updating event descriptions
self . requestor = InterProcessRequestor ( )
self . stop_event = stop_event
2024-10-23 00:05:48 +02:00
self . tracked_events : dict [ str , list [ any ] ] = { }
2024-11-09 14:48:53 +01:00
self . genai_client = get_genai_client ( config )
2024-06-21 23:30:19 +02:00
2024-10-26 19:07:45 +02:00
# set license plate recognition conditions
self . lpr_config = self . config . lpr
self . requires_license_plate_detection = (
2024-10-26 20:14:21 +02:00
" license_plate " not in self . config . objects . all_objects
2024-10-26 19:07:45 +02:00
)
self . detected_license_plates : dict [ str , dict [ str , any ] ] = { }
if self . lpr_config . enabled :
self . license_plate_recognition = LicensePlateRecognition (
self . lpr_config , self . requestor , self . embeddings
)
2024-10-23 21:50:58 +02:00
@property
def face_detector ( self ) - > cv2 . FaceDetectorYN :
# Lazily create the classifier.
if " face_detector " not in self . __dict__ :
self . __dict__ [ " face_detector " ] = cv2 . FaceDetectorYN . create (
" /config/model_cache/facenet/facedet.onnx " ,
config = " " ,
input_size = ( 320 , 320 ) ,
score_threshold = 0.8 ,
nms_threshold = 0.3 ,
)
return self . __dict__ [ " face_detector " ]
2024-06-21 23:30:19 +02:00
def run ( self ) - > None :
2024-10-07 22:30:45 +02:00
""" Maintain a SQLite-vec database for semantic search. """
2024-06-21 23:30:19 +02:00
while not self . stop_event . is_set ( ) :
2024-10-10 17:42:24 +02:00
self . _process_requests ( )
2024-06-21 23:30:19 +02:00
self . _process_updates ( )
self . _process_finalized ( )
2024-09-24 16:14:51 +02:00
self . _process_event_metadata ( )
2024-06-21 23:30:19 +02:00
self . event_subscriber . stop ( )
self . event_end_subscriber . stop ( )
2024-09-24 16:14:51 +02:00
self . event_metadata_subscriber . stop ( )
2024-10-10 17:42:24 +02:00
self . embeddings_responder . stop ( )
2024-06-21 23:30:19 +02:00
self . requestor . stop ( )
logger . info ( " Exiting embeddings maintenance... " )
2024-10-10 17:42:24 +02:00
def _process_requests ( self ) - > None :
""" Process embeddings requests """
2024-10-23 21:50:58 +02:00
def _handle_request ( topic : str , data : dict [ str , any ] ) - > str :
2024-10-10 23:37:43 +02:00
try :
if topic == EmbeddingsRequestEnum . embed_description . value :
return serialize (
2024-10-22 00:19:34 +02:00
self . embeddings . embed_description (
2024-10-10 23:37:43 +02:00
data [ " id " ] , data [ " description " ]
) ,
pack = False ,
)
elif topic == EmbeddingsRequestEnum . embed_thumbnail . value :
thumbnail = base64 . b64decode ( data [ " thumbnail " ] )
return serialize (
2024-10-22 00:19:34 +02:00
self . embeddings . embed_thumbnail ( data [ " id " ] , thumbnail ) ,
2024-10-10 23:37:43 +02:00
pack = False ,
)
elif topic == EmbeddingsRequestEnum . generate_search . value :
return serialize (
self . embeddings . text_embedding ( [ data ] ) [ 0 ] , pack = False
)
2024-10-23 00:05:48 +02:00
elif topic == EmbeddingsRequestEnum . register_face . value :
2024-10-23 21:50:58 +02:00
if data . get ( " cropped " ) :
self . embeddings . embed_face (
data [ " face_name " ] ,
base64 . b64decode ( data [ " image " ] ) ,
upsert = True ,
)
return True
else :
img = cv2 . imdecode (
np . frombuffer (
base64 . b64decode ( data [ " image " ] ) , dtype = np . uint8
) ,
cv2 . IMREAD_COLOR ,
)
face_box = self . _detect_face ( img )
if not face_box :
return False
face = img [ face_box [ 1 ] : face_box [ 3 ] , face_box [ 0 ] : face_box [ 2 ] ]
ret , webp = cv2 . imencode (
" .webp " , face , [ int ( cv2 . IMWRITE_WEBP_QUALITY ) , 100 ]
)
self . embeddings . embed_face (
data [ " face_name " ] , webp . tobytes ( ) , upsert = True
)
return False
2024-10-10 23:37:43 +02:00
except Exception as e :
logger . error ( f " Unable to handle embeddings request { e } " )
self . embeddings_responder . check_for_request ( _handle_request )
2024-10-10 17:42:24 +02:00
2024-06-21 23:30:19 +02:00
def _process_updates ( self ) - > None :
""" Process event updates """
2024-10-23 00:05:48 +02:00
update = self . event_subscriber . check_for_update ( timeout = 0.01 )
2024-06-21 23:30:19 +02:00
if update is None :
return
2024-11-19 19:20:04 +01:00
source_type , _ , camera , frame_name , data = update
2024-06-21 23:30:19 +02:00
if not camera or source_type != EventTypeEnum . tracked_object :
return
camera_config = self . config . cameras [ camera ]
2024-10-20 22:14:51 +02:00
2024-10-26 19:07:45 +02:00
# no need to process updated objects if face recognition, lpr, genai are disabled
if (
not camera_config . genai . enabled
and not self . face_recognition_enabled
and not self . lpr_config . enabled
) :
2024-10-23 00:05:48 +02:00
return
2024-06-21 23:30:19 +02:00
# Create our own thumbnail based on the bounding box and the frame time
try :
2024-10-23 00:05:48 +02:00
yuv_frame = self . frame_manager . get ( frame_name , camera_config . frame_shape_yuv )
except FileNotFoundError :
pass
if yuv_frame is None :
logger . debug (
" Unable to process object update because frame is unavailable. "
2024-11-19 19:20:04 +01:00
)
2024-10-23 00:05:48 +02:00
return
2024-09-03 18:22:30 +02:00
2024-10-23 00:05:48 +02:00
if self . face_recognition_enabled :
self . _process_face ( data , yuv_frame )
2024-10-20 22:14:51 +02:00
2024-10-26 19:07:45 +02:00
if self . lpr_config . enabled :
self . _process_license_plate ( data , yuv_frame )
2024-10-23 00:05:48 +02:00
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self . genai_client is not None and not data [ " stationary " ] :
if data [ " id " ] not in self . tracked_events :
self . tracked_events [ data [ " id " ] ] = [ ]
2024-10-20 22:14:51 +02:00
2024-10-23 00:05:48 +02:00
data [ " thumbnail " ] = self . _create_thumbnail ( yuv_frame , data [ " box " ] )
2024-10-20 22:14:51 +02:00
2024-10-23 00:05:48 +02:00
# Limit the number of thumbnails saved
if len ( self . tracked_events [ data [ " id " ] ] ) > = MAX_THUMBNAILS :
# Always keep the first thumbnail for the event
self . tracked_events [ data [ " id " ] ] . pop ( 1 )
self . tracked_events [ data [ " id " ] ] . append ( data )
self . frame_manager . close ( frame_name )
2024-06-21 23:30:19 +02:00
def _process_finalized ( self ) - > None :
""" Process the end of an event. """
while True :
2024-10-23 00:05:48 +02:00
ended = self . event_end_subscriber . check_for_update ( timeout = 0.01 )
2024-06-21 23:30:19 +02:00
if ended == None :
break
event_id , camera , updated_db = ended
camera_config = self . config . cameras [ camera ]
2024-10-23 17:03:18 +02:00
if event_id in self . detected_faces :
self . detected_faces . pop ( event_id )
2024-10-26 19:07:45 +02:00
if event_id in self . detected_license_plates :
self . detected_license_plates . pop ( event_id )
2024-06-21 23:30:19 +02:00
if updated_db :
try :
event : Event = Event . get ( Event . id == event_id )
except DoesNotExist :
continue
# Skip the event if not an object
if event . data . get ( " type " ) != " object " :
continue
2024-10-07 22:30:45 +02:00
# Extract valid thumbnail
2024-06-21 23:30:19 +02:00
thumbnail = base64 . b64decode ( event . thumbnail )
# Embed the thumbnail
2024-10-07 22:30:45 +02:00
self . _embed_thumbnail ( event_id , thumbnail )
2024-06-21 23:30:19 +02:00
if (
camera_config . genai . enabled
and self . genai_client is not None
and event . data . get ( " description " ) is None
2024-09-25 17:42:39 +02:00
and (
2024-09-25 19:53:25 +02:00
not camera_config . genai . objects
2024-09-25 17:42:39 +02:00
or event . label in camera_config . genai . objects
)
and (
2024-09-25 19:53:25 +02:00
not camera_config . genai . required_zones
2024-09-25 17:42:39 +02:00
or set ( event . zones ) & set ( camera_config . genai . required_zones )
)
2024-06-21 23:30:19 +02:00
) :
2024-09-30 23:54:53 +02:00
if event . has_snapshot and camera_config . genai . use_snapshot :
with open (
os . path . join ( CLIPS_DIR , f " { event . camera } - { event . id } .jpg " ) ,
" rb " ,
) as image_file :
snapshot_image = image_file . read ( )
img = cv2 . imdecode (
np . frombuffer ( snapshot_image , dtype = np . int8 ) ,
cv2 . IMREAD_COLOR ,
)
# crop snapshot based on region before sending off to genai
height , width = img . shape [ : 2 ]
x1_rel , y1_rel , width_rel , height_rel = event . data [ " region " ]
x1 , y1 = int ( x1_rel * width ) , int ( y1_rel * height )
cropped_image = img [
y1 : y1 + int ( height_rel * height ) ,
x1 : x1 + int ( width_rel * width ) ,
]
_ , buffer = cv2 . imencode ( " .jpg " , cropped_image )
snapshot_image = buffer . tobytes ( )
embed_image = (
[ snapshot_image ]
if event . has_snapshot and camera_config . genai . use_snapshot
else (
[ thumbnail for data in self . tracked_events [ event_id ] ]
if len ( self . tracked_events . get ( event_id , [ ] ) ) > 0
else [ thumbnail ]
)
)
2024-06-21 23:30:19 +02:00
# Generate the description. Call happens in a thread since it is network bound.
threading . Thread (
target = self . _embed_description ,
name = f " _embed_description_ { event . id } " ,
daemon = True ,
args = (
event ,
2024-09-30 23:54:53 +02:00
embed_image ,
2024-06-21 23:30:19 +02:00
) ,
) . start ( )
# Delete tracked events based on the event_id
if event_id in self . tracked_events :
del self . tracked_events [ event_id ]
2024-09-24 16:14:51 +02:00
def _process_event_metadata ( self ) :
# Check for regenerate description requests
2024-09-30 23:54:53 +02:00
( topic , event_id , source ) = self . event_metadata_subscriber . check_for_update (
2024-10-23 00:05:48 +02:00
timeout = 0.01
2024-09-30 23:54:53 +02:00
)
2024-09-24 16:14:51 +02:00
if topic is None :
return
if event_id :
2024-09-30 23:54:53 +02:00
self . handle_regenerate_description ( event_id , source )
2024-09-24 16:14:51 +02:00
2024-10-23 21:50:58 +02:00
def _search_face ( self , query_embedding : bytes ) - > list [ tuple [ str , float ] ] :
2024-10-23 00:05:48 +02:00
""" Search for the face most closely matching the embedding. """
2024-10-23 17:03:18 +02:00
sql_query = f """
2024-10-23 00:05:48 +02:00
SELECT
id ,
distance
FROM vec_faces
WHERE face_embedding MATCH ?
2024-10-23 17:03:18 +02:00
AND k = { REQUIRED_FACES } ORDER BY distance
2024-10-23 00:05:48 +02:00
"""
return self . embeddings . db . execute_sql ( sql_query , [ query_embedding ] ) . fetchall ( )
2024-10-23 21:50:58 +02:00
def _detect_face ( self , input : np . ndarray ) - > tuple [ int , int , int , int ] :
""" Detect faces in input image. """
self . face_detector . setInputSize ( ( input . shape [ 1 ] , input . shape [ 0 ] ) )
faces = self . face_detector . detect ( input )
if faces [ 1 ] is None :
return None
face = None
for _ , potential_face in enumerate ( faces [ 1 ] ) :
raw_bbox = potential_face [ 0 : 4 ] . astype ( np . uint16 )
x : int = max ( raw_bbox [ 0 ] , 0 )
y : int = max ( raw_bbox [ 1 ] , 0 )
w : int = raw_bbox [ 2 ]
h : int = raw_bbox [ 3 ]
bbox = ( x , y , x + w , y + h )
if face is None or area ( bbox ) > area ( face ) :
face = bbox
return face
2024-10-23 00:05:48 +02:00
def _process_face ( self , obj_data : dict [ str , any ] , frame : np . ndarray ) - > None :
""" Look for faces in image. """
2024-10-23 17:03:18 +02:00
id = obj_data [ " id " ]
2024-10-23 00:05:48 +02:00
# don't run for non person objects
if obj_data . get ( " label " ) != " person " :
logger . debug ( " Not a processing face for non person object. " )
return
2024-10-23 17:03:18 +02:00
# don't overwrite sub label for objects that have a sub label
# that is not a face
if obj_data . get ( " sub_label " ) and id not in self . detected_faces :
2024-10-23 00:05:48 +02:00
logger . debug (
f " Not processing face due to existing sub label: { obj_data . get ( ' sub_label ' ) } . "
)
return
face : Optional [ dict [ str , any ] ] = None
if self . requires_face_detection :
2024-10-23 21:50:58 +02:00
logger . debug ( " Running manual face detection. " )
person_box = obj_data . get ( " box " )
if not person_box :
return None
rgb = cv2 . cvtColor ( frame , cv2 . COLOR_YUV2RGB_I420 )
left , top , right , bottom = person_box
person = rgb [ top : bottom , left : right ]
face = self . _detect_face ( person )
if not face :
logger . debug ( " Detected no faces for person object. " )
return
face_frame = person [ face [ 1 ] : face [ 3 ] , face [ 0 ] : face [ 2 ] ]
face_frame = cv2 . cvtColor ( face_frame , cv2 . COLOR_RGB2BGR )
2024-10-23 00:05:48 +02:00
else :
# don't run for object without attributes
if not obj_data . get ( " current_attributes " ) :
logger . debug ( " No attributes to parse. " )
return
attributes : list [ dict [ str , any ] ] = obj_data . get ( " current_attributes " , [ ] )
for attr in attributes :
if attr . get ( " label " ) != " face " :
continue
if face is None or attr . get ( " score " , 0.0 ) > face . get ( " score " , 0.0 ) :
face = attr
2024-10-23 21:50:58 +02:00
# no faces detected in this frame
if not face :
return
2024-10-23 00:05:48 +02:00
2024-10-23 21:50:58 +02:00
face_box = face . get ( " box " )
2024-10-23 00:05:48 +02:00
2024-10-23 21:50:58 +02:00
# check that face is valid
if not face_box or area ( face_box ) < self . config . face_recognition . min_area :
logger . debug ( f " Invalid face box { face } " )
return
2024-10-23 00:05:48 +02:00
2024-10-23 21:50:58 +02:00
face_frame = cv2 . cvtColor ( frame , cv2 . COLOR_YUV2BGR_I420 )
face_frame = face_frame [
face_box [ 1 ] : face_box [ 3 ] , face_box [ 0 ] : face_box [ 2 ]
]
ret , webp = cv2 . imencode (
2024-10-23 00:05:48 +02:00
" .webp " , face_frame , [ int ( cv2 . IMWRITE_WEBP_QUALITY ) , 100 ]
)
if not ret :
logger . debug ( " Not processing face due to error creating cropped image. " )
return
2024-10-23 21:50:58 +02:00
embedding = self . embeddings . embed_face ( " unknown " , webp . tobytes ( ) , upsert = False )
2024-10-23 00:05:48 +02:00
query_embedding = serialize ( embedding )
best_faces = self . _search_face ( query_embedding )
logger . debug ( f " Detected best faces for person as: { best_faces } " )
2024-10-23 17:03:18 +02:00
if not best_faces or len ( best_faces ) < REQUIRED_FACES :
2024-10-23 21:50:58 +02:00
logger . debug ( f " { len ( best_faces ) } < { REQUIRED_FACES } min required faces. " )
2024-10-23 00:05:48 +02:00
return
sub_label = str ( best_faces [ 0 ] [ 0 ] ) . split ( " - " ) [ 0 ]
2024-10-23 17:03:18 +02:00
avg_score = 0
for face in best_faces :
score = 1.0 - face [ 1 ]
2024-10-23 21:50:58 +02:00
if face [ 0 ] . split ( " - " ) [ 0 ] != sub_label :
2024-10-23 17:03:18 +02:00
logger . debug ( " Detected multiple faces, result is not valid. " )
2024-10-23 21:50:58 +02:00
return
2024-10-23 00:05:48 +02:00
2024-10-23 17:03:18 +02:00
avg_score + = score
2024-10-23 21:50:58 +02:00
avg_score = round ( avg_score / REQUIRED_FACES , 2 )
2024-10-23 17:03:18 +02:00
2024-10-23 21:50:58 +02:00
if avg_score < self . config . face_recognition . threshold or (
2024-10-23 17:03:18 +02:00
id in self . detected_faces and avg_score < = self . detected_faces [ id ]
) :
logger . debug (
2024-10-23 21:50:58 +02:00
f " Recognized face score { avg_score } is less than threshold ( { self . config . face_recognition . threshold } ) / previous face score ( { self . detected_faces . get ( id ) } ). "
2024-10-23 17:03:18 +02:00
)
2024-10-23 21:50:58 +02:00
return
2024-10-23 00:05:48 +02:00
2024-10-23 21:50:58 +02:00
resp = requests . post (
2024-10-23 17:03:18 +02:00
f " { FRIGATE_LOCALHOST } /api/events/ { id } /sub_label " ,
2024-10-23 21:50:58 +02:00
json = {
" camera " : obj_data . get ( " camera " ) ,
" subLabel " : sub_label ,
" subLabelScore " : avg_score ,
} ,
2024-10-23 00:05:48 +02:00
)
2024-10-23 21:50:58 +02:00
if resp . status_code == 200 :
self . detected_faces [ id ] = avg_score
2024-10-26 19:07:45 +02:00
def _detect_license_plate ( self , input : np . ndarray ) - > tuple [ int , int , int , int ] :
""" Return the dimensions of the input image as [x, y, width, height]. """
height , width = input . shape [ : 2 ]
return ( 0 , 0 , width , height )
def _process_license_plate (
self , obj_data : dict [ str , any ] , frame : np . ndarray
) - > None :
""" Look for license plates in image. """
id = obj_data [ " id " ]
# don't run for non car objects
if obj_data . get ( " label " ) != " car " :
logger . debug ( " Not a processing license plate for non car object. " )
return
# don't run for stationary car objects
if obj_data . get ( " stationary " ) == True :
logger . debug ( " Not a processing license plate for a stationary car object. " )
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data . get ( " sub_label " ) and id not in self . detected_license_plates :
logger . debug (
f " Not processing license plate due to existing sub label: { obj_data . get ( ' sub_label ' ) } . "
)
return
license_plate : Optional [ dict [ str , any ] ] = None
if self . requires_license_plate_detection :
logger . debug ( " Running manual license_plate detection. " )
car_box = obj_data . get ( " box " )
if not car_box :
return None
rgb = cv2 . cvtColor ( frame , cv2 . COLOR_YUV2RGB_I420 )
left , top , right , bottom = car_box
car = rgb [ top : bottom , left : right ]
license_plate = self . _detect_license_plate ( car )
if not license_plate :
logger . debug ( " Detected no license plates for car object. " )
return
license_plate_frame = car [
license_plate [ 1 ] : license_plate [ 3 ] , license_plate [ 0 ] : license_plate [ 2 ]
]
license_plate_frame = cv2 . cvtColor ( license_plate_frame , cv2 . COLOR_RGB2BGR )
else :
# don't run for object without attributes
if not obj_data . get ( " current_attributes " ) :
logger . debug ( " No attributes to parse. " )
return
attributes : list [ dict [ str , any ] ] = obj_data . get ( " current_attributes " , [ ] )
for attr in attributes :
if attr . get ( " label " ) != " license_plate " :
continue
if license_plate is None or attr . get ( " score " , 0.0 ) > license_plate . get (
" score " , 0.0
) :
license_plate = attr
# no license plates detected in this frame
if not license_plate :
return
license_plate_box = license_plate . get ( " box " )
# check that license plate is valid
if (
not license_plate_box
or area ( license_plate_box ) < self . config . lpr . min_area
) :
logger . debug ( f " Invalid license plate box { license_plate } " )
return
license_plate_frame = cv2 . cvtColor ( frame , cv2 . COLOR_YUV2BGR_I420 )
license_plate_frame = license_plate_frame [
license_plate_box [ 1 ] : license_plate_box [ 3 ] ,
license_plate_box [ 0 ] : license_plate_box [ 2 ] ,
]
# run detection, returns results sorted by confidence, best first
license_plates , confidences , areas = (
self . license_plate_recognition . process_license_plate ( license_plate_frame )
)
logger . debug ( f " Text boxes: { license_plates } " )
logger . debug ( f " Confidences: { confidences } " )
logger . debug ( f " Areas: { areas } " )
if license_plates :
for plate , confidence , text_area in zip ( license_plates , confidences , areas ) :
avg_confidence = (
( sum ( confidence ) / len ( confidence ) ) if confidence else 0
)
logger . debug (
f " Detected text: { plate } (average confidence: { avg_confidence : .2f } , area: { text_area } pixels) "
)
else :
# no plates found
logger . debug ( " No text detected " )
return
top_plate , top_char_confidences = license_plates [ 0 ] , confidences [ 0 ]
2024-10-27 00:27:02 +02:00
avg_confidence = (
( sum ( top_char_confidences ) / len ( top_char_confidences ) )
if top_char_confidences
else 0
)
2024-10-26 19:07:45 +02:00
# Check if we have a previously detected plate for this ID
if id in self . detected_license_plates :
prev_plate = self . detected_license_plates [ id ] [ " plate " ]
prev_char_confidences = self . detected_license_plates [ id ] [ " char_confidences " ]
2024-10-27 00:27:02 +02:00
prev_avg_confidence = (
( sum ( prev_char_confidences ) / len ( prev_char_confidences ) )
if prev_char_confidences
else 0
2024-10-26 19:07:45 +02:00
)
# Define conditions for keeping the previous plate
shorter_than_previous = len ( top_plate ) < len ( prev_plate )
lower_avg_confidence = avg_confidence < = prev_avg_confidence
# Compare character-by-character confidence where possible
min_length = min ( len ( top_plate ) , len ( prev_plate ) )
char_confidence_comparison = sum (
1
for i in range ( min_length )
if top_char_confidences [ i ] < = prev_char_confidences [ i ]
)
worse_char_confidences = char_confidence_comparison > = min_length / 2
if shorter_than_previous or (
lower_avg_confidence and worse_char_confidences
) :
logger . debug (
f " Keeping previous plate. New plate stats: "
f " length= { len ( top_plate ) } , avg_conf= { avg_confidence : .2f } "
f " vs Previous: length= { len ( prev_plate ) } , avg_conf= { prev_avg_confidence : .2f } "
)
return
# Check against minimum confidence threshold
if avg_confidence < self . lpr_config . threshold :
logger . debug (
f " Average confidence { avg_confidence } is less than threshold ( { self . lpr_config . threshold } ) "
)
return
# Determine subLabel based on known plates
# Default to the detected plate, use label name if there's a match
sub_label = top_plate
for label , plates in self . lpr_config . known_plates . items ( ) :
if top_plate in plates :
sub_label = label
break
# Send the result to the API
resp = requests . post (
f " { FRIGATE_LOCALHOST } /api/events/ { id } /sub_label " ,
json = {
" camera " : obj_data . get ( " camera " ) ,
" subLabel " : sub_label ,
" subLabelScore " : avg_confidence ,
} ,
)
if resp . status_code == 200 :
self . detected_license_plates [ id ] = {
" plate " : top_plate ,
" char_confidences " : top_char_confidences ,
}
2024-06-21 23:30:19 +02:00
def _create_thumbnail ( self , yuv_frame , box , height = 500 ) - > Optional [ bytes ] :
""" Return jpg thumbnail of a region of the frame. """
frame = cv2 . cvtColor ( yuv_frame , cv2 . COLOR_YUV2BGR_I420 )
region = calculate_region (
frame . shape , box [ 0 ] , box [ 1 ] , box [ 2 ] , box [ 3 ] , height , multiplier = 1.4
)
frame = frame [ region [ 1 ] : region [ 3 ] , region [ 0 ] : region [ 2 ] ]
width = int ( height * frame . shape [ 1 ] / frame . shape [ 0 ] )
frame = cv2 . resize ( frame , dsize = ( width , height ) , interpolation = cv2 . INTER_AREA )
ret , jpg = cv2 . imencode ( " .jpg " , frame , [ int ( cv2 . IMWRITE_JPEG_QUALITY ) , 100 ] )
if ret :
return jpg . tobytes ( )
return None
2024-10-07 22:30:45 +02:00
def _embed_thumbnail ( self , event_id : str , thumbnail : bytes ) - > None :
2024-06-21 23:30:19 +02:00
""" Embed the thumbnail for an event. """
2024-10-22 00:19:34 +02:00
self . embeddings . embed_thumbnail ( event_id , thumbnail )
2024-06-21 23:30:19 +02:00
2024-10-07 22:30:45 +02:00
def _embed_description ( self , event : Event , thumbnails : list [ bytes ] ) - > None :
2024-06-21 23:30:19 +02:00
""" Embed the description for an event. """
2024-09-16 16:46:11 +02:00
camera_config = self . config . cameras [ event . camera ]
2024-06-21 23:30:19 +02:00
2024-09-16 16:46:11 +02:00
description = self . genai_client . generate_description (
2024-10-12 14:19:24 +02:00
camera_config , thumbnails , event
2024-09-16 16:46:11 +02:00
)
2024-06-21 23:30:19 +02:00
2024-09-23 14:53:19 +02:00
if not description :
2024-06-21 23:30:19 +02:00
logger . debug ( " Failed to generate description for %s " , event . id )
return
# fire and forget description update
self . requestor . send_data (
UPDATE_EVENT_DESCRIPTION ,
2024-11-18 19:26:44 +01:00
{
" type " : TrackedObjectUpdateTypesEnum . description ,
" id " : event . id ,
" description " : description ,
} ,
2024-06-21 23:30:19 +02:00
)
2024-10-22 00:19:34 +02:00
# Embed the description
self . embeddings . embed_description ( event . id , description )
2024-06-21 23:30:19 +02:00
logger . debug (
" Generated description for %s ( %d images): %s " ,
event . id ,
len ( thumbnails ) ,
description ,
)
2024-09-24 16:14:51 +02:00
2024-09-30 23:54:53 +02:00
def handle_regenerate_description ( self , event_id : str , source : str ) - > None :
2024-09-24 16:14:51 +02:00
try :
event : Event = Event . get ( Event . id == event_id )
except DoesNotExist :
logger . error ( f " Event { event_id } not found for description regeneration " )
return
camera_config = self . config . cameras [ event . camera ]
if not camera_config . genai . enabled or self . genai_client is None :
logger . error ( f " GenAI not enabled for camera { event . camera } " )
return
thumbnail = base64 . b64decode ( event . thumbnail )
2024-10-01 15:01:45 +02:00
logger . debug (
f " Trying { source } regeneration for { event } , has_snapshot: { event . has_snapshot } "
)
2024-09-30 23:54:53 +02:00
if event . has_snapshot and source == " snapshot " :
with open (
os . path . join ( CLIPS_DIR , f " { event . camera } - { event . id } .jpg " ) ,
" rb " ,
) as image_file :
snapshot_image = image_file . read ( )
img = cv2 . imdecode (
np . frombuffer ( snapshot_image , dtype = np . int8 ) , cv2 . IMREAD_COLOR
)
# crop snapshot based on region before sending off to genai
height , width = img . shape [ : 2 ]
x1_rel , y1_rel , width_rel , height_rel = event . data [ " region " ]
x1 , y1 = int ( x1_rel * width ) , int ( y1_rel * height )
cropped_image = img [
y1 : y1 + int ( height_rel * height ) , x1 : x1 + int ( width_rel * width )
]
_ , buffer = cv2 . imencode ( " .jpg " , cropped_image )
snapshot_image = buffer . tobytes ( )
embed_image = (
[ snapshot_image ]
if event . has_snapshot and source == " snapshot "
else (
[ thumbnail for data in self . tracked_events [ event_id ] ]
if len ( self . tracked_events . get ( event_id , [ ] ) ) > 0
else [ thumbnail ]
)
)
2024-10-07 22:30:45 +02:00
self . _embed_description ( event , embed_image )