From 0279121d77f13535b5262cf5edc948b531948979 Mon Sep 17 00:00:00 2001 From: blakeblackshear Date: Fri, 29 Mar 2019 20:49:27 -0500 Subject: [PATCH] WIP: convert to camera class --- detect_objects.py | 215 +++++++++++------------------------- frigate/object_detection.py | 35 +++--- frigate/objects.py | 46 +------- frigate/video.py | 133 ++++++++++++++++++++++ 4 files changed, 223 insertions(+), 206 deletions(-) diff --git a/detect_objects.py b/detect_objects.py index 30cf9662c..9ec382019 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -20,23 +20,16 @@ from frigate.util import tonumpyarray from frigate.mqtt import MqttMotionPublisher, MqttObjectPublisher from frigate.objects import ObjectParser, ObjectCleaner, BestPersonFrame from frigate.motion import detect_motion -from frigate.video import fetch_frames, FrameTracker +from frigate.video import fetch_frames, FrameTracker, Camera from frigate.object_detection import FramePrepper, PreppedQueueProcessor with open('/config/config.yml') as f: # use safe_load instead load CONFIG = yaml.safe_load(f) -rtsp_camera = CONFIG['cameras']['back']['rtsp'] -if (rtsp_camera['password'].startswith('$')): - rtsp_camera['password'] = os.getenv(rtsp_camera['password'][1:]) -RTSP_URL = 'rtsp://{}:{}@{}:{}{}'.format(rtsp_camera['user'], - rtsp_camera['password'], rtsp_camera['host'], rtsp_camera['port'], - rtsp_camera['path']) - MQTT_HOST = CONFIG['mqtt']['host'] MQTT_PORT = CONFIG.get('mqtt', {}).get('port', 1883) -MQTT_TOPIC_PREFIX = CONFIG['mqtt']['topic_prefix'] + '/back' +MQTT_TOPIC_PREFIX = CONFIG.get('mqtt', {}).get('topic_prefix', 'frigate') MQTT_USER = CONFIG.get('mqtt', {}).get('user') MQTT_PASS = CONFIG.get('mqtt', {}).get('password') @@ -44,80 +37,6 @@ WEB_PORT = CONFIG.get('web_port', 5000) DEBUG = (CONFIG.get('debug', '0') == '1') def main(): - DETECTED_OBJECTS = [] - recent_frames = {} - # Parse selected regions - regions = CONFIG['cameras']['back']['regions'] - # capture a single frame and check the frame shape so the correct array - # size can be allocated in memory - video = cv2.VideoCapture(RTSP_URL) - ret, frame = video.read() - if ret: - frame_shape = frame.shape - else: - print("Unable to capture video stream") - exit(1) - video.release() - - # compute the flattened array length from the array shape - flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2] - # create shared array for storing the full frame image data - shared_arr = mp.Array(ctypes.c_uint8, flat_array_length) - # create shared value for storing the frame_time - shared_frame_time = mp.Value('d', 0.0) - # Lock to control access to the frame - frame_lock = mp.Lock() - # Condition for notifying that a new frame is ready - frame_ready = mp.Condition() - # Condition for notifying that objects were parsed - objects_parsed = mp.Condition() - # Queue for detected objects - object_queue = queue.Queue() - # Queue for prepped frames - prepped_frame_queue = queue.Queue(len(regions)*2) - - # shape current frame so it can be treated as an image - frame_arr = tonumpyarray(shared_arr).reshape(frame_shape) - - # start the process to capture frames from the RTSP stream and store in a shared array - capture_process = mp.Process(target=fetch_frames, args=(shared_arr, - shared_frame_time, frame_lock, frame_ready, frame_shape, RTSP_URL)) - capture_process.daemon = True - - # for each region, start a separate thread to resize the region and prep for detection - detection_prep_threads = [] - for region in regions: - detection_prep_threads.append(FramePrepper( - frame_arr, - shared_frame_time, - frame_ready, - frame_lock, - region['size'], region['x_offset'], region['y_offset'], - prepped_frame_queue - )) - - prepped_queue_processor = PreppedQueueProcessor( - prepped_frame_queue, - object_queue - ) - prepped_queue_processor.start() - - # start a thread to store recent motion frames for processing - frame_tracker = FrameTracker(frame_arr, shared_frame_time, frame_ready, frame_lock, - recent_frames) - frame_tracker.start() - - # start a thread to store the highest scoring recent person frame - best_person_frame = BestPersonFrame(objects_parsed, recent_frames, DETECTED_OBJECTS) - best_person_frame.start() - - # start a thread to parse objects from the queue - object_parser = ObjectParser(object_queue, objects_parsed, DETECTED_OBJECTS, regions) - object_parser.start() - # start a thread to expire objects from the detected objects list - object_cleaner = ObjectCleaner(objects_parsed, DETECTED_OBJECTS) - object_cleaner.start() - # connect to mqtt and setup last will def on_connect(client, userdata, flags, rc): print("On connect called") @@ -128,84 +47,82 @@ def main(): client.will_set(MQTT_TOPIC_PREFIX+'/available', payload='offline', qos=1, retain=True) if not MQTT_USER is None: client.username_pw_set(MQTT_USER, password=MQTT_PASS) - client.connect(MQTT_HOST, MQTT_PORT, 60) client.loop_start() + + # Queue for prepped frames + # TODO: set length to 1.5x the number of total regions + prepped_frame_queue = queue.Queue(6) - # start a thread to publish object scores (currently only person) - mqtt_publisher = MqttObjectPublisher(client, MQTT_TOPIC_PREFIX, objects_parsed, DETECTED_OBJECTS) - mqtt_publisher.start() - # start the process of capturing frames - capture_process.start() - print("capture_process pid ", capture_process.pid) + camera = Camera('back', CONFIG['cameras']['back'], prepped_frame_queue, client, MQTT_TOPIC_PREFIX) - # start the object detection prep threads - for detection_prep_thread in detection_prep_threads: - detection_prep_thread.start() + cameras = { + 'back': camera + } + + prepped_queue_processor = PreppedQueueProcessor( + cameras, + prepped_frame_queue + ) + prepped_queue_processor.start() + + camera.start() + camera.join() # create a flask app that encodes frames a mjpeg on demand - app = Flask(__name__) + # app = Flask(__name__) - @app.route('/best_person.jpg') - def best_person(): - frame = np.zeros(frame_shape, np.uint8) if best_person_frame.best_frame is None else best_person_frame.best_frame - ret, jpg = cv2.imencode('.jpg', frame) - response = make_response(jpg.tobytes()) - response.headers['Content-Type'] = 'image/jpg' - return response + # @app.route('/best_person.jpg') + # def best_person(): + # frame = np.zeros(frame_shape, np.uint8) if camera.get_best_person() is None else camera.get_best_person() + # ret, jpg = cv2.imencode('.jpg', frame) + # response = make_response(jpg.tobytes()) + # response.headers['Content-Type'] = 'image/jpg' + # return response - @app.route('/') - def index(): - # return a multipart response - return Response(imagestream(), - mimetype='multipart/x-mixed-replace; boundary=frame') - def imagestream(): - while True: - # max out at 5 FPS - time.sleep(0.2) - # make a copy of the current detected objects - detected_objects = DETECTED_OBJECTS.copy() - # lock and make a copy of the current frame - with frame_lock: - frame = frame_arr.copy() - # convert to RGB for drawing - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - # draw the bounding boxes on the screen - for obj in detected_objects: - vis_util.draw_bounding_box_on_image_array(frame, - obj['ymin'], - obj['xmin'], - obj['ymax'], - obj['xmax'], - color='red', - thickness=2, - display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))], - use_normalized_coordinates=False) + # @app.route('/') + # def index(): + # # return a multipart response + # return Response(imagestream(), + # mimetype='multipart/x-mixed-replace; boundary=frame') + # def imagestream(): + # while True: + # # max out at 5 FPS + # time.sleep(0.2) + # # make a copy of the current detected objects + # detected_objects = DETECTED_OBJECTS.copy() + # # lock and make a copy of the current frame + # with frame_lock: + # frame = frame_arr.copy() + # # convert to RGB for drawing + # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # # draw the bounding boxes on the screen + # for obj in detected_objects: + # vis_util.draw_bounding_box_on_image_array(frame, + # obj['ymin'], + # obj['xmin'], + # obj['ymax'], + # obj['xmax'], + # color='red', + # thickness=2, + # display_str_list=["{}: {}%".format(obj['name'],int(obj['score']*100))], + # use_normalized_coordinates=False) - for region in regions: - color = (255,255,255) - cv2.rectangle(frame, (region['x_offset'], region['y_offset']), - (region['x_offset']+region['size'], region['y_offset']+region['size']), - color, 2) + # for region in regions: + # color = (255,255,255) + # cv2.rectangle(frame, (region['x_offset'], region['y_offset']), + # (region['x_offset']+region['size'], region['y_offset']+region['size']), + # color, 2) - # convert back to BGR - frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) - # encode the image into a jpg - ret, jpg = cv2.imencode('.jpg', frame) - yield (b'--frame\r\n' - b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n') + # # convert back to BGR + # frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + # # encode the image into a jpg + # ret, jpg = cv2.imencode('.jpg', frame) + # yield (b'--frame\r\n' + # b'Content-Type: image/jpeg\r\n\r\n' + jpg.tobytes() + b'\r\n\r\n') - app.run(host='0.0.0.0', port=WEB_PORT, debug=False) - - capture_process.join() - for detection_prep_thread in detection_prep_threads: - detection_prep_thread.join() - frame_tracker.join() - best_person_frame.join() - object_parser.join() - object_cleaner.join() - mqtt_publisher.join() + # app.run(host='0.0.0.0', port=WEB_PORT, debug=False) if __name__ == '__main__': main() \ No newline at end of file diff --git a/frigate/object_detection.py b/frigate/object_detection.py index d4a9c14e8..5c718b456 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -22,11 +22,11 @@ def ReadLabelFile(file_path): return ret class PreppedQueueProcessor(threading.Thread): - def __init__(self, prepped_frame_queue, object_queue): + def __init__(self, cameras, prepped_frame_queue): threading.Thread.__init__(self) + self.cameras = cameras self.prepped_frame_queue = prepped_frame_queue - self.object_queue = object_queue # Load the edgetpu engine and labels self.engine = DetectionEngine(PATH_TO_CKPT) @@ -41,30 +41,32 @@ class PreppedQueueProcessor(threading.Thread): objects = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=3) # time.sleep(0.1) # objects = [] - # print(engine.get_inference_time()) + print(self.engine.get_inference_time()) # put detected objects in the queue - if objects: - for obj in objects: - box = obj.bounding_box.flatten().tolist() - self.object_queue.put({ - 'frame_time': frame['frame_time'], - 'name': str(self.labels[obj.label_id]), - 'score': float(obj.score), - 'xmin': int((box[0] * frame['region_size']) + frame['region_x_offset']), - 'ymin': int((box[1] * frame['region_size']) + frame['region_y_offset']), - 'xmax': int((box[2] * frame['region_size']) + frame['region_x_offset']), - 'ymax': int((box[3] * frame['region_size']) + frame['region_y_offset']) - }) + parsed_objects = [] + for obj in objects: + box = obj.bounding_box.flatten().tolist() + parsed_objects.append({ + 'frame_time': frame['frame_time'], + 'name': str(self.labels[obj.label_id]), + 'score': float(obj.score), + 'xmin': int((box[0] * frame['region_size']) + frame['region_x_offset']), + 'ymin': int((box[1] * frame['region_size']) + frame['region_y_offset']), + 'xmax': int((box[2] * frame['region_size']) + frame['region_x_offset']), + 'ymax': int((box[3] * frame['region_size']) + frame['region_y_offset']) + }) + self.cameras[frame['camera_name']].add_objects(parsed_objects) # should this be a region class? class FramePrepper(threading.Thread): - def __init__(self, shared_frame, frame_time, frame_ready, + def __init__(self, camera_name, shared_frame, frame_time, frame_ready, frame_lock, region_size, region_x_offset, region_y_offset, prepped_frame_queue): threading.Thread.__init__(self) + self.camera_name = camera_name self.shared_frame = shared_frame self.frame_time = frame_time self.frame_ready = frame_ready @@ -101,6 +103,7 @@ class FramePrepper(threading.Thread): # add the frame to the queue if not self.prepped_frame_queue.full(): self.prepped_frame_queue.put({ + 'camera_name': self.camera_name, 'frame_time': frame_time, 'frame': frame_expanded.flatten().copy(), 'region_size': self.region_size, diff --git a/frigate/objects.py b/frigate/objects.py index 17a340ab0..66672cf1d 100644 --- a/frigate/objects.py +++ b/frigate/objects.py @@ -4,53 +4,17 @@ import threading import cv2 from object_detection.utils import visualization_utils as vis_util class ObjectParser(threading.Thread): - def __init__(self, object_queue, objects_parsed, detected_objects, regions): + def __init__(self, cameras, object_queue, detected_objects, regions): threading.Thread.__init__(self) - self._object_queue = object_queue - self._objects_parsed = objects_parsed - self._detected_objects = detected_objects + self.cameras = cameras + self.object_queue = object_queue self.regions = regions def run(self): # frame_times = {} while True: - obj = self._object_queue.get() - # filter out persons - # [obj['score'] for obj in detected_objects if obj['name'] == 'person'] - if obj['name'] == 'person': - person_area = (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin']) - # find the matching region - region = None - for r in self.regions: - if ( - obj['xmin'] >= r['x_offset'] and - obj['ymin'] >= r['y_offset'] and - obj['xmax'] <= r['x_offset']+r['size'] and - obj['ymax'] <= r['y_offset']+r['size'] - ): - region = r - break - - # if the min person area is larger than the - # detected person, don't add it to detected objects - if region and region['min_person_area'] > person_area: - continue - - - # frame_time = obj['frame_time'] - # if frame_time in frame_times: - # if frame_times[frame_time] == 7: - # del frame_times[frame_time] - # else: - # frame_times[frame_time] += 1 - # else: - # frame_times[frame_time] = 1 - # print(frame_times) - self._detected_objects.append(obj) - - # notify that objects were parsed - with self._objects_parsed: - self._objects_parsed.notify_all() + obj = self.object_queue.get() + self.cameras[obj['camera_name']].add_object(obj) class ObjectCleaner(threading.Thread): def __init__(self, objects_parsed, detected_objects): diff --git a/frigate/video.py b/frigate/video.py index a4eeef8e5..bcaee55c1 100644 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,8 +1,14 @@ +import os import time import datetime import cv2 import threading +import ctypes +import multiprocessing as mp from . util import tonumpyarray +from . object_detection import FramePrepper +from . objects import ObjectCleaner, ObjectParser, BestPersonFrame +from . mqtt import MqttObjectPublisher # fetch the frames as fast a possible, only decoding the frames when the # detection_process has consumed the current frame @@ -85,3 +91,130 @@ class FrameTracker(threading.Thread): for k in stored_frame_times: if (now - k) > 2: del self.recent_frames[k] + +def get_frame_shape(rtsp_url): + # capture a single frame and check the frame shape so the correct array + # size can be allocated in memory + video = cv2.VideoCapture(rtsp_url) + ret, frame = video.read() + frame_shape = frame.shape + video.release() + return frame_shape + +def get_rtsp_url(rtsp_config): + if (rtsp_config['password'].startswith('$')): + rtsp_config['password'] = os.getenv(rtsp_config['password'][1:]) + return 'rtsp://{}:{}@{}:{}{}'.format(rtsp_config['user'], + rtsp_config['password'], rtsp_config['host'], rtsp_config['port'], + rtsp_config['path']) + +class Camera: + def __init__(self, name, config, prepped_frame_queue, mqtt_client, mqtt_prefix): + self.name = name + self.config = config + self.detected_objects = [] + self.recent_frames = {} + self.rtsp_url = get_rtsp_url(self.config['rtsp']) + self.regions = self.config['regions'] + self.frame_shape = get_frame_shape(self.rtsp_url) + self.mqtt_client = mqtt_client + self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name) + + # compute the flattened array length from the shape of the frame + flat_array_length = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2] + # create shared array for storing the full frame image data + self.shared_frame_array = mp.Array(ctypes.c_uint8, flat_array_length) + # create shared value for storing the frame_time + self.shared_frame_time = mp.Value('d', 0.0) + # Lock to control access to the frame + self.frame_lock = mp.Lock() + # Condition for notifying that a new frame is ready + self.frame_ready = mp.Condition() + # Condition for notifying that objects were parsed + self.objects_parsed = mp.Condition() + + # shape current frame so it can be treated as a numpy image + self.shared_frame_np = tonumpyarray(self.shared_frame_array).reshape(self.frame_shape) + + # create the process to capture frames from the RTSP stream and store in a shared array + self.capture_process = mp.Process(target=fetch_frames, args=(self.shared_frame_array, + self.shared_frame_time, self.frame_lock, self.frame_ready, self.frame_shape, self.rtsp_url)) + self.capture_process.daemon = True + + # for each region, create a separate thread to resize the region and prep for detection + self.detection_prep_threads = [] + for region in self.config['regions']: + self.detection_prep_threads.append(FramePrepper( + self.name, + self.shared_frame_np, + self.shared_frame_time, + self.frame_ready, + self.frame_lock, + region['size'], region['x_offset'], region['y_offset'], + prepped_frame_queue + )) + + # start a thread to store recent motion frames for processing + self.frame_tracker = FrameTracker(self.shared_frame_np, self.shared_frame_time, + self.frame_ready, self.frame_lock, self.recent_frames) + self.frame_tracker.start() + + # start a thread to store the highest scoring recent person frame + self.best_person_frame = BestPersonFrame(self.objects_parsed, self.recent_frames, self.detected_objects) + self.best_person_frame.start() + + # start a thread to expire objects from the detected objects list + self.object_cleaner = ObjectCleaner(self.objects_parsed, self.detected_objects) + self.object_cleaner.start() + + # start a thread to publish object scores (currently only person) + mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self.objects_parsed, self.detected_objects) + mqtt_publisher.start() + + def start(self): + self.capture_process.start() + # start the object detection prep threads + for detection_prep_thread in self.detection_prep_threads: + detection_prep_thread.start() + + def join(self): + self.capture_process.join() + + def get_capture_pid(self): + return self.capture_process.pid + + def add_objects(self, objects): + if len(objects) == 0: + return + + for obj in objects: + if obj['name'] == 'person': + person_area = (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin']) + # find the matching region + region = None + for r in self.regions: + if ( + obj['xmin'] >= r['x_offset'] and + obj['ymin'] >= r['y_offset'] and + obj['xmax'] <= r['x_offset']+r['size'] and + obj['ymax'] <= r['y_offset']+r['size'] + ): + region = r + break + + # if the min person area is larger than the + # detected person, don't add it to detected objects + if region and region['min_person_area'] > person_area: + continue + + self.detected_objects.append(obj) + + with self.objects_parsed: + self.objects_parsed.notify_all() + + def get_best_person(self): + return self.best_person_frame.best_frame + + + + \ No newline at end of file