only save recordings when an event is in progress

This commit is contained in:
Blake Blackshear 2021-10-23 16:18:13 -05:00
parent 61c62d4685
commit b63c56d810
7 changed files with 215 additions and 65 deletions

View File

@ -30,6 +30,11 @@ class EventProcessor(threading.Thread):
self.stop_event = stop_event self.stop_event = stop_event
def run(self): def run(self):
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
).execute()
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
event_type, camera, event_data = self.event_queue.get(timeout=10) event_type, camera, event_data = self.event_queue.get(timeout=10)
@ -38,14 +43,35 @@ class EventProcessor(threading.Thread):
logger.debug(f"Event received: {event_type} {camera} {event_data['id']}") logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
event_config: EventsConfig = self.config.cameras[camera].record.events
if event_type == "start": if event_type == "start":
self.events_in_process[event_data["id"]] = event_data self.events_in_process[event_data["id"]] = event_data
if event_type == "end": elif event_type == "update":
event_config: EventsConfig = self.config.cameras[camera].record.events self.events_in_process[event_data["id"]] = event_data
# TODO: this will generate a lot of db activity possibly
if event_data["has_clip"] or event_data["has_snapshot"]: if event_data["has_clip"] or event_data["has_snapshot"]:
Event.create( Event.replace(
id=event_data["id"],
label=event_data["label"],
camera=camera,
start_time=event_data["start_time"] - event_config.pre_capture,
end_time=None,
top_score=event_data["top_score"],
false_positive=event_data["false_positive"],
zones=list(event_data["entered_zones"]),
thumbnail=event_data["thumbnail"],
region=event_data["region"],
box=event_data["box"],
area=event_data["area"],
has_clip=event_data["has_clip"],
has_snapshot=event_data["has_snapshot"],
).execute()
elif event_type == "end":
if event_data["has_clip"] or event_data["has_snapshot"]:
Event.replace(
id=event_data["id"], id=event_data["id"],
label=event_data["label"], label=event_data["label"],
camera=camera, camera=camera,
@ -60,11 +86,15 @@ class EventProcessor(threading.Thread):
area=event_data["area"], area=event_data["area"],
has_clip=event_data["has_clip"], has_clip=event_data["has_clip"],
has_snapshot=event_data["has_snapshot"], has_snapshot=event_data["has_snapshot"],
) ).execute()
del self.events_in_process[event_data["id"]] del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera)) self.event_processed_queue.put((event_data["id"], camera))
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...") logger.info(f"Exiting event processor...")

View File

@ -190,7 +190,7 @@ def event_snapshot(id):
download = request.args.get("download", type=bool) download = request.args.get("download", type=bool)
jpg_bytes = None jpg_bytes = None
try: try:
event = Event.get(Event.id == id) event = Event.get(Event.id == id, Event.end_time != None)
if not event.has_snapshot: if not event.has_snapshot:
return "Snapshot not available", 404 return "Snapshot not available", 404
# read snapshot from disk # read snapshot from disk
@ -697,7 +697,10 @@ def vod_event(id):
clip_path = os.path.join(CLIPS_DIR, f"{event.camera}-{id}.mp4") clip_path = os.path.join(CLIPS_DIR, f"{event.camera}-{id}.mp4")
if not os.path.isfile(clip_path): if not os.path.isfile(clip_path):
return vod_ts(event.camera, event.start_time, event.end_time) end_ts = (
datetime.now().timestamp() if event.end_time is None else event.end_time
)
return vod_ts(event.camera, event.start_time, end_ts)
duration = int((event.end_time - event.start_time) * 1000) duration = int((event.end_time - event.start_time) * 1000)
return jsonify( return jsonify(

View File

@ -603,6 +603,8 @@ class TrackedObjectProcessor(threading.Thread):
self.event_queue.put(("start", camera, obj.to_dict())) self.event_queue.put(("start", camera, obj.to_dict()))
def update(camera, obj: TrackedObject, current_frame_time): def update(camera, obj: TrackedObject, current_frame_time):
obj.has_snapshot = self.should_save_snapshot(camera, obj)
obj.has_clip = self.should_retain_recording(camera, obj)
after = obj.to_dict() after = obj.to_dict()
message = { message = {
"before": obj.previous, "before": obj.previous,
@ -613,6 +615,9 @@ class TrackedObjectProcessor(threading.Thread):
f"{self.topic_prefix}/events", json.dumps(message), retain=False f"{self.topic_prefix}/events", json.dumps(message), retain=False
) )
obj.previous = after obj.previous = after
self.event_queue.put(
("update", camera, obj.to_dict(include_thumbnail=True))
)
def end(camera, obj: TrackedObject, current_frame_time): def end(camera, obj: TrackedObject, current_frame_time):
# populate has_snapshot # populate has_snapshot

View File

@ -7,6 +7,7 @@ import shutil
import string import string
import subprocess as sp import subprocess as sp
import threading import threading
from collections import defaultdict
from pathlib import Path from pathlib import Path
import psutil import psutil
@ -45,7 +46,7 @@ class RecordingMaintainer(threading.Thread):
self.stop_event = stop_event self.stop_event = stop_event
def move_files(self): def move_files(self):
recordings = [ cache_files = [
d d
for d in os.listdir(CACHE_DIR) for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d)) if os.path.isfile(os.path.join(CACHE_DIR, d))
@ -66,7 +67,9 @@ class RecordingMaintainer(threading.Thread):
except: except:
continue continue
for f in recordings: # group recordings by camera
grouped_recordings = defaultdict(list)
for f in cache_files:
# Skip files currently in use # Skip files currently in use
if f in files_in_use: if f in files_in_use:
continue continue
@ -76,58 +79,124 @@ class RecordingMaintainer(threading.Thread):
camera, date = basename.rsplit("-", maxsplit=1) camera, date = basename.rsplit("-", maxsplit=1)
start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S") start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
# Just delete files if recordings are turned off grouped_recordings[camera].append(
if ( {
not camera in self.config.cameras "cache_path": cache_path,
or not self.config.cameras[camera].record.enabled "start_time": start_time,
): }
Path(cache_path).unlink(missing_ok=True)
continue
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{cache_path}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0:
duration = float(p.stdout.decode().strip())
end_time = start_time + datetime.timedelta(seconds=duration)
else:
logger.warning(f"Discarding a corrupt recording segment: {f}")
Path(cache_path).unlink(missing_ok=True)
continue
directory = os.path.join(
RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera
) )
if not os.path.exists(directory): for camera, recordings in grouped_recordings.items():
os.makedirs(directory) # get all events with the end time after the start of the oldest cache file
# or with end_time None
file_name = f"{start_time.strftime('%M.%S.mp4')}" events: Event = (
file_path = os.path.join(directory, file_name) Event.select()
.where(
# copy then delete is required when recordings are stored on some network drives Event.camera == camera,
shutil.copyfile(cache_path, file_path) (Event.end_time == None)
os.remove(cache_path) | (Event.end_time >= recordings[0]["start_time"]),
Event.has_clip,
rand_id = "".join( )
random.choices(string.ascii_lowercase + string.digits, k=6) .order_by(Event.start_time)
)
Recordings.create(
id=f"{start_time.timestamp()}-{rand_id}",
camera=camera,
path=file_path,
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
) )
for r in recordings:
cache_path = r["cache_path"]
start_time = r["start_time"]
# Just delete files if recordings are turned off
if (
not camera in self.config.cameras
or not self.config.cameras[camera].record.enabled
):
Path(cache_path).unlink(missing_ok=True)
continue
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{cache_path}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0:
duration = float(p.stdout.decode().strip())
end_time = start_time + datetime.timedelta(seconds=duration)
else:
logger.warning(f"Discarding a corrupt recording segment: {f}")
Path(cache_path).unlink(missing_ok=True)
continue
# if cached file's start_time is earlier than the retain_days for the camera
if start_time <= (
(
datetime.datetime.now()
- datetime.timedelta(
days=self.config.cameras[camera].record.retain_days
)
)
):
# if the cached segment overlaps with the events:
overlaps = False
for event in events:
# if the event starts in the future, stop checking events
# and let this recording segment expire
if event.start_time > end_time.timestamp():
overlaps = False
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= start_time:
overlaps = True
break
if overlaps:
# move from cache to recordings immediately
self.store_segment(
camera,
start_time,
end_time,
duration,
cache_path,
)
# else retain_days includes this segment
else:
self.store_segment(
camera, start_time, end_time, duration, cache_path
)
if len(recordings) > 2:
# delete all cached files past the most recent 2
to_remove = sorted(recordings, key=lambda i: i["start_time"])[:-2]
for f in to_remove:
Path(cache_path).unlink(missing_ok=True)
def store_segment(self, camera, start_time, end_time, duration, cache_path):
directory = os.path.join(RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera)
if not os.path.exists(directory):
os.makedirs(directory)
file_name = f"{start_time.strftime('%M.%S.mp4')}"
file_path = os.path.join(directory, file_name)
# copy then delete is required when recordings are stored on some network drives
shutil.copyfile(cache_path, file_path)
os.remove(cache_path)
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
Recordings.create(
id=f"{start_time.timestamp()}-{rand_id}",
camera=camera,
path=file_path,
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
)
def run(self): def run(self):
# Check for new files every 5 seconds # Check for new files every 5 seconds
@ -231,9 +300,9 @@ class RecordingCleanup(threading.Thread):
keep = False keep = False
break break
# if the event ends after the recording starts, keep it # if the event is in progress or ends after the recording starts, keep it
# and stop looking at events # and stop looking at events
if event.end_time >= recording.start_time: if event.end_time is None or event.end_time >= recording.start_time:
keep = True keep = True
break break

View File

@ -0,0 +1,43 @@
"""Peewee migrations -- 004_add_bbox_region_area.py.
Some examples (model - class or model name)::
> Model = migrator.orm['model_name'] # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.python(func, *args, **kwargs) # Run python code
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.drop_index(model, *col_names)
> migrator.add_not_null(model, *field_names)
> migrator.drop_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
"""
import datetime as dt
import peewee as pw
from playhouse.sqlite_ext import *
from decimal import ROUND_HALF_EVEN
from frigate.models import Event
try:
import playhouse.postgres_ext as pw_pext
except ImportError:
pass
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.drop_not_null(Event, "end_time")
def rollback(migrator, database, fake=False, **kwargs):
pass

View File

@ -99,7 +99,7 @@ export default function Event({ eventId, close, scrollRef }) {
} }
const startime = new Date(data.start_time * 1000); const startime = new Date(data.start_time * 1000);
const endtime = new Date(data.end_time * 1000); const endtime = data.end_time ? new Date(data.end_time * 1000) : null;
return ( return (
<div className="space-y-4"> <div className="space-y-4">
<div className="flex md:flex-row justify-between flex-wrap flex-col"> <div className="flex md:flex-row justify-between flex-wrap flex-col">
@ -155,7 +155,7 @@ export default function Event({ eventId, close, scrollRef }) {
<Tr index={1}> <Tr index={1}>
<Td>Timeframe</Td> <Td>Timeframe</Td>
<Td> <Td>
{startime.toLocaleString()} {endtime.toLocaleString()} {startime.toLocaleString()}{endtime === null ? ` ${endtime.toLocaleString()}`:''}
</Td> </Td>
</Tr> </Tr>
<Tr> <Tr>
@ -186,7 +186,7 @@ export default function Event({ eventId, close, scrollRef }) {
}, },
], ],
poster: data.has_snapshot poster: data.has_snapshot
? `${apiHost}/clips/${data.camera}-${eventId}.jpg` ? `${apiHost}/api/events/${eventId}/snapshot.jpg`
: `data:image/jpeg;base64,${data.thumbnail}`, : `data:image/jpeg;base64,${data.thumbnail}`,
}} }}
seekOptions={{ forward: 10, back: 5 }} seekOptions={{ forward: 10, back: 5 }}

View File

@ -42,7 +42,7 @@ const EventsRow = memo(
); );
const start = new Date(parseInt(startTime * 1000, 10)); const start = new Date(parseInt(startTime * 1000, 10));
const end = new Date(parseInt(endTime * 1000, 10)); const end = endTime ? new Date(parseInt(endTime * 1000, 10)) : null;
return ( return (
<Tbody reference={innerRef}> <Tbody reference={innerRef}>
@ -102,7 +102,7 @@ const EventsRow = memo(
</Td> </Td>
<Td>{start.toLocaleDateString()}</Td> <Td>{start.toLocaleDateString()}</Td>
<Td>{start.toLocaleTimeString()}</Td> <Td>{start.toLocaleTimeString()}</Td>
<Td>{end.toLocaleTimeString()}</Td> <Td>{end === null ? 'In progress' : end.toLocaleTimeString()}</Td>
</Tr> </Tr>
{viewEvent === id ? ( {viewEvent === id ? (
<Tr className="border-b-1"> <Tr className="border-b-1">