blakeblackshear.frigate/frigate/events/audio.py
Martin Weinelt 4d4d54d030
Fix various typing issues (#18187)
* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
2025-05-13 08:27:20 -06:00

424 lines
14 KiB
Python

"""Handle creating audio events."""
import datetime
import logging
import random
import string
import threading
import time
from typing import Any, Tuple
import numpy as np
import frigate.util as util
from frigate.camera import CameraMetrics
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig
from frigate.const import (
AUDIO_DURATION,
AUDIO_FORMAT,
AUDIO_MAX_BIT_RANGE,
AUDIO_MIN_CONFIDENCE,
AUDIO_SAMPLE_RATE,
)
from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe
from frigate.object_detection.base import load_labels
from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg
try:
from tflite_runtime.interpreter import Interpreter
except ModuleNotFoundError:
from tensorflow.lite.python.interpreter import Interpreter
def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]:
ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0]
input_args = get_ffmpeg_arg_list(ffmpeg.global_args) + (
parse_preset_input(ffmpeg_input.input_args, 1)
or get_ffmpeg_arg_list(ffmpeg_input.input_args)
or parse_preset_input(ffmpeg.input_args, 1)
or get_ffmpeg_arg_list(ffmpeg.input_args)
)
return (
[ffmpeg.ffmpeg_path, "-vn", "-threads", "1"]
+ input_args
+ ["-i"]
+ [ffmpeg_input.path]
+ [
"-threads",
"1",
"-f",
f"{AUDIO_FORMAT}",
"-ar",
f"{AUDIO_SAMPLE_RATE}",
"-ac",
"1",
"-y",
"pipe:",
]
)
class AudioProcessor(util.Process):
name = "frigate.audio_manager"
def __init__(
self,
cameras: list[CameraConfig],
camera_metrics: dict[str, CameraMetrics],
):
super().__init__(name="frigate.audio_manager", daemon=True)
self.camera_metrics = camera_metrics
self.cameras = cameras
def run(self) -> None:
audio_threads: list[AudioEventMaintainer] = []
threading.current_thread().name = "process:audio_manager"
if len(self.cameras) == 0:
return
for camera in self.cameras:
audio_thread = AudioEventMaintainer(
camera,
self.camera_metrics,
self.stop_event,
)
audio_threads.append(audio_thread)
audio_thread.start()
self.logger.info(f"Audio processor started (pid: {self.pid})")
while not self.stop_event.wait():
pass
for thread in audio_threads:
thread.join(1)
if thread.is_alive():
self.logger.info(f"Waiting for thread {thread.name:s} to exit")
thread.join(10)
for thread in audio_threads:
if thread.is_alive():
self.logger.warning(f"Thread {thread.name} is still alive")
self.logger.info("Exiting audio processor")
class AudioEventMaintainer(threading.Thread):
def __init__(
self,
camera: CameraConfig,
camera_metrics: dict[str, CameraMetrics],
stop_event: threading.Event,
) -> None:
super().__init__(name=f"{camera.name}_audio_event_processor")
self.config = camera
self.camera_metrics = camera_metrics
self.detections: dict[dict[str, Any]] = {}
self.stop_event = stop_event
self.detector = AudioTfl(stop_event, self.config.audio.num_threads)
self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),)
self.chunk_size = int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE * 2))
self.logger = logging.getLogger(f"audio.{self.config.name}")
self.ffmpeg_cmd = get_ffmpeg_command(self.config.ffmpeg)
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio")
self.audio_listener = None
# create communication for audio detections
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}")
self.enabled_subscriber = ConfigSubscriber(
f"config/enabled/{camera.name}", True
)
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio)
self.event_metadata_publisher = EventMetadataPublisher()
self.was_enabled = camera.enabled
def detect_audio(self, audio) -> None:
if not self.config.audio.enabled or self.stop_event.is_set():
return
audio_as_float = audio.astype(np.float32)
rms, dBFS = self.calculate_audio_levels(audio_as_float)
self.camera_metrics[self.config.name].audio_rms.value = rms
self.camera_metrics[self.config.name].audio_dBFS.value = dBFS
# only run audio detection when volume is above min_volume
if rms >= self.config.audio.min_volume:
# create waveform relative to max range and look for detections
waveform = (audio / AUDIO_MAX_BIT_RANGE).astype(np.float32)
model_detections = self.detector.detect(waveform)
audio_detections = []
for label, score, _ in model_detections:
self.logger.debug(
f"{self.config.name} heard {label} with a score of {score}"
)
if label not in self.config.audio.listen:
continue
if score > dict((self.config.audio.filters or {}).get(label, {})).get(
"threshold", 0.8
):
self.handle_detection(label, score)
audio_detections.append(label)
# send audio detection data
self.detection_publisher.publish(
(
self.config.name,
datetime.datetime.now().timestamp(),
dBFS,
audio_detections,
)
)
self.expire_detections()
def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]:
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
rms = np.sqrt(np.mean(np.absolute(np.square(audio_as_float))))
# Transform RMS to dBFS (decibels relative to full scale)
if rms > 0:
dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE)
else:
dBFS = 0
self.requestor.send_data(f"{self.config.name}/audio/dBFS", float(dBFS))
self.requestor.send_data(f"{self.config.name}/audio/rms", float(rms))
return float(rms), float(dBFS)
def handle_detection(self, label: str, score: float) -> None:
if self.detections.get(label):
self.detections[label]["last_detection"] = (
datetime.datetime.now().timestamp()
)
else:
now = datetime.datetime.now().timestamp()
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON")
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
self.config.name,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
),
)
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": now,
}
def expire_detections(self) -> None:
now = datetime.datetime.now().timestamp()
for detection in self.detections.values():
if not detection:
continue
if (
now - detection.get("last_detection", now)
> self.config.audio.max_not_heard
):
self.requestor.send_data(
f"{self.config.name}/audio/{detection['label']}", "OFF"
)
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], detection["last_detection"]),
)
self.detections[detection["label"]] = None
def expire_all_detections(self) -> None:
"""Immediately end all current detections"""
now = datetime.datetime.now().timestamp()
for label, detection in list(self.detections.items()):
if detection:
self.requestor.send_data(f"{self.config.name}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], now),
)
self.detections[label] = None
def start_or_restart_ffmpeg(self) -> None:
self.audio_listener = start_or_restart_ffmpeg(
self.ffmpeg_cmd,
self.logger,
self.logpipe,
self.chunk_size,
self.audio_listener,
)
def read_audio(self) -> None:
def log_and_restart() -> None:
if self.stop_event.is_set():
return
time.sleep(self.config.ffmpeg.retry_interval)
self.logpipe.dump()
self.start_or_restart_ffmpeg()
try:
chunk = self.audio_listener.stdout.read(self.chunk_size)
if not chunk:
if self.audio_listener.poll() is not None:
self.logger.error("ffmpeg process is not running, restarting...")
log_and_restart()
return
return
audio = np.frombuffer(chunk, dtype=np.int16)
self.detect_audio(audio)
except Exception as e:
self.logger.error(f"Error reading audio data from ffmpeg process: {e}")
log_and_restart()
def _update_enabled_state(self) -> bool:
"""Fetch the latest config and update enabled state."""
_, config_data = self.enabled_subscriber.check_for_update()
if config_data:
self.config.enabled = config_data.enabled
return config_data.enabled
return self.config.enabled
def run(self) -> None:
if self._update_enabled_state():
self.start_or_restart_ffmpeg()
while not self.stop_event.is_set():
enabled = self._update_enabled_state()
if enabled != self.was_enabled:
if enabled:
self.logger.debug(
f"Enabling audio detections for {self.config.name}"
)
self.start_or_restart_ffmpeg()
else:
self.logger.debug(
f"Disabling audio detections for {self.config.name}, ending events"
)
self.expire_all_detections()
stop_ffmpeg(self.audio_listener, self.logger)
self.audio_listener = None
self.was_enabled = enabled
continue
if not enabled:
time.sleep(0.1)
continue
# check if there is an updated config
(
updated_topic,
updated_audio_config,
) = self.config_subscriber.check_for_update()
if updated_topic:
self.config.audio = updated_audio_config
self.read_audio()
if self.audio_listener:
stop_ffmpeg(self.audio_listener, self.logger)
self.logpipe.close()
self.requestor.stop()
self.config_subscriber.stop()
self.enabled_subscriber.stop()
self.detection_publisher.stop()
class AudioTfl:
def __init__(self, stop_event: threading.Event, num_threads=2):
self.stop_event = stop_event
self.num_threads = num_threads
self.labels = load_labels("/audio-labelmap.txt", prefill=521)
self.interpreter = Interpreter(
model_path="/cpu_audio_model.tflite",
num_threads=self.num_threads,
)
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def _detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
detections = np.zeros((20, 6), np.float32)
res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0]
non_zero_indices = res > 0
class_ids = np.argpartition(-res, 20)[:20]
class_ids = class_ids[np.argsort(-res[class_ids])]
class_ids = class_ids[non_zero_indices[class_ids]]
scores = res[class_ids]
boxes = np.full((scores.shape[0], 4), -1, np.float32)
count = len(scores)
for i in range(count):
if scores[i] < AUDIO_MIN_CONFIDENCE or i == 20:
break
detections[i] = [
class_ids[i],
float(scores[i]),
boxes[i][0],
boxes[i][1],
boxes[i][2],
boxes[i][3],
]
return detections
def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE):
detections = []
if self.stop_event.is_set():
return detections
raw_detections = self._detect_raw(tensor_input)
for d in raw_detections:
if d[1] < threshold:
break
detections.append(
(self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
)
return detections