mirror of
synced 2024-11-21 19:07:46 +01:00
add watchdog for camera processes
This commit is contained in:
@ -2,6 +2,7 @@ import cv2
import time
import time
import queue
import queue
import yaml
import yaml
import threading
import multiprocessing as mp
import multiprocessing as mp
import subprocess as sp
import subprocess as sp
import numpy as np
import numpy as np
@ -50,10 +51,40 @@ GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
WEB_PORT = CONFIG.get('web_port', 5000)
WEB_PORT = CONFIG.get('web_port', 5000)
DEBUG = (CONFIG.get('debug', '0') == '1')
DEBUG = (CONFIG.get('debug', '0') == '1')
# TODO: make CPU/Coral switching more seamless
# MODEL_PATH = CONFIG.get('tflite_model', '/lab/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite')
# MODEL_PATH = CONFIG.get('tflite_model', '/lab/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite')
MODEL_PATH = CONFIG.get('tflite_model', '/lab/detect.tflite')
MODEL_PATH = CONFIG.get('tflite_model', '/lab/detect.tflite')
LABEL_MAP = CONFIG.get('label_map', '/lab/labelmap.txt')
LABEL_MAP = CONFIG.get('label_map', '/lab/labelmap.txt')
class CameraWatchdog(threading.Thread):
def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue):
self.camera_processes = camera_processes
self.config = config
self.tflite_process = tflite_process
self.tracked_objects_queue = tracked_objects_queue
def run(self):
while True:
# wait a bit before checking
for name, camera_process in self.camera_processes.items():
process = camera_process['process']
if not process.is_alive():
print(f"Process for {name} is not alive. Starting again...")
camera_process['fps'].value = 10.0
camera_process['skipped_fps'].value = 0.0
process = mp.Process(target=track_camera, args=(name, self.config[name], FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
self.tflite_process.detect_lock, self.tflite_process.detect_ready, self.tflite_process.frame_ready, self.tracked_objects_queue,
camera_process['fps'], camera_process['skipped_fps']))
process.daemon = True
camera_process['process'] = process
print(f"Camera_process started for {name}: {process.pid}")
def main():
def main():
# connect to mqtt and setup last will
# connect to mqtt and setup last will
def on_connect(client, userdata, flags, rc):
def on_connect(client, userdata, flags, rc):
@ -101,22 +132,24 @@ def main():
tflite_process = EdgeTPUProcess(MODEL_PATH)
tflite_process = EdgeTPUProcess(MODEL_PATH)
# start the camera processes
# start the camera processes
camera_processes = []
camera_processes = {}
camera_stats_values = {}
for name, config in CONFIG['cameras'].items():
for name, config in CONFIG['cameras'].items():
camera_stats_values[name] = {
camera_processes[name] = {
'fps': mp.Value('d', 10.0),
'fps': mp.Value('d', 10.0),
'skipped_fps': mp.Value('d', 0.0)
'skipped_fps': mp.Value('d', 0.0)
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
camera_process = mp.Process(target=track_camera, args=(name, config, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG,
tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue,
tflite_process.detect_lock, tflite_process.detect_ready, tflite_process.frame_ready, tracked_objects_queue,
camera_stats_values[name]['fps'], camera_stats_values[name]['skipped_fps']))
camera_processes[name]['fps'], camera_processes[name]['skipped_fps']))
camera_process.daemon = True
camera_process.daemon = True
camera_processes[name]['process'] = camera_process
for camera_process in camera_processes:
for name, camera_process in camera_processes.items():
print(f"Camera_process started {camera_process.pid}")
print(f"Camera_process started for {name}: {camera_process['process'].pid}")
camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue)
object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue)
object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue)
@ -138,7 +171,7 @@ def main():
for name, camera_stats in camera_stats_values.items():
for name, camera_stats in camera_processes.items():
stats[name] = {
stats[name] = {
'fps': camera_stats['fps'].value,
'fps': camera_stats['fps'].value,
'skipped_fps': camera_stats['skipped_fps'].value
'skipped_fps': camera_stats['skipped_fps'].value
@ -183,8 +216,7 @@ def main():
app.run(host='', port=WEB_PORT, debug=False)
app.run(host='', port=WEB_PORT, debug=False)
for camera_process in camera_processes:
@ -55,534 +55,6 @@ def get_ffmpeg_input(ffmpeg_input):
frigate_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
frigate_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
return ffmpeg_input.format(**frigate_vars)
return ffmpeg_input.format(**frigate_vars)
<<<<<<< HEAD
class CameraWatchdog(threading.Thread):
def __init__(self, camera):
self.camera = camera
def run(self):
while True:
# wait a bit before checking
if self.camera.frame_time.value != 0.0 and (datetime.datetime.now().timestamp() - self.camera.frame_time.value) > self.camera.watchdog_timeout:
print(self.camera.name + ": last frame is more than 5 minutes old, restarting camera capture...")
# Thread to read the stdout of the ffmpeg process and update the current frame
class CameraCapture(threading.Thread):
def __init__(self, camera):
self.camera = camera
def run(self):
frame_num = 0
while True:
if self.camera.ffmpeg_process.poll() != None:
print(self.camera.name + ": ffmpeg process is not running. exiting capture thread...")
raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size)
if len(raw_image) == 0:
print(self.camera.name + ": ffmpeg didnt return a frame. something is wrong. exiting capture thread...")
frame_num += 1
if (frame_num % self.camera.take_frame) != 0:
with self.camera.frame_lock:
# TODO: use frame_queue instead
self.camera.frame_time.value = datetime.datetime.now().timestamp()
self.camera.frame_cache[self.camera.frame_time.value] = (
.frombuffer(raw_image, np.uint8)
# Notify with the condition that a new frame is ready
with self.camera.frame_ready:
class VideoWriter(threading.Thread):
def __init__(self, camera):
self.camera = camera
def run(self):
while True:
(frame_time, tracked_objects) = self.camera.frame_output_queue.get()
# if len(tracked_objects) == 0:
# continue
# f = open(f"/debug/output/{self.camera.name}-{str(format(frame_time, '.8f'))}.jpg", 'wb')
# f.write(self.camera.frame_with_objects(frame_time, tracked_objects))
# f.close()
class Camera:
def __init__(self, name, ffmpeg_config, global_objects_config, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
self.name = name
self.config = config
self.detected_objects = defaultdict(lambda: [])
self.frame_cache = {}
self.last_processed_frame = None
# queue for re-assembling frames in order
self.frame_queue = queue.Queue()
# track how many regions have been requested for a frame so we know when a frame is complete
self.regions_in_process = {}
# Lock to control access
self.regions_in_process_lock = mp.Lock()
self.finished_frame_queue = queue.Queue()
self.refined_frame_queue = queue.Queue()
self.frame_output_queue = queue.Queue()
self.ffmpeg = config.get('ffmpeg', {})
self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input'])
self.ffmpeg_global_args = self.ffmpeg.get('global_args', ffmpeg_config['global_args'])
self.ffmpeg_hwaccel_args = self.ffmpeg.get('hwaccel_args', ffmpeg_config['hwaccel_args'])
self.ffmpeg_input_args = self.ffmpeg.get('input_args', ffmpeg_config['input_args'])
self.ffmpeg_output_args = self.ffmpeg.get('output_args', ffmpeg_config['output_args'])
camera_objects_config = config.get('objects', {})
self.take_frame = self.config.get('take_frame', 1)
self.watchdog_timeout = self.config.get('watchdog_timeout', 300)
self.snapshot_config = {
'show_timestamp': self.config.get('snapshots', {}).get('show_timestamp', True)
self.regions = self.config['regions']
if 'width' in self.config and 'height' in self.config:
self.frame_shape = (self.config['height'], self.config['width'], 3)
self.frame_shape = get_frame_shape(self.ffmpeg_input)
self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
self.mqtt_client = mqtt_client
self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name)
# create shared value for storing the frame_time
self.frame_time = mp.Value('d', 0.0)
# Lock to control access to the frame
self.frame_lock = mp.Lock()
# Condition for notifying that a new frame is ready
self.frame_ready = mp.Condition()
# Condition for notifying that objects were tracked
self.objects_tracked = mp.Condition()
# Queue for prepped frames, max size set to (number of regions * 5)
self.resize_queue = queue.Queue()
# Queue for raw detected objects
self.detected_objects_queue = queue.Queue()
self.detected_objects_processor = DetectedObjectsProcessor(self)
# initialize the frame cache
self.cached_frame_with_objects = {
'frame_bytes': [],
'frame_time': 0
self.ffmpeg_process = None
self.capture_thread = None
self.fps = EventsPerSecond()
self.skipped_region_tracker = EventsPerSecond()
# combine tracked objects lists
self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', []))
# merge object filters
global_object_filters = global_objects_config.get('filters', {})
camera_object_filters = camera_objects_config.get('filters', {})
objects_with_config = set().union(global_object_filters.keys(), camera_object_filters.keys())
self.object_filters = {}
for obj in objects_with_config:
self.object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
# start a thread to track objects
self.object_tracker = ObjectTracker(self, 10)
# start a thread to write tracked frames to disk
self.video_writer = VideoWriter(self)
# start a thread to queue resize requests for regions
self.region_requester = RegionRequester(self)
# start a thread to cache recent frames for processing
self.frame_tracker = FrameTracker(self.frame_time,
self.frame_ready, self.frame_lock, self.frame_cache)
# start a thread to resize regions
self.region_prepper = RegionPrepper(self, self.frame_cache, self.resize_queue, prepped_frame_queue)
# start a thread to store the highest scoring recent frames for monitored object types
self.best_frames = BestFrames(self)
# start a thread to expire objects from the detected objects list
self.object_cleaner = ObjectCleaner(self)
# start a thread to refine regions when objects are clipped
self.dynamic_region_fps = EventsPerSecond()
self.region_refiner = RegionRefiner(self)
# start a thread to publish object scores
mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self)
# create a watchdog thread for capture process
self.watchdog = CameraWatchdog(self)
# load in the mask for object detection
if 'mask' in self.config:
self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE)
self.mask = None
if self.mask is None:
self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
self.mask[:] = 255
def start_or_restart_capture(self):
if not self.ffmpeg_process is None:
print("Terminating the existing ffmpeg process...")
print("Waiting for ffmpeg to exit gracefully...")
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
print("Waiting for the capture thread to exit...")
self.ffmpeg_process = None
self.capture_thread = None
# # Thread to read the stdout of the ffmpeg process and update the current frame
# class CameraCapture(threading.Thread):
# def __init__(self, camera):
# threading.Thread.__init__(self)
# self.camera = camera
# def run(self):
# prctl.set_name(self.__class__.__name__)
# frame_num = 0
# while True:
# if self.camera.ffmpeg_process.poll() != None:
# print(self.camera.name + ": ffmpeg process is not running. exiting capture thread...")
# break
# raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size)
# if len(raw_image) == 0:
# print(self.camera.name + ": ffmpeg didnt return a frame. something is wrong. exiting capture thread...")
# break
# frame_num += 1
# if (frame_num % self.camera.take_frame) != 0:
# continue
# with self.camera.frame_lock:
# # TODO: use frame_queue instead
# self.camera.frame_time.value = datetime.datetime.now().timestamp()
# self.camera.frame_cache[self.camera.frame_time.value] = (
# np
# .frombuffer(raw_image, np.uint8)
# .reshape(self.camera.frame_shape)
# )
# self.camera.frame_queue.put(self.camera.frame_time.value)
# # Notify with the condition that a new frame is ready
# with self.camera.frame_ready:
# self.camera.frame_ready.notify_all()
# self.camera.fps.update()
# class VideoWriter(threading.Thread):
# def __init__(self, camera):
# threading.Thread.__init__(self)
# self.camera = camera
# def run(self):
# prctl.set_name(self.__class__.__name__)
# while True:
# (frame_time, tracked_objects) = self.camera.frame_output_queue.get()
# # if len(tracked_objects) == 0:
# # continue
# # f = open(f"/debug/output/{self.camera.name}-{str(format(frame_time, '.8f'))}.jpg", 'wb')
# # f.write(self.camera.frame_with_objects(frame_time, tracked_objects))
# # f.close()
# class Camera:
# def __init__(self, name, ffmpeg_config, global_objects_config, config, tflite_process, mqtt_client, mqtt_prefix):
# self.name = name
# self.config = config
# self.detected_objects = defaultdict(lambda: [])
# self.frame_cache = {}
# self.last_processed_frame = None
# # queue for re-assembling frames in order
# self.frame_queue = queue.Queue()
# # track how many regions have been requested for a frame so we know when a frame is complete
# self.regions_in_process = {}
# # Lock to control access
# self.regions_in_process_lock = mp.Lock()
# self.finished_frame_queue = queue.Queue()
# self.refined_frame_queue = queue.Queue()
# self.frame_output_queue = queue.Queue()
# self.ffmpeg = config.get('ffmpeg', {})
# self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input'])
# self.ffmpeg_global_args = self.ffmpeg.get('global_args', ffmpeg_config['global_args'])
# self.ffmpeg_hwaccel_args = self.ffmpeg.get('hwaccel_args', ffmpeg_config['hwaccel_args'])
# self.ffmpeg_input_args = self.ffmpeg.get('input_args', ffmpeg_config['input_args'])
# self.ffmpeg_output_args = self.ffmpeg.get('output_args', ffmpeg_config['output_args'])
# camera_objects_config = config.get('objects', {})
# self.take_frame = self.config.get('take_frame', 1)
# self.watchdog_timeout = self.config.get('watchdog_timeout', 300)
# self.snapshot_config = {
# 'show_timestamp': self.config.get('snapshots', {}).get('show_timestamp', True)
# }
# self.regions = self.config['regions']
# self.frame_shape = get_frame_shape(self.ffmpeg_input)
# self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
# self.mqtt_client = mqtt_client
# self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name)
# # create shared value for storing the frame_time
# self.frame_time = mp.Value('d', 0.0)
# # Lock to control access to the frame
# self.frame_lock = mp.Lock()
# # Condition for notifying that a new frame is ready
# self.frame_ready = mp.Condition()
# # Condition for notifying that objects were tracked
# self.objects_tracked = mp.Condition()
# # Queue for prepped frames, max size set to (number of regions * 5)
# self.resize_queue = queue.Queue()
# # Queue for raw detected objects
# self.detected_objects_queue = queue.Queue()
# self.detected_objects_processor = DetectedObjectsProcessor(self)
# self.detected_objects_processor.start()
# # initialize the frame cache
# self.cached_frame_with_objects = {
# 'frame_bytes': [],
# 'frame_time': 0
# }
# self.ffmpeg_process = None
# self.capture_thread = None
# self.fps = EventsPerSecond()
# self.skipped_region_tracker = EventsPerSecond()
# # combine tracked objects lists
# self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', []))
# # merge object filters
# global_object_filters = global_objects_config.get('filters', {})
# camera_object_filters = camera_objects_config.get('filters', {})
# objects_with_config = set().union(global_object_filters.keys(), camera_object_filters.keys())
# self.object_filters = {}
# for obj in objects_with_config:
# self.object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
# # start a thread to track objects
# self.object_tracker = ObjectTracker(self, 10)
# self.object_tracker.start()
# # start a thread to write tracked frames to disk
# self.video_writer = VideoWriter(self)
# self.video_writer.start()
# # start a thread to queue resize requests for regions
# self.region_requester = RegionRequester(self)
# self.region_requester.start()
# # start a thread to cache recent frames for processing
# self.frame_tracker = FrameTracker(self.frame_time,
# self.frame_ready, self.frame_lock, self.frame_cache)
# self.frame_tracker.start()
# # start a thread to resize regions
# self.region_prepper = RegionPrepper(self, self.frame_cache, self.resize_queue, prepped_frame_queue)
# self.region_prepper.start()
# # start a thread to store the highest scoring recent frames for monitored object types
# self.best_frames = BestFrames(self)
# self.best_frames.start()
# # start a thread to expire objects from the detected objects list
# self.object_cleaner = ObjectCleaner(self)
# self.object_cleaner.start()
# # start a thread to refine regions when objects are clipped
# self.dynamic_region_fps = EventsPerSecond()
# self.region_refiner = RegionRefiner(self)
# self.region_refiner.start()
# self.dynamic_region_fps.start()
# # start a thread to publish object scores
# mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self)
# mqtt_publisher.start()
# # create a watchdog thread for capture process
# self.watchdog = CameraWatchdog(self)
# # load in the mask for object detection
# if 'mask' in self.config:
# self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE)
# else:
# self.mask = None
# if self.mask is None:
# self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
# self.mask[:] = 255
# def start_or_restart_capture(self):
# if not self.ffmpeg_process is None:
# print("Terminating the existing ffmpeg process...")
# self.ffmpeg_process.terminate()
# try:
# print("Waiting for ffmpeg to exit gracefully...")
# self.ffmpeg_process.wait(timeout=30)
# except sp.TimeoutExpired:
# print("FFmpeg didnt exit. Force killing...")
# self.ffmpeg_process.kill()
# self.ffmpeg_process.wait()
# print("Waiting for the capture thread to exit...")
# self.capture_thread.join()
# self.ffmpeg_process = None
# self.capture_thread = None
# # create the process to capture frames from the input stream and store in a shared array
# print("Creating a new ffmpeg process...")
# self.start_ffmpeg()
# print("Creating a new capture thread...")
# self.capture_thread = CameraCapture(self)
# print("Starting a new capture thread...")
# self.capture_thread.start()
# self.fps.start()
# self.skipped_region_tracker.start()
# def start_ffmpeg(self):
# ffmpeg_cmd = (['ffmpeg'] +
# self.ffmpeg_global_args +
# self.ffmpeg_hwaccel_args +
# self.ffmpeg_input_args +
# ['-i', self.ffmpeg_input] +
# self.ffmpeg_output_args +
# ['pipe:'])
# print(" ".join(ffmpeg_cmd))
# self.ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=self.frame_size)
# def start(self):
# self.start_or_restart_capture()
# self.watchdog.start()
# def join(self):
# self.capture_thread.join()
# def get_capture_pid(self):
# return self.ffmpeg_process.pid
# def get_best(self, label):
# return self.best_frames.best_frames.get(label)
# def stats(self):
# # TODO: anything else?
# return {
# 'camera_fps': self.fps.eps(60),
# 'resize_queue': self.resize_queue.qsize(),
# 'frame_queue': self.frame_queue.qsize(),
# 'finished_frame_queue': self.finished_frame_queue.qsize(),
# 'refined_frame_queue': self.refined_frame_queue.qsize(),
# 'regions_in_process': self.regions_in_process,
# 'dynamic_regions_per_sec': self.dynamic_region_fps.eps(),
# 'skipped_regions_per_sec': self.skipped_region_tracker.eps(60)
# }
# def frame_with_objects(self, frame_time, tracked_objects=None):
# if not frame_time in self.frame_cache:
# frame = np.zeros(self.frame_shape, np.uint8)
# else:
# frame = self.frame_cache[frame_time].copy()
# detected_objects = self.detected_objects[frame_time].copy()
# for region in self.regions:
# color = (255,255,255)
# cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
# (region['x_offset']+region['size'], region['y_offset']+region['size']),
# color, 2)
# # draw the bounding boxes on the screen
# if tracked_objects is None:
# with self.object_tracker.tracked_objects_lock:
# tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects)
# for obj in detected_objects:
# draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], "{}% {}".format(int(obj['score']*100), obj['area']), thickness=3)
# for id, obj in tracked_objects.items():
# color = (0, 255,0) if obj['frame_time'] == frame_time else (255, 0, 0)
# draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], id, color=color, thickness=1, position='bl')
# # print a timestamp
# time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
# cv2.putText(frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
# # print fps
# cv2.putText(frame, str(self.fps.eps())+'FPS', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
# # convert to BGR
# frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# # encode the image into a jpg
# ret, jpg = cv2.imencode('.jpg', frame)
# return jpg.tobytes()
# def get_current_frame_with_objects(self):
# frame_time = self.last_processed_frame
# if frame_time == self.cached_frame_with_objects['frame_time']:
# return self.cached_frame_with_objects['frame_bytes']
# frame_bytes = self.frame_with_objects(frame_time)
# self.cached_frame_with_objects = {
# 'frame_bytes': frame_bytes,
# 'frame_time': frame_time
# }
# return frame_bytes
>>>>>>> 2a2fbe7... cleanup old code
def filtered(obj, objects_to_track, object_filters, mask):
def filtered(obj, objects_to_track, object_filters, mask):
object_name = obj[0]
object_name = obj[0]
Reference in New Issue
Block a user