label threads and implements stats endpoint

This commit is contained in:
Blake Blackshear 2019-12-23 06:01:32 -06:00
parent d8a3f8fc9d
commit 0f8f8fa3b3
7 changed files with 192 additions and 110 deletions

View File

@ -45,6 +45,7 @@ RUN apt-get -qq update && apt-get -qq install --no-install-recommends -y \
python3-pip \
python3-pil \
python3-numpy \
python3-prctl \
libc++1 \
libc++abi1 \
libunwind8 \

View File

@ -3,11 +3,12 @@ import time
import queue
import yaml
import numpy as np
from flask import Flask, Response, make_response
from flask import Flask, Response, make_response, jsonify
import paho.mqtt.client as mqtt
from frigate.video import Camera
from frigate.object_detection import PreppedQueueProcessor
from frigate.util import EventsPerSecond
with open('/config/config.yml') as f:
CONFIG = yaml.safe_load(f)
@ -70,20 +71,27 @@ def main():
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start()
# Queue for prepped frames, max size set to (number of cameras * 5)
max_queue_size = len(CONFIG['cameras'].items())*5
prepped_frame_queue = queue.PriorityQueue(max_queue_size)
# Queue for prepped frames, max size set to number of regions * 3
max_queue_size = sum([len(camera['regions'])*3 for name, camera in CONFIG['cameras'].items()])
prepped_frame_queue = queue.Queue(max_queue_size)
cameras = {}
for name, config in CONFIG['cameras'].items():
cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config,
prepped_frame_queue, client, MQTT_TOPIC_PREFIX)
fps_tracker = EventsPerSecond()
queue_full_tracker = EventsPerSecond()
prepped_queue_processor = PreppedQueueProcessor(
cameras,
prepped_frame_queue
prepped_frame_queue,
fps_tracker,
queue_full_tracker
)
prepped_queue_processor.start()
fps_tracker.start()
queue_full_tracker.start()
for name, camera in cameras.items():
camera.start()
@ -97,6 +105,22 @@ def main():
# return a healh
return "Frigate is running. Alive and healthy!"
@app.route('/debug/stats')
def stats():
stats = {
'coral': {
'fps': fps_tracker.eps(),
'inference_speed': prepped_queue_processor.avg_inference_speed,
'queue_length': prepped_frame_queue.qsize(),
'queue_full_events_per_min': queue_full_tracker.eps(60)
}
}
for name, camera in cameras.items():
stats[name] = camera.stats()
return jsonify(stats)
@app.route('/<camera_name>/<label>/best.jpg')
def best(camera_name, label):
if camera_name in cameras:

View File

@ -1,6 +1,7 @@
import json
import cv2
import threading
import prctl
from collections import Counter, defaultdict
class MqttObjectPublisher(threading.Thread):
@ -13,6 +14,7 @@ class MqttObjectPublisher(threading.Thread):
self.best_frames = best_frames
def run(self):
prctl.set_name("MqttObjectPublisher")
current_object_status = defaultdict(lambda: 'OFF')
while True:
# wait until objects have been parsed
@ -35,7 +37,8 @@ class MqttObjectPublisher(threading.Thread):
self.client.publish(self.topic_prefix+'/'+obj_name, new_status, retain=False)
# send the snapshot over mqtt if we have it as well
if obj_name in self.best_frames.best_frames:
ret, jpg = cv2.imencode('.jpg', self.best_frames.best_frames[obj_name])
best_frame = cv2.cvtColor(self.best_frames.best_frames[obj_name], cv2.COLOR_RGB2BGR)
ret, jpg = cv2.imencode('.jpg', best_frame)
if ret:
jpg_bytes = jpg.tobytes()
self.client.publish(self.topic_prefix+'/'+obj_name+'/snapshot', jpg_bytes, retain=True)

View File

@ -2,12 +2,13 @@ import datetime
import time
import cv2
import threading
import prctl
import numpy as np
from edgetpu.detection.engine import DetectionEngine
from . util import tonumpyarray, LABELS, PATH_TO_CKPT
class PreppedQueueProcessor(threading.Thread):
def __init__(self, cameras, prepped_frame_queue):
def __init__(self, cameras, prepped_frame_queue, fps, queue_full):
threading.Thread.__init__(self)
self.cameras = cameras
@ -16,89 +17,33 @@ class PreppedQueueProcessor(threading.Thread):
# Load the edgetpu engine and labels
self.engine = DetectionEngine(PATH_TO_CKPT)
self.labels = LABELS
self.fps = fps
self.queue_full = queue_full
self.avg_inference_speed = 10
def run(self):
prctl.set_name("PreppedQueueProcessor")
# process queue...
while True:
if self.prepped_frame_queue.full():
self.queue_full.update()
frame = self.prepped_frame_queue.get()
# Actual detection.
objects = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=5)
# print(self.engine.get_inference_time())
frame['detected_objects'] = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=5)
self.fps.update()
self.avg_inference_speed = (self.avg_inference_speed*9 + self.engine.get_inference_time())/10
# parse and pass detected objects back to the camera
# TODO: just send this back with all the same info you received and objects as a new property
parsed_objects = []
for obj in objects:
parsed_objects.append({
'region_id': frame['region_id'],
'frame_time': frame['frame_time'],
'name': str(self.labels[obj.label_id]),
'score': float(obj.score),
'box': obj.bounding_box.flatten().tolist()
})
self.cameras[frame['camera_name']].add_objects(parsed_objects)
# should this be a region class?
class FramePrepper(threading.Thread):
def __init__(self, camera_name, shared_frame, frame_time, frame_ready,
frame_lock,
region_size, region_x_offset, region_y_offset, region_id,
prepped_frame_queue):
threading.Thread.__init__(self)
self.camera_name = camera_name
self.shared_frame = shared_frame
self.frame_time = frame_time
self.frame_ready = frame_ready
self.frame_lock = frame_lock
self.region_size = region_size
self.region_x_offset = region_x_offset
self.region_y_offset = region_y_offset
self.region_id = region_id
self.prepped_frame_queue = prepped_frame_queue
def run(self):
frame_time = 0.0
while True:
now = datetime.datetime.now().timestamp()
with self.frame_ready:
# if there isnt a frame ready for processing or it is old, wait for a new frame
if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
self.frame_ready.wait()
# make a copy of the cropped frame
with self.frame_lock:
cropped_frame = self.shared_frame[self.region_y_offset:self.region_y_offset+self.region_size, self.region_x_offset:self.region_x_offset+self.region_size].copy()
frame_time = self.frame_time.value
# Resize to 300x300 if needed
if cropped_frame.shape != (300, 300, 3):
cropped_frame = cv2.resize(cropped_frame, dsize=(300, 300), interpolation=cv2.INTER_LINEAR)
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
frame_expanded = np.expand_dims(cropped_frame, axis=0)
# add the frame to the queue
if not self.prepped_frame_queue.full():
self.prepped_frame_queue.put({
'camera_name': self.camera_name,
'frame_time': frame_time,
'frame': frame_expanded.flatten().copy(),
'region_size': self.region_size,
'region_id': self.region_id,
'region_x_offset': self.region_x_offset,
'region_y_offset': self.region_y_offset
})
else:
print("queue full. moving on")
self.cameras[frame['camera_name']].add_objects(frame)
class RegionRequester(threading.Thread):
def __init__(self, camera):
threading.Thread.__init__(self)
self.camera = camera
def run(self):
prctl.set_name("RegionRequester")
frame_time = 0.0
while True:
now = datetime.datetime.now().timestamp()
@ -110,27 +55,27 @@ class RegionRequester(threading.Thread):
# make a copy of the frame_time
frame_time = self.camera.frame_time.value
for index, region in enumerate(self.camera.config['regions']):
# queue with priority 1
self.camera.resize_queue.put((1, {
self.camera.resize_queue.put({
'camera_name': self.camera.name,
'frame_time': frame_time,
'region_id': index,
'size': region['size'],
'x_offset': region['x_offset'],
'y_offset': region['y_offset']
}))
})
class RegionPrepper(threading.Thread):
def __init__(self, frame_cache, resize_request_queue, prepped_frame_queue):
threading.Thread.__init__(self)
self.frame_cache = frame_cache
self.resize_request_queue = resize_request_queue
self.prepped_frame_queue = prepped_frame_queue
def run(self):
prctl.set_name("RegionPrepper")
while True:
resize_request = self.resize_request_queue.get()
@ -153,7 +98,4 @@ class RegionPrepper(threading.Thread):
# add the frame to the queue
if not self.prepped_frame_queue.full():
resize_request['frame'] = frame_expanded.flatten().copy()
# add to queue with priority 1
self.prepped_frame_queue.put((1, resize_request))
else:
print("queue full. moving on")
self.prepped_frame_queue.put(resize_request)

View File

@ -2,6 +2,7 @@ import time
import datetime
import threading
import cv2
import prctl
import numpy as np
from . util import draw_box_with_label
@ -12,6 +13,7 @@ class ObjectCleaner(threading.Thread):
self._detected_objects = detected_objects
def run(self):
prctl.set_name("ObjectCleaner")
while True:
# wait a bit before checking for expired frames
@ -47,6 +49,7 @@ class BestFrames(threading.Thread):
self.best_frames = {}
def run(self):
prctl.set_name("BestFrames")
while True:
# wait until objects have been parsed
@ -80,4 +83,4 @@ class BestFrames(threading.Thread):
time_to_show = datetime.datetime.fromtimestamp(obj['frame_time']).strftime("%m/%d/%Y %H:%M:%S")
cv2.putText(best_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
self.best_frames[name] = cv2.cvtColor(best_frame, cv2.COLOR_RGB2BGR)
self.best_frames[name] = best_frame

View File

@ -1,5 +1,8 @@
import datetime
import collections
import numpy as np
import cv2
import threading
import matplotlib.pyplot as plt
# Function to read labels from text files.
@ -12,6 +15,31 @@ def ReadLabelFile(file_path):
ret[int(pair[0])] = pair[1].strip()
return ret
def calculate_region(frame_shape, xmin, ymin, xmax, ymax):
# size is 50% larger than longest edge
size = max(xmax-xmin, ymax-ymin)
# if the size is too big to fit in the frame
if size > min(frame_shape[0], frame_shape[1]):
size = min(frame_shape[0], frame_shape[1])
# x_offset is midpoint of bounding box minus half the size
x_offset = int(((xmax-xmin)/2+xmin)-size/2)
# if outside the image
if x_offset < 0:
x_offset = 0
elif x_offset > (frame_shape[1]-size):
x_offset = (frame_shape[1]-size)
# x_offset is midpoint of bounding box minus half the size
y_offset = int(((ymax-ymin)/2+ymin)-size/2)
# if outside the image
if y_offset < 0:
y_offset = 0
elif y_offset > (frame_shape[0]-size):
y_offset = (frame_shape[0]-size)
return (size, x_offset, y_offset)
# convert shared memory array into numpy array
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj(), dtype=np.uint8)
@ -47,4 +75,45 @@ cmap = plt.cm.get_cmap('tab10', len(LABELS.keys()))
COLOR_MAP = {}
for key, val in LABELS.items():
COLOR_MAP[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
COLOR_MAP[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
class QueueMerger():
def __init__(self, from_queues, to_queue):
self.from_queues = from_queues
self.to_queue = to_queue
self.merge_threads = []
def start(self):
for from_q in self.from_queues:
self.merge_threads.append(QueueTransfer(from_q,self.to_queue))
class QueueTransfer(threading.Thread):
def __init__(self, from_queue, to_queue):
threading.Thread.__init__(self)
self.from_queue = from_queue
self.to_queue = to_queue
def run(self):
while True:
self.to_queue.put(self.from_queue.get())
class EventsPerSecond:
def __init__(self, max_events=1000):
self._start = None
self._max_events = max_events
self._timestamps = []
def start(self):
self._start = datetime.datetime.now().timestamp()
def update(self):
self._timestamps.append(datetime.datetime.now().timestamp())
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events+100:
self._timestamps = self._timestamps[(1-self._max_events):]
def eps(self, last_n_seconds=10):
# compute the (approximate) events in the last n seconds
now = datetime.datetime.now().timestamp()
seconds = min(now-self._start, last_n_seconds)
return len([t for t in self._timestamps if t > (now-last_n_seconds)]) / seconds

View File

@ -8,9 +8,10 @@ import ctypes
import multiprocessing as mp
import subprocess as sp
import numpy as np
import prctl
from collections import defaultdict
from . util import tonumpyarray, draw_box_with_label
from . object_detection import FramePrepper, RegionPrepper, RegionRequester
from . util import tonumpyarray, LABELS, draw_box_with_label, calculate_region, EventsPerSecond
from . object_detection import RegionPrepper, RegionRequester
from . objects import ObjectCleaner, BestFrames
from . mqtt import MqttObjectPublisher
@ -25,23 +26,13 @@ class FrameTracker(threading.Thread):
self.recent_frames = recent_frames
def run(self):
frame_time = 0.0
prctl.set_name("FrameTracker")
while True:
now = datetime.datetime.now().timestamp()
# wait for a frame
with self.frame_ready:
# if there isnt a frame ready for processing or it is old, wait for a signal
if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
self.frame_ready.wait()
# lock and make a copy of the frame
with self.frame_lock:
frame = self.shared_frame.copy()
frame_time = self.frame_time.value
# add the frame to recent frames
self.recent_frames[frame_time] = frame
self.frame_ready.wait()
now = datetime.datetime.now().timestamp()
# delete any old frames
stored_frame_times = list(self.recent_frames.keys())
for k in stored_frame_times:
@ -67,7 +58,7 @@ class CameraWatchdog(threading.Thread):
self.camera = camera
def run(self):
prctl.set_name("CameraWatchdog")
while True:
# wait a bit before checking
time.sleep(10)
@ -84,6 +75,7 @@ class CameraCapture(threading.Thread):
self.camera = camera
def run(self):
prctl.set_name("CameraCapture")
frame_num = 0
while True:
if self.camera.ffmpeg_process.poll() != None:
@ -108,10 +100,13 @@ class CameraCapture(threading.Thread):
.frombuffer(raw_image, np.uint8)
.reshape(self.camera.frame_shape)
)
self.camera.frame_cache[self.camera.frame_time.value] = self.camera.current_frame.copy()
# Notify with the condition that a new frame is ready
with self.camera.frame_ready:
self.camera.frame_ready.notify_all()
self.camera.fps.update()
class Camera:
def __init__(self, name, ffmpeg_config, global_objects_config, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
self.name = name
@ -148,7 +143,7 @@ class Camera:
# Queue for prepped frames, max size set to (number of regions * 5)
max_queue_size = len(self.config['regions'])*5
self.resize_queue = queue.PriorityQueue(max_queue_size)
self.resize_queue = queue.Queue(max_queue_size)
# initialize the frame cache
self.cached_frame_with_objects = {
@ -158,6 +153,7 @@ class Camera:
self.ffmpeg_process = None
self.capture_thread = None
self.fps = EventsPerSecond()
# for each region, merge the object config
self.detection_prep_threads = []
@ -173,6 +169,7 @@ class Camera:
# start a thread to queue resize requests for regions
self.region_requester = RegionRequester(self)
self.region_requester.start()
# start a thread to cache recent frames for processing
self.frame_tracker = FrameTracker(self.current_frame, self.frame_time,
@ -234,6 +231,7 @@ class Camera:
self.capture_thread = CameraCapture(self)
print("Starting a new capture thread...")
self.capture_thread.start()
self.fps.start()
def start_ffmpeg(self):
ffmpeg_cmd = (['ffmpeg'] +
@ -261,20 +259,30 @@ class Camera:
def get_capture_pid(self):
return self.ffmpeg_process.pid
def add_objects(self, objects):
def add_objects(self, frame):
objects = frame['detected_objects']
if len(objects) == 0:
return
for obj in objects:
for raw_obj in objects:
obj = {
'score': float(raw_obj.score),
'box': raw_obj.bounding_box.flatten().tolist(),
'name': str(LABELS[raw_obj.label_id]),
'frame_time': frame['frame_time'],
'region_id': frame['region_id']
}
# find the matching region
region = self.regions[obj['region_id']]
region = self.regions[frame['region_id']]
# Compute some extra properties
obj.update({
'xmin': int((obj['box'][0] * region['size']) + region['x_offset']),
'ymin': int((obj['box'][1] * region['size']) + region['y_offset']),
'xmax': int((obj['box'][2] * region['size']) + region['x_offset']),
'ymax': int((obj['box'][3] * region['size']) + region['y_offset'])
'xmin': int((obj['box'][0] * frame['size']) + frame['x_offset']),
'ymin': int((obj['box'][1] * frame['size']) + frame['y_offset']),
'xmax': int((obj['box'][2] * frame['size']) + frame['x_offset']),
'ymax': int((obj['box'][3] * frame['size']) + frame['y_offset'])
})
# Compute the area
@ -307,6 +315,29 @@ class Camera:
# if the object is in a masked location, don't add it to detected objects
if self.mask[y_location][x_location] == [0]:
continue
# look to see if the bounding box is too close to the region border and the region border is not the edge of the frame
# if ((frame['x_offset'] > 0 and obj['box'][0] < 0.01) or
# (frame['y_offset'] > 0 and obj['box'][1] < 0.01) or
# (frame['x_offset']+frame['size'] < self.frame_shape[1] and obj['box'][2] > 0.99) or
# (frame['y_offset']+frame['size'] < self.frame_shape[0] and obj['box'][3] > 0.99)):
# size, x_offset, y_offset = calculate_region(self.frame_shape, obj['xmin'], obj['ymin'], obj['xmax'], obj['ymax'])
# This triggers WAY too often with stationary objects on the edge of a region.
# Every frame triggers it and fills the queue...
# I need to create a new region and add it to the list of regions, but
# it needs to check for a duplicate region first.
# self.resize_queue.put({
# 'camera_name': self.name,
# 'frame_time': frame['frame_time'],
# 'region_id': frame['region_id'],
# 'size': size,
# 'x_offset': x_offset,
# 'y_offset': y_offset
# })
# print('object too close to region border')
#continue
self.detected_objects.append(obj)
@ -315,6 +346,12 @@ class Camera:
def get_best(self, label):
return self.best_frames.best_frames.get(label)
def stats(self):
return {
'camera_fps': self.fps.eps(60),
'resize_queue': self.resize_queue.qsize()
}
def get_current_frame_with_objects(self):
# make a copy of the current detected objects
@ -340,6 +377,9 @@ class Camera:
# print a timestamp
time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
cv2.putText(frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
# print fps
cv2.putText(frame, str(self.fps.eps())+'FPS', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
# convert to BGR
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)