From b77a65d44644ff733a7b746e87763bffc2c30f51 Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Sun, 1 Nov 2020 14:58:51 -0600 Subject: [PATCH] add event processor --- frigate/__main__.py | 45 ++++++++++++++++++++++++++++----------------- frigate/events.py | 17 +++++++++-------- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/frigate/__main__.py b/frigate/__main__.py index 79ac19eda..989025b96 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -9,6 +9,7 @@ from typing import Dict, List from frigate.config import FRIGATE_CONFIG_SCHEMA from frigate.edgetpu import EdgeTPUProcess +from frigate.events import EventProcessor from frigate.http import create_app from frigate.models import Event from frigate.mqtt import create_mqtt_client @@ -23,6 +24,7 @@ class FrigateApp(): self.detectors: Dict[str: EdgeTPUProcess] = {} self.detection_out_events: Dict[str: mp.Event] = {} self.detection_shms: List[mp.shared_memory.SharedMemory] = [] + self.camera_metrics = {} def init_config(self): config_file = os.environ.get('CONFIG_FILE', '/config/config.yml') @@ -37,6 +39,7 @@ class FrigateApp(): self.config = FRIGATE_CONFIG_SCHEMA(config) for camera_name, camera_config in self.config['cameras'].items(): + # set shape if 'width' in camera_config and 'height' in camera_config: frame_shape = (camera_config['height'], camera_config['width'], 3) else: @@ -44,6 +47,7 @@ class FrigateApp(): camera_config['frame_shape'] = frame_shape + # build ffmpeg command ffmpeg = camera_config['ffmpeg'] ffmpeg_input = ffmpeg['input'] ffmpeg_global_args = ffmpeg.get('global_args', self.config['ffmpeg']['global_args']) @@ -81,6 +85,18 @@ class FrigateApp(): camera_config['ffmpeg_cmd'] = ffmpeg_cmd + # create camera_metrics + self.camera_metrics[camera_name] = { + 'camera_fps': mp.Value('d', 0.0), + 'skipped_fps': mp.Value('d', 0.0), + 'process_fps': mp.Value('d', 0.0), + 'detection_fps': mp.Value('d', 0.0), + 'detection_frame': mp.Value('d', 0.0), + 'read_start': mp.Value('d', 0.0), + 'ffmpeg_pid': mp.Value('i', 0), + 'frame_queue': mp.Queue(maxsize=2) + } + # TODO: sub in FRIGATE_ENV vars def init_queues(self): @@ -131,34 +147,27 @@ class FrigateApp(): self.detected_frames_processor.start() def start_camera_processors(self): - self.camera_process_info = {} for name, config in self.config['cameras'].items(): - self.camera_process_info[name] = { - 'camera_fps': mp.Value('d', 0.0), - 'skipped_fps': mp.Value('d', 0.0), - 'process_fps': mp.Value('d', 0.0), - 'detection_fps': mp.Value('d', 0.0), - 'detection_frame': mp.Value('d', 0.0), - 'read_start': mp.Value('d', 0.0), - 'ffmpeg_pid': mp.Value('i', 0), - 'frame_queue': mp.Queue(maxsize=2) - } camera_process = mp.Process(target=track_camera, args=(name, config, self.detection_queue, self.detection_out_events[name], self.detected_frames_queue, - self.camera_process_info[name])) + self.camera_metrics[name])) camera_process.daemon = True - self.camera_process_info[name]['process'] = camera_process + self.camera_metrics[name]['process'] = camera_process camera_process.start() - print(f"Camera process started for {name}: {camera_process.pid}") + print(f"Camera processor started for {name}: {camera_process.pid}") def start_camera_capture_processes(self): for name, config in self.config['cameras'].items(): capture_process = mp.Process(target=capture_camera, args=(name, config, - self.camera_process_info[name])) + self.camera_metrics[name])) capture_process.daemon = True - self.camera_process_info[name]['capture_process'] = capture_process + self.camera_metrics[name]['capture_process'] = capture_process capture_process.start() - print(f"Camera process started for {name}: {capture_process.pid}") + print(f"Capture process started for {name}: {capture_process.pid}") + + def start_event_processor(self): + self.event_processor = EventProcessor(self.config, self.camera_metrics, self.event_queue, self.stop_event) + self.event_processor.start() def start_watchdog(self): pass @@ -173,6 +182,7 @@ class FrigateApp(): self.start_detected_frames_processor() self.start_camera_processors() self.start_camera_capture_processes() + self.start_event_processor() self.start_watchdog() self.flask_app.run(host='0.0.0.0', port=self.config['web_port'], debug=False) self.stop() @@ -182,6 +192,7 @@ class FrigateApp(): self.stop_event.set() self.detected_frames_processor.join() + self.event_processor.join() for detector in self.detectors.values(): detector.stop() diff --git a/frigate/events.py b/frigate/events.py index 94bc59408..7dd4e4436 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -8,18 +8,19 @@ import datetime import subprocess as sp import queue +from frigate.models import Event + class EventProcessor(threading.Thread): - def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue, stop_event, Event): + def __init__(self, config, camera_processes, event_queue, stop_event): threading.Thread.__init__(self) self.config = config + self.cache_dir = self.config['save_clips']['cache_dir'] + self.clips_dir = self.config['save_clips']['clips_dir'] self.camera_processes = camera_processes - self.cache_dir = cache_dir - self.clip_dir = clip_dir self.cached_clips = {} self.event_queue = event_queue self.events_in_process = {} self.stop_event = stop_event - self.Event = Event def refresh_cache(self): cached_files = os.listdir(self.cache_dir) @@ -76,7 +77,7 @@ class EventProcessor(threading.Thread): earliest_event = datetime.datetime.now().timestamp() # if the earliest event exceeds the max seconds, cap it - max_seconds = self.config.get('save_clips', {}).get('max_seconds', 300) + max_seconds = self.config['save_clips']['max_seconds'] if datetime.datetime.now().timestamp()-earliest_event > max_seconds: earliest_event = datetime.datetime.now().timestamp()-max_seconds @@ -127,7 +128,7 @@ class EventProcessor(threading.Thread): '-', '-c', 'copy', - f"{os.path.join(self.clip_dir, clip_name)}.mp4" + f"{os.path.join(self.clips_dir, clip_name)}.mp4" ] p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True) @@ -135,7 +136,7 @@ class EventProcessor(threading.Thread): print(p.stderr) return - with open(f"{os.path.join(self.clip_dir, clip_name)}.json", 'w') as outfile: + with open(f"{os.path.join(self.clips_dir, clip_name)}.json", 'w') as outfile: json.dump({ 'id': event_data['id'], 'label': event_data['label'], @@ -177,7 +178,7 @@ class EventProcessor(threading.Thread): self.events_in_process[event_data['id']] = event_data if event_type == 'end': - self.Event.create( + Event.create( id=event_data['id'], label=event_data['label'], camera=camera,