From 9016a48dc7e8e2c8a946b53a240b11e832daaeae Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 26 Jul 2023 04:51:45 -0600 Subject: [PATCH] Refactor Audio Events To Use stdout Pipe (#7291) * Cleanup audio detection * Read audio frames from ffmpeg process directly * Handle case where process has stopped --- frigate/events/audio.py | 60 ++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 9fb134fc8..20a5f95f6 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -3,9 +3,9 @@ import datetime import logging import multiprocessing as mp -import os import signal import threading +import time from types import FrameType from typing import Optional, Tuple @@ -21,7 +21,6 @@ from frigate.const import ( AUDIO_MAX_BIT_RANGE, AUDIO_MIN_CONFIDENCE, AUDIO_SAMPLE_RATE, - CACHE_DIR, FRIGATE_LOCALHOST, ) from frigate.ffmpeg_presets import parse_preset_input @@ -40,12 +39,12 @@ except ModuleNotFoundError: logger = logging.getLogger(__name__) -def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> list[str]: +def get_ffmpeg_command(input_args: list[str], input_path: str) -> list[str]: return get_ffmpeg_arg_list( f"ffmpeg {{}} -i {{}} -f {AUDIO_FORMAT} -ar {AUDIO_SAMPLE_RATE} -ac 1 -y {{}}".format( " ".join(input_args), input_path, - pipe, + "pipe:", ) ) @@ -168,14 +167,12 @@ class AudioEventMaintainer(threading.Thread): self.detector = AudioTfl(stop_event) self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),) self.chunk_size = int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE * 2)) - self.pipe = f"{CACHE_DIR}/{self.config.name}-audio" + self.logger = logging.getLogger(f"audio.{self.config.name}") self.ffmpeg_cmd = get_ffmpeg_command( get_ffmpeg_arg_list(self.config.ffmpeg.global_args) + parse_preset_input("preset-rtsp-audio-only", 1), [i.path for i in self.config.ffmpeg.inputs if "audio" in i.roles][0], - self.pipe, ) - self.pipe_file = None self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio") self.audio_listener = None @@ -274,37 +271,52 @@ class AudioEventMaintainer(threading.Thread): if resp.status_code == 200: self.detections[detection["label"]] = None else: - logger.warn( + self.logger.warn( f"Failed to end audio event {detection['id']} with status code {resp.status_code}" ) - def restart_audio_pipe(self) -> None: - try: - os.mkfifo(self.pipe) - except FileExistsError: - pass - + def start_or_restart_ffmpeg(self) -> None: self.audio_listener = start_or_restart_ffmpeg( - self.ffmpeg_cmd, logger, self.logpipe, None, self.audio_listener + self.ffmpeg_cmd, + self.logger, + self.logpipe, + self.chunk_size, + self.audio_listener, ) def read_audio(self) -> None: - if self.pipe_file is None: - self.pipe_file = open(self.pipe, "rb") + def log_and_restart() -> None: + if self.stop_event.is_set(): + return + + time.sleep(self.config.ffmpeg.retry_interval) + self.logpipe.dump() + self.start_or_restart_ffmpeg() try: - audio = np.frombuffer(self.pipe_file.read(self.chunk_size), dtype=np.int16) + chunk = self.audio_listener.stdout.read(self.chunk_size) + + if not chunk: + if self.audio_listener.poll() is not None: + self.logger.error("ffmpeg process is not running, restarting...") + log_and_restart() + return + + return + + audio = np.frombuffer(chunk, dtype=np.int16) self.detect_audio(audio) - except BrokenPipeError: - self.logpipe.dump() - self.restart_audio_pipe() + except Exception: + self.logger.error( + "Error reading audio data from ffmpeg process, restarting..." + ) + log_and_restart() def run(self) -> None: - self.restart_audio_pipe() + self.start_or_restart_ffmpeg() while not self.stop_event.is_set(): self.read_audio() - self.pipe_file.close() - stop_ffmpeg(self.audio_listener, logger) + stop_ffmpeg(self.audio_listener, self.logger) self.logpipe.close()