make object processor resilient to plasma failures

This commit is contained in:
Blake Blackshear 2020-03-13 16:13:01 -05:00
parent 6de8e3bd1f
commit e37eba49ff
3 changed files with 127 additions and 111 deletions

View File

@ -51,12 +51,6 @@ RUN wget -q https://storage.googleapis.com/download.tensorflow.org/models/tflite
mv /detect.tflite /cpu_model.tflite && \ mv /detect.tflite /cpu_model.tflite && \
rm /cpu_model.zip rm /cpu_model.zip
RUN apt -qq update && apt -qq install --no-install-recommends -y \
gdb \
python3.7-dbg \
&& rm -rf /var/lib/apt/lists/* \
&& (apt-get autoremove -y; apt-get autoclean -y)
WORKDIR /opt/frigate/ WORKDIR /opt/frigate/
ADD frigate frigate/ ADD frigate frigate/
COPY detect_objects.py . COPY detect_objects.py .

View File

@ -71,13 +71,12 @@ def start_plasma_store():
return plasma_process return plasma_process
class CameraWatchdog(threading.Thread): class CameraWatchdog(threading.Thread):
def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, object_processor, plasma_process): def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.camera_processes = camera_processes self.camera_processes = camera_processes
self.config = config self.config = config
self.tflite_process = tflite_process self.tflite_process = tflite_process
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.object_processor = object_processor
self.plasma_process = plasma_process self.plasma_process = plasma_process
def run(self): def run(self):
@ -202,7 +201,7 @@ def main():
object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue) object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue)
object_processor.start() object_processor.start()
camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, object_processor, plasma_process) camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process)
camera_watchdog.start() camera_watchdog.start()
# create a flask app that encodes frames a mjpeg on demand # create a flask app that encodes frames a mjpeg on demand

View File

@ -1,6 +1,7 @@
import json import json
import hashlib import hashlib
import datetime import datetime
import time
import copy import copy
import cv2 import cv2
import threading import threading
@ -45,10 +46,34 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(self, camera): def get_current_frame(self, camera):
return self.camera_data[camera]['current_frame'] return self.camera_data[camera]['current_frame']
def run(self): def connect_plasma_client(self):
while True: while True:
try: try:
self.plasma_client = plasma.connect("/tmp/plasma") self.plasma_client = plasma.connect("/tmp/plasma")
return
except:
print(f"TrackedObjectProcessor: unable to connect plasma client")
time.sleep(10)
def get_from_plasma(self, object_id):
while True:
try:
return self.plasma_client.get(object_id, timeout_ms=0)
except:
self.connect_plasma_client()
time.sleep(1)
def delete_from_plasma(self, object_ids):
while True:
try:
self.plasma_client.delete(object_ids)
return
except:
self.connect_plasma_client()
time.sleep(1)
def run(self):
self.connect_plasma_client()
while True: while True:
camera, frame_time, tracked_objects = self.tracked_objects_queue.get() camera, frame_time, tracked_objects = self.tracked_objects_queue.get()
@ -63,7 +88,7 @@ class TrackedObjectProcessor(threading.Thread):
object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}")) object_id_hash = hashlib.sha1(str.encode(f"{camera}{frame_time}"))
object_id_bytes = object_id_hash.digest() object_id_bytes = object_id_hash.digest()
object_id = plasma.ObjectID(object_id_bytes) object_id = plasma.ObjectID(object_id_bytes)
current_frame = self.plasma_client.get(object_id, timeout_ms=0) current_frame = self.get_from_plasma(object_id)
if not current_frame is plasma.ObjectNotAvailable: if not current_frame is plasma.ObjectNotAvailable:
# draw the bounding boxes on the frame # draw the bounding boxes on the frame
@ -94,7 +119,7 @@ class TrackedObjectProcessor(threading.Thread):
# store the object id, so you can delete it at the next loop # store the object id, so you can delete it at the next loop
previous_object_id = self.camera_data[camera]['object_id'] previous_object_id = self.camera_data[camera]['object_id']
if not previous_object_id is None: if not previous_object_id is None:
self.plasma_client.delete([previous_object_id]) self.delete_from_plasma([previous_object_id])
self.camera_data[camera]['object_id'] = object_id self.camera_data[camera]['object_id'] = object_id
### ###
@ -148,5 +173,3 @@ class TrackedObjectProcessor(threading.Thread):
if ret: if ret:
jpg_bytes = jpg.tobytes() jpg_bytes = jpg.tobytes()
self.client.publish(f"{self.topic_prefix}/{camera}/{obj_name}/snapshot", jpg_bytes, retain=True) self.client.publish(f"{self.topic_prefix}/{camera}/{obj_name}/snapshot", jpg_bytes, retain=True)
except:
pass