import os import cv2 import time import datetime import queue import yaml import threading import multiprocessing as mp import subprocess as sp import numpy as np import logging from flask import Flask, Response, make_response, jsonify import paho.mqtt.client as mqtt from frigate.video import track_camera from frigate.object_processing import TrackedObjectProcessor from frigate.util import EventsPerSecond from frigate.edgetpu import EdgeTPUProcess FRIGATE_VARS = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')} with open('/config/config.yml') as f: CONFIG = yaml.safe_load(f) MQTT_HOST = CONFIG['mqtt']['host'] MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883) MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate') MQTT_USER = CONFIG.get('mqtt', {}).get('user') MQTT_PASS = CONFIG.get('mqtt', {}).get('password') if not MQTT_PASS is None: MQTT_PASS = MQTT_PASS.format(**FRIGATE_VARS) MQTT_CLIENT_ID = CONFIG.get('mqtt', {}).get('client_id', 'frigate') # Set the default FFmpeg config FFMPEG_CONFIG = CONFIG.get('ffmpeg', {}) FFMPEG_DEFAULT_CONFIG = { 'global_args': FFMPEG_CONFIG.get('global_args', ['-hide_banner','-loglevel','panic']), 'hwaccel_args': FFMPEG_CONFIG.get('hwaccel_args', []), 'input_args': FFMPEG_CONFIG.get('input_args', ['-avoid_negative_ts', 'make_zero', '-fflags', 'nobuffer', '-flags', 'low_delay', '-strict', 'experimental', '-fflags', '+genpts+discardcorrupt', '-vsync', 'drop', '-rtsp_transport', 'tcp', '-stimeout', '5000000', '-use_wallclock_as_timestamps', '1']), 'output_args': FFMPEG_CONFIG.get('output_args', ['-f', 'rawvideo', '-pix_fmt', 'rgb24']) } GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {}) WEB_PORT = CONFIG.get('web_port', 5000) DEBUG = (CONFIG.get('debug', '0') == '1') class CameraWatchdog(threading.Thread): def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, object_processor): threading.Thread.__init__(self) self.camera_processes = camera_processes self.config = config self.tflite_process = tflite_process self.tracked_objects_queue = tracked_objects_queue self.object_processor = object_processor def run(self): time.sleep(10) while True: # wait a bit before checking time.sleep(30) if (self.tflite_process.detection_start.value > 0.0 and datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10): print("Detection appears to be stuck. Restarting detection process") self.tflite_process.start_or_restart() time.sleep(30) for name, camera_process in self.camera_processes.items(): process = camera_process['process'] if not process.is_alive(): print(f"Process for {name} is not alive. Starting again...") camera_process['fps'].value = float(self.config[name]['fps']) camera_process['skipped_fps'].value = 0.0 camera_process['detection_fps'].value = 0.0 process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, self.tflite_process.detection_queue, self.tracked_objects_queue, camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'])) process.daemon = True camera_process['process'] = process process.start() print(f"Camera_process started for {name}: {process.pid}") def main(): # connect to mqtt and setup last will def on_connect(client, userdata, flags, rc): print("On connect called") if rc != 0: if rc == 3: print ("MQTT Server unavailable") elif rc == 4: print ("MQTT Bad username or password") elif rc == 5: print ("MQTT Not authorized") else: print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc)) # publish a message to signal that the service is running client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True) client = mqtt.Client(client_id=MQTT_CLIENT_ID) client.on_connect = on_connect client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) if not MQTT_USER is None: client.username_pw_set(MQTT_USER, password=MQTT_PASS) client.connect(MQTT_HOST, MQTT_PORT, 60) client.loop_start() # start plasma store plasma_cmd = ['plasma_store', '-m', '400000000', '-s', '/tmp/plasma'] plasma_process = sp.Popen(plasma_cmd, stdout=sp.DEVNULL) time.sleep(1) rc = plasma_process.poll() if rc is not None: raise RuntimeError("plasma_store exited unexpectedly with " "code %d" % (rc,)) ## # Setup config defaults for cameras ## for name, config in CONFIG['cameras'].items(): config['snapshots'] = { 'show_timestamp': config.get('snapshots', {}).get('show_timestamp', True) } # Queue for cameras to push tracked objects to tracked_objects_queue = mp.Queue() # Start the shared tflite process tflite_process = EdgeTPUProcess() # start the camera processes camera_processes = {} for name, config in CONFIG['cameras'].items(): camera_processes[name] = { 'fps': mp.Value('d', float(config['fps'])), 'skipped_fps': mp.Value('d', 0.0), 'detection_fps': mp.Value('d', 0.0) } camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['fps'], camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'])) camera_process.daemon = True camera_processes[name]['process'] = camera_process for name, camera_process in camera_processes.items(): camera_process['process'].start() print(f"Camera_process started for {name}: {camera_process['process'].pid}") object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue) object_processor.start() camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, object_processor) camera_watchdog.start() # create a flask app that encodes frames a mjpeg on demand app = Flask(__name__) log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) @app.route('/') def ishealthy(): # return a healh return "Frigate is running. Alive and healthy!" @app.route('/debug/stats') def stats(): stats = {} total_detection_fps = 0 for name, camera_stats in camera_processes.items(): total_detection_fps += camera_stats['detection_fps'].value stats[name] = { 'fps': round(camera_stats['fps'].value, 2), 'skipped_fps': round(camera_stats['skipped_fps'].value, 2), 'detection_fps': round(camera_stats['detection_fps'].value, 2) } stats['coral'] = { 'fps': round(total_detection_fps, 2), 'inference_speed': round(tflite_process.avg_inference_speed.value*1000, 2), 'detection_queue': tflite_process.detection_queue.qsize(), 'detection_start': tflite_process.detection_start.value } rc = plasma_process.poll() stats['plasma_store_rc'] = rc stats['tracked_objects_queue'] = tracked_objects_queue.qsize() return jsonify(stats) @app.route('//