Refactor events to be more generic (#6320)

* Organize event table to be more generalized

* Add appropriate fields to data

* Move tracked object logic to own function

* Add source type to event queue

* rename enum

* Fix types that are used in webUI

* remove redundant

* Formatting

* fix typing

* Rename enum
This commit is contained in:
Nicolas Mowen 2023-04-30 11:07:14 -06:00 committed by GitHub
parent ca7790ff65
commit ad52e238ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 120 deletions

View File

@ -3,6 +3,8 @@ import logging
import os import os
import queue import queue
import threading import threading
from enum import Enum
from pathlib import Path from pathlib import Path
from peewee import fn from peewee import fn
@ -10,7 +12,6 @@ from peewee import fn
from frigate.config import EventsConfig, FrigateConfig from frigate.config import EventsConfig, FrigateConfig
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.models import Event from frigate.models import Event
from frigate.timeline import TimelineSourceEnum
from frigate.types import CameraMetricsTypes from frigate.types import CameraMetricsTypes
from frigate.util import to_relative_box from frigate.util import to_relative_box
@ -21,6 +22,12 @@ from typing import Dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class EventTypeEnum(str, Enum):
# api = "api"
# audio = "audio"
tracked_object = "tracked_object"
def should_update_db(prev_event: Event, current_event: Event) -> bool: def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot).""" """If current_event has updated fields and (clip or snapshot)."""
if current_event["has_clip"] or current_event["has_snapshot"]: if current_event["has_clip"] or current_event["has_snapshot"]:
@ -66,7 +73,9 @@ class EventProcessor(threading.Thread):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
event_type, camera, event_data = self.event_queue.get(timeout=1) source_type, event_type, camera, event_data = self.event_queue.get(
timeout=1
)
except queue.Empty: except queue.Empty:
continue continue
@ -75,18 +84,34 @@ class EventProcessor(threading.Thread):
self.timeline_queue.put( self.timeline_queue.put(
( (
camera, camera,
TimelineSourceEnum.tracked_object, source_type,
event_type, event_type,
self.events_in_process.get(event_data["id"]), self.events_in_process.get(event_data["id"]),
event_data, event_data,
) )
) )
# if this is the first message, just store it and continue, its not time to insert it in the db if source_type == EventTypeEnum.tracked_object:
if event_type == "start": if event_type == "start":
self.events_in_process[event_data["id"]] = event_data self.events_in_process[event_data["id"]] = event_data
continue continue
self.handle_object_detection(event_type, camera, event_data)
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")
def handle_object_detection(
self,
event_type: str,
camera: str,
event_data: Event,
) -> None:
"""handle tracked object event updates."""
# if this is the first message, just store it and continue, its not time to insert it in the db
if should_update_db(self.events_in_process[event_data["id"]], event_data): if should_update_db(self.events_in_process[event_data["id"]], event_data):
camera_config = self.config.cameras[camera] camera_config = self.config.cameras[camera]
event_config: EventsConfig = camera_config.record.events event_config: EventsConfig = camera_config.record.events
@ -141,17 +166,19 @@ class EventProcessor(threading.Thread):
Event.camera: camera, Event.camera: camera,
Event.start_time: start_time, Event.start_time: start_time,
Event.end_time: end_time, Event.end_time: end_time,
Event.top_score: event_data["top_score"],
Event.score: score,
Event.zones: list(event_data["entered_zones"]), Event.zones: list(event_data["entered_zones"]),
Event.thumbnail: event_data["thumbnail"], Event.thumbnail: event_data["thumbnail"],
Event.region: region,
Event.box: box,
Event.has_clip: event_data["has_clip"], Event.has_clip: event_data["has_clip"],
Event.has_snapshot: event_data["has_snapshot"], Event.has_snapshot: event_data["has_snapshot"],
Event.model_hash: first_detector.model.model_hash, Event.model_hash: first_detector.model.model_hash,
Event.model_type: first_detector.model.model_type, Event.model_type: first_detector.model.model_type,
Event.detector_type: first_detector.type, Event.detector_type: first_detector.type,
Event.data: {
"box": box,
"region": region,
"score": score,
"top_score": event_data["top_score"],
},
} }
( (
@ -170,12 +197,6 @@ class EventProcessor(threading.Thread):
del self.events_in_process[event_data["id"]] del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera)) self.event_processed_queue.put((event_data["id"], camera))
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")
class EventCleanup(threading.Thread): class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent): def __init__(self, config: FrigateConfig, stop_event: MpEvent):

View File

@ -44,7 +44,6 @@ from frigate.util import (
restart_frigate, restart_frigate,
vainfo_hwaccel, vainfo_hwaccel,
get_tz_modifiers, get_tz_modifiers,
to_relative_box,
) )
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.version import VERSION from frigate.version import VERSION
@ -196,7 +195,7 @@ def send_to_plus(id):
return make_response(jsonify({"success": False, "message": message}), 404) return make_response(jsonify({"success": False, "message": message}), 404)
# events from before the conversion to relative dimensions cant include annotations # events from before the conversion to relative dimensions cant include annotations
if any(d > 1 for d in event.box): if any(d > 1 for d in event.data["box"]):
include_annotation = None include_annotation = None
if event.end_time is None: if event.end_time is None:
@ -252,8 +251,8 @@ def send_to_plus(id):
event.save() event.save()
if not include_annotation is None: if not include_annotation is None:
region = event.region region = event.data["region"]
box = event.box box = event.data["box"]
try: try:
current_app.plus_api.add_annotation( current_app.plus_api.add_annotation(
@ -294,7 +293,7 @@ def false_positive(id):
return make_response(jsonify({"success": False, "message": message}), 404) return make_response(jsonify({"success": False, "message": message}), 404)
# events from before the conversion to relative dimensions cant include annotations # events from before the conversion to relative dimensions cant include annotations
if any(d > 1 for d in event.box): if any(d > 1 for d in event.data["box"]):
message = f"Events prior to 0.13 cannot be submitted as false positives" message = f"Events prior to 0.13 cannot be submitted as false positives"
logger.error(message) logger.error(message)
return make_response(jsonify({"success": False, "message": message}), 400) return make_response(jsonify({"success": False, "message": message}), 400)
@ -311,11 +310,15 @@ def false_positive(id):
# need to refetch the event now that it has a plus_id # need to refetch the event now that it has a plus_id
event = Event.get(Event.id == id) event = Event.get(Event.id == id)
region = event.region region = event.data["region"]
box = event.box box = event.data["box"]
# provide top score if score is unavailable # provide top score if score is unavailable
score = event.top_score if event.score is None else event.score score = (
(event.data["top_score"] if event.data["top_score"] else event.top_score)
if event.data["score"] is None
else event.data["score"]
)
try: try:
current_app.plus_api.add_false_positive( current_app.plus_api.add_false_positive(
@ -756,6 +759,7 @@ def events():
Event.top_score, Event.top_score,
Event.false_positive, Event.false_positive,
Event.box, Event.box,
Event.data,
] ]
if camera != "all": if camera != "all":

View File

@ -18,22 +18,33 @@ class Event(Model): # type: ignore[misc]
camera = CharField(index=True, max_length=20) camera = CharField(index=True, max_length=20)
start_time = DateTimeField() start_time = DateTimeField()
end_time = DateTimeField() end_time = DateTimeField()
top_score = FloatField() top_score = (
score = FloatField() FloatField()
) # TODO remove when columns can be dropped without rebuilding table
score = (
FloatField()
) # TODO remove when columns can be dropped without rebuilding table
false_positive = BooleanField() false_positive = BooleanField()
zones = JSONField() zones = JSONField()
thumbnail = TextField() thumbnail = TextField()
has_clip = BooleanField(default=True) has_clip = BooleanField(default=True)
has_snapshot = BooleanField(default=True) has_snapshot = BooleanField(default=True)
region = JSONField() region = (
box = JSONField() JSONField()
area = IntegerField() ) # TODO remove when columns can be dropped without rebuilding table
box = (
JSONField()
) # TODO remove when columns can be dropped without rebuilding table
area = (
IntegerField()
) # TODO remove when columns can be dropped without rebuilding table
retain_indefinitely = BooleanField(default=False) retain_indefinitely = BooleanField(default=False)
ratio = FloatField(default=1.0) ratio = FloatField(default=1.0)
plus_id = CharField(max_length=30) plus_id = CharField(max_length=30)
model_hash = CharField(max_length=32) model_hash = CharField(max_length=32)
detector_type = CharField(max_length=32) detector_type = CharField(max_length=32)
model_type = CharField(max_length=32) model_type = CharField(max_length=32)
data = JSONField() # ex: tracked object box, region, etc.
class Timeline(Model): # type: ignore[misc] class Timeline(Model): # type: ignore[misc]

View File

@ -21,6 +21,7 @@ from frigate.config import (
FrigateConfig, FrigateConfig,
) )
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.events import EventTypeEnum
from frigate.util import ( from frigate.util import (
SharedMemoryFrameManager, SharedMemoryFrameManager,
calculate_region, calculate_region,
@ -656,7 +657,9 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {} self.last_motion_detected: dict[str, float] = {}
def start(camera, obj: TrackedObject, current_frame_time): def start(camera, obj: TrackedObject, current_frame_time):
self.event_queue.put(("start", camera, obj.to_dict())) self.event_queue.put(
(EventTypeEnum.tracked_object, "start", camera, obj.to_dict())
)
def update(camera, obj: TrackedObject, current_frame_time): def update(camera, obj: TrackedObject, current_frame_time):
obj.has_snapshot = self.should_save_snapshot(camera, obj) obj.has_snapshot = self.should_save_snapshot(camera, obj)
@ -670,7 +673,12 @@ class TrackedObjectProcessor(threading.Thread):
self.dispatcher.publish("events", json.dumps(message), retain=False) self.dispatcher.publish("events", json.dumps(message), retain=False)
obj.previous = after obj.previous = after
self.event_queue.put( self.event_queue.put(
("update", camera, obj.to_dict(include_thumbnail=True)) (
EventTypeEnum.tracked_object,
"update",
camera,
obj.to_dict(include_thumbnail=True),
)
) )
def end(camera, obj: TrackedObject, current_frame_time): def end(camera, obj: TrackedObject, current_frame_time):
@ -722,7 +730,14 @@ class TrackedObjectProcessor(threading.Thread):
} }
self.dispatcher.publish("events", json.dumps(message), retain=False) self.dispatcher.publish("events", json.dumps(message), retain=False)
self.event_queue.put(("end", camera, obj.to_dict(include_thumbnail=True))) self.event_queue.put(
(
EventTypeEnum.tracked_object,
"end",
camera,
obj.to_dict(include_thumbnail=True),
)
)
def snapshot(camera, obj: TrackedObject, current_frame_time): def snapshot(camera, obj: TrackedObject, current_frame_time):
mqtt_config: MqttConfig = self.config.cameras[camera].mqtt mqtt_config: MqttConfig = self.config.cameras[camera].mqtt

View File

@ -4,9 +4,8 @@ import logging
import threading import threading
import queue import queue
from enum import Enum
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.events import EventTypeEnum
from frigate.models import Timeline from frigate.models import Timeline
from multiprocessing.queues import Queue from multiprocessing.queues import Queue
@ -17,12 +16,6 @@ from frigate.util import to_relative_box
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TimelineSourceEnum(str, Enum):
# api = "api"
# audio = "audio"
tracked_object = "tracked_object"
class TimelineProcessor(threading.Thread): class TimelineProcessor(threading.Thread):
"""Handle timeline queue and update DB.""" """Handle timeline queue and update DB."""
@ -51,7 +44,7 @@ class TimelineProcessor(threading.Thread):
except queue.Empty: except queue.Empty:
continue continue
if input_type == TimelineSourceEnum.tracked_object: if input_type == EventTypeEnum.tracked_object:
self.handle_object_detection( self.handle_object_detection(
camera, event_type, prev_event_data, event_data camera, event_type, prev_event_data, event_data
) )

View File

@ -0,0 +1,49 @@
"""Peewee migrations
Some examples (model - class or model name)::
> Model = migrator.orm['model_name'] # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.python(func, *args, **kwargs) # Run python code
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.drop_index(model, *col_names)
> migrator.add_not_null(model, *field_names)
> migrator.drop_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
"""
import datetime as dt
import peewee as pw
from playhouse.sqlite_ext import *
from decimal import ROUND_HALF_EVEN
from frigate.models import Event
try:
import playhouse.postgres_ext as pw_pext
except ImportError:
pass
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.drop_not_null(
Event, "top_score", "score", "region", "box", "area", "ratio"
)
migrator.add_fields(
Event,
data=JSONField(default={}),
)
def rollback(migrator, database, fake=False, **kwargs):
pass

View File

@ -163,7 +163,9 @@ export function EventCard({ camera, event }) {
<div className="text-xs md:text-normal text-gray-300">Start: {format(start, 'HH:mm:ss')}</div> <div className="text-xs md:text-normal text-gray-300">Start: {format(start, 'HH:mm:ss')}</div>
<div className="text-xs md:text-normal text-gray-300">Duration: {duration}</div> <div className="text-xs md:text-normal text-gray-300">Duration: {duration}</div>
</div> </div>
<div className="text-lg text-white text-right leading-tight">{(event.top_score * 100).toFixed(1)}%</div> <div className="text-lg text-white text-right leading-tight">
{((event?.data?.top_score || event.top_score) * 100).toFixed(1)}%
</div>
</div> </div>
</div> </div>
</div> </div>

View File

@ -206,7 +206,7 @@ export default function Events({ path, ...props }) {
e.stopPropagation(); e.stopPropagation();
setDownloadEvent((_prev) => ({ setDownloadEvent((_prev) => ({
id: event.id, id: event.id,
box: event.box, box: event?.data?.box || event.box,
label: event.label, label: event.label,
has_clip: event.has_clip, has_clip: event.has_clip,
has_snapshot: event.has_snapshot, has_snapshot: event.has_snapshot,
@ -599,7 +599,7 @@ export default function Events({ path, ...props }) {
{event.sub_label {event.sub_label
? `${event.label.replaceAll('_', ' ')}: ${event.sub_label.replaceAll('_', ' ')}` ? `${event.label.replaceAll('_', ' ')}: ${event.sub_label.replaceAll('_', ' ')}`
: event.label.replaceAll('_', ' ')} : event.label.replaceAll('_', ' ')}
({(event.top_score * 100).toFixed(0)}%) ({((event?.data?.top_score || event.top_score) * 100).toFixed(0)}%)
</div> </div>
<div className="text-sm flex"> <div className="text-sm flex">
<Clock className="h-5 w-5 mr-2 inline" /> <Clock className="h-5 w-5 mr-2 inline" />
@ -638,7 +638,9 @@ export default function Events({ path, ...props }) {
<Button <Button
color="gray" color="gray"
disabled={uploading.includes(event.id)} disabled={uploading.includes(event.id)}
onClick={(e) => showSubmitToPlus(event.id, event.label, event.box, e)} onClick={(e) =>
showSubmitToPlus(event.id, event.label, event?.data?.box || event.box, e)
}
> >
{uploading.includes(event.id) ? 'Uploading...' : 'Send to Frigate+'} {uploading.includes(event.id) ? 'Uploading...' : 'Send to Frigate+'}
</Button> </Button>
@ -680,7 +682,9 @@ export default function Events({ path, ...props }) {
<div> <div>
<TimelineSummary <TimelineSummary
event={event} event={event}
onFrameSelected={(frame, seekSeconds) => onEventFrameSelected(event, frame, seekSeconds)} onFrameSelected={(frame, seekSeconds) =>
onEventFrameSelected(event, frame, seekSeconds)
}
/> />
<div> <div>
<VideoPlayer <VideoPlayer
@ -738,7 +742,9 @@ export default function Events({ path, ...props }) {
? `${apiHost}/api/events/${event.id}/snapshot.jpg` ? `${apiHost}/api/events/${event.id}/snapshot.jpg`
: `${apiHost}/api/events/${event.id}/thumbnail.jpg` : `${apiHost}/api/events/${event.id}/thumbnail.jpg`
} }
alt={`${event.label} at ${(event.top_score * 100).toFixed(0)}% confidence`} alt={`${event.label} at ${((event?.data?.top_score || event.top_score) * 100).toFixed(
0
)}% confidence`}
/> />
</div> </div>
) : null} ) : null}