From e580aca4405296f407c86ab7c7ffcf564e1c04aa Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Thu, 9 Jan 2020 20:53:04 -0600 Subject: [PATCH] switch everything to run off of tracked objects --- frigate/mqtt.py | 32 +++++++-------- frigate/object_detection.py | 8 ++-- frigate/objects.py | 77 ++++++++++++++++++------------------- frigate/video.py | 33 ++++++++-------- 4 files changed, 72 insertions(+), 78 deletions(-) diff --git a/frigate/mqtt.py b/frigate/mqtt.py index 9299e319d..6dc8491c2 100644 --- a/frigate/mqtt.py +++ b/frigate/mqtt.py @@ -6,39 +6,35 @@ from collections import Counter, defaultdict import itertools class MqttObjectPublisher(threading.Thread): - def __init__(self, client, topic_prefix, objects_parsed, detected_objects, best_frames): + def __init__(self, client, topic_prefix, camera): threading.Thread.__init__(self) self.client = client self.topic_prefix = topic_prefix - self.objects_parsed = objects_parsed - self._detected_objects = detected_objects - self.best_frames = best_frames + self.camera = camera def run(self): - prctl.set_name("MqttObjectPublisher") + prctl.set_name(self.__class__.__name__) current_object_status = defaultdict(lambda: 'OFF') while True: - # wait until objects have been parsed - with self.objects_parsed: - self.objects_parsed.wait() + # wait until objects have been tracked + with self.camera.objects_tracked: + self.camera.objects_tracked.wait() - # make a copy of detected objects - detected_objects = self._detected_objects.copy() - - # total up all scores by object type + # count objects with more than 2 entries in history by type obj_counter = Counter() - for obj in itertools.chain.from_iterable(detected_objects.values()): - obj_counter[obj['name']] += obj['score'] + for obj in self.camera.object_tracker.tracked_objects.values(): + if len(obj['history']) > 1: + obj_counter[obj['name']] += 1 # report on detected objects - for obj_name, total_score in obj_counter.items(): - new_status = 'ON' if int(total_score*100) > 100 else 'OFF' + for obj_name, count in obj_counter.items(): + new_status = 'ON' if count > 0 else 'OFF' if new_status != current_object_status[obj_name]: current_object_status[obj_name] = new_status self.client.publish(self.topic_prefix+'/'+obj_name, new_status, retain=False) # send the snapshot over mqtt if we have it as well - if obj_name in self.best_frames.best_frames: - best_frame = cv2.cvtColor(self.best_frames.best_frames[obj_name], cv2.COLOR_RGB2BGR) + if obj_name in self.camera.best_frames.best_frames: + best_frame = cv2.cvtColor(self.camera.best_frames.best_frames[obj_name], cv2.COLOR_RGB2BGR) ret, jpg = cv2.imencode('.jpg', best_frame) if ret: jpg_bytes = jpg.tobytes() diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 9ff56424d..85d85d7a2 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -23,7 +23,7 @@ class PreppedQueueProcessor(threading.Thread): self.avg_inference_speed = 10 def run(self): - prctl.set_name("PreppedQueueProcessor") + prctl.set_name(self.__class__.__name__) # process queue... while True: if self.prepped_frame_queue.full(): @@ -44,7 +44,7 @@ class RegionRequester(threading.Thread): self.camera = camera def run(self): - prctl.set_name("RegionRequester") + prctl.set_name(self.__class__.__name__) frame_time = 0.0 while True: now = datetime.datetime.now().timestamp() @@ -58,7 +58,7 @@ class RegionRequester(threading.Thread): frame_time = self.camera.frame_time.value # grab the current tracked objects - tracked_objects = self.camera.object_tracker.tracked_objects.values().copy() + tracked_objects = list(self.camera.object_tracker.tracked_objects.values()).copy() with self.camera.regions_in_process_lock: self.camera.regions_in_process[frame_time] = len(self.camera.config['regions']) @@ -100,7 +100,7 @@ class RegionPrepper(threading.Thread): self.prepped_frame_queue = prepped_frame_queue def run(self): - prctl.set_name("RegionPrepper") + prctl.set_name(self.__class__.__name__) while True: resize_request = self.resize_request_queue.get() diff --git a/frigate/objects.py b/frigate/objects.py index 08ae6b23b..2b12f330e 100644 --- a/frigate/objects.py +++ b/frigate/objects.py @@ -10,10 +10,9 @@ from scipy.spatial import distance as dist from frigate.util import draw_box_with_label, LABELS, compute_intersection_rectangle, compute_intersection_over_union, calculate_region class ObjectCleaner(threading.Thread): - def __init__(self, objects_parsed, detected_objects): + def __init__(self, camera): threading.Thread.__init__(self) - self._objects_parsed = objects_parsed - self._detected_objects = detected_objects + self.camera = camera def run(self): prctl.set_name("ObjectCleaner") @@ -22,22 +21,9 @@ class ObjectCleaner(threading.Thread): # wait a bit before checking for expired frames time.sleep(0.2) - # expire the objects that are more than 1 second old - now = datetime.datetime.now().timestamp() - # look for the first object found within the last second - # (newest objects are appended to the end) - detected_objects = self._detected_objects.copy() - - objects_removed = False - for frame_time in detected_objects.keys(): - if now-frame_time>2: - del self._detected_objects[frame_time] - objects_removed = True - - if objects_removed: - # notify that parsed objects were changed - with self._objects_parsed: - self._objects_parsed.notify_all() + for frame_time in list(self.camera.detected_objects.keys()).copy(): + if not frame_time in self.camera.frame_cache: + del self.camera.detected_objects[frame_time] class DetectedObjectsProcessor(threading.Thread): def __init__(self, camera): @@ -168,9 +154,6 @@ class RegionRefiner(threading.Thread): selected_objects = [o for o in selected_objects if not self.filtered(o)] self.camera.detected_objects[frame_time] = selected_objects - - with self.camera.objects_parsed: - self.camera.objects_parsed.notify_all() # print(f"{frame_time} is actually finished") @@ -247,11 +230,16 @@ class ObjectTracker(threading.Thread): while True: frame_time = self.camera.refined_frame_queue.get() self.match_and_update(self.camera.detected_objects[frame_time]) - self.camera.frame_tracked_queue.put(frame_time) + self.camera.frame_output_queue.put(frame_time) + if len(self.tracked_objects) > 0: + with self.camera.objects_tracked: + self.camera.objects_tracked.notify_all() def register(self, index, obj): id = f"{str(obj['frame_time'])}-{index}" obj['id'] = id + obj['top_score'] = obj['score'] + self.add_history(obj) self.tracked_objects[id] = obj self.disappeared[id] = 0 @@ -261,7 +249,22 @@ class ObjectTracker(threading.Thread): def update(self, id, new_obj): self.tracked_objects[id].update(new_obj) - # TODO: am i missing anything? history? + self.add_history(self.tracked_objects[id]) + if self.tracked_objects[id]['score'] > self.tracked_objects[id]['top_score']: + self.tracked_objects[id]['top_score'] = self.tracked_objects[id]['score'] + + def add_history(self, obj): + entry = { + 'score': obj['score'], + 'box': obj['box'], + 'region': obj['region'], + 'centroid': obj['centroid'], + 'frame_time': obj['frame_time'] + } + if 'history' in obj: + obj['history'].append(entry) + else: + obj['history'] = [entry] def match_and_update(self, new_objects): # check to see if the list of input bounding box rectangles @@ -384,26 +387,23 @@ class ObjectTracker(threading.Thread): # Maintains the frame and object with the highest score class BestFrames(threading.Thread): - def __init__(self, objects_parsed, recent_frames, detected_objects): + def __init__(self, camera): threading.Thread.__init__(self) - self.objects_parsed = objects_parsed - self.recent_frames = recent_frames - self.detected_objects = detected_objects + self.camera = camera self.best_objects = {} self.best_frames = {} def run(self): - prctl.set_name("BestFrames") + prctl.set_name(self.__class__.__name__) while True: - - # wait until objects have been parsed - with self.objects_parsed: - self.objects_parsed.wait() + # wait until objects have been tracked + with self.camera.objects_tracked: + self.camera.objects_tracked.wait() # make a copy of detected objects - detected_objects = self.detected_objects.copy() + detected_objects = list(self.camera.object_tracker.tracked_objects.values()).copy() - for obj in itertools.chain.from_iterable(detected_objects.values()): + for obj in detected_objects: if obj['name'] in self.best_objects: now = datetime.datetime.now().timestamp() # if the object is a higher score than the current best score @@ -413,12 +413,9 @@ class BestFrames(threading.Thread): else: self.best_objects[obj['name']] = obj - # make a copy of the recent frames - recent_frames = self.recent_frames.copy() - for name, obj in self.best_objects.items(): - if obj['frame_time'] in recent_frames: - best_frame = recent_frames[obj['frame_time']] #, np.zeros((720,1280,3), np.uint8)) + if obj['frame_time'] in self.camera.frame_cache: + best_frame = self.camera.frame_cache[obj['frame_time']] draw_box_with_label(best_frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], f"{int(obj['score']*100)}% {obj['area']}") diff --git a/frigate/video.py b/frigate/video.py index 99bd59c26..66eab52d7 100644 --- a/frigate/video.py +++ b/frigate/video.py @@ -26,17 +26,18 @@ class FrameTracker(threading.Thread): self.recent_frames = recent_frames def run(self): - prctl.set_name("FrameTracker") + prctl.set_name(self.__class__.__name__) while True: # wait for a frame with self.frame_ready: self.frame_ready.wait() - now = datetime.datetime.now().timestamp() # delete any old frames stored_frame_times = list(self.recent_frames.keys()) - for k in stored_frame_times: - if (now - k) > 10: + stored_frame_times.sort(reverse=True) + if len(stored_frame_times) > 100: + frames_to_delete = stored_frame_times[50:] + for k in frames_to_delete: del self.recent_frames[k] def get_frame_shape(source): @@ -58,7 +59,7 @@ class CameraWatchdog(threading.Thread): self.camera = camera def run(self): - prctl.set_name("CameraWatchdog") + prctl.set_name(self.__class__.__name__) while True: # wait a bit before checking time.sleep(10) @@ -75,7 +76,7 @@ class CameraCapture(threading.Thread): self.camera = camera def run(self): - prctl.set_name("CameraCapture") + prctl.set_name(self.__class__.__name__) frame_num = 0 while True: if self.camera.ffmpeg_process.poll() != None: @@ -113,10 +114,10 @@ class VideoWriter(threading.Thread): self.camera = camera def run(self): - prctl.set_name("VideoWriter") + prctl.set_name(self.__class__.__name__) while True: - frame_time = self.camera.frame_tracked_queue.get() - if len(self.camera.detected_objects[frame_time]) == 0: + frame_time = self.camera.frame_output_queue.get() + if len(self.camera.object_tracker.tracked_objects) == 0: continue f = open(f"/debug/{self.camera.name}-{str(frame_time)}.jpg", 'wb') f.write(self.camera.frame_with_objects(frame_time)) @@ -137,7 +138,7 @@ class Camera: self.regions_in_process_lock = mp.Lock() self.finished_frame_queue = queue.Queue() self.refined_frame_queue = queue.Queue() - self.frame_tracked_queue = queue.Queue() + self.frame_output_queue = queue.Queue() self.ffmpeg = config.get('ffmpeg', {}) self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input']) @@ -161,8 +162,8 @@ class Camera: self.frame_lock = mp.Lock() # Condition for notifying that a new frame is ready self.frame_ready = mp.Condition() - # Condition for notifying that objects were parsed - self.objects_parsed = mp.Condition() + # Condition for notifying that objects were tracked + self.objects_tracked = mp.Condition() # Queue for prepped frames, max size set to (number of regions * 5) max_queue_size = len(self.config['regions'])*5 @@ -208,11 +209,11 @@ class Camera: self.region_prepper.start() # start a thread to store the highest scoring recent frames for monitored object types - self.best_frames = BestFrames(self.objects_parsed, self.frame_cache, self.detected_objects) + self.best_frames = BestFrames(self) self.best_frames.start() # start a thread to expire objects from the detected objects list - self.object_cleaner = ObjectCleaner(self.objects_parsed, self.detected_objects) + self.object_cleaner = ObjectCleaner(self) self.object_cleaner.start() # start a thread to refine regions when objects are clipped @@ -230,7 +231,7 @@ class Camera: self.video_writer.start() # start a thread to publish object scores - mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self.objects_parsed, self.detected_objects, self.best_frames) + mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self) mqtt_publisher.start() # create a watchdog thread for capture process @@ -321,7 +322,7 @@ class Camera: color, 2) # draw the bounding boxes on the screen - for id, obj in self.object_tracker.tracked_objects.items(): + for id, obj in list(self.object_tracker.tracked_objects.items()): # for obj in detected_objects[frame_time]: cv2.rectangle(frame, (obj['region']['xmin'], obj['region']['ymin']), (obj['region']['xmax'], obj['region']['ymax']),