Save average dBFS and retain segment with dBFS in motion mode (#7158)

* Hold audio info queue for recordings

* Add dBFS to db

* Cleanup

* Formatting

* Fix check
This commit is contained in:
Nicolas Mowen 2023-07-14 18:05:14 -06:00 committed by GitHub
parent 5bb5e2dc5a
commit 00016b7499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 165 additions and 33 deletions

View File

@ -110,7 +110,7 @@ class FrigateApp:
user_config = FrigateConfig.parse_file(config_file)
self.config = user_config.runtime_config(self.plus_api)
for camera_name, camera_config in self.config.cameras.items():
for camera_name in self.config.cameras.keys():
# create camera_metrics
self.camera_metrics[camera_name] = {
"camera_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item]
@ -227,12 +227,22 @@ class FrigateApp:
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
)
# Queue for recordings info
self.recordings_info_queue: Queue = ff.Queue(
# Queue for object recordings info
self.object_recordings_info_queue: Queue = ff.Queue(
DEFAULT_QUEUE_BUFFER_SIZE
* sum(camera.enabled for camera in self.config.cameras.values())
)
# Queue for audio recordings info if enabled
self.audio_recordings_info_queue: Optional[Queue] = (
ff.Queue(
DEFAULT_QUEUE_BUFFER_SIZE
* sum(camera.audio.enabled for camera in self.config.cameras.values())
)
if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0
else None
)
# Queue for timeline events
self.timeline_queue: Queue = ff.Queue(
DEFAULT_QUEUE_BUFFER_SIZE
@ -297,7 +307,12 @@ class FrigateApp:
recording_process = mp.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config, self.recordings_info_queue, self.feature_metrics),
args=(
self.config,
self.object_recordings_info_queue,
self.audio_recordings_info_queue,
self.feature_metrics,
),
)
recording_process.daemon = True
self.recording_process = recording_process
@ -422,7 +437,7 @@ class FrigateApp:
self.event_queue,
self.event_processed_queue,
self.video_output_queue,
self.recordings_info_queue,
self.object_recordings_info_queue,
self.ptz_autotracker_thread,
self.stop_event,
)
@ -491,6 +506,7 @@ class FrigateApp:
name="audio_capture",
args=(
self.config,
self.audio_recordings_info_queue,
self.feature_metrics,
self.inter_process_communicator,
),
@ -656,10 +672,12 @@ class FrigateApp:
self.event_processed_queue,
self.video_output_queue,
self.detected_frames_queue,
self.recordings_info_queue,
self.object_recordings_info_queue,
self.audio_recordings_info_queue,
self.log_queue,
self.inter_process_queue,
]:
if queue is not None:
while not queue.empty():
queue.get_nowait()
queue.close()

View File

@ -9,6 +9,7 @@ import threading
from types import FrameType
from typing import Optional, Tuple
import faster_fifo as ff
import numpy as np
import requests
from setproctitle import setproctitle
@ -51,6 +52,7 @@ def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> lis
def listen_to_audio(
config: FrigateConfig,
recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes],
inter_process_communicator: InterProcessCommunicator,
) -> None:
@ -77,7 +79,11 @@ def listen_to_audio(
for camera in config.cameras.values():
if camera.enabled and camera.audio.enabled_in_config:
audio = AudioEventMaintainer(
camera, process_info, stop_event, inter_process_communicator
camera,
recordings_info_queue,
process_info,
stop_event,
inter_process_communicator,
)
audio_threads.append(audio)
audio.start()
@ -146,6 +152,7 @@ class AudioEventMaintainer(threading.Thread):
def __init__(
self,
camera: CameraConfig,
recordings_info_queue: ff.Queue,
feature_metrics: dict[str, FeatureMetricsTypes],
stop_event: mp.Event,
inter_process_communicator: InterProcessCommunicator,
@ -153,6 +160,7 @@ class AudioEventMaintainer(threading.Thread):
threading.Thread.__init__(self)
self.name = f"{camera.name}_audio_event_processor"
self.config = camera
self.recordings_info_queue = recordings_info_queue
self.feature_metrics = feature_metrics
self.inter_process_communicator = inter_process_communicator
self.detections: dict[dict[str, any]] = feature_metrics
@ -176,10 +184,16 @@ class AudioEventMaintainer(threading.Thread):
return
audio_as_float = audio.astype(np.float32)
rms, _ = self.calculate_audio_levels(audio_as_float)
rms, dBFS = self.calculate_audio_levels(audio_as_float)
# only run audio detection when volume is above min_volume
if rms >= self.config.audio.min_volume:
# add audio info to recordings queue
self.recordings_info_queue.put(
(self.config.name, datetime.datetime.now().timestamp(), dBFS)
)
# create waveform relative to max range and look for detections
waveform = (audio / AUDIO_MAX_BIT_RANGE).astype(np.float32)
model_detections = self.detector.detect(waveform)
@ -194,7 +208,7 @@ class AudioEventMaintainer(threading.Thread):
def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]:
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
rms = np.sqrt(np.mean(np.absolute(audio_as_float**2)))
rms = np.sqrt(np.mean(np.absolute(np.square(audio_as_float))))
# Transform RMS to dBFS (decibels relative to full scale)
dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE)

View File

@ -66,6 +66,7 @@ class Recordings(Model): # type: ignore[misc]
duration = FloatField()
motion = IntegerField(null=True)
objects = IntegerField(null=True)
dBFS = IntegerField(null=True)
segment_size = FloatField(default=0) # this should be stored as MB

View File

@ -12,9 +12,10 @@ import threading
from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Tuple
from typing import Any, Optional, Tuple
import faster_fifo as ff
import numpy as np
import psutil
from frigate.config import FrigateConfig, RetainModeEnum
@ -31,17 +32,20 @@ class RecordingMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
recordings_info_queue: ff.Queue,
object_recordings_info_queue: ff.Queue,
audio_recordings_info_queue: Optional[ff.Queue],
process_info: dict[str, FeatureMetricsTypes],
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "recording_maintainer"
self.config = config
self.recordings_info_queue = recordings_info_queue
self.object_recordings_info_queue = object_recordings_info_queue
self.audio_recordings_info_queue = audio_recordings_info_queue
self.process_info = process_info
self.stop_event = stop_event
self.recordings_info: dict[str, Any] = defaultdict(list)
self.object_recordings_info: dict[str, list] = defaultdict(list)
self.audio_recordings_info: dict[str, list] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
async def move_files(self) -> None:
@ -103,13 +107,21 @@ class RecordingMaintainer(threading.Thread):
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
for camera, recordings in grouped_recordings.items():
# clear out all the recording info for old frames
# clear out all the object recording info for old frames
while (
len(self.recordings_info[camera]) > 0
and self.recordings_info[camera][0][0]
len(self.object_recordings_info[camera]) > 0
and self.object_recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.recordings_info[camera].pop(0)
self.object_recordings_info[camera].pop(0)
# clear out all the audio recording info for old frames
while (
len(self.audio_recordings_info[camera]) > 0
and self.audio_recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp()
):
self.audio_recordings_info[camera].pop(0)
# get all events with the end time after the start of the oldest cache file
# or with end_time None
@ -206,7 +218,9 @@ class RecordingMaintainer(threading.Thread):
# if it ends more than the configured pre_capture for the camera
else:
pre_capture = self.config.cameras[camera].record.events.pre_capture
most_recently_processed_frame_time = self.recordings_info[camera][-1][0]
most_recently_processed_frame_time = self.object_recordings_info[
camera
][-1][0]
retain_cutoff = most_recently_processed_frame_time - pre_capture
if end_time.timestamp() < retain_cutoff:
Path(cache_path).unlink(missing_ok=True)
@ -220,10 +234,10 @@ class RecordingMaintainer(threading.Thread):
def segment_stats(
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
) -> Tuple[int, int]:
) -> Tuple[int, int, int]:
active_count = 0
motion_count = 0
for frame in self.recordings_info[camera]:
for frame in self.object_recordings_info[camera]:
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
@ -241,7 +255,21 @@ class RecordingMaintainer(threading.Thread):
motion_count += sum([area(box) for box in frame[2]])
return (motion_count, active_count)
audio_values = []
for frame in self.audio_recordings_info[camera]:
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
audio_values.append(frame[1])
average_dBFS = 0 if not audio_values else np.average(audio_values)
return (motion_count, active_count, round(average_dBFS))
def store_segment(
self,
@ -252,11 +280,17 @@ class RecordingMaintainer(threading.Thread):
cache_path: str,
store_mode: RetainModeEnum,
) -> None:
motion_count, active_count = self.segment_stats(camera, start_time, end_time)
motion_count, active_count, averageDBFS = self.segment_stats(
camera, start_time, end_time
)
# check if the segment shouldn't be stored
if (store_mode == RetainModeEnum.motion and motion_count == 0) or (
store_mode == RetainModeEnum.active_objects and active_count == 0
if (
(store_mode == RetainModeEnum.motion and motion_count == 0)
or (
store_mode == RetainModeEnum.motion and averageDBFS < 0
) # dBFS is stored in a negative scale
or (store_mode == RetainModeEnum.active_objects and active_count == 0)
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
@ -333,6 +367,7 @@ class RecordingMaintainer(threading.Thread):
motion=motion_count,
# TODO: update this to store list of active objects at some point
objects=active_count,
dBFS=averageDBFS,
segment_size=segment_size,
)
except Exception as e:
@ -349,7 +384,7 @@ class RecordingMaintainer(threading.Thread):
while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp()
# empty the recordings info queue
# empty the object recordings info queue
while True:
try:
(
@ -358,10 +393,10 @@ class RecordingMaintainer(threading.Thread):
current_tracked_objects,
motion_boxes,
regions,
) = self.recordings_info_queue.get(False)
) = self.object_recordings_info_queue.get(False)
if self.process_info[camera]["record_enabled"].value:
self.recordings_info[camera].append(
self.object_recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
@ -372,6 +407,26 @@ class RecordingMaintainer(threading.Thread):
except queue.Empty:
break
# empty the audio recordings info queue if audio is enabled
if self.audio_recordings_info_queue:
while True:
try:
(
camera,
frame_time,
dBFS,
) = self.audio_recordings_info_queue.get(False)
if self.process_info[camera]["record_enabled"].value:
self.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
)
)
except queue.Empty:
break
try:
asyncio.run(self.move_files())
except Exception as e:

View File

@ -23,7 +23,8 @@ logger = logging.getLogger(__name__)
def manage_recordings(
config: FrigateConfig,
recordings_info_queue: ff.Queue,
object_recordings_info_queue: ff.Queue,
audio_recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes],
) -> None:
stop_event = mp.Event()
@ -51,7 +52,11 @@ def manage_recordings(
db.bind(models)
maintainer = RecordingMaintainer(
config, recordings_info_queue, process_info, stop_event
config,
object_recordings_info_queue,
audio_recordings_info_queue,
process_info,
stop_event,
)
maintainer.start()

View File

@ -0,0 +1,39 @@
"""Peewee migrations -- 018_add_dbfs.py.
Some examples (model - class or model name)::
> Model = migrator.orm['model_name'] # Return model in current state by name
> migrator.sql(sql) # Run custom SQL
> migrator.python(func, *args, **kwargs) # Run python code
> migrator.create_model(Model) # Create a model (could be used as decorator)
> migrator.remove_model(model, cascade=True) # Remove a model
> migrator.add_fields(model, **fields) # Add fields to a model
> migrator.change_fields(model, **fields) # Change fields
> migrator.remove_fields(model, *field_names, cascade=True)
> migrator.rename_field(model, old_field_name, new_field_name)
> migrator.rename_table(model, new_table_name)
> migrator.add_index(model, *col_names, unique=False)
> migrator.drop_index(model, *col_names)
> migrator.add_not_null(model, *field_names)
> migrator.drop_not_null(model, *field_names)
> migrator.add_default(model, field_name, default)
"""
import peewee as pw
from frigate.models import Recordings
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.add_fields(
Recordings,
dBFS=pw.IntegerField(null=True),
)
def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_fields(Recordings, ["dBFS"])