blakeblackshear.frigate/frigate/events.py

268 lines
11 KiB
Python

import datetime
import logging
import os
import queue
import threading
import time
from pathlib import Path
from peewee import fn
from frigate.config import EventsConfig, FrigateConfig, RecordConfig
from frigate.const import CLIPS_DIR
from frigate.models import Event
logger = logging.getLogger(__name__)
def should_update_db(prev_event, current_event):
return (
prev_event["top_score"] != current_event["top_score"]
or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["has_clip"] != current_event["has_clip"]
or prev_event["has_snapshot"] != current_event["has_snapshot"]
)
class EventProcessor(threading.Thread):
def __init__(
self, config, camera_processes, event_queue, event_processed_queue, stop_event
):
threading.Thread.__init__(self)
self.name = "event_processor"
self.config = config
self.camera_processes = camera_processes
self.cached_clips = {}
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue
self.events_in_process = {}
self.stop_event = stop_event
def run(self):
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
).execute()
while not self.stop_event.is_set():
try:
event_type, camera, event_data = self.event_queue.get(timeout=10)
except queue.Empty:
continue
logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
event_config: EventsConfig = self.config.cameras[camera].record.events
if event_type == "start":
self.events_in_process[event_data["id"]] = event_data
elif event_type == "update" and should_update_db(
self.events_in_process[event_data["id"]], event_data
):
self.events_in_process[event_data["id"]] = event_data
# TODO: this will generate a lot of db activity possibly
if event_data["has_clip"] or event_data["has_snapshot"]:
Event.replace(
id=event_data["id"],
label=event_data["label"],
camera=camera,
start_time=event_data["start_time"] - event_config.pre_capture,
end_time=None,
top_score=event_data["top_score"],
false_positive=event_data["false_positive"],
zones=list(event_data["entered_zones"]),
thumbnail=event_data["thumbnail"],
region=event_data["region"],
box=event_data["box"],
area=event_data["area"],
has_clip=event_data["has_clip"],
has_snapshot=event_data["has_snapshot"],
).execute()
elif event_type == "end":
if event_data["has_clip"] or event_data["has_snapshot"]:
Event.replace(
id=event_data["id"],
label=event_data["label"],
camera=camera,
start_time=event_data["start_time"] - event_config.pre_capture,
end_time=event_data["end_time"] + event_config.post_capture,
top_score=event_data["top_score"],
false_positive=event_data["false_positive"],
zones=list(event_data["entered_zones"]),
thumbnail=event_data["thumbnail"],
region=event_data["region"],
box=event_data["box"],
area=event_data["area"],
has_clip=event_data["has_clip"],
has_snapshot=event_data["has_snapshot"],
).execute()
del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera))
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")
class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event):
threading.Thread.__init__(self)
self.name = "event_cleanup"
self.config = config
self.stop_event = stop_event
self.camera_keys = list(self.config.cameras.keys())
def expire(self, media_type):
## Expire events from unlisted cameras based on the global config
if media_type == "clips":
retain_config = self.config.record.events.retain
file_extension = "mp4"
update_params = {"has_clip": False}
else:
retain_config = self.config.snapshots.retain
file_extension = "jpg"
update_params = {"has_snapshot": False}
distinct_labels = (
Event.select(Event.label)
.where(Event.camera.not_in(self.camera_keys))
.distinct()
)
# loop over object types in db
for l in distinct_labels:
# get expiration time for this label
expire_days = retain_config.objects.get(l.label, retain_config.default)
expire_after = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# grab all events after specific time
expired_events = Event.select().where(
Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
# delete the media from disk
for event in expired_events:
media_name = f"{event.camera}-{event.id}"
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
)
media_path.unlink(missing_ok=True)
if file_extension == "jpg":
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
)
media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry
update_query = Event.update(update_params).where(
Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
update_query.execute()
## Expire events from cameras based on the camera config
for name, camera in self.config.cameras.items():
if media_type == "clips":
retain_config = camera.record.events.retain
else:
retain_config = camera.snapshots.retain
# get distinct objects in database for this camera
distinct_labels = (
Event.select(Event.label).where(Event.camera == name).distinct()
)
# loop over object types in db
for l in distinct_labels:
# get expiration time for this label
expire_days = retain_config.objects.get(l.label, retain_config.default)
expire_after = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp()
# grab all events after specific time
expired_events = Event.select().where(
Event.camera == name,
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
# delete the grabbed clips from disk
for event in expired_events:
media_name = f"{event.camera}-{event.id}"
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
)
media_path.unlink(missing_ok=True)
if file_extension == "jpg":
media_path = Path(
f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
)
media_path.unlink(missing_ok=True)
# update the clips attribute for the db entry
update_query = Event.update(update_params).where(
Event.camera == name,
Event.start_time < expire_after,
Event.label == l.label,
Event.retain_indefinitely == False,
)
update_query.execute()
def purge_duplicates(self):
duplicate_query = """with grouped_events as (
select id,
label,
camera,
has_snapshot,
has_clip,
row_number() over (
partition by label, camera, round(start_time/5,0)*5
order by end_time-start_time desc
) as copy_number
from event
)
select distinct id, camera, has_snapshot, has_clip from grouped_events
where copy_number > 1;"""
duplicate_events = Event.raw(duplicate_query)
for event in duplicate_events:
logger.debug(f"Removing duplicate: {event.id}")
media_name = f"{event.camera}-{event.id}"
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
media_path.unlink(missing_ok=True)
media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
media_path.unlink(missing_ok=True)
(
Event.delete()
.where(Event.id << [event.id for event in duplicate_events])
.execute()
)
def run(self):
# only expire events every 5 minutes
while not self.stop_event.wait(300):
self.expire("clips")
self.expire("snapshots")
self.purge_duplicates()
# drop events from db where has_clip and has_snapshot are false
delete_query = Event.delete().where(
Event.has_clip == False, Event.has_snapshot == False
)
delete_query.execute()
logger.info(f"Exiting event cleanup...")