"""Manages ffmpeg processes for camera frame capture.""" import logging import queue import subprocess as sp import threading import time from collections import deque from datetime import datetime, timedelta, timezone from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent from typing import Any from frigate.camera import CameraMetrics from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.recordings_updater import ( RecordingsDataSubscriber, RecordingsDataTypeEnum, ) from frigate.config import CameraConfig, LoggerConfig from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, ) from frigate.const import PROCESS_PRIORITY_HIGH from frigate.log import LogPipe from frigate.util.builtin import EventsPerSecond from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg from frigate.util.image import ( FrameManager, SharedMemoryFrameManager, ) from frigate.util.process import FrigateProcess logger = logging.getLogger(__name__) def capture_frames( ffmpeg_process: sp.Popen[Any], config: CameraConfig, shm_frame_count: int, frame_index: int, frame_shape: tuple[int, int], frame_manager: FrameManager, frame_queue, fps: Value, skipped_fps: Value, current_frame: Value, stop_event: MpEvent, ) -> None: frame_size = frame_shape[0] * frame_shape[1] frame_rate = EventsPerSecond() frame_rate.start() skipped_eps = EventsPerSecond() skipped_eps.start() config_subscriber = CameraConfigUpdateSubscriber( None, {config.name: config}, [CameraConfigUpdateEnum.enabled] ) def get_enabled_state(): """Fetch the latest enabled state from ZMQ.""" config_subscriber.check_for_updates() return config.enabled try: while not stop_event.is_set(): if not get_enabled_state(): logger.debug(f"Stopping capture thread for disabled {config.name}") break fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() current_frame.value = datetime.now().timestamp() frame_name = f"{config.name}_frame{frame_index}" frame_buffer = frame_manager.write(frame_name) try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) except Exception: # shutdown has been initiated if stop_event.is_set(): break logger.error( f"{config.name}: Unable to read frames from ffmpeg process." ) if ffmpeg_process.poll() is not None: logger.error( f"{config.name}: ffmpeg process is not running. exiting capture thread..." ) break continue frame_rate.update() # don't lock the queue to check, just try since it should rarely be full try: # add to the queue frame_queue.put((frame_name, current_frame.value), False) frame_manager.close(frame_name) except queue.Full: # if the queue is full, skip this frame skipped_eps.update() frame_index = 0 if frame_index == shm_frame_count - 1 else frame_index + 1 finally: config_subscriber.stop() class CameraWatchdog(threading.Thread): def __init__( self, config: CameraConfig, shm_frame_count: int, frame_queue: Queue, camera_fps, skipped_fps, ffmpeg_pid, stalls, reconnects, detection_frame, stop_event, ): threading.Thread.__init__(self) self.logger = logging.getLogger(f"watchdog.{config.name}") self.config = config self.shm_frame_count = shm_frame_count self.capture_thread = None self.ffmpeg_detect_process = None self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.detect") self.ffmpeg_other_processes: list[dict[str, Any]] = [] self.camera_fps = camera_fps self.skipped_fps = skipped_fps self.ffmpeg_pid = ffmpeg_pid self.frame_queue = frame_queue self.frame_shape = self.config.frame_shape_yuv self.frame_size = self.frame_shape[0] * self.frame_shape[1] self.fps_overflow_count = 0 self.frame_index = 0 self.stop_event = stop_event self.sleeptime = self.config.ffmpeg.retry_interval self.reconnect_timestamps = deque() self.stalls = stalls self.reconnects = reconnects self.detection_frame = detection_frame self.config_subscriber = CameraConfigUpdateSubscriber( None, {config.name: config}, [ CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.ffmpeg, CameraConfigUpdateEnum.record, ], ) self.requestor = InterProcessRequestor() self.was_enabled = self.config.enabled self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all) self.latest_valid_segment_time: float = 0 self.latest_invalid_segment_time: float = 0 self.latest_cache_segment_time: float = 0 self.record_enable_time: datetime | None = None # Stall tracking (based on last processed frame) self._stall_timestamps: deque[float] = deque() self._stall_active: bool = False # Status caching to reduce message volume self._last_detect_status: str | None = None self._last_record_status: str | None = None self._last_status_update_time: float = 0.0 def _send_detect_status(self, status: str, now: float) -> None: """Send detect status only if changed or retry_interval has elapsed.""" if ( status != self._last_detect_status or (now - self._last_status_update_time) >= self.sleeptime ): self.requestor.send_data(f"{self.config.name}/status/detect", status) self._last_detect_status = status self._last_status_update_time = now def _send_record_status(self, status: str, now: float) -> None: """Send record status only if changed or retry_interval has elapsed.""" if ( status != self._last_record_status or (now - self._last_status_update_time) >= self.sleeptime ): self.requestor.send_data(f"{self.config.name}/status/record", status) self._last_record_status = status self._last_status_update_time = now def _check_config_updates(self) -> dict[str, list[str]]: """Check for config updates and return the update dict.""" return self.config_subscriber.check_for_updates() def _update_enabled_state(self) -> bool: """Fetch the latest config and update enabled state.""" self._check_config_updates() return self.config.enabled def reset_capture_thread( self, terminate: bool = True, drain_output: bool = True ) -> None: if terminate: self.ffmpeg_detect_process.terminate() try: self.logger.info("Waiting for ffmpeg to exit gracefully...") if drain_output: self.ffmpeg_detect_process.communicate(timeout=30) else: self.ffmpeg_detect_process.wait(timeout=30) except sp.TimeoutExpired: self.logger.info("FFmpeg did not exit. Force killing...") self.ffmpeg_detect_process.kill() if drain_output: self.ffmpeg_detect_process.communicate() else: self.ffmpeg_detect_process.wait() # Update reconnects now = datetime.now().timestamp() self.reconnect_timestamps.append(now) while self.reconnect_timestamps and self.reconnect_timestamps[0] < now - 3600: self.reconnect_timestamps.popleft() if self.reconnects: self.reconnects.value = len(self.reconnect_timestamps) # Wait for old capture thread to fully exit before starting a new one if self.capture_thread is not None and self.capture_thread.is_alive(): self.logger.info("Waiting for capture thread to exit...") self.capture_thread.join(timeout=5) if self.capture_thread.is_alive(): self.logger.warning( f"Capture thread for {self.config.name} did not exit in time" ) self.logger.error( "The following ffmpeg logs include the last 100 lines prior to exit." ) self.logpipe.dump() self.logger.info("Restarting ffmpeg...") self.start_ffmpeg_detect() def run(self) -> None: if self._update_enabled_state(): self.start_all_ffmpeg() # If recording is enabled at startup, set the grace period timer if self.config.record.enabled: self.record_enable_time = datetime.now().astimezone(timezone.utc) time.sleep(self.sleeptime) last_restart_time = datetime.now().timestamp() # 1 second watchdog loop while not self.stop_event.wait(1): updates = self._check_config_updates() # Handle ffmpeg config changes by restarting all ffmpeg processes if "ffmpeg" in updates and self.config.enabled: self.logger.debug( "FFmpeg config updated for %s, restarting ffmpeg processes", self.config.name, ) self.stop_all_ffmpeg() self.start_all_ffmpeg() self.latest_valid_segment_time = 0 self.latest_invalid_segment_time = 0 self.latest_cache_segment_time = 0 self.record_enable_time = datetime.now().astimezone(timezone.utc) last_restart_time = datetime.now().timestamp() continue enabled = self.config.enabled if enabled != self.was_enabled: if enabled: self.logger.debug(f"Enabling camera {self.config.name}") self.start_all_ffmpeg() # reset all timestamps and record the enable time for grace period self.latest_valid_segment_time = 0 self.latest_invalid_segment_time = 0 self.latest_cache_segment_time = 0 self.record_enable_time = datetime.now().astimezone(timezone.utc) else: self.logger.debug(f"Disabling camera {self.config.name}") self.stop_all_ffmpeg() self.record_enable_time = None # update camera status now = datetime.now().timestamp() self._send_detect_status("disabled", now) self._send_record_status("disabled", now) self.was_enabled = enabled continue if not enabled: continue while True: update = self.segment_subscriber.check_for_update(timeout=0) if update == (None, None): break raw_topic, payload = update if raw_topic and payload: topic = str(raw_topic) camera, segment_time, _ = payload if camera != self.config.name: continue if topic.endswith(RecordingsDataTypeEnum.valid.value): self.logger.debug( f"Latest valid recording segment time on {camera}: {segment_time}" ) self.latest_valid_segment_time = segment_time elif topic.endswith(RecordingsDataTypeEnum.invalid.value): self.logger.warning( f"Invalid recording segment detected for {camera} at {segment_time}" ) self.latest_invalid_segment_time = segment_time elif topic.endswith(RecordingsDataTypeEnum.latest.value): if segment_time is not None: self.latest_cache_segment_time = segment_time else: self.latest_cache_segment_time = 0 now = datetime.now().timestamp() # Check if enough time has passed to allow ffmpeg restart (backoff pacing) time_since_last_restart = now - last_restart_time can_restart = time_since_last_restart >= self.sleeptime if not self.capture_thread.is_alive(): self._send_detect_status("offline", now) self.camera_fps.value = 0 self.logger.error( f"Ffmpeg process crashed unexpectedly for {self.config.name}." ) if can_restart: self.reset_capture_thread(terminate=False) last_restart_time = now elif self.camera_fps.value >= (self.config.detect.fps + 10): self.fps_overflow_count += 1 if self.fps_overflow_count == 3: self._send_detect_status("offline", now) self.fps_overflow_count = 0 self.camera_fps.value = 0 self.logger.info( f"{self.config.name} exceeded fps limit. Exiting ffmpeg..." ) if can_restart: self.reset_capture_thread(drain_output=False) last_restart_time = now elif now - self.capture_thread.current_frame.value > 20: self._send_detect_status("offline", now) self.camera_fps.value = 0 self.logger.info( f"No frames received from {self.config.name} in 20 seconds. Exiting ffmpeg..." ) if can_restart: self.reset_capture_thread() last_restart_time = now else: # process is running normally self._send_detect_status("online", now) self.fps_overflow_count = 0 for p in self.ffmpeg_other_processes: poll = p["process"].poll() if self.config.record.enabled and "record" in p["roles"]: now_utc = datetime.now().astimezone(timezone.utc) # Check if we're within the grace period after enabling recording # Grace period: 90 seconds allows time for ffmpeg to start and create first segment in_grace_period = self.record_enable_time is not None and ( now_utc - self.record_enable_time ) < timedelta(seconds=90) latest_cache_dt = ( datetime.fromtimestamp( self.latest_cache_segment_time, tz=timezone.utc ) if self.latest_cache_segment_time > 0 else now_utc - timedelta(seconds=1) ) latest_valid_dt = ( datetime.fromtimestamp( self.latest_valid_segment_time, tz=timezone.utc ) if self.latest_valid_segment_time > 0 else now_utc - timedelta(seconds=1) ) latest_invalid_dt = ( datetime.fromtimestamp( self.latest_invalid_segment_time, tz=timezone.utc ) if self.latest_invalid_segment_time > 0 else now_utc - timedelta(seconds=1) ) # ensure segments are still being created and that they have valid video data # Skip checks during grace period to allow segments to start being created cache_stale = not in_grace_period and now_utc > ( latest_cache_dt + timedelta(seconds=120) ) valid_stale = not in_grace_period and now_utc > ( latest_valid_dt + timedelta(seconds=120) ) invalid_stale_condition = ( self.latest_invalid_segment_time > 0 and not in_grace_period and now_utc > (latest_invalid_dt + timedelta(seconds=120)) and self.latest_valid_segment_time <= self.latest_invalid_segment_time ) invalid_stale = invalid_stale_condition if cache_stale or valid_stale or invalid_stale: if cache_stale: reason = "No new recording segments were created" elif valid_stale: reason = "No new valid recording segments were created" else: # invalid_stale reason = ( "No valid segments created since last invalid segment" ) self.logger.error( f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..." ) p["process"] = start_or_restart_ffmpeg( p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"], ) for role in p["roles"]: self.requestor.send_data( f"{self.config.name}/status/{role.value}", "offline" ) continue else: self._send_record_status("online", now) p["latest_segment_time"] = self.latest_cache_segment_time if poll is None: continue for role in p["roles"]: self.requestor.send_data( f"{self.config.name}/status/{role.value}", "offline" ) p["logpipe"].dump() p["process"] = start_or_restart_ffmpeg( p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] ) # Prune expired reconnect timestamps now = datetime.now().timestamp() while ( self.reconnect_timestamps and self.reconnect_timestamps[0] < now - 3600 ): self.reconnect_timestamps.popleft() if self.reconnects: self.reconnects.value = len(self.reconnect_timestamps) # Update stall metrics based on last processed frame timestamp processed_ts = ( float(self.detection_frame.value) if self.detection_frame else 0.0 ) if processed_ts > 0: delta = now - processed_ts observed_fps = ( self.camera_fps.value if self.camera_fps.value > 0 else self.config.detect.fps ) interval = 1.0 / max(observed_fps, 0.1) stall_threshold = max(2.0 * interval, 2.0) if delta > stall_threshold: if not self._stall_active: self._stall_timestamps.append(now) self._stall_active = True else: self._stall_active = False while self._stall_timestamps and self._stall_timestamps[0] < now - 3600: self._stall_timestamps.popleft() if self.stalls: self.stalls.value = len(self._stall_timestamps) self.stop_all_ffmpeg() self.logpipe.close() self.config_subscriber.stop() self.segment_subscriber.stop() def start_ffmpeg_detect(self): ffmpeg_cmd = [ c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"] ][0] self.ffmpeg_detect_process = start_or_restart_ffmpeg( ffmpeg_cmd, self.logger, self.logpipe, self.frame_size ) self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid self.capture_thread = CameraCaptureRunner( self.config, self.shm_frame_count, self.frame_index, self.ffmpeg_detect_process, self.frame_shape, self.frame_queue, self.camera_fps, self.skipped_fps, self.stop_event, ) self.capture_thread.start() def start_all_ffmpeg(self): """Start all ffmpeg processes (detection and others).""" logger.debug(f"Starting all ffmpeg processes for {self.config.name}") self.start_ffmpeg_detect() for c in self.config.ffmpeg_cmds: if "detect" in c["roles"]: continue logpipe = LogPipe( f"ffmpeg.{self.config.name}.{'_'.join(sorted(c['roles']))}" ) self.ffmpeg_other_processes.append( { "cmd": c["cmd"], "roles": c["roles"], "logpipe": logpipe, "process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe), } ) def stop_all_ffmpeg(self): """Stop all ffmpeg processes (detection and others).""" logger.debug(f"Stopping all ffmpeg processes for {self.config.name}") if self.capture_thread is not None and self.capture_thread.is_alive(): self.capture_thread.join(timeout=5) if self.capture_thread.is_alive(): self.logger.warning( f"Capture thread for {self.config.name} did not stop gracefully." ) if self.ffmpeg_detect_process is not None: stop_ffmpeg(self.ffmpeg_detect_process, self.logger) self.ffmpeg_detect_process = None for p in self.ffmpeg_other_processes[:]: if p["process"] is not None: stop_ffmpeg(p["process"], self.logger) p["logpipe"].close() self.ffmpeg_other_processes.clear() class CameraCaptureRunner(threading.Thread): def __init__( self, config: CameraConfig, shm_frame_count: int, frame_index: int, ffmpeg_process, frame_shape: tuple[int, int], frame_queue: Queue, fps: Value, skipped_fps: Value, stop_event: MpEvent, ): threading.Thread.__init__(self) self.name = f"capture:{config.name}" self.config = config self.shm_frame_count = shm_frame_count self.frame_index = frame_index self.frame_shape = frame_shape self.frame_queue = frame_queue self.fps = fps self.stop_event = stop_event self.skipped_fps = skipped_fps self.frame_manager = SharedMemoryFrameManager() self.ffmpeg_process = ffmpeg_process self.current_frame = Value("d", 0.0) self.last_frame = 0 def run(self): capture_frames( self.ffmpeg_process, self.config, self.shm_frame_count, self.frame_index, self.frame_shape, self.frame_manager, self.frame_queue, self.fps, self.skipped_fps, self.current_frame, self.stop_event, ) class CameraCapture(FrigateProcess): def __init__( self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics, stop_event: MpEvent, log_config: LoggerConfig | None = None, ) -> None: super().__init__( stop_event, PROCESS_PRIORITY_HIGH, name=f"frigate.capture:{config.name}", daemon=True, ) self.config = config self.shm_frame_count = shm_frame_count self.camera_metrics = camera_metrics self.log_config = log_config def run(self) -> None: self.pre_run_setup(self.log_config) camera_watchdog = CameraWatchdog( self.config, self.shm_frame_count, self.camera_metrics.frame_queue, self.camera_metrics.camera_fps, self.camera_metrics.skipped_fps, self.camera_metrics.ffmpeg_pid, self.camera_metrics.stalls_last_hour, self.camera_metrics.reconnects_last_hour, self.camera_metrics.detection_frame, self.stop_event, ) camera_watchdog.start() camera_watchdog.join()