move ffmpeg capture to a separate thread and use a queue

This commit is contained in:
Blake Blackshear 2020-03-14 15:32:51 -05:00
parent e37eba49ff
commit 4ee200a81c
4 changed files with 170 additions and 131 deletions

View File

@ -15,7 +15,7 @@ import logging
from flask import Flask, Response, make_response, jsonify, request
import paho.mqtt.client as mqtt
from frigate.video import track_camera
from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
from frigate.object_processing import TrackedObjectProcessor
from frigate.util import EventsPerSecond
from frigate.edgetpu import EdgeTPUProcess
@ -83,60 +83,50 @@ class CameraWatchdog(threading.Thread):
time.sleep(10)
while True:
# wait a bit before checking
time.sleep(30)
time.sleep(10)
# check the plasma process
rc = self.plasma_process.poll()
if rc != None:
print(f"plasma_process exited unexpectedly with {rc}")
self.plasma_process = start_plasma_store()
time.sleep(10)
# check the detection process
if (self.tflite_process.detection_start.value > 0.0 and
datetime.datetime.now().timestamp() - self.tflite_process.detection_start.value > 10):
print("Detection appears to be stuck. Restarting detection process")
self.tflite_process.start_or_restart()
time.sleep(30)
elif not self.tflite_process.detect_process.is_alive():
print("Detection appears to have stopped. Restarting detection process")
self.tflite_process.start_or_restart()
time.sleep(30)
# check the camera processes
for name, camera_process in self.camera_processes.items():
process = camera_process['process']
if not process.is_alive():
print(f"Process for {name} is not alive. Starting again...")
print(f"Track process for {name} is not alive. Starting again...")
camera_process['fps'].value = float(self.config[name]['fps'])
camera_process['skipped_fps'].value = 0.0
camera_process['detection_fps'].value = 0.0
camera_process['read_start'].value = 0.0
camera_process['ffmpeg_pid'].value = 0
process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
self.tflite_process.detection_queue, self.tracked_objects_queue,
process = mp.Process(target=track_camera, args=(name, self.config[name], GLOBAL_OBJECT_CONFIG, camera_process['frame_queue'],
camera_process['frame_shape'], self.tflite_process.detection_queue, self.tracked_objects_queue,
camera_process['fps'], camera_process['skipped_fps'], camera_process['detection_fps'],
camera_process['read_start'], camera_process['ffmpeg_pid']))
camera_process['read_start']))
process.daemon = True
camera_process['process'] = process
process.start()
print(f"Camera_process started for {name}: {process.pid}")
print(f"Track process started for {name}: {process.pid}")
if (camera_process['read_start'].value > 0.0 and
datetime.datetime.now().timestamp() - camera_process['read_start'].value > 10):
print(f"Process for {name} has been reading from ffmpeg for over 10 seconds long. Killing ffmpeg...")
ffmpeg_pid = camera_process['ffmpeg_pid'].value
if ffmpeg_pid != 0:
try:
os.kill(ffmpeg_pid, signal.SIGTERM)
except OSError:
print(f"Unable to terminate ffmpeg with pid {ffmpeg_pid}")
time.sleep(10)
try:
os.kill(ffmpeg_pid, signal.SIGKILL)
print(f"Unable to kill ffmpeg with pid {ffmpeg_pid}")
except OSError:
pass
if not camera_process['capture_thread'].is_alive():
frame_shape = camera_process['frame_shape']
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'],
camera_process['take_frame'], camera_process['camera_fps'])
camera_capture.start()
camera_process['ffmpeg_process'] = ffmpeg_process
camera_process['capture_thread'] = camera_capture
def main():
# connect to mqtt and setup last will
@ -180,17 +170,54 @@ def main():
# start the camera processes
camera_processes = {}
for name, config in CONFIG['cameras'].items():
# Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_global_args = ffmpeg.get('global_args', FFMPEG_DEFAULT_CONFIG['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
take_frame = config.get('take_frame', 1)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
frame_queue = mp.SimpleQueue()
camera_fps = EventsPerSecond()
camera_fps.start()
camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps)
camera_capture.start()
camera_processes[name] = {
'camera_fps': camera_fps,
'take_frame': take_frame,
'fps': mp.Value('d', float(config['fps'])),
'skipped_fps': mp.Value('d', 0.0),
'detection_fps': mp.Value('d', 0.0),
'read_start': mp.Value('d', 0.0),
'ffmpeg_pid': mp.Value('i', 0)
'ffmpeg_process': ffmpeg_process,
'ffmpeg_cmd': ffmpeg_cmd,
'frame_queue': frame_queue,
'frame_shape': frame_shape,
'capture_thread': camera_capture
}
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
camera_process = mp.Process(target=track_camera, args=(name, config, GLOBAL_OBJECT_CONFIG, frame_queue, frame_shape,
tflite_process.detection_queue, tracked_objects_queue, camera_processes[name]['fps'],
camera_processes[name]['skipped_fps'], camera_processes[name]['detection_fps'],
camera_processes[name]['read_start'], camera_processes[name]['ffmpeg_pid']))
camera_processes[name]['read_start']))
camera_process.daemon = True
camera_processes[name]['process'] = camera_process
@ -245,7 +272,7 @@ def main():
'detection_fps': round(camera_stats['detection_fps'].value, 2),
'read_start': camera_stats['read_start'].value,
'pid': camera_stats['process'].pid,
'ffmpeg_pid': camera_stats['ffmpeg_pid'].value
'ffmpeg_pid': camera_stats['ffmpeg_process'].pid
}
stats['coral'] = {
@ -302,7 +329,7 @@ def main():
app.run(host='0.0.0.0', port=WEB_PORT, debug=False)
camera_watchdog.join()
object_processor.join()
plasma_process.terminate()

View File

@ -10,7 +10,7 @@ from collections import Counter, defaultdict
import itertools
import pyarrow.plasma as plasma
import matplotlib.pyplot as plt
from frigate.util import draw_box_with_label
from frigate.util import draw_box_with_label, PlasmaManager
from frigate.edgetpu import load_labels
PATH_TO_LABELS = '/labelmap.txt'
@ -36,6 +36,7 @@ class TrackedObjectProcessor(threading.Thread):
'current_frame': np.zeros((720,1280,3), np.uint8),
'object_id': None
})
self.plasma_client = PlasmaManager()
def get_best(self, camera, label):
if label in self.camera_data[camera]['best_objects']:
@ -46,34 +47,7 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(self, camera):
return self.camera_data[camera]['current_frame']
def connect_plasma_client(self):
while True:
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get_from_plasma(self, object_id):
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=0)
except:
self.connect_plasma_client()
time.sleep(1)
def delete_from_plasma(self, object_ids):
while True:
try:
self.plasma_client.delete(object_ids)
return
except:
self.connect_plasma_client()
time.sleep(1)
def run(self):
self.connect_plasma_client()
while True:
camera, frame_time, tracked_objects = self.tracked_objects_queue.get()
@ -85,10 +59,7 @@ class TrackedObjectProcessor(threading.Thread):
###
# Draw tracked objects on the frame
###
object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}"))
object_id_bytes = object_id_hash.digest()
object_id = plasma.ObjectID(object_id_bytes)
current_frame = self.get_from_plasma(object_id)
current_frame = self.plasma_client.get(f"{camera}{frame_time}")
if not current_frame is plasma.ObjectNotAvailable:
# draw the bounding boxes on the frame
@ -117,10 +88,10 @@ class TrackedObjectProcessor(threading.Thread):
self.camera_data[camera]['current_frame'] = current_frame
# store the object id, so you can delete it at the next loop
previous_object_id = self.camera_data[camera]['object_id']
previous_object_id = f"{camera}{frame_time}"
if not previous_object_id is None:
self.delete_from_plasma([previous_object_id])
self.camera_data[camera]['object_id'] = object_id
self.plasma_client.delete(f"{camera}{frame_time}")
self.camera_data[camera]['object_id'] = f"{camera}{frame_time}"
###
# Maintain the highest scoring recent object and frame for each label

View File

@ -1,4 +1,5 @@
import datetime
import time
import signal
import traceback
import collections
@ -6,6 +7,8 @@ import numpy as np
import cv2
import threading
import matplotlib.pyplot as plt
import hashlib
import pyarrow.plasma as plasma
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None:
@ -135,3 +138,46 @@ def print_stack(sig, frame):
def listen():
signal.signal(signal.SIGUSR1, print_stack)
class PlasmaManager:
def __init__(self):
self.connect()
def connect(self):
while True:
try:
self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get(self, name, timeout_ms=0):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=timeout_ms)
except:
self.connect()
time.sleep(1)
def put(self, name, obj):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.put(obj, object_id)
return
except Exception as e:
print(f"Failed to put in plasma: {e}")
self.connect()
time.sleep(1)
def delete(self, name):
object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
while True:
try:
self.plasma_client.delete([object_id])
return
except:
self.connect()
time.sleep(1)

View File

@ -5,16 +5,15 @@ import cv2
import queue
import threading
import ctypes
import pyarrow.plasma as plasma
import multiprocessing as mp
import subprocess as sp
import numpy as np
import hashlib
import pyarrow.plasma as plasma
import copy
import itertools
import json
from collections import defaultdict
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen
from frigate.util import draw_box_with_label, area, calculate_region, clipped, intersection_over_union, intersection, EventsPerSecond, listen, PlasmaManager
from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector
@ -97,7 +96,7 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0)
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, pid, ffmpeg_process=None):
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
if not ffmpeg_process is None:
print("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
@ -112,30 +111,54 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, pid, ffmpeg_process=None):
print("Creating ffmpeg process...")
print(" ".join(ffmpeg_cmd))
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size*10)
pid.value = process.pid
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True)
return process
def track_camera(name, config, ffmpeg_global_config, global_objects_config, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps, read_start, ffmpeg_pid):
class CameraCapture(threading.Thread):
def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps):
threading.Thread.__init__(self)
self.name = name
self.frame_shape = frame_shape
self.frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
self.frame_queue = frame_queue
self.take_frame = take_frame
self.fps = fps
self.plasma_client = PlasmaManager()
self.ffmpeg_process = ffmpeg_process
def run(self):
frame_num = 0
while True:
if self.ffmpeg_process.poll() != None:
print(f"{self.name}: ffmpeg process is not running. exiting capture thread...")
break
frame_bytes = self.ffmpeg_process.stdout.read(self.frame_size)
frame_time = datetime.datetime.now().timestamp()
if len(frame_bytes) == 0:
print(f"{self.name}: ffmpeg didnt return a frame. something is wrong.")
continue
frame_num += 1
if (frame_num % self.take_frame) != 0:
continue
# put the frame in the plasma store
self.plasma_client.put(f"{self.name}{frame_time}",
np
.frombuffer(frame_bytes, np.uint8)
.reshape(self.frame_shape)
)
# add to the queue
self.frame_queue.put(frame_time)
self.fps.update()
def track_camera(name, config, global_objects_config, frame_queue, frame_shape, detection_queue, detected_objects_queue, fps, skipped_fps, detection_fps, read_start):
print(f"Starting process for {name}: {os.getpid()}")
listen()
# Merge the ffmpeg config with the global config
ffmpeg = config.get('ffmpeg', {})
ffmpeg_input = get_ffmpeg_input(ffmpeg['input'])
ffmpeg_restart_delay = ffmpeg.get('restart_delay', 0)
ffmpeg_global_args = ffmpeg.get('global_args', ffmpeg_global_config['global_args'])
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', ffmpeg_global_config['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', ffmpeg_global_config['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', ffmpeg_global_config['output_args'])
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
ffmpeg_input_args +
['-i', ffmpeg_input] +
ffmpeg_output_args +
['pipe:'])
# Merge the tracked object config with the global config
camera_objects_config = config.get('objects', {})
# combine tracked objects lists
@ -149,14 +172,6 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
expected_fps = config['fps']
take_frame = config.get('take_frame', 1)
if 'width' in config and 'height' in config:
frame_shape = (config['height'], config['width'], 3)
else:
frame_shape = get_frame_shape(ffmpeg_input)
frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
frame = np.zeros(frame_shape, np.uint8)
@ -175,9 +190,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
object_tracker = ObjectTracker(10)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_pid)
plasma_client = plasma.connect("/tmp/plasma")
plasma_client = PlasmaManager()
frame_num = 0
avg_wait = 0.0
fps_tracker = EventsPerSecond()
@ -186,39 +199,23 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
skipped_fps_tracker.start()
object_detector.fps.start()
while True:
rc = ffmpeg_process.poll()
if rc != None:
print(f"{name}: ffmpeg_process exited unexpectedly with {rc}")
print(f"Letting {name} rest for {ffmpeg_restart_delay} seconds before restarting...")
time.sleep(ffmpeg_restart_delay)
ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_pid, ffmpeg_process)
time.sleep(10)
read_start.value = datetime.datetime.now().timestamp()
frame_bytes = ffmpeg_process.stdout.read(frame_size)
frame_time = frame_queue.get()
duration = datetime.datetime.now().timestamp()-read_start.value
read_start.value = 0.0
avg_wait = (avg_wait*99+duration)/100
if len(frame_bytes) == 0:
print(f"{name}: ffmpeg_process didnt return any bytes")
continue
# limit frame rate
frame_num += 1
if (frame_num % take_frame) != 0:
continue
fps_tracker.update()
fps.value = fps_tracker.eps()
detection_fps.value = object_detector.fps.eps()
frame_time = datetime.datetime.now().timestamp()
# Get frame from plasma store
frame = plasma_client.get(f"{name}{frame_time}")
# Store frame in numpy array
frame[:] = (np
.frombuffer(frame_bytes, np.uint8)
.reshape(frame_shape))
if frame is plasma.ObjectNotAvailable:
skipped_fps_tracker.update()
skipped_fps.value = skipped_fps_tracker.eps()
continue
# look for motion
motion_boxes = motion_detector.detect(frame)
@ -227,6 +224,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
if frame_num > 100 and fps.value < expected_fps-1 and duration < 0.5*avg_wait:
skipped_fps_tracker.update()
skipped_fps.value = skipped_fps_tracker.eps()
plasma_client.delete(f"{name}{frame_time}")
continue
skipped_fps.value = skipped_fps_tracker.eps()
@ -330,7 +328,7 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
for index in idxs:
obj = group[index[0]]
if clipped(obj, frame_shape): #obj['clipped']:
if clipped(obj, frame_shape):
box = obj[2]
# calculate a new region that will hopefully get the entire object
region = calculate_region(frame_shape,
@ -370,9 +368,6 @@ def track_camera(name, config, ffmpeg_global_config, global_objects_config, dete
# now that we have refined our detections, we need to track objects
object_tracker.match_and_update(frame_time, detections)
# put the frame in the plasma store
object_id = hashlib.sha1(str.encode(f"{name}{frame_time}")).digest()
plasma_client.put(frame, plasma.ObjectID(object_id))
# add to the queue
detected_objects_queue.put((name, frame_time, object_tracker.tracked_objects))