fix for queue size growing too large

This commit is contained in:
blakeblackshear 2019-03-25 20:35:44 -05:00
parent bca4e78e9a
commit ada8ffccf9
3 changed files with 36 additions and 15 deletions

View File

@ -29,9 +29,9 @@ MQTT_USER = os.getenv('MQTT_USER')
MQTT_PASS = os.getenv('MQTT_PASS') MQTT_PASS = os.getenv('MQTT_PASS')
MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX') MQTT_TOPIC_PREFIX = os.getenv('MQTT_TOPIC_PREFIX')
# REGIONS = "350,0,300,50:400,350,250,50:400,750,250,50" REGIONS = "300,0,0,2000,200,no-mask-300.bmp:300,300,0,2000,200,no-mask-300.bmp:300,600,0,2000,200,no-mask-300.bmp:300,900,0,2000,200,no-mask-300.bmp"
# REGIONS = "400,350,250,50" # REGIONS = "400,350,250,50"
REGIONS = os.getenv('REGIONS') # REGIONS = os.getenv('REGIONS')
DEBUG = (os.getenv('DEBUG') == '1') DEBUG = (os.getenv('DEBUG') == '1')
@ -95,7 +95,7 @@ def main():
# Queue for detected objects # Queue for detected objects
object_queue = mp.Queue() object_queue = mp.Queue()
# Queue for prepped frames # Queue for prepped frames
prepped_frame_queue = queue.Queue() prepped_frame_queue = queue.Queue(len(regions)*2)
prepped_frame_box = mp.Array(ctypes.c_uint16, 3) prepped_frame_box = mp.Array(ctypes.c_uint16, 3)
# shape current frame so it can be treated as an image # shape current frame so it can be treated as an image

View File

@ -1,4 +1,5 @@
import datetime import datetime
import time
import cv2 import cv2
import threading import threading
import numpy as np import numpy as np
@ -33,7 +34,6 @@ def detect_objects(prepped_frame_array, prepped_frame_time,
region_box = [0,0,0] region_box = [0,0,0]
while True: while True:
# wait until a frame is ready # wait until a frame is ready
prepped_frame_grabbed.clear()
prepped_frame_ready.wait() prepped_frame_ready.wait()
prepped_frame_copy = prepped_frame_np.copy() prepped_frame_copy = prepped_frame_np.copy()
@ -41,10 +41,13 @@ def detect_objects(prepped_frame_array, prepped_frame_time,
region_box[:] = prepped_frame_box region_box[:] = prepped_frame_box
prepped_frame_grabbed.set() prepped_frame_grabbed.set()
# print("Grabbed " + str(region_box[1]) + "," + str(region_box[2]))
# Actual detection. # Actual detection.
objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3) objects = engine.DetectWithInputTensor(prepped_frame_copy, threshold=0.5, top_k=3)
# print(engine.get_inference_time()) # time.sleep(0.1)
# objects = []
print(engine.get_inference_time())
# put detected objects in the queue # put detected objects in the queue
if objects: if objects:
for obj in objects: for obj in objects:
@ -90,14 +93,16 @@ class PreppedQueueProcessor(threading.Thread):
# process queue... # process queue...
while True: while True:
frame = self.prepped_frame_queue.get() frame = self.prepped_frame_queue.get()
print(self.prepped_frame_queue.qsize()) # print(self.prepped_frame_queue.qsize())
prepped_frame_np[:] = frame['frame'] prepped_frame_np[:] = frame['frame']
self.prepped_frame_time.value = frame['frame_time'] self.prepped_frame_time.value = frame['frame_time']
self.prepped_frame_box[0] = frame['region_size'] self.prepped_frame_box[0] = frame['region_size']
self.prepped_frame_box[1] = frame['region_x_offset'] self.prepped_frame_box[1] = frame['region_x_offset']
self.prepped_frame_box[2] = frame['region_y_offset'] self.prepped_frame_box[2] = frame['region_y_offset']
# print("Passed " + str(frame['region_x_offset']) + "," + str(frame['region_x_offset']))
self.prepped_frame_ready.set() self.prepped_frame_ready.set()
self.prepped_frame_grabbed.wait() self.prepped_frame_grabbed.wait()
self.prepped_frame_grabbed.clear()
self.prepped_frame_ready.clear() self.prepped_frame_ready.clear()
@ -145,7 +150,9 @@ class FramePrepper(threading.Thread):
# Expand dimensions since the model expects images to have shape: [1, 300, 300, 3] # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0) frame_expanded = np.expand_dims(cropped_frame_rgb, axis=0)
# print("Prepped frame at " + str(self.region_x_offset) + "," + str(self.region_y_offset))
# add the frame to the queue # add the frame to the queue
if not self.prepped_frame_queue.full():
self.prepped_frame_queue.put({ self.prepped_frame_queue.put({
'frame_time': frame_time, 'frame_time': frame_time,
'frame': frame_expanded.flatten().copy(), 'frame': frame_expanded.flatten().copy(),
@ -153,3 +160,5 @@ class FramePrepper(threading.Thread):
'region_x_offset': self.region_x_offset, 'region_x_offset': self.region_x_offset,
'region_y_offset': self.region_y_offset 'region_y_offset': self.region_y_offset
}) })
# else:
# print("queue full. moving on")

View File

@ -11,8 +11,18 @@ class ObjectParser(threading.Thread):
self._detected_objects = detected_objects self._detected_objects = detected_objects
def run(self): def run(self):
# frame_times = {}
while True: while True:
obj = self._object_queue.get() obj = self._object_queue.get()
# frame_time = obj['frame_time']
# if frame_time in frame_times:
# if frame_times[frame_time] == 7:
# del frame_times[frame_time]
# else:
# frame_times[frame_time] += 1
# else:
# frame_times[frame_time] = 1
# print(frame_times)
self._detected_objects.append(obj) self._detected_objects.append(obj)
# notify that objects were parsed # notify that objects were parsed
@ -40,9 +50,11 @@ class ObjectCleaner(threading.Thread):
# look for the first object found within the last second # look for the first object found within the last second
# (newest objects are appended to the end) # (newest objects are appended to the end)
detected_objects = self._detected_objects.copy() detected_objects = self._detected_objects.copy()
#print([round(now-obj['frame_time'],2) for obj in detected_objects])
num_to_delete = 0 num_to_delete = 0
for obj in detected_objects: for obj in detected_objects:
if now-obj['frame_time']<1: if now-obj['frame_time']<2:
break break
num_to_delete += 1 num_to_delete += 1
if num_to_delete > 0: if num_to_delete > 0: