From 86f5d8128d82706c3a515dfda928492cf718d3eb Mon Sep 17 00:00:00 2001 From: blakeblackshear Date: Mon, 25 Feb 2019 20:27:02 -0600 Subject: [PATCH] initial refactoring --- .gitignore | 2 + detect_objects.py | 402 +++--------------------------------- frigate/motion.py | 114 ++++++++++ frigate/mqtt.py | 57 +++++ frigate/object_detection.py | 114 ++++++++++ frigate/objects.py | 48 +++++ frigate/util.py | 5 + frigate/video.py | 41 ++++ 8 files changed, 410 insertions(+), 373 deletions(-) create mode 100644 .gitignore create mode 100644 frigate/motion.py create mode 100644 frigate/mqtt.py create mode 100644 frigate/object_detection.py create mode 100644 frigate/objects.py create mode 100644 frigate/util.py create mode 100644 frigate/video.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..2c31dc4ca --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +debug diff --git a/detect_objects.py b/detect_objects.py index 440c5f80a..175d9742e 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -10,192 +10,31 @@ import threading import json from contextlib import closing import numpy as np -import tensorflow as tf -from object_detection.utils import label_map_util from object_detection.utils import visualization_utils as vis_util from flask import Flask, Response, make_response import paho.mqtt.client as mqtt +from frigate.util import tonumpyarray +from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher +from frigate.objects import ObjectParser, ObjectCleaner +from frigate.motion import detect_motion +from frigate.video import fetch_frames +from frigate.object_detection import detect_objects + RTSP_URL = os.getenv('RTSP_URL') -# Path to frozen detection graph. This is the actual model that is used for the object detection. -PATH_TO_CKPT = '/frozen_inference_graph.pb' - -# List of the strings that is used to add correct label for each box. -PATH_TO_LABELS = '/label_map.pbtext' - MQTT_HOST = os.getenv('MQTT_HOST') MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX') MQTT_OBJECT_CLASSES = os.getenv('MQTT_OBJECT_CLASSES') -# TODO: make dynamic? -NUM_CLASSES = 90 - # REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50" # REGIONS = "400,350,250,50" REGIONS = os.getenv('REGIONS') -DETECTED_OBJECTS = [] - DEBUG = (os.getenv('DEBUG') == '1') -# Loading label map -label_map = label_map_util.load_labelmap(PATH_TO_LABELS) -categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, - use_display_name=True) -category_index = label_map_util.create_category_index(categories) - -def detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug): - # Expand dimensions since the model expects images to have shape: [1, None, None, 3] - image_np_expanded = np.expand_dims(cropped_frame, axis=0) - image_tensor = detection_graph.get_tensor_by_name('image_tensor:0') - - # Each box represents a part of the image where a particular object was detected. - boxes = detection_graph.get_tensor_by_name('detection_boxes:0') - - # Each score represent how level of confidence for each of the objects. - # Score is shown on the result image, together with the class label. - scores = detection_graph.get_tensor_by_name('detection_scores:0') - classes = detection_graph.get_tensor_by_name('detection_classes:0') - num_detections = detection_graph.get_tensor_by_name('num_detections:0') - - # Actual detection. - (boxes, scores, classes, num_detections) = sess.run( - [boxes, scores, classes, num_detections], - feed_dict={image_tensor: image_np_expanded}) - - if debug: - if len([value for index,value in enumerate(classes[0]) if str(category_index.get(value).get('name')) == 'person' and scores[0,index] > 0.5]) > 0: - vis_util.visualize_boxes_and_labels_on_image_array( - cropped_frame, - np.squeeze(boxes), - np.squeeze(classes).astype(np.int32), - np.squeeze(scores), - category_index, - use_normalized_coordinates=True, - line_thickness=4) - cv2.imwrite("/lab/debug/obj-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame) - - - # build an array of detected objects - objects = [] - for index, value in enumerate(classes[0]): - score = scores[0, index] - if score > 0.5: - box = boxes[0, index].tolist() - objects.append({ - 'name': str(category_index.get(value).get('name')), - 'score': float(score), - 'ymin': int((box[0] * region_size) + region_y_offset), - 'xmin': int((box[1] * region_size) + region_x_offset), - 'ymax': int((box[2] * region_size) + region_y_offset), - 'xmax': int((box[3] * region_size) + region_x_offset) - }) - - return objects - -class ObjectParser(threading.Thread): - def __init__(self, object_queue, objects_parsed): - threading.Thread.__init__(self) - self._object_queue = object_queue - self._objects_parsed = objects_parsed - - def run(self): - global DETECTED_OBJECTS - while True: - obj = self._object_queue.get() - DETECTED_OBJECTS.append(obj) - - # notify that objects were parsed - with self._objects_parsed: - self._objects_parsed.notify_all() - -class ObjectCleaner(threading.Thread): - def __init__(self, objects_parsed): - threading.Thread.__init__(self) - self._objects_parsed = objects_parsed - - def run(self): - global DETECTED_OBJECTS - while True: - - # expire the objects that are more than 1 second old - now = datetime.datetime.now().timestamp() - # look for the first object found within the last second - # (newest objects are appended to the end) - detected_objects = DETECTED_OBJECTS.copy() - num_to_delete = 0 - for obj in detected_objects: - if now-obj['frame_time']<1: - break - num_to_delete += 1 - if num_to_delete > 0: - del DETECTED_OBJECTS[:num_to_delete] - - # notify that parsed objects were changed - with self._objects_parsed: - self._objects_parsed.notify_all() - - # wait a bit before checking for more expired frames - time.sleep(0.2) - -class MqttMotionPublisher(threading.Thread): - def __init__(self, client, topic_prefix, motion_changed, motion_flags): - threading.Thread.__init__(self) - self.client = client - self.topic_prefix = topic_prefix - self.motion_changed = motion_changed - self.motion_flags = motion_flags - - def run(self): - last_sent_motion = "" - while True: - with self.motion_changed: - self.motion_changed.wait() - - # send message for motion - motion_status = 'OFF' - if any(obj.is_set() for obj in self.motion_flags): - motion_status = 'ON' - - if last_sent_motion != motion_status: - last_sent_motion = motion_status - self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False) - -class MqttObjectPublisher(threading.Thread): - def __init__(self, client, topic_prefix, objects_parsed, object_classes): - threading.Thread.__init__(self) - self.client = client - self.topic_prefix = topic_prefix - self.objects_parsed = objects_parsed - self.object_classes = object_classes - - def run(self): - global DETECTED_OBJECTS - - last_sent_payload = "" - while True: - - # initialize the payload - payload = {} - - # wait until objects have been parsed - with self.objects_parsed: - self.objects_parsed.wait() - - # add all the person scores in detected objects and - # average over past 1 seconds (5fps) - detected_objects = DETECTED_OBJECTS.copy() - avg_person_score = sum([obj['score'] for obj in detected_objects if obj['name'] == 'person'])/5 - payload['person'] = int(avg_person_score*100) - - # send message for objects if different - new_payload = json.dumps(payload, sort_keys=True) - if new_payload != last_sent_payload: - last_sent_payload = new_payload - self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False) - def main(): + DETECTED_OBJECTS = [] # Parse selected regions regions = [] for region_string in REGIONS.split(':'): @@ -234,7 +73,7 @@ def main(): shared_arr = mp.Array(ctypes.c_uint16, flat_array_length) # create shared value for storing the frame_time shared_frame_time = mp.Value('d', 0.0) - # Lock to control access to the frame while writing + # Lock to control access to the frame frame_lock = mp.Lock() # Condition for notifying that a new frame is ready frame_ready = mp.Condition() @@ -244,17 +83,20 @@ def main(): objects_parsed = mp.Condition() # Queue for detected objects object_queue = mp.Queue() + # shape current frame so it can be treated as an image frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) + # start the process to capture frames from the RTSP stream and store in a shared array capture_process = mp.Process(target=fetch_frames, args=(shared_arr, - shared_frame_time, frame_lock, frame_ready, frame_shape)) + shared_frame_time, frame_lock, frame_ready, frame_shape, RTSP_URL)) capture_process.daemon = True + # for each region, start a separate process for motion detection and object detection detection_processes = [] motion_processes = [] for region in regions: - detection_process = mp.Process(target=process_frames, args=(shared_arr, + detection_process = mp.Process(target=detect_objects, args=(shared_arr, object_queue, shared_frame_time, frame_lock, frame_ready, @@ -278,34 +120,46 @@ def main(): motion_process.daemon = True motion_processes.append(motion_process) - object_parser = ObjectParser(object_queue, objects_parsed) + # start a thread to parse objects from the queue + object_parser = ObjectParser(object_queue, objects_parsed, DETECTED_OBJECTS) object_parser.start() - object_cleaner = ObjectCleaner(objects_parsed) + # start a thread to expire objects from the detected objects list + object_cleaner = ObjectCleaner(objects_parsed, DETECTED_OBJECTS) object_cleaner.start() + # connect to mqtt and setup last will client = mqtt.Client() client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) client.connect(MQTT_HOST, 1883, 60) client.loop_start() + # publish a message to signal that the service is running client.publish(MQTT_TOPIC_PREFIX+'/available', 'online', retain=True) + # start a thread to publish object scores (currently only person) mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, - MQTT_OBJECT_CLASSES.split(',')) + MQTT_OBJECT_CLASSES.split(','), DETECTED_OBJECTS) mqtt_publisher.start() + # start thread to publish motion status mqtt_motion_publisher = MqttMotionPublisher(client, MQTT_TOPIC_PREFIX, motion_changed, [region['motion_detected'] for region in regions]) mqtt_motion_publisher.start() + # start the process of capturing frames capture_process.start() print("capture_process pid ", capture_process.pid) + + # start the object detection processes for detection_process in detection_processes: detection_process.start() print("detection_process pid ", detection_process.pid) + + # start the motion detection processes for motion_process in motion_processes: motion_process.start() print("motion_process pid ", motion_process.pid) + # create a flask app that encodes frames a mjpeg on demand app = Flask(__name__) @app.route('/') @@ -314,7 +168,6 @@ def main(): return Response(imagestream(), mimetype='multipart/x-mixed-replace; boundary=frame') def imagestream(): - global DETECTED_OBJECTS while True: # max out at 5 FPS time.sleep(0.2) @@ -363,202 +216,5 @@ def main(): object_cleaner.join() mqtt_publisher.join() -# convert shared memory array into numpy array -def tonumpyarray(mp_arr): - return np.frombuffer(mp_arr.get_obj(), dtype=np.uint16) - -# fetch the frames as fast a possible, only decoding the frames when the -# detection_process has consumed the current frame -def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape): - # convert shared memory array into numpy and shape into image array - arr = tonumpyarray(shared_arr).reshape(frame_shape) - - # start the video capture - video = cv2.VideoCapture() - video.open(RTSP_URL) - # keep the buffer small so we minimize old data - video.set(cv2.CAP_PROP_BUFFERSIZE,1) - - while True: - # check if the video stream is still open, and reopen if needed - if not video.isOpened(): - success = video.open(RTSP_URL) - if not success: - time.sleep(1) - continue - # grab the frame, but dont decode it yet - ret = video.grab() - # snapshot the time the frame was grabbed - frame_time = datetime.datetime.now() - if ret: - # go ahead and decode the current frame - ret, frame = video.retrieve() - if ret: - # Lock access and update frame - with frame_lock: - arr[:] = frame - shared_frame_time.value = frame_time.timestamp() - # Notify with the condition that a new frame is ready - with frame_ready: - frame_ready.notify_all() - - video.release() - -# do the actual object detection -def process_frames(shared_arr, object_queue, shared_frame_time, frame_lock, frame_ready, - motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, - min_person_area, debug): - # shape shared input array into frame for processing - arr = tonumpyarray(shared_arr).reshape(frame_shape) - - # Load a (frozen) Tensorflow model into memory before the processing loop - detection_graph = tf.Graph() - with detection_graph.as_default(): - od_graph_def = tf.GraphDef() - with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid: - serialized_graph = fid.read() - od_graph_def.ParseFromString(serialized_graph) - tf.import_graph_def(od_graph_def, name='') - sess = tf.Session(graph=detection_graph) - - frame_time = 0.0 - while True: - now = datetime.datetime.now().timestamp() - - # wait until motion is detected - motion_detected.wait() - - with frame_ready: - # if there isnt a frame ready for processing or it is old, wait for a signal - if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5: - frame_ready.wait() - - # make a copy of the cropped frame - with frame_lock: - cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy() - frame_time = shared_frame_time.value - - # convert to RGB - cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB) - # do the object detection - objects = detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug) - for obj in objects: - # ignore persons below the size threshold - if obj['name'] == 'person' and (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin']) < min_person_area: - continue - obj['frame_time'] = frame_time - object_queue.put(obj) - - -# do the actual motion detection -def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed, - frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, mask, debug): - # shape shared input array into frame for processing - arr = tonumpyarray(shared_arr).reshape(frame_shape) - - avg_frame = None - avg_delta = None - frame_time = 0.0 - motion_frames = 0 - while True: - now = datetime.datetime.now().timestamp() - - with frame_ready: - # if there isnt a frame ready for processing or it is old, wait for a signal - if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5: - frame_ready.wait() - - # lock and make a copy of the cropped frame - with frame_lock: - cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy().astype('uint8') - frame_time = shared_frame_time.value - - # convert to grayscale - gray = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2GRAY) - - # apply image mask to remove areas from motion detection - gray[mask] = [255] - - # apply gaussian blur - gray = cv2.GaussianBlur(gray, (21, 21), 0) - - if avg_frame is None: - avg_frame = gray.copy().astype("float") - continue - - # look at the delta from the avg_frame - frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg_frame)) - - if avg_delta is None: - avg_delta = frameDelta.copy().astype("float") - - # compute the average delta over the past few frames - # the alpha value can be modified to configure how sensitive the motion detection is. - # higher values mean the current frame impacts the delta a lot, and a single raindrop may - # register as motion, too low and a fast moving person wont be detected as motion - # this also assumes that a person is in the same location across more than a single frame - cv2.accumulateWeighted(frameDelta, avg_delta, 0.2) - - # compute the threshold image for the current frame - current_thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] - - # black out everything in the avg_delta where there isnt motion in the current frame - avg_delta_image = cv2.convertScaleAbs(avg_delta) - avg_delta_image[np.where(current_thresh==[0])] = [0] - - # then look for deltas above the threshold, but only in areas where there is a delta - # in the current frame. this prevents deltas from previous frames from being included - thresh = cv2.threshold(avg_delta_image, 25, 255, cv2.THRESH_BINARY)[1] - - # dilate the thresholded image to fill in holes, then find contours - # on thresholded image - thresh = cv2.dilate(thresh, None, iterations=2) - cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - cnts = imutils.grab_contours(cnts) - - # if there are no contours, there is no motion - if len(cnts) < 1: - motion_frames = 0 - continue - - motion_found = False - - # loop over the contours - for c in cnts: - # if the contour is big enough, count it as motion - contour_area = cv2.contourArea(c) - if contour_area > min_motion_area: - motion_found = True - if debug: - cv2.drawContours(cropped_frame, [c], -1, (0, 255, 0), 2) - x, y, w, h = cv2.boundingRect(c) - cv2.putText(cropped_frame, str(contour_area), (x, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 0), 2) - else: - break - - if motion_found: - motion_frames += 1 - # if there have been enough consecutive motion frames, report motion - if motion_frames >= 3: - # only average in the current frame if the difference persists for at least 3 frames - cv2.accumulateWeighted(gray, avg_frame, 0.01) - motion_detected.set() - with motion_changed: - motion_changed.notify_all() - else: - # when no motion, just keep averaging the frames together - cv2.accumulateWeighted(gray, avg_frame, 0.01) - motion_frames = 0 - if motion_detected.is_set(): - motion_detected.clear() - with motion_changed: - motion_changed.notify_all() - - if debug and motion_frames == 3: - cv2.imwrite("/lab/debug/motion-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame) - cv2.imwrite("/lab/debug/avg_delta-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), avg_delta_image) - if __name__ == '__main__': - mp.freeze_support() main() \ No newline at end of file diff --git a/frigate/motion.py b/frigate/motion.py new file mode 100644 index 000000000..89a26cf66 --- /dev/null +++ b/frigate/motion.py @@ -0,0 +1,114 @@ +import datetime +import numpy as np +import cv2 +import imutils +from . util import tonumpyarray + +# do the actual motion detection +def detect_motion(shared_arr, shared_frame_time, frame_lock, frame_ready, motion_detected, motion_changed, + frame_shape, region_size, region_x_offset, region_y_offset, min_motion_area, mask, debug): + # shape shared input array into frame for processing + arr = tonumpyarray(shared_arr).reshape(frame_shape) + + avg_frame = None + avg_delta = None + frame_time = 0.0 + motion_frames = 0 + while True: + now = datetime.datetime.now().timestamp() + + with frame_ready: + # if there isnt a frame ready for processing or it is old, wait for a signal + if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5: + frame_ready.wait() + + # lock and make a copy of the cropped frame + with frame_lock: + cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy().astype('uint8') + frame_time = shared_frame_time.value + + # convert to grayscale + gray = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2GRAY) + + # apply image mask to remove areas from motion detection + gray[mask] = [255] + + # apply gaussian blur + gray = cv2.GaussianBlur(gray, (21, 21), 0) + + if avg_frame is None: + avg_frame = gray.copy().astype("float") + continue + + # look at the delta from the avg_frame + frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg_frame)) + + if avg_delta is None: + avg_delta = frameDelta.copy().astype("float") + + # compute the average delta over the past few frames + # the alpha value can be modified to configure how sensitive the motion detection is. + # higher values mean the current frame impacts the delta a lot, and a single raindrop may + # register as motion, too low and a fast moving person wont be detected as motion + # this also assumes that a person is in the same location across more than a single frame + cv2.accumulateWeighted(frameDelta, avg_delta, 0.2) + + # compute the threshold image for the current frame + current_thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] + + # black out everything in the avg_delta where there isnt motion in the current frame + avg_delta_image = cv2.convertScaleAbs(avg_delta) + avg_delta_image[np.where(current_thresh==[0])] = [0] + + # then look for deltas above the threshold, but only in areas where there is a delta + # in the current frame. this prevents deltas from previous frames from being included + thresh = cv2.threshold(avg_delta_image, 25, 255, cv2.THRESH_BINARY)[1] + + # dilate the thresholded image to fill in holes, then find contours + # on thresholded image + thresh = cv2.dilate(thresh, None, iterations=2) + cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + cnts = imutils.grab_contours(cnts) + + # if there are no contours, there is no motion + if len(cnts) < 1: + motion_frames = 0 + continue + + motion_found = False + + # loop over the contours + for c in cnts: + # if the contour is big enough, count it as motion + contour_area = cv2.contourArea(c) + if contour_area > min_motion_area: + motion_found = True + if debug: + cv2.drawContours(cropped_frame, [c], -1, (0, 255, 0), 2) + x, y, w, h = cv2.boundingRect(c) + cv2.putText(cropped_frame, str(contour_area), (x, y), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 0), 2) + else: + break + + if motion_found: + motion_frames += 1 + # if there have been enough consecutive motion frames, report motion + if motion_frames >= 3: + # only average in the current frame if the difference persists for at least 3 frames + cv2.accumulateWeighted(gray, avg_frame, 0.01) + motion_detected.set() + with motion_changed: + motion_changed.notify_all() + else: + # when no motion, just keep averaging the frames together + cv2.accumulateWeighted(gray, avg_frame, 0.01) + motion_frames = 0 + if motion_detected.is_set(): + motion_detected.clear() + with motion_changed: + motion_changed.notify_all() + + if debug and motion_frames == 3: + cv2.imwrite("/lab/debug/motion-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame) + cv2.imwrite("/lab/debug/avg_delta-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), avg_delta_image) diff --git a/frigate/mqtt.py b/frigate/mqtt.py new file mode 100644 index 000000000..2dfecd4ca --- /dev/null +++ b/frigate/mqtt.py @@ -0,0 +1,57 @@ +import json +import threading + +class MqttMotionPublisher(threading.Thread): + def __init__(self, client, topic_prefix, motion_changed, motion_flags): + threading.Thread.__init__(self) + self.client = client + self.topic_prefix = topic_prefix + self.motion_changed = motion_changed + self.motion_flags = motion_flags + + def run(self): + last_sent_motion = "" + while True: + with self.motion_changed: + self.motion_changed.wait() + + # send message for motion + motion_status = 'OFF' + if any(obj.is_set() for obj in self.motion_flags): + motion_status = 'ON' + + if last_sent_motion != motion_status: + last_sent_motion = motion_status + self.client.publish(self.topic_prefix+'/motion', motion_status, retain=False) + +class MqttObjectPublisher(threading.Thread): + def __init__(self, client, topic_prefix, objects_parsed, object_classes, detected_objects): + threading.Thread.__init__(self) + self.client = client + self.topic_prefix = topic_prefix + self.objects_parsed = objects_parsed + self.object_classes = object_classes + self._detected_objects = detected_objects + + def run(self): + last_sent_payload = "" + while True: + + # initialize the payload + payload = {} + + # wait until objects have been parsed + with self.objects_parsed: + self.objects_parsed.wait() + + # add all the person scores in detected objects and + # average over past 1 seconds (5fps) + detected_objects = self._detected_objects.copy() + avg_person_score = sum([obj['score'] for obj in detected_objects if obj['name'] == 'person'])/5 + payload['person'] = int(avg_person_score*100) + + # send message for objects if different + new_payload = json.dumps(payload, sort_keys=True) + if new_payload != last_sent_payload: + last_sent_payload = new_payload + self.client.publish(self.topic_prefix+'/objects', new_payload, retain=False) \ No newline at end of file diff --git a/frigate/object_detection.py b/frigate/object_detection.py new file mode 100644 index 000000000..2c68f2b40 --- /dev/null +++ b/frigate/object_detection.py @@ -0,0 +1,114 @@ +import datetime +import cv2 +import numpy as np +import tensorflow as tf +from object_detection.utils import label_map_util +from object_detection.utils import visualization_utils as vis_util +from . util import tonumpyarray + +# TODO: make dynamic? +NUM_CLASSES = 90 +# Path to frozen detection graph. This is the actual model that is used for the object detection. +PATH_TO_CKPT = '/frozen_inference_graph.pb' +# List of the strings that is used to add correct label for each box. +PATH_TO_LABELS = '/label_map.pbtext' + +# Loading label map +label_map = label_map_util.load_labelmap(PATH_TO_LABELS) +categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, + use_display_name=True) +category_index = label_map_util.create_category_index(categories) + +# do the actual object detection +def tf_detect_objects(cropped_frame, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug): + # Expand dimensions since the model expects images to have shape: [1, None, None, 3] + image_np_expanded = np.expand_dims(cropped_frame, axis=0) + image_tensor = detection_graph.get_tensor_by_name('image_tensor:0') + + # Each box represents a part of the image where a particular object was detected. + boxes = detection_graph.get_tensor_by_name('detection_boxes:0') + + # Each score represent how level of confidence for each of the objects. + # Score is shown on the result image, together with the class label. + scores = detection_graph.get_tensor_by_name('detection_scores:0') + classes = detection_graph.get_tensor_by_name('detection_classes:0') + num_detections = detection_graph.get_tensor_by_name('num_detections:0') + + # Actual detection. + (boxes, scores, classes, num_detections) = sess.run( + [boxes, scores, classes, num_detections], + feed_dict={image_tensor: image_np_expanded}) + + if debug: + if len([value for index,value in enumerate(classes[0]) if str(category_index.get(value).get('name')) == 'person' and scores[0,index] > 0.5]) > 0: + vis_util.visualize_boxes_and_labels_on_image_array( + cropped_frame, + np.squeeze(boxes), + np.squeeze(classes).astype(np.int32), + np.squeeze(scores), + category_index, + use_normalized_coordinates=True, + line_thickness=4) + cv2.imwrite("/lab/debug/obj-{}-{}-{}.jpg".format(region_x_offset, region_y_offset, datetime.datetime.now().timestamp()), cropped_frame) + + + # build an array of detected objects + objects = [] + for index, value in enumerate(classes[0]): + score = scores[0, index] + if score > 0.5: + box = boxes[0, index].tolist() + objects.append({ + 'name': str(category_index.get(value).get('name')), + 'score': float(score), + 'ymin': int((box[0] * region_size) + region_y_offset), + 'xmin': int((box[1] * region_size) + region_x_offset), + 'ymax': int((box[2] * region_size) + region_y_offset), + 'xmax': int((box[3] * region_size) + region_x_offset) + }) + + return objects + +def detect_objects(shared_arr, object_queue, shared_frame_time, frame_lock, frame_ready, + motion_detected, frame_shape, region_size, region_x_offset, region_y_offset, + min_person_area, debug): + # shape shared input array into frame for processing + arr = tonumpyarray(shared_arr).reshape(frame_shape) + + # Load a (frozen) Tensorflow model into memory before the processing loop + detection_graph = tf.Graph() + with detection_graph.as_default(): + od_graph_def = tf.GraphDef() + with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid: + serialized_graph = fid.read() + od_graph_def.ParseFromString(serialized_graph) + tf.import_graph_def(od_graph_def, name='') + sess = tf.Session(graph=detection_graph) + + frame_time = 0.0 + while True: + now = datetime.datetime.now().timestamp() + + # wait until motion is detected + motion_detected.wait() + + with frame_ready: + # if there isnt a frame ready for processing or it is old, wait for a new frame + if shared_frame_time.value == frame_time or (now - shared_frame_time.value) > 0.5: + frame_ready.wait() + + # make a copy of the cropped frame + with frame_lock: + cropped_frame = arr[region_y_offset:region_y_offset+region_size, region_x_offset:region_x_offset+region_size].copy() + frame_time = shared_frame_time.value + + # convert to RGB + cropped_frame_rgb = cv2.cvtColor(cropped_frame, cv2.COLOR_BGR2RGB) + # do the object detection + objects = tf_detect_objects(cropped_frame_rgb, sess, detection_graph, region_size, region_x_offset, region_y_offset, debug) + for obj in objects: + # ignore persons below the size threshold + if obj['name'] == 'person' and (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin']) < min_person_area: + continue + obj['frame_time'] = frame_time + object_queue.put(obj) \ No newline at end of file diff --git a/frigate/objects.py b/frigate/objects.py new file mode 100644 index 000000000..0ce6697e1 --- /dev/null +++ b/frigate/objects.py @@ -0,0 +1,48 @@ +import time +import datetime +import threading + +class ObjectParser(threading.Thread): + def __init__(self, object_queue, objects_parsed, detected_objects): + threading.Thread.__init__(self) + self._object_queue = object_queue + self._objects_parsed = objects_parsed + self._detected_objects = detected_objects + + def run(self): + while True: + obj = self._object_queue.get() + self._detected_objects.append(obj) + + # notify that objects were parsed + with self._objects_parsed: + self._objects_parsed.notify_all() + +class ObjectCleaner(threading.Thread): + def __init__(self, objects_parsed, detected_objects): + threading.Thread.__init__(self) + self._objects_parsed = objects_parsed + self._detected_objects = detected_objects + + def run(self): + while True: + + # expire the objects that are more than 1 second old + now = datetime.datetime.now().timestamp() + # look for the first object found within the last second + # (newest objects are appended to the end) + detected_objects = self._detected_objects.copy() + num_to_delete = 0 + for obj in detected_objects: + if now-obj['frame_time']<1: + break + num_to_delete += 1 + if num_to_delete > 0: + del self._detected_objects[:num_to_delete] + + # notify that parsed objects were changed + with self._objects_parsed: + self._objects_parsed.notify_all() + + # wait a bit before checking for more expired frames + time.sleep(0.2) \ No newline at end of file diff --git a/frigate/util.py b/frigate/util.py new file mode 100644 index 000000000..984d37193 --- /dev/null +++ b/frigate/util.py @@ -0,0 +1,5 @@ +import numpy as np + +# convert shared memory array into numpy array +def tonumpyarray(mp_arr): + return np.frombuffer(mp_arr.get_obj(), dtype=np.uint16) \ No newline at end of file diff --git a/frigate/video.py b/frigate/video.py new file mode 100644 index 000000000..f9c95c718 --- /dev/null +++ b/frigate/video.py @@ -0,0 +1,41 @@ +import time +import datetime +import cv2 +from . util import tonumpyarray + +# fetch the frames as fast a possible, only decoding the frames when the +# detection_process has consumed the current frame +def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape, rtsp_url): + # convert shared memory array into numpy and shape into image array + arr = tonumpyarray(shared_arr).reshape(frame_shape) + + # start the video capture + video = cv2.VideoCapture() + video.open(rtsp_url) + # keep the buffer small so we minimize old data + video.set(cv2.CAP_PROP_BUFFERSIZE,1) + + while True: + # check if the video stream is still open, and reopen if needed + if not video.isOpened(): + success = video.open(rtsp_url) + if not success: + time.sleep(1) + continue + # grab the frame, but dont decode it yet + ret = video.grab() + # snapshot the time the frame was grabbed + frame_time = datetime.datetime.now() + if ret: + # go ahead and decode the current frame + ret, frame = video.retrieve() + if ret: + # Lock access and update frame + with frame_lock: + arr[:] = frame + shared_frame_time.value = frame_time.timestamp() + # Notify with the condition that a new frame is ready + with frame_ready: + frame_ready.notify_all() + + video.release() \ No newline at end of file