mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-07 00:06:57 +01:00
84a0827aee
Use config data classes to eliminate some of the boilerplate associated with setting up the configuration. In particular, using dataclasses removes a lot of the boilerplate around assigning properties to the object and allows these to be easily immutable by freezing them. In the case of simple, non-nested dataclasses, this also provides more convenient `asdict` helpers. To set this up, where previously the objects would be parsed from the config via the `__init__` method, create a `build` classmethod that does this and calls the dataclass initializer. Some of the objects are mutated at runtime, in particular some of the zones are mutated to set the color (this might be able to be refactored out) and some of the camera functionality can be enabled/disabled. Some of the configs with `enabled` properties don't seem to have mqtt hooks to be able to toggle this, in particular, the clips, snapshots, and detect can be toggled but rtmp and record configs do not, but all of these configs are still not frozen in case there is some other functionality I am missing. There are a couple other minor fixes here, one that was introduced by me recently where `max_seconds` was not defined, the other to properly `get()` the message payload when handling publishing mqtt messages sent via websocket.
414 lines
15 KiB
Python
414 lines
15 KiB
Python
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import queue
|
|
import subprocess as sp
|
|
import threading
|
|
import time
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
import psutil
|
|
import shutil
|
|
|
|
from frigate.config import FrigateConfig
|
|
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
|
|
from frigate.models import Event
|
|
|
|
from peewee import fn
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventProcessor(threading.Thread):
|
|
def __init__(
|
|
self, config, camera_processes, event_queue, event_processed_queue, stop_event
|
|
):
|
|
threading.Thread.__init__(self)
|
|
self.name = "event_processor"
|
|
self.config = config
|
|
self.camera_processes = camera_processes
|
|
self.cached_clips = {}
|
|
self.event_queue = event_queue
|
|
self.event_processed_queue = event_processed_queue
|
|
self.events_in_process = {}
|
|
self.stop_event = stop_event
|
|
|
|
def should_create_clip(self, camera, event_data):
|
|
if event_data["false_positive"]:
|
|
return False
|
|
|
|
# if there are required zones and there is no overlap
|
|
required_zones = self.config.cameras[camera].clips.required_zones
|
|
if len(required_zones) > 0 and not set(event_data["entered_zones"]) & set(
|
|
required_zones
|
|
):
|
|
logger.debug(
|
|
f"Not creating clip for {event_data['id']} because it did not enter required zones"
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
def refresh_cache(self):
|
|
cached_files = os.listdir(CACHE_DIR)
|
|
|
|
files_in_use = []
|
|
for process in psutil.process_iter():
|
|
try:
|
|
if process.name() != "ffmpeg":
|
|
continue
|
|
|
|
flist = process.open_files()
|
|
if flist:
|
|
for nt in flist:
|
|
if nt.path.startswith(CACHE_DIR):
|
|
files_in_use.append(nt.path.split("/")[-1])
|
|
except:
|
|
continue
|
|
|
|
for f in cached_files:
|
|
if f in files_in_use or f in self.cached_clips:
|
|
continue
|
|
|
|
basename = os.path.splitext(f)[0]
|
|
camera, date = basename.rsplit("-", maxsplit=1)
|
|
start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
|
|
|
|
ffprobe_cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
f"{os.path.join(CACHE_DIR, f)}",
|
|
]
|
|
p = sp.run(ffprobe_cmd, capture_output=True)
|
|
if p.returncode == 0:
|
|
duration = float(p.stdout.decode().strip())
|
|
else:
|
|
logger.info(f"bad file: {f}")
|
|
os.remove(os.path.join(CACHE_DIR, f))
|
|
continue
|
|
|
|
self.cached_clips[f] = {
|
|
"path": f,
|
|
"camera": camera,
|
|
"start_time": start_time.timestamp(),
|
|
"duration": duration,
|
|
}
|
|
|
|
if len(self.events_in_process) > 0:
|
|
earliest_event = min(
|
|
self.events_in_process.values(), key=lambda x: x["start_time"]
|
|
)["start_time"]
|
|
else:
|
|
earliest_event = datetime.datetime.now().timestamp()
|
|
|
|
# if the earliest event is more tha max seconds ago, cap it
|
|
max_seconds = self.config.clips.max_seconds
|
|
earliest_event = max(
|
|
earliest_event,
|
|
datetime.datetime.now().timestamp() - self.config.clips.max_seconds,
|
|
)
|
|
|
|
for f, data in list(self.cached_clips.items()):
|
|
if earliest_event - 90 > data["start_time"] + data["duration"]:
|
|
del self.cached_clips[f]
|
|
logger.debug(f"Cleaning up cached file {f}")
|
|
os.remove(os.path.join(CACHE_DIR, f))
|
|
|
|
# if we are still using more than 90% of the cache, proactively cleanup
|
|
cache_usage = shutil.disk_usage("/tmp/cache")
|
|
if (
|
|
cache_usage.used / cache_usage.total > 0.9
|
|
and cache_usage.free < 200000000
|
|
and len(self.cached_clips) > 0
|
|
):
|
|
logger.warning("More than 90% of the cache is used.")
|
|
logger.warning(
|
|
"Consider increasing space available at /tmp/cache or reducing max_seconds in your clips config."
|
|
)
|
|
logger.warning("Proactively cleaning up the cache...")
|
|
while cache_usage.used / cache_usage.total > 0.9:
|
|
oldest_clip = min(
|
|
self.cached_clips.values(), key=lambda x: x["start_time"]
|
|
)
|
|
del self.cached_clips[oldest_clip["path"]]
|
|
os.remove(os.path.join(CACHE_DIR, oldest_clip["path"]))
|
|
cache_usage = shutil.disk_usage("/tmp/cache")
|
|
|
|
def create_clip(self, camera, event_data, pre_capture, post_capture):
|
|
# get all clips from the camera with the event sorted
|
|
sorted_clips = sorted(
|
|
[c for c in self.cached_clips.values() if c["camera"] == camera],
|
|
key=lambda i: i["start_time"],
|
|
)
|
|
|
|
# if there are no clips in the cache or we are still waiting on a needed file check every 5 seconds
|
|
wait_count = 0
|
|
while (
|
|
len(sorted_clips) == 0
|
|
or sorted_clips[-1]["start_time"] + sorted_clips[-1]["duration"]
|
|
< event_data["end_time"] + post_capture
|
|
):
|
|
if wait_count > 4:
|
|
logger.warning(
|
|
f"Unable to create clip for {camera} and event {event_data['id']}. There were no cache files for this event."
|
|
)
|
|
return False
|
|
logger.debug(f"No cache clips for {camera}. Waiting...")
|
|
time.sleep(5)
|
|
self.refresh_cache()
|
|
# get all clips from the camera with the event sorted
|
|
sorted_clips = sorted(
|
|
[c for c in self.cached_clips.values() if c["camera"] == camera],
|
|
key=lambda i: i["start_time"],
|
|
)
|
|
wait_count += 1
|
|
|
|
playlist_start = event_data["start_time"] - pre_capture
|
|
playlist_end = event_data["end_time"] + post_capture
|
|
playlist_lines = []
|
|
for clip in sorted_clips:
|
|
# clip ends before playlist start time, skip
|
|
if clip["start_time"] + clip["duration"] < playlist_start:
|
|
continue
|
|
# clip starts after playlist ends, finish
|
|
if clip["start_time"] > playlist_end:
|
|
break
|
|
playlist_lines.append(f"file '{os.path.join(CACHE_DIR,clip['path'])}'")
|
|
# if this is the starting clip, add an inpoint
|
|
if clip["start_time"] < playlist_start:
|
|
playlist_lines.append(
|
|
f"inpoint {int(playlist_start-clip['start_time'])}"
|
|
)
|
|
# if this is the ending clip, add an outpoint
|
|
if clip["start_time"] + clip["duration"] > playlist_end:
|
|
playlist_lines.append(
|
|
f"outpoint {int(playlist_end-clip['start_time'])}"
|
|
)
|
|
|
|
clip_name = f"{camera}-{event_data['id']}"
|
|
ffmpeg_cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-protocol_whitelist",
|
|
"pipe,file",
|
|
"-f",
|
|
"concat",
|
|
"-safe",
|
|
"0",
|
|
"-i",
|
|
"-",
|
|
"-c",
|
|
"copy",
|
|
"-movflags",
|
|
"+faststart",
|
|
f"{os.path.join(CLIPS_DIR, clip_name)}.mp4",
|
|
]
|
|
|
|
p = sp.run(
|
|
ffmpeg_cmd,
|
|
input="\n".join(playlist_lines),
|
|
encoding="ascii",
|
|
capture_output=True,
|
|
)
|
|
if p.returncode != 0:
|
|
logger.error(p.stderr)
|
|
return False
|
|
return True
|
|
|
|
def run(self):
|
|
while not self.stop_event.is_set():
|
|
try:
|
|
event_type, camera, event_data = self.event_queue.get(timeout=10)
|
|
except queue.Empty:
|
|
if not self.stop_event.is_set():
|
|
self.refresh_cache()
|
|
continue
|
|
|
|
logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
|
|
self.refresh_cache()
|
|
|
|
if event_type == "start":
|
|
self.events_in_process[event_data["id"]] = event_data
|
|
|
|
if event_type == "end":
|
|
clips_config = self.config.cameras[camera].clips
|
|
|
|
clip_created = False
|
|
if self.should_create_clip(camera, event_data):
|
|
if clips_config.enabled and (
|
|
clips_config.objects is None
|
|
or event_data["label"] in clips_config.objects
|
|
):
|
|
clip_created = self.create_clip(
|
|
camera,
|
|
event_data,
|
|
clips_config.pre_capture,
|
|
clips_config.post_capture,
|
|
)
|
|
|
|
if clip_created or event_data["has_snapshot"]:
|
|
Event.create(
|
|
id=event_data["id"],
|
|
label=event_data["label"],
|
|
camera=camera,
|
|
start_time=event_data["start_time"],
|
|
end_time=event_data["end_time"],
|
|
top_score=event_data["top_score"],
|
|
false_positive=event_data["false_positive"],
|
|
zones=list(event_data["entered_zones"]),
|
|
thumbnail=event_data["thumbnail"],
|
|
has_clip=clip_created,
|
|
has_snapshot=event_data["has_snapshot"],
|
|
)
|
|
del self.events_in_process[event_data["id"]]
|
|
self.event_processed_queue.put((event_data["id"], camera))
|
|
|
|
logger.info(f"Exiting event processor...")
|
|
|
|
|
|
class EventCleanup(threading.Thread):
|
|
def __init__(self, config: FrigateConfig, stop_event):
|
|
threading.Thread.__init__(self)
|
|
self.name = "event_cleanup"
|
|
self.config = config
|
|
self.stop_event = stop_event
|
|
self.camera_keys = list(self.config.cameras.keys())
|
|
|
|
def expire(self, media):
|
|
## Expire events from unlisted cameras based on the global config
|
|
if media == "clips":
|
|
retain_config = self.config.clips.retain
|
|
file_extension = "mp4"
|
|
update_params = {"has_clip": False}
|
|
else:
|
|
retain_config = self.config.snapshots.retain
|
|
file_extension = "jpg"
|
|
update_params = {"has_snapshot": False}
|
|
|
|
distinct_labels = (
|
|
Event.select(Event.label)
|
|
.where(Event.camera.not_in(self.camera_keys))
|
|
.distinct()
|
|
)
|
|
|
|
# loop over object types in db
|
|
for l in distinct_labels:
|
|
# get expiration time for this label
|
|
expire_days = retain_config.objects.get(l.label, retain_config.default)
|
|
expire_after = (
|
|
datetime.datetime.now() - datetime.timedelta(days=expire_days)
|
|
).timestamp()
|
|
# grab all events after specific time
|
|
expired_events = Event.select().where(
|
|
Event.camera.not_in(self.camera_keys),
|
|
Event.start_time < expire_after,
|
|
Event.label == l.label,
|
|
)
|
|
# delete the media from disk
|
|
for event in expired_events:
|
|
media_name = f"{event.camera}-{event.id}"
|
|
media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}")
|
|
media.unlink(missing_ok=True)
|
|
# update the clips attribute for the db entry
|
|
update_query = Event.update(update_params).where(
|
|
Event.camera.not_in(self.camera_keys),
|
|
Event.start_time < expire_after,
|
|
Event.label == l.label,
|
|
)
|
|
update_query.execute()
|
|
|
|
## Expire events from cameras based on the camera config
|
|
for name, camera in self.config.cameras.items():
|
|
if media == "clips":
|
|
retain_config = camera.clips.retain
|
|
else:
|
|
retain_config = camera.snapshots.retain
|
|
# get distinct objects in database for this camera
|
|
distinct_labels = (
|
|
Event.select(Event.label).where(Event.camera == name).distinct()
|
|
)
|
|
|
|
# loop over object types in db
|
|
for l in distinct_labels:
|
|
# get expiration time for this label
|
|
expire_days = retain_config.objects.get(l.label, retain_config.default)
|
|
expire_after = (
|
|
datetime.datetime.now() - datetime.timedelta(days=expire_days)
|
|
).timestamp()
|
|
# grab all events after specific time
|
|
expired_events = Event.select().where(
|
|
Event.camera == name,
|
|
Event.start_time < expire_after,
|
|
Event.label == l.label,
|
|
)
|
|
# delete the grabbed clips from disk
|
|
for event in expired_events:
|
|
media_name = f"{event.camera}-{event.id}"
|
|
media = Path(
|
|
f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
|
|
)
|
|
media.unlink(missing_ok=True)
|
|
# update the clips attribute for the db entry
|
|
update_query = Event.update(update_params).where(
|
|
Event.camera == name,
|
|
Event.start_time < expire_after,
|
|
Event.label == l.label,
|
|
)
|
|
update_query.execute()
|
|
|
|
def purge_duplicates(self):
|
|
duplicate_query = """with grouped_events as (
|
|
select id,
|
|
label,
|
|
camera,
|
|
has_snapshot,
|
|
has_clip,
|
|
row_number() over (
|
|
partition by label, camera, round(start_time/5,0)*5
|
|
order by end_time-start_time desc
|
|
) as copy_number
|
|
from event
|
|
)
|
|
|
|
select distinct id, camera, has_snapshot, has_clip from grouped_events
|
|
where copy_number > 1;"""
|
|
|
|
duplicate_events = Event.raw(duplicate_query)
|
|
for event in duplicate_events:
|
|
logger.debug(f"Removing duplicate: {event.id}")
|
|
media_name = f"{event.camera}-{event.id}"
|
|
if event.has_snapshot:
|
|
media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
|
|
media.unlink(missing_ok=True)
|
|
if event.has_clip:
|
|
media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
|
|
media.unlink(missing_ok=True)
|
|
|
|
(
|
|
Event.delete()
|
|
.where(Event.id << [event.id for event in duplicate_events])
|
|
.execute()
|
|
)
|
|
|
|
def run(self):
|
|
# only expire events every 5 minutes
|
|
while not self.stop_event.wait(300):
|
|
self.expire("clips")
|
|
self.expire("snapshots")
|
|
self.purge_duplicates()
|
|
|
|
# drop events from db where has_clip and has_snapshot are false
|
|
delete_query = Event.delete().where(
|
|
Event.has_clip == False, Event.has_snapshot == False
|
|
)
|
|
delete_query.execute()
|
|
|
|
logger.info(f"Exiting event cleanup...")
|