diff --git a/frigate/events.py b/frigate/events.py index 86fd2082b..2b7110dc7 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -30,6 +30,11 @@ class EventProcessor(threading.Thread): self.stop_event = stop_event def run(self): + # set an end_time on events without an end_time on startup + Event.update(end_time=Event.start_time + 30).where( + Event.end_time == None + ).execute() + while not self.stop_event.is_set(): try: event_type, camera, event_data = self.event_queue.get(timeout=10) @@ -38,14 +43,35 @@ class EventProcessor(threading.Thread): logger.debug(f"Event received: {event_type} {camera} {event_data['id']}") + event_config: EventsConfig = self.config.cameras[camera].record.events + if event_type == "start": self.events_in_process[event_data["id"]] = event_data - if event_type == "end": - event_config: EventsConfig = self.config.cameras[camera].record.events - + elif event_type == "update": + self.events_in_process[event_data["id"]] = event_data + # TODO: this will generate a lot of db activity possibly if event_data["has_clip"] or event_data["has_snapshot"]: - Event.create( + Event.replace( + id=event_data["id"], + label=event_data["label"], + camera=camera, + start_time=event_data["start_time"] - event_config.pre_capture, + end_time=None, + top_score=event_data["top_score"], + false_positive=event_data["false_positive"], + zones=list(event_data["entered_zones"]), + thumbnail=event_data["thumbnail"], + region=event_data["region"], + box=event_data["box"], + area=event_data["area"], + has_clip=event_data["has_clip"], + has_snapshot=event_data["has_snapshot"], + ).execute() + + elif event_type == "end": + if event_data["has_clip"] or event_data["has_snapshot"]: + Event.replace( id=event_data["id"], label=event_data["label"], camera=camera, @@ -60,11 +86,15 @@ class EventProcessor(threading.Thread): area=event_data["area"], has_clip=event_data["has_clip"], has_snapshot=event_data["has_snapshot"], - ) + ).execute() del self.events_in_process[event_data["id"]] self.event_processed_queue.put((event_data["id"], camera)) + # set an end_time on events without an end_time before exiting + Event.update(end_time=datetime.datetime.now().timestamp()).where( + Event.end_time == None + ).execute() logger.info(f"Exiting event processor...") diff --git a/frigate/http.py b/frigate/http.py index 1ef35dd92..32dcdb130 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -190,7 +190,7 @@ def event_snapshot(id): download = request.args.get("download", type=bool) jpg_bytes = None try: - event = Event.get(Event.id == id) + event = Event.get(Event.id == id, Event.end_time != None) if not event.has_snapshot: return "Snapshot not available", 404 # read snapshot from disk @@ -697,7 +697,10 @@ def vod_event(id): clip_path = os.path.join(CLIPS_DIR, f"{event.camera}-{id}.mp4") if not os.path.isfile(clip_path): - return vod_ts(event.camera, event.start_time, event.end_time) + end_ts = ( + datetime.now().timestamp() if event.end_time is None else event.end_time + ) + return vod_ts(event.camera, event.start_time, end_ts) duration = int((event.end_time - event.start_time) * 1000) return jsonify( diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 2597893d7..9a8ad8cc6 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -603,6 +603,8 @@ class TrackedObjectProcessor(threading.Thread): self.event_queue.put(("start", camera, obj.to_dict())) def update(camera, obj: TrackedObject, current_frame_time): + obj.has_snapshot = self.should_save_snapshot(camera, obj) + obj.has_clip = self.should_retain_recording(camera, obj) after = obj.to_dict() message = { "before": obj.previous, @@ -613,6 +615,9 @@ class TrackedObjectProcessor(threading.Thread): f"{self.topic_prefix}/events", json.dumps(message), retain=False ) obj.previous = after + self.event_queue.put( + ("update", camera, obj.to_dict(include_thumbnail=True)) + ) def end(camera, obj: TrackedObject, current_frame_time): # populate has_snapshot diff --git a/frigate/record.py b/frigate/record.py index b0fd64798..e2427e729 100644 --- a/frigate/record.py +++ b/frigate/record.py @@ -7,6 +7,7 @@ import shutil import string import subprocess as sp import threading +from collections import defaultdict from pathlib import Path import psutil @@ -45,7 +46,7 @@ class RecordingMaintainer(threading.Thread): self.stop_event = stop_event def move_files(self): - recordings = [ + cache_files = [ d for d in os.listdir(CACHE_DIR) if os.path.isfile(os.path.join(CACHE_DIR, d)) @@ -66,7 +67,9 @@ class RecordingMaintainer(threading.Thread): except: continue - for f in recordings: + # group recordings by camera + grouped_recordings = defaultdict(list) + for f in cache_files: # Skip files currently in use if f in files_in_use: continue @@ -76,58 +79,124 @@ class RecordingMaintainer(threading.Thread): camera, date = basename.rsplit("-", maxsplit=1) start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S") - # Just delete files if recordings are turned off - if ( - not camera in self.config.cameras - or not self.config.cameras[camera].record.enabled - ): - Path(cache_path).unlink(missing_ok=True) - continue - - ffprobe_cmd = [ - "ffprobe", - "-v", - "error", - "-show_entries", - "format=duration", - "-of", - "default=noprint_wrappers=1:nokey=1", - f"{cache_path}", - ] - p = sp.run(ffprobe_cmd, capture_output=True) - if p.returncode == 0: - duration = float(p.stdout.decode().strip()) - end_time = start_time + datetime.timedelta(seconds=duration) - else: - logger.warning(f"Discarding a corrupt recording segment: {f}") - Path(cache_path).unlink(missing_ok=True) - continue - - directory = os.path.join( - RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera + grouped_recordings[camera].append( + { + "cache_path": cache_path, + "start_time": start_time, + } ) - if not os.path.exists(directory): - os.makedirs(directory) - - file_name = f"{start_time.strftime('%M.%S.mp4')}" - file_path = os.path.join(directory, file_name) - - # copy then delete is required when recordings are stored on some network drives - shutil.copyfile(cache_path, file_path) - os.remove(cache_path) - - rand_id = "".join( - random.choices(string.ascii_lowercase + string.digits, k=6) - ) - Recordings.create( - id=f"{start_time.timestamp()}-{rand_id}", - camera=camera, - path=file_path, - start_time=start_time.timestamp(), - end_time=end_time.timestamp(), - duration=duration, + for camera, recordings in grouped_recordings.items(): + # get all events with the end time after the start of the oldest cache file + # or with end_time None + events: Event = ( + Event.select() + .where( + Event.camera == camera, + (Event.end_time == None) + | (Event.end_time >= recordings[0]["start_time"]), + Event.has_clip, + ) + .order_by(Event.start_time) ) + for r in recordings: + cache_path = r["cache_path"] + start_time = r["start_time"] + + # Just delete files if recordings are turned off + if ( + not camera in self.config.cameras + or not self.config.cameras[camera].record.enabled + ): + Path(cache_path).unlink(missing_ok=True) + continue + + ffprobe_cmd = [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + f"{cache_path}", + ] + p = sp.run(ffprobe_cmd, capture_output=True) + if p.returncode == 0: + duration = float(p.stdout.decode().strip()) + end_time = start_time + datetime.timedelta(seconds=duration) + else: + logger.warning(f"Discarding a corrupt recording segment: {f}") + Path(cache_path).unlink(missing_ok=True) + continue + + # if cached file's start_time is earlier than the retain_days for the camera + if start_time <= ( + ( + datetime.datetime.now() + - datetime.timedelta( + days=self.config.cameras[camera].record.retain_days + ) + ) + ): + # if the cached segment overlaps with the events: + overlaps = False + for event in events: + # if the event starts in the future, stop checking events + # and let this recording segment expire + if event.start_time > end_time.timestamp(): + overlaps = False + break + + # if the event is in progress or ends after the recording starts, keep it + # and stop looking at events + if event.end_time is None or event.end_time >= start_time: + overlaps = True + break + + if overlaps: + # move from cache to recordings immediately + self.store_segment( + camera, + start_time, + end_time, + duration, + cache_path, + ) + # else retain_days includes this segment + else: + self.store_segment( + camera, start_time, end_time, duration, cache_path + ) + + if len(recordings) > 2: + # delete all cached files past the most recent 2 + to_remove = sorted(recordings, key=lambda i: i["start_time"])[:-2] + for f in to_remove: + Path(cache_path).unlink(missing_ok=True) + + def store_segment(self, camera, start_time, end_time, duration, cache_path): + directory = os.path.join(RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera) + + if not os.path.exists(directory): + os.makedirs(directory) + + file_name = f"{start_time.strftime('%M.%S.mp4')}" + file_path = os.path.join(directory, file_name) + + # copy then delete is required when recordings are stored on some network drives + shutil.copyfile(cache_path, file_path) + os.remove(cache_path) + + rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) + Recordings.create( + id=f"{start_time.timestamp()}-{rand_id}", + camera=camera, + path=file_path, + start_time=start_time.timestamp(), + end_time=end_time.timestamp(), + duration=duration, + ) def run(self): # Check for new files every 5 seconds @@ -231,9 +300,9 @@ class RecordingCleanup(threading.Thread): keep = False break - # if the event ends after the recording starts, keep it + # if the event is in progress or ends after the recording starts, keep it # and stop looking at events - if event.end_time >= recording.start_time: + if event.end_time is None or event.end_time >= recording.start_time: keep = True break diff --git a/migrations/005_make_end_time_nullable.py b/migrations/005_make_end_time_nullable.py new file mode 100644 index 000000000..5c8cf6505 --- /dev/null +++ b/migrations/005_make_end_time_nullable.py @@ -0,0 +1,43 @@ +"""Peewee migrations -- 004_add_bbox_region_area.py. + +Some examples (model - class or model name):: + + > Model = migrator.orm['model_name'] # Return model in current state by name + + > migrator.sql(sql) # Run custom SQL + > migrator.python(func, *args, **kwargs) # Run python code + > migrator.create_model(Model) # Create a model (could be used as decorator) + > migrator.remove_model(model, cascade=True) # Remove a model + > migrator.add_fields(model, **fields) # Add fields to a model + > migrator.change_fields(model, **fields) # Change fields + > migrator.remove_fields(model, *field_names, cascade=True) + > migrator.rename_field(model, old_field_name, new_field_name) + > migrator.rename_table(model, new_table_name) + > migrator.add_index(model, *col_names, unique=False) + > migrator.drop_index(model, *col_names) + > migrator.add_not_null(model, *field_names) + > migrator.drop_not_null(model, *field_names) + > migrator.add_default(model, field_name, default) + +""" + +import datetime as dt +import peewee as pw +from playhouse.sqlite_ext import * +from decimal import ROUND_HALF_EVEN +from frigate.models import Event + +try: + import playhouse.postgres_ext as pw_pext +except ImportError: + pass + +SQL = pw.SQL + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.drop_not_null(Event, "end_time") + + +def rollback(migrator, database, fake=False, **kwargs): + pass diff --git a/web/src/routes/Event.jsx b/web/src/routes/Event.jsx index d69a882e0..4a371a9d0 100644 --- a/web/src/routes/Event.jsx +++ b/web/src/routes/Event.jsx @@ -99,7 +99,7 @@ export default function Event({ eventId, close, scrollRef }) { } const startime = new Date(data.start_time * 1000); - const endtime = new Date(data.end_time * 1000); + const endtime = data.end_time ? new Date(data.end_time * 1000) : null; return (
@@ -155,7 +155,7 @@ export default function Event({ eventId, close, scrollRef }) { Timeframe - {startime.toLocaleString()} – {endtime.toLocaleString()} + {startime.toLocaleString()}{endtime === null ? ` – ${endtime.toLocaleString()}`:''} @@ -186,7 +186,7 @@ export default function Event({ eventId, close, scrollRef }) { }, ], poster: data.has_snapshot - ? `${apiHost}/clips/${data.camera}-${eventId}.jpg` + ? `${apiHost}/api/events/${eventId}/snapshot.jpg` : `data:image/jpeg;base64,${data.thumbnail}`, }} seekOptions={{ forward: 10, back: 5 }} diff --git a/web/src/routes/Events/components/tableRow.jsx b/web/src/routes/Events/components/tableRow.jsx index 262f3408a..f358153b2 100644 --- a/web/src/routes/Events/components/tableRow.jsx +++ b/web/src/routes/Events/components/tableRow.jsx @@ -42,7 +42,7 @@ const EventsRow = memo( ); const start = new Date(parseInt(startTime * 1000, 10)); - const end = new Date(parseInt(endTime * 1000, 10)); + const end = endTime ? new Date(parseInt(endTime * 1000, 10)) : null; return ( @@ -102,7 +102,7 @@ const EventsRow = memo( {start.toLocaleDateString()} {start.toLocaleTimeString()} - {end.toLocaleTimeString()} + {end === null ? 'In progress' : end.toLocaleTimeString()} {viewEvent === id ? (