Limited shm frame count (#12346)

* Only keep 2x detect fps frames in SHM

* Don't delete previous shm frames in output

* Catch case where images do not exist

* Ensure files are closed

* Clear out all frames when shutting down

* Correct the number of frames saved

* Simplify empty shm error handling

* Improve frame safety
This commit is contained in:
Nicolas Mowen 2024-07-09 06:44:53 -06:00 committed by GitHub
parent 0ce596ec8f
commit 34812b7439
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 56 additions and 37 deletions

View File

@ -659,6 +659,7 @@ class CameraState:
def update(self, frame_time, current_detections, motion_boxes, regions): def update(self, frame_time, current_detections, motion_boxes, regions):
# get the new frame # get the new frame
frame_id = f"{self.name}{frame_time}" frame_id = f"{self.name}{frame_time}"
current_frame = self.frame_manager.get( current_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv frame_id, self.camera_config.frame_shape_yuv
) )
@ -693,7 +694,7 @@ class CameraState:
for c in self.callbacks["autotrack"]: for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_time) c(self.name, updated_obj, frame_time)
if thumb_update: if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache # ensure this frame is stored in the cache
if ( if (
updated_obj.thumbnail_data["frame_time"] == frame_time updated_obj.thumbnail_data["frame_time"] == frame_time
@ -850,12 +851,16 @@ class CameraState:
with self.current_frame_lock: with self.current_frame_lock:
self.tracked_objects = tracked_objects self.tracked_objects = tracked_objects
self.current_frame_time = frame_time
self.motion_boxes = motion_boxes self.motion_boxes = motion_boxes
self.regions = regions self.regions = regions
self._current_frame = current_frame
if self.previous_frame_id is not None: if current_frame is not None:
self.frame_manager.close(self.previous_frame_id) self.current_frame_time = frame_time
self._current_frame = current_frame
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_id self.previous_frame_id = frame_id

View File

@ -45,7 +45,6 @@ def output_frames(
signal.signal(signal.SIGINT, receiveSignal) signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager() frame_manager = SharedMemoryFrameManager()
previous_frames = {}
# start a websocket server on 8082 # start a websocket server on 8082
WebSocketWSGIHandler.http_version = "1.1" WebSocketWSGIHandler.http_version = "1.1"
@ -97,6 +96,9 @@ def output_frames(
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
if frame is None:
continue
# send camera frame to ffmpeg process if websockets are connected # send camera frame to ffmpeg process if websockets are connected
if any( if any(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
@ -124,12 +126,7 @@ def output_frames(
preview_recorders[camera].write_data( preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame current_tracked_objects, motion_boxes, frame_time, frame
) )
frame_manager.close(frame_id)
# delete frames after they have been used for output
if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}")
previous_frames[camera] = frame_time
move_preview_frames("clips") move_preview_frames("clips")
@ -149,7 +146,7 @@ def output_frames(
frame_id = f"{camera}{frame_time}" frame_id = f"{camera}{frame_time}"
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id) frame_manager.close(frame_id)
detection_subscriber.stop() detection_subscriber.stop()

View File

@ -687,27 +687,31 @@ class DictFrameManager(FrameManager):
class SharedMemoryFrameManager(FrameManager): class SharedMemoryFrameManager(FrameManager):
def __init__(self): def __init__(self):
self.shm_store = {} self.shm_store: dict[str, shared_memory.SharedMemory] = {}
def create(self, name, size) -> AnyStr: def create(self, name: str, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size) shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm self.shm_store[name] = shm
return shm.buf return shm.buf
def get(self, name, shape): def get(self, name: str, shape) -> Optional[np.ndarray]:
if name in self.shm_store: try:
shm = self.shm_store[name] if name in self.shm_store:
else: shm = self.shm_store[name]
shm = shared_memory.SharedMemory(name=name) else:
self.shm_store[name] = shm shm = shared_memory.SharedMemory(name=name)
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
except FileNotFoundError:
logger.error(f"Failed to get {name} from SHM")
return None
def close(self, name): def close(self, name: str):
if name in self.shm_store: if name in self.shm_store:
self.shm_store[name].close() self.shm_store[name].close()
del self.shm_store[name] del self.shm_store[name]
def delete(self, name): def delete(self, name: str):
if name in self.shm_store: if name in self.shm_store:
self.shm_store[name].close() self.shm_store[name].close()
self.shm_store[name].unlink() self.shm_store[name].unlink()

View File

@ -94,7 +94,7 @@ def start_or_restart_ffmpeg(
def capture_frames( def capture_frames(
ffmpeg_process, ffmpeg_process,
camera_name, config: CameraConfig,
frame_shape, frame_shape,
frame_manager: FrameManager, frame_manager: FrameManager,
frame_queue, frame_queue,
@ -108,24 +108,36 @@ def capture_frames(
frame_rate.start() frame_rate.start()
skipped_eps = EventsPerSecond() skipped_eps = EventsPerSecond()
skipped_eps.start() skipped_eps.start()
shm_count = max(10, config.detect.fps * 2)
shm_frames: list[str] = []
while True: while True:
fps.value = frame_rate.eps() fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps() skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp() current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{camera_name}{current_frame.value}" frame_name = f"{config.name}{current_frame.value}"
frame_buffer = frame_manager.create(frame_name, frame_size) frame_buffer = frame_manager.create(frame_name, frame_size)
try: try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
# update frame cache and cleanup existing frames
shm_frames.append(frame_name)
if len(shm_frames) > shm_count:
expired_frame_name = shm_frames.pop(0)
frame_manager.delete(expired_frame_name)
except Exception: except Exception:
frame_manager.delete(frame_name)
# shutdown has been initiated # shutdown has been initiated
if stop_event.is_set(): if stop_event.is_set():
break break
logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.") logger.error(f"{config.name}: Unable to read frames from ffmpeg process.")
if ffmpeg_process.poll() is not None: if ffmpeg_process.poll() is not None:
logger.error( logger.error(
f"{camera_name}: ffmpeg process is not running. exiting capture thread..." f"{config.name}: ffmpeg process is not running. exiting capture thread..."
) )
frame_manager.delete(frame_name) frame_manager.delete(frame_name)
break break
@ -137,12 +149,13 @@ def capture_frames(
try: try:
# add to the queue # add to the queue
frame_queue.put(current_frame.value, False) frame_queue.put(current_frame.value, False)
# close the frame
frame_manager.close(frame_name)
except queue.Full: except queue.Full:
# if the queue is full, skip this frame # if the queue is full, skip this frame
skipped_eps.update() skipped_eps.update()
frame_manager.delete(frame_name)
# clear out frames
for frame in shm_frames:
frame_manager.delete(frame)
class CameraWatchdog(threading.Thread): class CameraWatchdog(threading.Thread):
@ -282,7 +295,7 @@ class CameraWatchdog(threading.Thread):
) )
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture( self.capture_thread = CameraCapture(
self.camera_name, self.config,
self.ffmpeg_detect_process, self.ffmpeg_detect_process,
self.frame_shape, self.frame_shape,
self.frame_queue, self.frame_queue,
@ -321,7 +334,7 @@ class CameraWatchdog(threading.Thread):
class CameraCapture(threading.Thread): class CameraCapture(threading.Thread):
def __init__( def __init__(
self, self,
camera_name, config: CameraConfig,
ffmpeg_process, ffmpeg_process,
frame_shape, frame_shape,
frame_queue, frame_queue,
@ -330,8 +343,8 @@ class CameraCapture(threading.Thread):
stop_event, stop_event,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = f"capture:{camera_name}" self.name = f"capture:{config.name}"
self.camera_name = camera_name self.config = config
self.frame_shape = frame_shape self.frame_shape = frame_shape
self.frame_queue = frame_queue self.frame_queue = frame_queue
self.fps = fps self.fps = fps
@ -345,7 +358,7 @@ class CameraCapture(threading.Thread):
def run(self): def run(self):
capture_frames( capture_frames(
self.ffmpeg_process, self.ffmpeg_process,
self.camera_name, self.config,
self.frame_shape, self.frame_shape,
self.frame_manager, self.frame_manager,
self.frame_queue, self.frame_queue,