refactor some classes into new files

This commit is contained in:
Blake Blackshear 2020-02-09 07:39:24 -06:00
parent 8a572f96d5
commit c539993387
3 changed files with 203 additions and 207 deletions

124
frigate/edgetpu.py Normal file
View File

@ -0,0 +1,124 @@
import multiprocessing as mp
import numpy as np
import SharedArray as sa
import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate
def load_labels(path, encoding='utf-8'):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, 'r', encoding=encoding) as f:
lines = f.readlines()
if not lines:
return {}
if lines[0].split(' ', maxsplit=1)[0].isdigit():
pairs = [line.split(' ', maxsplit=1) for line in lines]
return {int(index): label.strip() for index, label in pairs}
else:
return {index: line.strip() for index, line in enumerate(lines)}
class ObjectDetector():
def __init__(self, model_file):
edge_tpu_delegate = None
try:
edge_tpu_delegate = load_delegate('libedgetpu.so.1.0')
except ValueError:
print("No EdgeTPU detected. Falling back to CPU.")
if edge_tpu_delegate is None:
self.interpreter = tflite.Interpreter(
model_path=model_file)
else:
self.interpreter = tflite.Interpreter(
model_path=model_file,
experimental_delegates=[edge_tpu_delegate])
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
self.interpreter.invoke()
boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
detections = np.zeros((20,6), np.float32)
for i, score in enumerate(scores):
detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
return detections
class EdgeTPUProcess():
def __init__(self, model):
try:
sa.delete("frame")
except:
pass
try:
sa.delete("detections")
except:
pass
self.input_frame = sa.create("frame", shape=(1,300,300,3), dtype=np.uint8)
self.detections = sa.create("detections", shape=(20,6), dtype=np.float32)
self.detect_lock = mp.Lock()
self.detect_ready = mp.Event()
self.frame_ready = mp.Event()
def run_detector(model, detect_ready, frame_ready):
object_detector = ObjectDetector(model)
input_frame = sa.attach("frame")
detections = sa.attach("detections")
while True:
# wait until a frame is ready
frame_ready.wait()
# signal that the process is busy
frame_ready.clear()
detections[:] = object_detector.detect_raw(input_frame)
# signal that the process is ready to detect
detect_ready.set()
self.detect_process = mp.Process(target=run_detector, args=(model, self.detect_ready, self.frame_ready))
self.detect_process.daemon = True
self.detect_process.start()
class RemoteObjectDetector():
def __init__(self, labels, detect_lock, detect_ready, frame_ready):
self.labels = load_labels(labels)
self.input_frame = sa.attach("frame")
self.detections = sa.attach("detections")
self.detect_lock = detect_lock
self.detect_ready = detect_ready
self.frame_ready = frame_ready
def detect(self, tensor_input, threshold=.4):
detections = []
with self.detect_lock:
self.input_frame[:] = tensor_input
# unset detections and signal that a frame is ready
self.detect_ready.clear()
self.frame_ready.set()
# wait until the detection process is finished,
self.detect_ready.wait()
for d in self.detections:
if d[1] < threshold:
break
detections.append((
self.labels[int(d[0])],
float(d[1]),
(d[2], d[3], d[4], d[5])
))
return detections

76
frigate/motion.py Normal file
View File

@ -0,0 +1,76 @@
import cv2
import imutils
import numpy as np
class MotionDetector():
# TODO: add motion masking
def __init__(self, frame_shape, resize_factor=4):
self.resize_factor = resize_factor
self.motion_frame_size = (int(frame_shape[0]/resize_factor), int(frame_shape[1]/resize_factor))
self.avg_frame = np.zeros(self.motion_frame_size, np.float)
self.avg_delta = np.zeros(self.motion_frame_size, np.float)
self.motion_frame_count = 0
self.frame_counter = 0
def detect(self, frame):
motion_boxes = []
# resize frame
resized_frame = cv2.resize(frame, dsize=(self.motion_frame_size[1], self.motion_frame_size[0]), interpolation=cv2.INTER_LINEAR)
# convert to grayscale
gray = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)
# it takes ~30 frames to establish a baseline
# dont bother looking for motion
if self.frame_counter < 30:
self.frame_counter += 1
else:
# compare to average
frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(self.avg_frame))
# compute the average delta over the past few frames
# the alpha value can be modified to configure how sensitive the motion detection is.
# higher values mean the current frame impacts the delta a lot, and a single raindrop may
# register as motion, too low and a fast moving person wont be detected as motion
# this also assumes that a person is in the same location across more than a single frame
cv2.accumulateWeighted(frameDelta, self.avg_delta, 0.2)
# compute the threshold image for the current frame
current_thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
# black out everything in the avg_delta where there isnt motion in the current frame
avg_delta_image = cv2.convertScaleAbs(self.avg_delta)
avg_delta_image[np.where(current_thresh==[0])] = [0]
# then look for deltas above the threshold, but only in areas where there is a delta
# in the current frame. this prevents deltas from previous frames from being included
thresh = cv2.threshold(avg_delta_image, 25, 255, cv2.THRESH_BINARY)[1]
# dilate the thresholded image to fill in holes, then find contours
# on thresholded image
thresh = cv2.dilate(thresh, None, iterations=2)
cnts = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cnts = imutils.grab_contours(cnts)
# loop over the contours
for c in cnts:
# if the contour is big enough, count it as motion
contour_area = cv2.contourArea(c)
if contour_area > 100:
# cv2.drawContours(resized_frame, [c], -1, (255,255,255), 2)
x, y, w, h = cv2.boundingRect(c)
motion_boxes.append((x*self.resize_factor, y*self.resize_factor, (x+w)*self.resize_factor, (y+h)*self.resize_factor))
if len(motion_boxes) > 0:
self.motion_frame_count += 1
# TODO: this really depends on FPS
if self.motion_frame_count >= 10:
# only average in the current frame if the difference persists for at least 3 frames
cv2.accumulateWeighted(gray, self.avg_frame, 0.2)
else:
# when no motion, just keep averaging the frames together
cv2.accumulateWeighted(gray, self.avg_frame, 0.2)
self.motion_frame_count = 0
return motion_boxes

View File

@ -14,25 +14,8 @@ import SharedArray as sa
from scipy.spatial import distance as dist from scipy.spatial import distance as dist
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate from tflite_runtime.interpreter import load_delegate
from frigate.edgetpu import ObjectDetector, EdgeTPUProcess, RemoteObjectDetector, load_labels
def load_labels(path, encoding='utf-8'): from frigate.motion import MotionDetector
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, 'r', encoding=encoding) as f:
lines = f.readlines()
if not lines:
return {}
if lines[0].split(' ', maxsplit=1)[0].isdigit():
pairs = [line.split(' ', maxsplit=1) for line in lines]
return {int(index): label.strip() for index, label in pairs}
else:
return {index: line.strip() for index, line in enumerate(lines)}
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'): def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None: if color is None:
@ -152,193 +135,6 @@ def create_tensor_input(frame, region):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
return np.expand_dims(cropped_frame, axis=0) return np.expand_dims(cropped_frame, axis=0)
class MotionDetector():
# TODO: add motion masking
def __init__(self, frame_shape, resize_factor=4):
self.resize_factor = resize_factor
self.motion_frame_size = (int(frame_shape[0]/resize_factor), int(frame_shape[1]/resize_factor))
self.avg_frame = np.zeros(self.motion_frame_size, np.float)
self.avg_delta = np.zeros(self.motion_frame_size, np.float)
self.motion_frame_count = 0
self.frame_counter = 0
def detect(self, frame):
motion_boxes = []
# resize frame
resized_frame = cv2.resize(frame, dsize=(self.motion_frame_size[1], self.motion_frame_size[0]), interpolation=cv2.INTER_LINEAR)
# convert to grayscale
gray = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)
# it takes ~30 frames to establish a baseline
# dont bother looking for motion
if self.frame_counter < 30:
self.frame_counter += 1
else:
# compare to average
frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(self.avg_frame))
# compute the average delta over the past few frames
# the alpha value can be modified to configure how sensitive the motion detection is.
# higher values mean the current frame impacts the delta a lot, and a single raindrop may
# register as motion, too low and a fast moving person wont be detected as motion
# this also assumes that a person is in the same location across more than a single frame
cv2.accumulateWeighted(frameDelta, self.avg_delta, 0.2)
# compute the threshold image for the current frame
current_thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
# black out everything in the avg_delta where there isnt motion in the current frame
avg_delta_image = cv2.convertScaleAbs(self.avg_delta)
avg_delta_image[np.where(current_thresh==[0])] = [0]
# then look for deltas above the threshold, but only in areas where there is a delta
# in the current frame. this prevents deltas from previous frames from being included
thresh = cv2.threshold(avg_delta_image, 25, 255, cv2.THRESH_BINARY)[1]
# dilate the thresholded image to fill in holes, then find contours
# on thresholded image
thresh = cv2.dilate(thresh, None, iterations=2)
cnts = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cnts = imutils.grab_contours(cnts)
# loop over the contours
for c in cnts:
# if the contour is big enough, count it as motion
contour_area = cv2.contourArea(c)
if contour_area > 100:
# cv2.drawContours(resized_frame, [c], -1, (255,255,255), 2)
x, y, w, h = cv2.boundingRect(c)
motion_boxes.append((x*self.resize_factor, y*self.resize_factor, (x+w)*self.resize_factor, (y+h)*self.resize_factor))
if len(motion_boxes) > 0:
self.motion_frame_count += 1
# TODO: this really depends on FPS
if self.motion_frame_count >= 10:
# only average in the current frame if the difference persists for at least 3 frames
cv2.accumulateWeighted(gray, self.avg_frame, 0.2)
else:
# when no motion, just keep averaging the frames together
cv2.accumulateWeighted(gray, self.avg_frame, 0.2)
self.motion_frame_count = 0
return motion_boxes
class ObjectDetector():
def __init__(self, model_file, label_file):
self.labels = load_labels(label_file)
edge_tpu_delegate = None
try:
edge_tpu_delegate = load_delegate('libedgetpu.so.1.0')
except ValueError:
print("No EdgeTPU detected. Falling back to CPU.")
if edge_tpu_delegate is None:
self.interpreter = tflite.Interpreter(
model_path=model_file)
else:
self.interpreter = tflite.Interpreter(
model_path=model_file,
experimental_delegates=[edge_tpu_delegate])
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
self.interpreter.invoke()
boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
detections = np.zeros((20,6), np.float32)
for i, score in enumerate(scores):
detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
return detections
def detect(self, tensor_input, threshold=.4):
self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
self.interpreter.invoke()
boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
detections = []
for i, score in enumerate(scores):
label = self.labels[int(label_codes[i])]
if score < threshold:
break
detections.append((
label,
float(score),
boxes[i]
))
return detections
class RemoteObjectDetector():
def __init__(self, model, labels):
self.labels = load_labels(labels)
try:
sa.delete("frame")
except:
pass
try:
sa.delete("detections")
except:
pass
self.input_frame = sa.create("frame", shape=(1,300,300,3), dtype=np.uint8)
self.detections = sa.create("detections", shape=(20,6), dtype=np.float32)
self.detect_lock = mp.Lock()
self.detect_ready = mp.Event()
self.frame_ready = mp.Event()
def run_detector(model, labels, detect_ready, frame_ready):
object_detector = ObjectDetector(model, labels)
input_frame = sa.attach("frame")
detections = sa.attach("detections")
while True:
# wait until a frame is ready
frame_ready.wait()
# signal that the process is busy
frame_ready.clear()
detections[:] = object_detector.detect_raw(input_frame)
# signal that the process is ready to detect
detect_ready.set()
self.detect_process = mp.Process(target=run_detector, args=(model, labels, self.detect_ready, self.frame_ready))
self.detect_process.daemon = True
self.detect_process.start()
def detect(self, tensor_input, threshold=.4):
detections = []
with self.detect_lock:
self.input_frame[:] = tensor_input
# unset detections and signal that a frame is ready
self.detect_ready.clear()
self.frame_ready.set()
# wait until the detection process is finished,
self.detect_ready.wait()
for d in self.detections:
if d[1] < threshold:
break
detections.append((
self.labels[int(d[0])],
float(d[1]),
(d[2], d[3], d[4], d[5])
))
return detections
class ObjectTracker(): class ObjectTracker():
def __init__(self, max_disappeared): def __init__(self, max_disappeared):
self.tracked_objects = {} self.tracked_objects = {}
@ -521,7 +317,7 @@ def main():
start_frame = datetime.datetime.now().timestamp() start_frame = datetime.datetime.now().timestamp()
frame_detections = 0 frame_detections = 0
frame_bytes = ffmpeg_process.stdout.read(frame_size)#f.read(frame_size) frame_bytes = ffmpeg_process.stdout.read(frame_size)
if not frame_bytes: if not frame_bytes:
break break
frame_time = datetime.datetime.now().timestamp() frame_time = datetime.datetime.now().timestamp()