blakeblackshear.frigate/frigate/events/audio.py
gtsiam 4bb420d049
Add service manager infrastructure (#14150)
* Add service manager infrastructure

The changes are (This will be a bit long):
- A ServiceManager class that spawns a background thread and deals with
  service lifecycle management. The idea is that service lifecycle code
  will run in async functions, so a single thread is enough to manage
  any (reasonable) amount of services.

- A Service class, that offers start(), stop() and restart() methods
  that simply notify the service manager to... well. Start, stop or
  restart a service.

(!) Warning: Note that this differs from mp.Process.start/stop in that
  the service commands are sent asynchronously and will complete
  "eventually". This is good because it means that business logic is
  fast when booting up and shutting down, but we need to make sure
  that code does not rely on start() and stop() being instant
  (Mainly pid assignments).

  Subclasses of the Service class should use the on_start and on_stop
  methods to monitor for service events. These will be run by the
  service manager thread, so we need to be careful not to block
  execution here. Standard async stuff.

(!) Note on service names: Service names should be unique within a
  ServiceManager. Make sure that you pass the name you want to
  super().__init__(name="...") if you plan to spawn multiple instances
  of a service.

- A ServiceProcess class: A Service that wraps a multiprocessing.Process
  into a Service. It offers a run() method subclasses can override and
  can support in-place restarting using the service manager.

And finally, I lied a bit about this whole thing using a single thread.
I can't find any way to run python multiprocessing in async, so there is
a MultiprocessingWaiter thread that waits for multiprocessing events and
notifies any pending futures. This was uhhh... fun? No, not really.
But it works. Using this part of the code just involves calling the
provided wait method. See the implementation of ServiceProcess for more
details.

Mirror util.Process hooks onto service process

Remove Service.__name attribute

Do not serialize process object on ServiceProcess start.

asd

* Update frigate dictionary

* Convert AudioProcessor to service process
2024-10-21 10:00:38 -05:00

362 lines
12 KiB
Python

"""Handle creating audio events."""
import datetime
import logging
import threading
import time
from typing import Tuple
import numpy as np
import requests
from frigate.camera import CameraMetrics
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig
from frigate.const import (
AUDIO_DURATION,
AUDIO_FORMAT,
AUDIO_MAX_BIT_RANGE,
AUDIO_MIN_CONFIDENCE,
AUDIO_SAMPLE_RATE,
FRIGATE_LOCALHOST,
)
from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe
from frigate.object_detection import load_labels
from frigate.service_manager import ServiceProcess
from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg
try:
from tflite_runtime.interpreter import Interpreter
except ModuleNotFoundError:
from tensorflow.lite.python.interpreter import Interpreter
def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]:
ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0]
input_args = get_ffmpeg_arg_list(ffmpeg.global_args) + (
parse_preset_input(ffmpeg_input.input_args, 1)
or get_ffmpeg_arg_list(ffmpeg_input.input_args)
or parse_preset_input(ffmpeg.input_args, 1)
or get_ffmpeg_arg_list(ffmpeg.input_args)
)
return (
[ffmpeg.ffmpeg_path, "-vn", "-threads", "1"]
+ input_args
+ ["-i"]
+ [ffmpeg_input.path]
+ [
"-threads",
"1",
"-f",
f"{AUDIO_FORMAT}",
"-ar",
f"{AUDIO_SAMPLE_RATE}",
"-ac",
"1",
"-y",
"pipe:",
]
)
class AudioProcessor(ServiceProcess):
name = "frigate.audio_manager"
def __init__(
self,
cameras: list[CameraConfig],
camera_metrics: dict[str, CameraMetrics],
):
super().__init__()
self.camera_metrics = camera_metrics
self.cameras = cameras
def run(self) -> None:
audio_threads: list[AudioEventMaintainer] = []
threading.current_thread().name = "process:audio_manager"
if len(self.cameras) == 0:
return
for camera in self.cameras:
audio_thread = AudioEventMaintainer(
camera,
self.camera_metrics,
self.stop_event,
)
audio_threads.append(audio_thread)
audio_thread.start()
self.logger.info(f"Audio processor started (pid: {self.pid})")
while not self.stop_event.wait():
pass
for thread in audio_threads:
thread.join(1)
if thread.is_alive():
self.logger.info(f"Waiting for thread {thread.name:s} to exit")
thread.join(10)
for thread in audio_threads:
if thread.is_alive():
self.logger.warning(f"Thread {thread.name} is still alive")
self.logger.info("Exiting audio processor")
class AudioEventMaintainer(threading.Thread):
def __init__(
self,
camera: CameraConfig,
camera_metrics: dict[str, CameraMetrics],
stop_event: threading.Event,
) -> None:
super().__init__(name=f"{camera.name}_audio_event_processor")
self.config = camera
self.camera_metrics = camera_metrics
self.detections: dict[dict[str, any]] = {}
self.stop_event = stop_event
self.detector = AudioTfl(stop_event, self.config.audio.num_threads)
self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),)
self.chunk_size = int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE * 2))
self.logger = logging.getLogger(f"audio.{self.config.name}")
self.ffmpeg_cmd = get_ffmpeg_command(self.config.ffmpeg)
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio")
self.audio_listener = None
# create communication for audio detections
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}")
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio)
def detect_audio(self, audio) -> None:
if not self.config.audio.enabled or self.stop_event.is_set():
return
audio_as_float = audio.astype(np.float32)
rms, dBFS = self.calculate_audio_levels(audio_as_float)
self.camera_metrics[self.config.name].audio_rms.value = rms
self.camera_metrics[self.config.name].audio_dBFS.value = dBFS
# only run audio detection when volume is above min_volume
if rms >= self.config.audio.min_volume:
# create waveform relative to max range and look for detections
waveform = (audio / AUDIO_MAX_BIT_RANGE).astype(np.float32)
model_detections = self.detector.detect(waveform)
audio_detections = []
for label, score, _ in model_detections:
self.logger.debug(
f"{self.config.name} heard {label} with a score of {score}"
)
if label not in self.config.audio.listen:
continue
if score > dict((self.config.audio.filters or {}).get(label, {})).get(
"threshold", 0.8
):
self.handle_detection(label, score)
audio_detections.append(label)
# send audio detection data
self.detection_publisher.publish(
(
self.config.name,
datetime.datetime.now().timestamp(),
dBFS,
audio_detections,
)
)
self.expire_detections()
def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]:
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
rms = np.sqrt(np.mean(np.absolute(np.square(audio_as_float))))
# Transform RMS to dBFS (decibels relative to full scale)
if rms > 0:
dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE)
else:
dBFS = 0
self.requestor.send_data(f"{self.config.name}/audio/dBFS", float(dBFS))
self.requestor.send_data(f"{self.config.name}/audio/rms", float(rms))
return float(rms), float(dBFS)
def handle_detection(self, label: str, score: float) -> None:
if self.detections.get(label):
self.detections[label]["last_detection"] = (
datetime.datetime.now().timestamp()
)
else:
self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON")
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{self.config.name}/{label}/create",
json={"duration": None, "score": score, "source_type": "audio"},
)
if resp.status_code == 200:
event_id = resp.json()["event_id"]
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": datetime.datetime.now().timestamp(),
}
def expire_detections(self) -> None:
now = datetime.datetime.now().timestamp()
for detection in self.detections.values():
if not detection:
continue
if (
now - detection.get("last_detection", now)
> self.config.audio.max_not_heard
):
self.requestor.send_data(
f"{self.config.name}/audio/{detection['label']}", "OFF"
)
resp = requests.put(
f"{FRIGATE_LOCALHOST}/api/events/{detection['id']}/end",
json={"end_time": detection["last_detection"]},
)
if resp.status_code == 200:
self.detections[detection["label"]] = None
else:
self.logger.warning(
f"Failed to end audio event {detection['id']} with status code {resp.status_code}"
)
def start_or_restart_ffmpeg(self) -> None:
self.audio_listener = start_or_restart_ffmpeg(
self.ffmpeg_cmd,
self.logger,
self.logpipe,
self.chunk_size,
self.audio_listener,
)
def read_audio(self) -> None:
def log_and_restart() -> None:
if self.stop_event.is_set():
return
time.sleep(self.config.ffmpeg.retry_interval)
self.logpipe.dump()
self.start_or_restart_ffmpeg()
try:
chunk = self.audio_listener.stdout.read(self.chunk_size)
if not chunk:
if self.audio_listener.poll() is not None:
self.logger.error("ffmpeg process is not running, restarting...")
log_and_restart()
return
return
audio = np.frombuffer(chunk, dtype=np.int16)
self.detect_audio(audio)
except Exception as e:
self.logger.error(f"Error reading audio data from ffmpeg process: {e}")
log_and_restart()
def run(self) -> None:
self.start_or_restart_ffmpeg()
while not self.stop_event.is_set():
# check if there is an updated config
(
updated_topic,
updated_audio_config,
) = self.config_subscriber.check_for_update()
if updated_topic:
self.config.audio = updated_audio_config
self.read_audio()
stop_ffmpeg(self.audio_listener, self.logger)
self.logpipe.close()
self.requestor.stop()
self.config_subscriber.stop()
self.detection_publisher.stop()
class AudioTfl:
def __init__(self, stop_event: threading.Event, num_threads=2):
self.stop_event = stop_event
self.num_threads = num_threads
self.labels = load_labels("/audio-labelmap.txt", prefill=521)
self.interpreter = Interpreter(
model_path="/cpu_audio_model.tflite",
num_threads=self.num_threads,
)
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def _detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
detections = np.zeros((20, 6), np.float32)
res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0]
non_zero_indices = res > 0
class_ids = np.argpartition(-res, 20)[:20]
class_ids = class_ids[np.argsort(-res[class_ids])]
class_ids = class_ids[non_zero_indices[class_ids]]
scores = res[class_ids]
boxes = np.full((scores.shape[0], 4), -1, np.float32)
count = len(scores)
for i in range(count):
if scores[i] < AUDIO_MIN_CONFIDENCE or i == 20:
break
detections[i] = [
class_ids[i],
float(scores[i]),
boxes[i][0],
boxes[i][1],
boxes[i][2],
boxes[i][3],
]
return detections
def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE):
detections = []
if self.stop_event.is_set():
return detections
raw_detections = self._detect_raw(tensor_input)
for d in raw_detections:
if d[1] < threshold:
break
detections.append(
(self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
)
return detections