From b909ff4ecb5374f99e06cd0b6b154b6477afb7b6 Mon Sep 17 00:00:00 2001 From: jpverdejo Date: Thu, 13 Jul 2023 19:52:33 -0500 Subject: [PATCH] Expose dBFS when doing audio analysis (#6979) * Expose dBFS when doing audio analysis * Implement metadata communicator * revert test changes * Reverting the tests changes. For real this time * Address feedback * Address feedback * Address feedback * Address feedback --- frigate/app.py | 19 +++++++++++++++- frigate/comms/dispatcher.py | 2 ++ frigate/comms/inter_process.py | 40 ++++++++++++++++++++++++++++++++++ frigate/events/audio.py | 30 ++++++++++++++++++++++--- 4 files changed, 87 insertions(+), 4 deletions(-) create mode 100644 frigate/comms/inter_process.py diff --git a/frigate/app.py b/frigate/app.py index 87a55aca2..b0ec223b4 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -18,6 +18,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.dispatcher import Communicator, Dispatcher +from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient from frigate.comms.ws import WebSocketClient from frigate.config import FrigateConfig @@ -226,6 +227,9 @@ class FrigateApp: # Queue for timeline events self.timeline_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) + # Queue for inter process communication + self.inter_process_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) + def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: db.execute_sql("VACUUM;") @@ -314,6 +318,11 @@ class FrigateApp: self.config, self.event_queue ) + def init_inter_process_communicator(self) -> None: + self.inter_process_communicator = InterProcessCommunicator( + self.inter_process_queue + ) + def init_web_server(self) -> None: self.flask_app = create_app( self.config, @@ -336,6 +345,8 @@ class FrigateApp: comms.append(MqttClient(self.config)) comms.append(WebSocketClient(self.config)) + comms.append(self.inter_process_communicator) + self.dispatcher = Dispatcher( self.config, self.onvif_controller, @@ -466,7 +477,11 @@ class FrigateApp: audio_process = mp.Process( target=listen_to_audio, name="audio_capture", - args=(self.config, self.feature_metrics), + args=( + self.config, + self.feature_metrics, + self.inter_process_communicator, + ), ) audio_process.daemon = True audio_process.start() @@ -559,6 +574,7 @@ class FrigateApp: self.init_recording_manager() self.init_go2rtc() self.bind_database() + self.init_inter_process_communicator() self.init_dispatcher() except Exception as e: print(e) @@ -630,6 +646,7 @@ class FrigateApp: self.detected_frames_queue, self.recordings_info_queue, self.log_queue, + self.inter_process_queue, ]: while not queue.empty(): queue.get_nowait() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index fc4561d6d..f2fe40c5d 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -86,6 +86,8 @@ class Dispatcher: return elif topic == "restart": restart_frigate() + else: + self.publish(topic, payload, retain=False) def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Handle publishing to communicators.""" diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py new file mode 100644 index 000000000..ff4a1180a --- /dev/null +++ b/frigate/comms/inter_process.py @@ -0,0 +1,40 @@ +import multiprocessing as mp +import queue +import threading +from multiprocessing.synchronize import Event as MpEvent +from typing import Callable + +from faster_fifo import Queue + +from frigate.comms.dispatcher import Communicator + + +class InterProcessCommunicator(Communicator): + def __init__(self, queue: Queue) -> None: + self.queue = queue + self.stop_event: MpEvent = mp.Event() + + def publish(self, topic: str, payload: str, retain: bool) -> None: + """There is no communication back to the processes.""" + pass + + def subscribe(self, receiver: Callable) -> None: + self._dispatcher = receiver + self.reader_thread = threading.Thread(target=self.read) + self.reader_thread.start() + + def read(self) -> None: + while not self.stop_event.is_set(): + try: + ( + topic, + value, + ) = self.queue.get(True, 1) + except queue.Empty: + continue + + self._dispatcher(topic, value) + + def stop(self) -> None: + self.stop_event.set() + self.reader_thread.join() diff --git a/frigate/events/audio.py b/frigate/events/audio.py index decd17ca4..631c4349f 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -7,12 +7,13 @@ import os import signal import threading from types import FrameType -from typing import Optional +from typing import Optional, Tuple import numpy as np import requests from setproctitle import setproctitle +from frigate.comms.inter_process import InterProcessCommunicator from frigate.config import CameraConfig, FrigateConfig from frigate.const import ( AUDIO_DURATION, @@ -51,6 +52,7 @@ def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> lis def listen_to_audio( config: FrigateConfig, process_info: dict[str, FeatureMetricsTypes], + inter_process_communicator: InterProcessCommunicator, ) -> None: stop_event = mp.Event() audio_threads: list[threading.Thread] = [] @@ -74,7 +76,9 @@ def listen_to_audio( for camera in config.cameras.values(): if camera.enabled and camera.audio.enabled_in_config: - audio = AudioEventMaintainer(camera, process_info, stop_event) + audio = AudioEventMaintainer( + camera, process_info, stop_event, inter_process_communicator + ) audio_threads.append(audio) audio.start() @@ -144,11 +148,13 @@ class AudioEventMaintainer(threading.Thread): camera: CameraConfig, feature_metrics: dict[str, FeatureMetricsTypes], stop_event: mp.Event, + inter_process_communicator: InterProcessCommunicator, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" self.config = camera self.feature_metrics = feature_metrics + self.inter_process_communicator = inter_process_communicator self.detections: dict[dict[str, any]] = feature_metrics self.stop_event = stop_event self.detector = AudioTfl(stop_event) @@ -169,7 +175,8 @@ class AudioEventMaintainer(threading.Thread): if not self.feature_metrics[self.config.name]["audio_enabled"].value: return - rms = np.sqrt(np.mean(np.absolute(np.square(audio.astype(np.float32))))) + audio_as_float = audio.astype(np.float32) + rms, _ = self.calculate_audio_levels(audio_as_float) # only run audio detection when volume is above min_volume if rms >= self.config.audio.min_volume: @@ -184,6 +191,23 @@ class AudioEventMaintainer(threading.Thread): self.expire_detections() + def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]: + # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude + # Note: np.float32 isn't serializable, we must use np.float64 to publish the message + rms = np.sqrt(np.mean(np.absolute(audio_as_float**2))) + + # Transform RMS to dBFS (decibels relative to full scale) + dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE) + + self.inter_process_communicator.queue.put( + (f"{self.config.name}/audio/dBFS", float(dBFS)) + ) + self.inter_process_communicator.queue.put( + (f"{self.config.name}/audio/rms", float(rms)) + ) + + return float(rms), float(dBFS) + def handle_detection(self, label: str, score: float) -> None: if self.detections.get(label): self.detections[label][