blakeblackshear.frigate/frigate/video.py
Josh Hawkins 531042467a
Dynamically enable/disable cameras (#16894)
* config options

* metrics

* stop and restart ffmpeg processes

* dispatcher

* frontend websocket

* buttons for testing

* don't recreate log pipe

* add/remove cam from birdseye when enabling/disabling

* end all objects and send empty camera activity

* enable/disable switch in ui

* disable buttons when camera is disabled

* use enabled_in_config for some frontend checks

* tweaks

* handle settings pane with disabled cameras

* frontend tweaks

* change to debug log

* mqtt docs

* tweak

* ensure all ffmpeg processes are initially started

* clean up

* use zmq

* remove camera metrics

* remove camera metrics

* tweaks

* frontend tweaks
2025-03-03 08:30:52 -07:00

943 lines
32 KiB
Python
Executable File

import datetime
import logging
import multiprocessing as mp
import os
import queue
import signal
import subprocess as sp
import threading
import time
import cv2
from setproctitle import setproctitle
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, DetectConfig, ModelConfig
from frigate.const import (
CACHE_DIR,
CACHE_SEGMENT_FORMAT,
REQUEST_REGION_GRID,
)
from frigate.log import LogPipe
from frigate.motion import MotionDetector
from frigate.motion.improved_motion import ImprovedMotionDetector
from frigate.object_detection import RemoteObjectDetector
from frigate.ptz.autotrack import ptz_moving_at_frame_time
from frigate.track import ObjectTracker
from frigate.track.norfair_tracker import NorfairTracker
from frigate.track.tracked_object import TrackedObjectAttribute
from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_time
from frigate.util.image import (
FrameManager,
SharedMemoryFrameManager,
draw_box_with_label,
)
from frigate.util.object import (
create_tensor_input,
get_cluster_candidates,
get_cluster_region,
get_cluster_region_from_grid,
get_min_region_size,
get_startup_regions,
inside_any,
intersects_any,
is_object_filtered,
reduce_detections,
)
from frigate.util.services import listen
logger = logging.getLogger(__name__)
def stop_ffmpeg(ffmpeg_process, logger):
logger.info("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
try:
logger.info("Waiting for ffmpeg to exit gracefully...")
ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
logger.info("FFmpeg didn't exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.communicate()
ffmpeg_process = None
def start_or_restart_ffmpeg(
ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None
):
if ffmpeg_process is not None:
stop_ffmpeg(ffmpeg_process, logger)
if frame_size is None:
process = sp.Popen(
ffmpeg_cmd,
stdout=sp.DEVNULL,
stderr=logpipe,
stdin=sp.DEVNULL,
start_new_session=True,
)
else:
process = sp.Popen(
ffmpeg_cmd,
stdout=sp.PIPE,
stderr=logpipe,
stdin=sp.DEVNULL,
bufsize=frame_size * 10,
start_new_session=True,
)
return process
def capture_frames(
ffmpeg_process,
config: CameraConfig,
shm_frame_count: int,
frame_index: int,
frame_shape: tuple[int, int],
frame_manager: FrameManager,
frame_queue,
fps: mp.Value,
skipped_fps: mp.Value,
current_frame: mp.Value,
stop_event: mp.Event,
):
frame_size = frame_shape[0] * frame_shape[1]
frame_rate = EventsPerSecond()
frame_rate.start()
skipped_eps = EventsPerSecond()
skipped_eps.start()
config_subscriber = ConfigSubscriber(f"config/enabled/{config.name}", True)
def get_enabled_state():
"""Fetch the latest enabled state from ZMQ."""
_, config_data = config_subscriber.check_for_update()
if config_data:
return config_data.enabled
return config.enabled
while not stop_event.is_set():
if not get_enabled_state():
logger.debug(f"Stopping capture thread for disabled {config.name}")
break
fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{config.name}_frame{frame_index}"
frame_buffer = frame_manager.write(frame_name)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
except Exception:
# shutdown has been initiated
if stop_event.is_set():
break
logger.error(f"{config.name}: Unable to read frames from ffmpeg process.")
if ffmpeg_process.poll() is not None:
logger.error(
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
)
break
continue
frame_rate.update()
# don't lock the queue to check, just try since it should rarely be full
try:
# add to the queue
frame_queue.put((frame_name, current_frame.value), False)
frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()
frame_index = 0 if frame_index == shm_frame_count - 1 else frame_index + 1
class CameraWatchdog(threading.Thread):
def __init__(
self,
camera_name,
config: CameraConfig,
shm_frame_count: int,
frame_queue: mp.Queue,
camera_fps,
skipped_fps,
ffmpeg_pid,
stop_event,
):
threading.Thread.__init__(self)
self.logger = logging.getLogger(f"watchdog.{camera_name}")
self.camera_name = camera_name
self.config = config
self.shm_frame_count = shm_frame_count
self.capture_thread = None
self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
self.ffmpeg_other_processes: list[dict[str, any]] = []
self.camera_fps = camera_fps
self.skipped_fps = skipped_fps
self.ffmpeg_pid = ffmpeg_pid
self.frame_queue = frame_queue
self.frame_shape = self.config.frame_shape_yuv
self.frame_size = self.frame_shape[0] * self.frame_shape[1]
self.fps_overflow_count = 0
self.frame_index = 0
self.stop_event = stop_event
self.sleeptime = self.config.ffmpeg.retry_interval
self.config_subscriber = ConfigSubscriber(f"config/enabled/{camera_name}", True)
self.was_enabled = self.config.enabled
def _update_enabled_state(self) -> bool:
"""Fetch the latest config and update enabled state."""
_, config_data = self.config_subscriber.check_for_update()
if config_data:
enabled = config_data.enabled
return enabled
return self.was_enabled if self.was_enabled is not None else self.config.enabled
def run(self):
if self._update_enabled_state():
self.start_all_ffmpeg()
time.sleep(self.sleeptime)
while not self.stop_event.wait(self.sleeptime):
enabled = self._update_enabled_state()
if enabled != self.was_enabled:
if enabled:
self.logger.debug(f"Enabling camera {self.camera_name}")
self.start_all_ffmpeg()
else:
self.logger.debug(f"Disabling camera {self.camera_name}")
self.stop_all_ffmpeg()
self.was_enabled = enabled
continue
if not enabled:
continue
now = datetime.datetime.now().timestamp()
if not self.capture_thread.is_alive():
self.camera_fps.value = 0
self.logger.error(
f"Ffmpeg process crashed unexpectedly for {self.camera_name}."
)
self.logger.error(
"The following ffmpeg logs include the last 100 lines prior to exit."
)
self.logpipe.dump()
self.start_ffmpeg_detect()
elif now - self.capture_thread.current_frame.value > 20:
self.camera_fps.value = 0
self.logger.info(
f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..."
)
self.ffmpeg_detect_process.terminate()
try:
self.logger.info("Waiting for ffmpeg to exit gracefully...")
self.ffmpeg_detect_process.communicate(timeout=30)
except sp.TimeoutExpired:
self.logger.info("FFmpeg did not exit. Force killing...")
self.ffmpeg_detect_process.kill()
self.ffmpeg_detect_process.communicate()
elif self.camera_fps.value >= (self.config.detect.fps + 10):
self.fps_overflow_count += 1
if self.fps_overflow_count == 3:
self.fps_overflow_count = 0
self.camera_fps.value = 0
self.logger.info(
f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..."
)
self.ffmpeg_detect_process.terminate()
try:
self.logger.info("Waiting for ffmpeg to exit gracefully...")
self.ffmpeg_detect_process.communicate(timeout=30)
except sp.TimeoutExpired:
self.logger.info("FFmpeg did not exit. Force killing...")
self.ffmpeg_detect_process.kill()
self.ffmpeg_detect_process.communicate()
else:
# process is running normally
self.fps_overflow_count = 0
for p in self.ffmpeg_other_processes:
poll = p["process"].poll()
if self.config.record.enabled and "record" in p["roles"]:
latest_segment_time = self.get_latest_segment_datetime(
p.get(
"latest_segment_time",
datetime.datetime.now().astimezone(datetime.timezone.utc),
)
)
if datetime.datetime.now().astimezone(datetime.timezone.utc) > (
latest_segment_time + datetime.timedelta(seconds=120)
):
self.logger.error(
f"No new recording segments were created for {self.camera_name} in the last 120s. restarting the ffmpeg record process..."
)
p["process"] = start_or_restart_ffmpeg(
p["cmd"],
self.logger,
p["logpipe"],
ffmpeg_process=p["process"],
)
continue
else:
p["latest_segment_time"] = latest_segment_time
if poll is None:
continue
p["logpipe"].dump()
p["process"] = start_or_restart_ffmpeg(
p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
)
self.stop_all_ffmpeg()
self.logpipe.close()
self.config_subscriber.stop()
def start_ffmpeg_detect(self):
ffmpeg_cmd = [
c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"]
][0]
self.ffmpeg_detect_process = start_or_restart_ffmpeg(
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
)
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture(
self.config,
self.shm_frame_count,
self.frame_index,
self.ffmpeg_detect_process,
self.frame_shape,
self.frame_queue,
self.camera_fps,
self.skipped_fps,
self.stop_event,
)
self.capture_thread.start()
def start_all_ffmpeg(self):
"""Start all ffmpeg processes (detection and others)."""
logger.debug(f"Starting all ffmpeg processes for {self.camera_name}")
self.start_ffmpeg_detect()
for c in self.config.ffmpeg_cmds:
if "detect" in c["roles"]:
continue
logpipe = LogPipe(
f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}"
)
self.ffmpeg_other_processes.append(
{
"cmd": c["cmd"],
"roles": c["roles"],
"logpipe": logpipe,
"process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe),
}
)
def stop_all_ffmpeg(self):
"""Stop all ffmpeg processes (detection and others)."""
logger.debug(f"Stopping all ffmpeg processes for {self.camera_name}")
if self.capture_thread is not None and self.capture_thread.is_alive():
self.capture_thread.join(timeout=5)
if self.capture_thread.is_alive():
self.logger.warning(
f"Capture thread for {self.camera_name} did not stop gracefully."
)
if self.ffmpeg_detect_process is not None:
stop_ffmpeg(self.ffmpeg_detect_process, self.logger)
self.ffmpeg_detect_process = None
for p in self.ffmpeg_other_processes[:]:
if p["process"] is not None:
stop_ffmpeg(p["process"], self.logger)
p["logpipe"].close()
self.ffmpeg_other_processes.clear()
def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int:
"""Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("preview_")
]
)
newest_segment_time = latest_segment
for file in cache_files:
if self.camera_name in file:
basename = os.path.splitext(file)[0]
_, date = basename.rsplit("@", maxsplit=1)
segment_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
if segment_time > newest_segment_time:
newest_segment_time = segment_time
return newest_segment_time
class CameraCapture(threading.Thread):
def __init__(
self,
config: CameraConfig,
shm_frame_count: int,
frame_index: int,
ffmpeg_process,
frame_shape: tuple[int, int],
frame_queue: mp.Queue,
fps,
skipped_fps,
stop_event,
):
threading.Thread.__init__(self)
self.name = f"capture:{config.name}"
self.config = config
self.shm_frame_count = shm_frame_count
self.frame_index = frame_index
self.frame_shape = frame_shape
self.frame_queue = frame_queue
self.fps = fps
self.stop_event = stop_event
self.skipped_fps = skipped_fps
self.frame_manager = SharedMemoryFrameManager()
self.ffmpeg_process = ffmpeg_process
self.current_frame = mp.Value("d", 0.0)
self.last_frame = 0
def run(self):
capture_frames(
self.ffmpeg_process,
self.config,
self.shm_frame_count,
self.frame_index,
self.frame_shape,
self.frame_manager,
self.frame_queue,
self.fps,
self.skipped_fps,
self.current_frame,
self.stop_event,
)
def capture_camera(
name, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"capture:{name}"
setproctitle(f"frigate.capture:{name}")
camera_watchdog = CameraWatchdog(
name,
config,
shm_frame_count,
camera_metrics.frame_queue,
camera_metrics.camera_fps,
camera_metrics.skipped_fps,
camera_metrics.ffmpeg_pid,
stop_event,
)
camera_watchdog.start()
camera_watchdog.join()
def track_camera(
name,
config: CameraConfig,
model_config,
labelmap,
detection_queue,
result_connection,
detected_objects_queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
region_grid,
):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"process:{name}"
setproctitle(f"frigate.process:{name}")
listen()
frame_queue = camera_metrics.frame_queue
frame_shape = config.frame_shape
objects_to_track = config.objects.track
object_filters = config.objects.filters
motion_detector = ImprovedMotionDetector(
frame_shape,
config.motion,
config.detect.fps,
name=config.name,
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, result_connection, model_config, stop_event
)
object_tracker = NorfairTracker(config, ptz_metrics)
frame_manager = SharedMemoryFrameManager()
# create communication for region grid updates
requestor = InterProcessRequestor()
process_frames(
name,
requestor,
frame_queue,
frame_shape,
model_config,
config.detect,
frame_manager,
motion_detector,
object_detector,
object_tracker,
detected_objects_queue,
camera_metrics,
objects_to_track,
object_filters,
stop_event,
ptz_metrics,
region_grid,
)
# empty the frame queue
logger.info(f"{name}: emptying frame queue")
while not frame_queue.empty():
(frame_name, _) = frame_queue.get(False)
frame_manager.delete(frame_name)
logger.info(f"{name}: exiting subprocess")
def detect(
detect_config: DetectConfig,
object_detector,
frame,
model_config: ModelConfig,
region,
objects_to_track,
object_filters,
):
tensor_input = create_tensor_input(frame, model_config, region)
detections = []
region_detections = object_detector.detect(tensor_input)
for d in region_detections:
box = d[2]
size = region[2] - region[0]
x_min = int(max(0, (box[1] * size) + region[0]))
y_min = int(max(0, (box[0] * size) + region[1]))
x_max = int(min(detect_config.width - 1, (box[3] * size) + region[0]))
y_max = int(min(detect_config.height - 1, (box[2] * size) + region[1]))
# ignore objects that were detected outside the frame
if (x_min >= detect_config.width - 1) or (y_min >= detect_config.height - 1):
continue
width = x_max - x_min
height = y_max - y_min
area = width * height
ratio = width / max(1, height)
det = (d[0], d[1], (x_min, y_min, x_max, y_max), area, ratio, region)
# apply object filters
if is_object_filtered(det, objects_to_track, object_filters):
continue
detections.append(det)
return detections
def process_frames(
camera_name: str,
requestor: InterProcessRequestor,
frame_queue: mp.Queue,
frame_shape,
model_config: ModelConfig,
detect_config: DetectConfig,
frame_manager: FrameManager,
motion_detector: MotionDetector,
object_detector: RemoteObjectDetector,
object_tracker: ObjectTracker,
detected_objects_queue: mp.Queue,
camera_metrics: CameraMetrics,
objects_to_track: list[str],
object_filters,
stop_event,
ptz_metrics: PTZMetrics,
region_grid,
exit_on_empty: bool = False,
):
next_region_update = get_tomorrow_at_time(2)
detect_config_subscriber = ConfigSubscriber(f"config/detect/{camera_name}", True)
enabled_config_subscriber = ConfigSubscriber(f"config/enabled/{camera_name}", True)
fps_tracker = EventsPerSecond()
fps_tracker.start()
startup_scan = True
stationary_frame_counter = 0
region_min_size = get_min_region_size(model_config)
prev_enabled = None
while not stop_event.is_set():
_, enabled_config = enabled_config_subscriber.check_for_update()
current_enabled = (
enabled_config.enabled
if enabled_config
else (prev_enabled if prev_enabled is not None else True)
)
if prev_enabled is None:
prev_enabled = current_enabled
if prev_enabled and not current_enabled and camera_metrics.frame_queue.empty():
logger.debug(f"Camera {camera_name} disabled, clearing tracked objects")
# Clear norfair's dictionaries
object_tracker.tracked_objects.clear()
object_tracker.disappeared.clear()
object_tracker.stationary_box_history.clear()
object_tracker.positions.clear()
object_tracker.track_id_map.clear()
# Clear internal norfair states
for trackers_by_type in object_tracker.trackers.values():
for tracker in trackers_by_type.values():
tracker.tracked_objects = []
for tracker in object_tracker.default_tracker.values():
tracker.tracked_objects = []
prev_enabled = current_enabled
if not current_enabled:
time.sleep(0.1)
continue
# check for updated detect config
_, updated_detect_config = detect_config_subscriber.check_for_update()
if updated_detect_config:
detect_config = updated_detect_config
if (
datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update
):
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name)
next_region_update = get_tomorrow_at_time(2)
try:
if exit_on_empty:
frame_name, frame_time = frame_queue.get(False)
else:
frame_name, frame_time = frame_queue.get(True, 1)
except queue.Empty:
if exit_on_empty:
logger.info("Exiting track_objects...")
break
continue
camera_metrics.detection_frame.value = frame_time
ptz_metrics.frame_time.value = frame_time
frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1]))
if frame is None:
logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.")
continue
# look for motion if enabled
motion_boxes = motion_detector.detect(frame)
regions = []
consolidated_detections = []
# if detection is disabled
if not detect_config.enabled:
object_tracker.match_and_update(frame_name, frame_time, [])
else:
# get stationary object ids
# check every Nth frame for stationary objects
# disappeared objects are not stationary
# also check for overlapping motion boxes
if stationary_frame_counter == detect_config.stationary.interval:
stationary_frame_counter = 0
stationary_object_ids = []
else:
stationary_frame_counter += 1
stationary_object_ids = [
obj["id"]
for obj in object_tracker.tracked_objects.values()
# if it has exceeded the stationary threshold
if obj["motionless_count"] >= detect_config.stationary.threshold
# and it hasn't disappeared
and object_tracker.disappeared[obj["id"]] == 0
# and it doesn't overlap with any current motion boxes when not calibrating
and not intersects_any(
obj["box"],
[] if motion_detector.is_calibrating() else motion_boxes,
)
]
# get tracked object boxes that aren't stationary
tracked_object_boxes = [
(
# use existing object box for stationary objects
obj["estimate"]
if obj["motionless_count"] < detect_config.stationary.threshold
else obj["box"]
)
for obj in object_tracker.tracked_objects.values()
if obj["id"] not in stationary_object_ids
]
object_boxes = tracked_object_boxes + object_tracker.untracked_object_boxes
# get consolidated regions for tracked objects
regions = [
get_cluster_region(
frame_shape, region_min_size, candidate, object_boxes
)
for candidate in get_cluster_candidates(
frame_shape, region_min_size, object_boxes
)
]
# only add in the motion boxes when not calibrating and a ptz is not moving via autotracking
# ptz_moving_at_frame_time() always returns False for non-autotracking cameras
if not motion_detector.is_calibrating() and not ptz_moving_at_frame_time(
frame_time,
ptz_metrics.start_time.value,
ptz_metrics.stop_time.value,
):
# find motion boxes that are not inside tracked object regions
standalone_motion_boxes = [
b for b in motion_boxes if not inside_any(b, regions)
]
if standalone_motion_boxes:
motion_clusters = get_cluster_candidates(
frame_shape,
region_min_size,
standalone_motion_boxes,
)
motion_regions = [
get_cluster_region_from_grid(
frame_shape,
region_min_size,
candidate,
standalone_motion_boxes,
region_grid,
)
for candidate in motion_clusters
]
regions += motion_regions
# if starting up, get the next startup scan region
if startup_scan:
for region in get_startup_regions(
frame_shape, region_min_size, region_grid
):
regions.append(region)
startup_scan = False
# resize regions and detect
# seed with stationary objects
detections = [
(
obj["label"],
obj["score"],
obj["box"],
obj["area"],
obj["ratio"],
obj["region"],
)
for obj in object_tracker.tracked_objects.values()
if obj["id"] in stationary_object_ids
]
for region in regions:
detections.extend(
detect(
detect_config,
object_detector,
frame,
model_config,
region,
objects_to_track,
object_filters,
)
)
consolidated_detections = reduce_detections(frame_shape, detections)
# if detection was run on this frame, consolidate
if len(regions) > 0:
tracked_detections = [
d
for d in consolidated_detections
if d[0] not in model_config.all_attributes
]
# now that we have refined our detections, we need to track objects
object_tracker.match_and_update(
frame_name, frame_time, tracked_detections
)
# else, just update the frame times for the stationary objects
else:
object_tracker.update_frame_times(frame_name, frame_time)
# group the attribute detections based on what label they apply to
attribute_detections: dict[str, list[TrackedObjectAttribute]] = {}
for label, attribute_labels in model_config.attributes_map.items():
attribute_detections[label] = [
TrackedObjectAttribute(d)
for d in consolidated_detections
if d[0] in attribute_labels
]
# build detections
detections = {}
for obj in object_tracker.tracked_objects.values():
detections[obj["id"]] = {**obj, "attributes": []}
# find the best object for each attribute to be assigned to
all_objects: list[dict[str, any]] = object_tracker.tracked_objects.values()
for attributes in attribute_detections.values():
for attribute in attributes:
filtered_objects = filter(
lambda o: attribute.label
in model_config.attributes_map.get(o["label"], []),
all_objects,
)
selected_object_id = attribute.find_best_object(filtered_objects)
if selected_object_id is not None:
detections[selected_object_id]["attributes"].append(
attribute.get_tracking_data()
)
# debug object tracking
if False:
bgr_frame = cv2.cvtColor(
frame,
cv2.COLOR_YUV2BGR_I420,
)
object_tracker.debug_draw(bgr_frame, frame_time)
cv2.imwrite(
f"debug/frames/track-{'{:.6f}'.format(frame_time)}.jpg", bgr_frame
)
# debug
if False:
bgr_frame = cv2.cvtColor(
frame,
cv2.COLOR_YUV2BGR_I420,
)
for m_box in motion_boxes:
cv2.rectangle(
bgr_frame,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
for b in tracked_object_boxes:
cv2.rectangle(
bgr_frame,
(b[0], b[1]),
(b[2], b[3]),
(255, 0, 0),
2,
)
for obj in object_tracker.tracked_objects.values():
if obj["frame_time"] == frame_time:
thickness = 2
color = model_config.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw the bounding boxes on the frame
box = obj["box"]
draw_box_with_label(
bgr_frame,
box[0],
box[1],
box[2],
box[3],
obj["label"],
obj["id"],
thickness=thickness,
color=color,
)
for region in regions:
cv2.rectangle(
bgr_frame,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
cv2.imwrite(
f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg",
bgr_frame,
)
# add to the queue if not full
if detected_objects_queue.full():
frame_manager.close(frame_name)
continue
else:
fps_tracker.update()
camera_metrics.process_fps.value = fps_tracker.eps()
detected_objects_queue.put(
(
camera_name,
frame_name,
frame_time,
detections,
motion_boxes,
regions,
)
)
camera_metrics.detection_fps.value = object_detector.fps.eps()
frame_manager.close(frame_name)
motion_detector.stop()
requestor.stop()
detect_config_subscriber.stop()
enabled_config_subscriber.stop()