Refactor Audio Events To Use stdout Pipe (#7291)

* Cleanup audio detection

* Read audio frames from ffmpeg process directly

* Handle case where process has stopped
This commit is contained in:
Nicolas Mowen 2023-07-26 04:51:45 -06:00 committed by GitHub
parent a96a951e23
commit 9016a48dc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,9 +3,9 @@
import datetime import datetime
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import os
import signal import signal
import threading import threading
import time
from types import FrameType from types import FrameType
from typing import Optional, Tuple from typing import Optional, Tuple
@ -21,7 +21,6 @@ from frigate.const import (
AUDIO_MAX_BIT_RANGE, AUDIO_MAX_BIT_RANGE,
AUDIO_MIN_CONFIDENCE, AUDIO_MIN_CONFIDENCE,
AUDIO_SAMPLE_RATE, AUDIO_SAMPLE_RATE,
CACHE_DIR,
FRIGATE_LOCALHOST, FRIGATE_LOCALHOST,
) )
from frigate.ffmpeg_presets import parse_preset_input from frigate.ffmpeg_presets import parse_preset_input
@ -40,12 +39,12 @@ except ModuleNotFoundError:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> list[str]: def get_ffmpeg_command(input_args: list[str], input_path: str) -> list[str]:
return get_ffmpeg_arg_list( return get_ffmpeg_arg_list(
f"ffmpeg {{}} -i {{}} -f {AUDIO_FORMAT} -ar {AUDIO_SAMPLE_RATE} -ac 1 -y {{}}".format( f"ffmpeg {{}} -i {{}} -f {AUDIO_FORMAT} -ar {AUDIO_SAMPLE_RATE} -ac 1 -y {{}}".format(
" ".join(input_args), " ".join(input_args),
input_path, input_path,
pipe, "pipe:",
) )
) )
@ -168,14 +167,12 @@ class AudioEventMaintainer(threading.Thread):
self.detector = AudioTfl(stop_event) self.detector = AudioTfl(stop_event)
self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),) self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),)
self.chunk_size = int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE * 2)) self.chunk_size = int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE * 2))
self.pipe = f"{CACHE_DIR}/{self.config.name}-audio" self.logger = logging.getLogger(f"audio.{self.config.name}")
self.ffmpeg_cmd = get_ffmpeg_command( self.ffmpeg_cmd = get_ffmpeg_command(
get_ffmpeg_arg_list(self.config.ffmpeg.global_args) get_ffmpeg_arg_list(self.config.ffmpeg.global_args)
+ parse_preset_input("preset-rtsp-audio-only", 1), + parse_preset_input("preset-rtsp-audio-only", 1),
[i.path for i in self.config.ffmpeg.inputs if "audio" in i.roles][0], [i.path for i in self.config.ffmpeg.inputs if "audio" in i.roles][0],
self.pipe,
) )
self.pipe_file = None
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio") self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio")
self.audio_listener = None self.audio_listener = None
@ -274,37 +271,52 @@ class AudioEventMaintainer(threading.Thread):
if resp.status_code == 200: if resp.status_code == 200:
self.detections[detection["label"]] = None self.detections[detection["label"]] = None
else: else:
logger.warn( self.logger.warn(
f"Failed to end audio event {detection['id']} with status code {resp.status_code}" f"Failed to end audio event {detection['id']} with status code {resp.status_code}"
) )
def restart_audio_pipe(self) -> None: def start_or_restart_ffmpeg(self) -> None:
try:
os.mkfifo(self.pipe)
except FileExistsError:
pass
self.audio_listener = start_or_restart_ffmpeg( self.audio_listener = start_or_restart_ffmpeg(
self.ffmpeg_cmd, logger, self.logpipe, None, self.audio_listener self.ffmpeg_cmd,
self.logger,
self.logpipe,
self.chunk_size,
self.audio_listener,
) )
def read_audio(self) -> None: def read_audio(self) -> None:
if self.pipe_file is None: def log_and_restart() -> None:
self.pipe_file = open(self.pipe, "rb") if self.stop_event.is_set():
return
time.sleep(self.config.ffmpeg.retry_interval)
self.logpipe.dump()
self.start_or_restart_ffmpeg()
try: try:
audio = np.frombuffer(self.pipe_file.read(self.chunk_size), dtype=np.int16) chunk = self.audio_listener.stdout.read(self.chunk_size)
if not chunk:
if self.audio_listener.poll() is not None:
self.logger.error("ffmpeg process is not running, restarting...")
log_and_restart()
return
return
audio = np.frombuffer(chunk, dtype=np.int16)
self.detect_audio(audio) self.detect_audio(audio)
except BrokenPipeError: except Exception:
self.logpipe.dump() self.logger.error(
self.restart_audio_pipe() "Error reading audio data from ffmpeg process, restarting..."
)
log_and_restart()
def run(self) -> None: def run(self) -> None:
self.restart_audio_pipe() self.start_or_restart_ffmpeg()
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self.read_audio() self.read_audio()
self.pipe_file.close() stop_ffmpeg(self.audio_listener, self.logger)
stop_ffmpeg(self.audio_listener, logger)
self.logpipe.close() self.logpipe.close()