mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-02 00:07:11 +01:00
188 lines
6.6 KiB
Python
188 lines
6.6 KiB
Python
"""Record events for object, audio, etc. detections."""
|
|
|
|
import logging
|
|
import queue
|
|
import threading
|
|
from multiprocessing import Queue
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
from frigate.config import FrigateConfig
|
|
from frigate.events.maintainer import EventStateEnum, EventTypeEnum
|
|
from frigate.models import Timeline
|
|
from frigate.util.builtin import to_relative_box
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TimelineProcessor(threading.Thread):
|
|
"""Handle timeline queue and update DB."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
queue: Queue,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
super().__init__(name="timeline_processor")
|
|
self.config = config
|
|
self.queue = queue
|
|
self.stop_event = stop_event
|
|
self.pre_event_cache: dict[str, list[dict[str, any]]] = {}
|
|
|
|
def run(self) -> None:
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
(
|
|
camera,
|
|
input_type,
|
|
event_type,
|
|
prev_event_data,
|
|
event_data,
|
|
) = self.queue.get(timeout=1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if input_type == EventTypeEnum.tracked_object:
|
|
# None prev_event_data is only allowed for the start of an event
|
|
if event_type != EventStateEnum.start and prev_event_data is None:
|
|
continue
|
|
|
|
self.handle_object_detection(
|
|
camera, event_type, prev_event_data, event_data
|
|
)
|
|
elif input_type == EventTypeEnum.api:
|
|
self.handle_api_entry(camera, event_type, event_data)
|
|
|
|
def insert_or_save(
|
|
self,
|
|
entry: dict[str, any],
|
|
prev_event_data: dict[any, any],
|
|
event_data: dict[any, any],
|
|
) -> None:
|
|
"""Insert into db or cache."""
|
|
id = entry[Timeline.source_id]
|
|
if not event_data["has_clip"] and not event_data["has_snapshot"]:
|
|
# the related event has not been saved yet, should be added to cache
|
|
if id in self.pre_event_cache.keys():
|
|
self.pre_event_cache[id].append(entry)
|
|
else:
|
|
self.pre_event_cache[id] = [entry]
|
|
else:
|
|
# the event is saved, insert to db and insert cached into db
|
|
if id in self.pre_event_cache.keys():
|
|
for e in self.pre_event_cache[id]:
|
|
Timeline.insert(e).execute()
|
|
|
|
self.pre_event_cache.pop(id)
|
|
|
|
Timeline.insert(entry).execute()
|
|
|
|
def handle_object_detection(
|
|
self,
|
|
camera: str,
|
|
event_type: str,
|
|
prev_event_data: dict[any, any],
|
|
event_data: dict[any, any],
|
|
) -> bool:
|
|
"""Handle object detection."""
|
|
save = False
|
|
camera_config = self.config.cameras[camera]
|
|
event_id = event_data["id"]
|
|
|
|
timeline_entry = {
|
|
Timeline.timestamp: event_data["frame_time"],
|
|
Timeline.camera: camera,
|
|
Timeline.source: "tracked_object",
|
|
Timeline.source_id: event_id,
|
|
Timeline.data: {
|
|
"box": to_relative_box(
|
|
camera_config.detect.width,
|
|
camera_config.detect.height,
|
|
event_data["box"],
|
|
),
|
|
"label": event_data["label"],
|
|
"sub_label": event_data.get("sub_label"),
|
|
"region": to_relative_box(
|
|
camera_config.detect.width,
|
|
camera_config.detect.height,
|
|
event_data["region"],
|
|
),
|
|
"attribute": "",
|
|
},
|
|
}
|
|
|
|
# update sub labels for existing entries that haven't been added yet
|
|
if (
|
|
prev_event_data != None
|
|
and prev_event_data["sub_label"] != event_data["sub_label"]
|
|
and event_id in self.pre_event_cache.keys()
|
|
):
|
|
for e in self.pre_event_cache[event_id]:
|
|
e[Timeline.data]["sub_label"] = event_data["sub_label"]
|
|
|
|
if event_type == EventStateEnum.start:
|
|
timeline_entry[Timeline.class_type] = "visible"
|
|
save = True
|
|
elif event_type == EventStateEnum.update:
|
|
if (
|
|
len(prev_event_data["current_zones"]) < len(event_data["current_zones"])
|
|
and not event_data["stationary"]
|
|
):
|
|
timeline_entry[Timeline.class_type] = "entered_zone"
|
|
timeline_entry[Timeline.data]["zones"] = event_data["current_zones"]
|
|
save = True
|
|
elif prev_event_data["stationary"] != event_data["stationary"]:
|
|
timeline_entry[Timeline.class_type] = (
|
|
"stationary" if event_data["stationary"] else "active"
|
|
)
|
|
save = True
|
|
elif prev_event_data["attributes"] == {} and event_data["attributes"] != {}:
|
|
timeline_entry[Timeline.class_type] = "attribute"
|
|
timeline_entry[Timeline.data]["attribute"] = list(
|
|
event_data["attributes"].keys()
|
|
)[0]
|
|
save = True
|
|
elif event_type == EventStateEnum.end:
|
|
timeline_entry[Timeline.class_type] = "gone"
|
|
save = True
|
|
|
|
if save:
|
|
self.insert_or_save(timeline_entry, prev_event_data, event_data)
|
|
|
|
def handle_api_entry(
|
|
self,
|
|
camera: str,
|
|
event_type: str,
|
|
event_data: dict[any, any],
|
|
) -> bool:
|
|
if event_type != "new":
|
|
return False
|
|
|
|
if event_data.get("type", "api") == "audio":
|
|
timeline_entry = {
|
|
Timeline.class_type: "heard",
|
|
Timeline.timestamp: event_data["start_time"],
|
|
Timeline.camera: camera,
|
|
Timeline.source: "audio",
|
|
Timeline.source_id: event_data["id"],
|
|
Timeline.data: {
|
|
"label": event_data["label"],
|
|
"sub_label": event_data.get("sub_label"),
|
|
},
|
|
}
|
|
else:
|
|
timeline_entry = {
|
|
Timeline.class_type: "external",
|
|
Timeline.timestamp: event_data["start_time"],
|
|
Timeline.camera: camera,
|
|
Timeline.source: "api",
|
|
Timeline.source_id: event_data["id"],
|
|
Timeline.data: {
|
|
"label": event_data["label"],
|
|
"sub_label": event_data.get("sub_label"),
|
|
},
|
|
}
|
|
|
|
Timeline.insert(timeline_entry).execute()
|
|
return True
|