From ada8ffccf9755eca7e77e89a6d668b68eafce498 Mon Sep 17 00:00:00 2001 From: blakeblackshear Date: Mon, 25 Mar 2019 20:35:44 -0500 Subject: [PATCH] fix for queue size growing too large --- detect_objects.py | 8 ++++---- frigate/object_detection.py | 29 +++++++++++++++++++---------- frigate/objects.py | 14 +++++++++++++- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/detect_objects.py b/detect_objects.py index 7c52f115a..50acd5a7d 100644 --- a/detect_objects.py +++ b/detect_objects.py @@ -29,9 +29,9 @@ MQTT_USER = os.getenv('MQTT_USER') MQTT_PASS = os.getenv('MQTT_PASS') MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX') -# REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50" +REGIONS = "300,0,0,2000,200,no-mask-300.bmp:300,300,0,2000,200,no-mask-300.bmp:300,600,0,2000,200,no-mask-300.bmp:300,900,0,2000,200,no-mask-300.bmp" # REGIONS = "400,350,250,50" -REGIONS = os.getenv('REGIONS') +# REGIONS = os.getenv('REGIONS') DEBUG = (os.getenv('DEBUG') == '1') @@ -70,7 +70,7 @@ def main(): print("Unable to capture video stream") exit(1) video.release() - + # compute the flattened array length from the array shape flat_array_length = frame_shape[0] * frame_shape[1] * frame_shape[2] # create shared array for storing the full frame image data @@ -95,7 +95,7 @@ def main(): # Queue for detected objects object_queue = mp.Queue() # Queue for prepped frames - prepped_frame_queue = queue.Queue() + prepped_frame_queue = queue.Queue(len(regions)*2) prepped_frame_box = mp.Array(ctypes.c_uint16, 3) # shape current frame so it can be treated as an image diff --git a/frigate/object_detection.py b/frigate/object_detection.py index faad222da..235739d7a 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -1,4 +1,5 @@ import datetime +import time import cv2 import threading import numpy as np @@ -33,7 +34,6 @@ def detect_objects(prepped_frame_array, prepped_frame_time, region_box = [0,0,0] while True: # wait until a frame is ready - prepped_frame_grabbed.clear() prepped_frame_ready.wait() prepped_frame_copy = prepped_frame_np.copy() @@ -41,10 +41,13 @@ def detect_objects(prepped_frame_array, prepped_frame_time, region_box[:] = prepped_frame_box prepped_frame_grabbed.set() + # print("Grabbed " + str(region_box[1]) + "," + str(region_box[2])) # Actual detection. objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3) - # print(engine.get_inference_time()) + # time.sleep(0.1) + # objects = [] + print(engine.get_inference_time()) # put detected objects in the queue if objects: for obj in objects: @@ -90,14 +93,16 @@ class PreppedQueueProcessor(threading.Thread): # process queue... while True: frame = self.prepped_frame_queue.get() - print(self.prepped_frame_queue.qsize()) + # print(self.prepped_frame_queue.qsize()) prepped_frame_np[:] = frame['frame'] self.prepped_frame_time.value = frame['frame_time'] self.prepped_frame_box[0] = frame['region_size'] self.prepped_frame_box[1] = frame['region_x_offset'] self.prepped_frame_box[2] = frame['region_y_offset'] + # print("Passed " + str(frame['region_x_offset']) + "," + str(frame['region_x_offset'])) self.prepped_frame_ready.set() self.prepped_frame_grabbed.wait() + self.prepped_frame_grabbed.clear() self.prepped_frame_ready.clear() @@ -145,11 +150,15 @@ class FramePrepper(threading.Thread): # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0) + # print("Prepped frame at " + str(self.region_x_offset) + "," + str(self.region_y_offset)) # add the frame to the queue - self.prepped_frame_queue.put({ - 'frame_time': frame_time, - 'frame': frame_expanded.flatten().copy(), - 'region_size': self.region_size, - 'region_x_offset': self.region_x_offset, - 'region_y_offset': self.region_y_offset - }) + if not self.prepped_frame_queue.full(): + self.prepped_frame_queue.put({ + 'frame_time': frame_time, + 'frame': frame_expanded.flatten().copy(), + 'region_size': self.region_size, + 'region_x_offset': self.region_x_offset, + 'region_y_offset': self.region_y_offset + }) + # else: + # print("queue full. moving on") diff --git a/frigate/objects.py b/frigate/objects.py index f3b0e0520..9c602430b 100644 --- a/frigate/objects.py +++ b/frigate/objects.py @@ -11,8 +11,18 @@ class ObjectParser(threading.Thread): self._detected_objects = detected_objects def run(self): + # frame_times = {} while True: obj = self._object_queue.get() + # frame_time = obj['frame_time'] + # if frame_time in frame_times: + # if frame_times[frame_time] == 7: + # del frame_times[frame_time] + # else: + # frame_times[frame_time] += 1 + # else: + # frame_times[frame_time] = 1 + # print(frame_times) self._detected_objects.append(obj) # notify that objects were parsed @@ -40,9 +50,11 @@ class ObjectCleaner(threading.Thread): # look for the first object found within the last second # (newest objects are appended to the end) detected_objects = self._detected_objects.copy() + + #print([round(now-obj['frame_time'],2) for obj in detected_objects]) num_to_delete = 0 for obj in detected_objects: - if now-obj['frame_time']<1: + if now-obj['frame_time']<2: break num_to_delete += 1 if num_to_delete > 0: