Restart record process if segments stop being written. (#4604)

* Catch when recording segments are not being written to cache and restart ffmpeg responsible for record

* Ensure this check is only run for role with record

* Fix formatting

* Redo recordings validator to watch segments time and restart if no segment for 30 seconds

* Formatting

* Increase wait time to 120 seconds and improve error message

* Add more config checks for record args and add test

* Formatting

* Specify output args.
This commit is contained in:
Nicolas Mowen 2022-12-08 20:03:54 -07:00 committed by GitHub
parent 964bcc0733
commit cd9f6b074e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 3 deletions

View File

@ -843,6 +843,26 @@ def verify_recording_retention(camera_config: CameraConfig) -> None:
) )
def verify_recording_segments_setup_with_reasonable_time(
camera_config: CameraConfig,
) -> None:
"""Verify that recording segments are setup and segment time is not greater than 60."""
record_args: list[str] = get_ffmpeg_arg_list(
camera_config.ffmpeg.output_args.record
)
seg_arg_index = record_args.index("-segment_time")
if seg_arg_index < 0:
raise ValueError(
f"Camera {camera_config.name} has no segment_time in recording output args, segment args are required for record."
)
if int(record_args[seg_arg_index + 1]) > 60:
raise ValueError(
f"Camera {camera_config.name} has invalid segment_time output arg, segment_time must be 60 or less."
)
def verify_zone_objects_are_tracked(camera_config: CameraConfig) -> None: def verify_zone_objects_are_tracked(camera_config: CameraConfig) -> None:
"""Verify that user has not entered zone objects that are not in the tracking config.""" """Verify that user has not entered zone objects that are not in the tracking config."""
for zone_name, zone in camera_config.zones.items(): for zone_name, zone in camera_config.zones.items():
@ -997,6 +1017,7 @@ class FrigateConfig(FrigateBaseModel):
verify_config_roles(camera_config) verify_config_roles(camera_config)
verify_old_retain_config(camera_config) verify_old_retain_config(camera_config)
verify_recording_retention(camera_config) verify_recording_retention(camera_config)
verify_recording_segments_setup_with_reasonable_time(camera_config)
verify_zone_objects_are_tracked(camera_config) verify_zone_objects_are_tracked(camera_config)
if camera_config.rtmp.enabled: if camera_config.rtmp.enabled:

View File

@ -1426,6 +1426,33 @@ class TestConfig(unittest.TestCase):
ValidationError, lambda: frigate_config.runtime_config.cameras ValidationError, lambda: frigate_config.runtime_config.cameras
) )
def test_fails_on_bad_segment_time(self):
config = {
"mqtt": {"host": "mqtt"},
"record": {"enabled": True},
"cameras": {
"back": {
"ffmpeg": {
"output_args": {
"record": "-f segment -segment_time 70 -segment_format mp4 -reset_timestamps 1 -strftime 1 -c copy -an"
},
"inputs": [
{
"path": "rtsp://10.0.0.1:554/video",
"roles": ["detect"],
},
],
},
}
},
}
frigate_config = FrigateConfig(**config)
self.assertRaises(
ValueError, lambda: frigate_config.runtime_config.ffmpeg.output_args.record
)
def test_fails_zone_defines_untracked_object(self): def test_fails_zone_defines_untracked_object(self):
config = { config = {
"mqtt": {"host": "mqtt"}, "mqtt": {"host": "mqtt"},

View File

@ -1,7 +1,7 @@
import datetime import datetime
import itertools
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import os
import queue import queue
import random import random
import signal import signal
@ -15,6 +15,7 @@ import cv2
from setproctitle import setproctitle from setproctitle import setproctitle
from frigate.config import CameraConfig, DetectConfig, PixelFormatEnum from frigate.config import CameraConfig, DetectConfig, PixelFormatEnum
from frigate.const import CACHE_DIR
from frigate.object_detection import RemoteObjectDetector from frigate.object_detection import RemoteObjectDetector
from frigate.log import LogPipe from frigate.log import LogPipe
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
@ -203,7 +204,13 @@ def capture_frames(
class CameraWatchdog(threading.Thread): class CameraWatchdog(threading.Thread):
def __init__( def __init__(
self, camera_name, config, frame_queue, camera_fps, ffmpeg_pid, stop_event self,
camera_name,
config: CameraConfig,
frame_queue,
camera_fps,
ffmpeg_pid,
stop_event,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.logger = logging.getLogger(f"watchdog.{camera_name}") self.logger = logging.getLogger(f"watchdog.{camera_name}")
@ -212,7 +219,7 @@ class CameraWatchdog(threading.Thread):
self.capture_thread = None self.capture_thread = None
self.ffmpeg_detect_process = None self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
self.ffmpeg_other_processes = [] self.ffmpeg_other_processes: list[dict[str, any]] = []
self.camera_fps = camera_fps self.camera_fps = camera_fps
self.ffmpeg_pid = ffmpeg_pid self.ffmpeg_pid = ffmpeg_pid
self.frame_queue = frame_queue self.frame_queue = frame_queue
@ -232,6 +239,7 @@ class CameraWatchdog(threading.Thread):
self.ffmpeg_other_processes.append( self.ffmpeg_other_processes.append(
{ {
"cmd": c["cmd"], "cmd": c["cmd"],
"roles": c["roles"],
"logpipe": logpipe, "logpipe": logpipe,
"process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe), "process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe),
} }
@ -267,8 +275,33 @@ class CameraWatchdog(threading.Thread):
for p in self.ffmpeg_other_processes: for p in self.ffmpeg_other_processes:
poll = p["process"].poll() poll = p["process"].poll()
if self.config.record.enabled and "record" in p["roles"]:
latest_segment_time = self.get_latest_segment_timestamp(
p.get(
"latest_segment_time", datetime.datetime.now().timestamp()
)
)
if datetime.datetime.now().timestamp() > (
latest_segment_time + 120
):
self.logger.error(
f"No new recording segments were created for {self.camera_name} in the last 120s. restarting the ffmpeg record process..."
)
p["process"] = start_or_restart_ffmpeg(
p["cmd"],
self.logger,
p["logpipe"],
ffmpeg_process=p["process"],
)
continue
else:
p["latest_segment_time"] = latest_segment_time
if poll is None: if poll is None:
continue continue
p["logpipe"].dump() p["logpipe"].dump()
p["process"] = start_or_restart_ffmpeg( p["process"] = start_or_restart_ffmpeg(
p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
@ -297,6 +330,29 @@ class CameraWatchdog(threading.Thread):
) )
self.capture_thread.start() self.capture_thread.start()
def get_latest_segment_timestamp(self, latest_timestamp) -> int:
"""Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("clip_")
]
)
newest_segment_timestamp = latest_timestamp
for file in cache_files:
if self.camera_name in file:
basename = os.path.splitext(file)[0]
_, date = basename.rsplit("-", maxsplit=1)
ts = datetime.datetime.strptime(date, "%Y%m%d%H%M%S").timestamp()
if ts > newest_segment_timestamp:
newest_segment_timestamp = ts
return newest_segment_timestamp
class CameraCapture(threading.Thread): class CameraCapture(threading.Thread):
def __init__(self, camera_name, ffmpeg_process, frame_shape, frame_queue, fps): def __init__(self, camera_name, ffmpeg_process, frame_shape, frame_queue, fps):