diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index 2764eca43..59cc1ab9c 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -71,3 +71,8 @@ prometheus-client == 0.21.* # TFLite tflite_runtime @ https://github.com/frigate-nvr/TFlite-builds/releases/download/v2.17.1/tflite_runtime-2.17.1-cp311-cp311-linux_x86_64.whl; platform_machine == 'x86_64' tflite_runtime @ https://github.com/feranick/TFlite-builds/releases/download/v2.17.1/tflite_runtime-2.17.1-cp311-cp311-linux_aarch64.whl; platform_machine == 'aarch64' +# audio transcription +sherpa-onnx==1.12.* +faster-whisper==1.1.* +librosa==0.11.* +soundfile==0.13.* \ No newline at end of file diff --git a/docs/docs/configuration/audio_detectors.md b/docs/docs/configuration/audio_detectors.md index b783daa69..2f4d43a6a 100644 --- a/docs/docs/configuration/audio_detectors.md +++ b/docs/docs/configuration/audio_detectors.md @@ -72,3 +72,77 @@ audio: - speech - yell ``` + +### Audio Transcription + +Frigate supports fully local audio transcription using either `sherpa-onnx` or OpenAI’s open-source Whisper models via `faster-whisper`. To enable transcription, it is recommended to only configure the features at the global level, and enable it at the individual camera level. + +```yaml +audio_transcription: + enabled: False + device: ... + model_size: ... +``` + +Enable audio transcription for select cameras at the camera level: + +```yaml +cameras: + back_yard: + ... + audio_transcription: + enabled: True +``` + +:::note + +Audio detection must be enabled and configured as described above in order to use audio transcription features. + +::: + +The optional config parameters that can be set at the global level include: + +- **`enabled`**: Enable or disable the audio transcription feature. + - Default: `False` + - It is recommended to only configure the features at the global level, and enable it at the individual camera level. +- **`device`**: Device to use to run transcription and translation models. + - Default: `CPU` + - This can be `CPU` or `GPU`. The `sherpa-onnx` models are lightweight and run on the CPU only. The `whisper` models can run on GPU but are only supported on CUDA hardware. +- **`model_size`**: The size of the model used for live transcription. + - Default: `small` + - This can be `small` or `large`. The `small` setting uses `sherpa-onnx` models that are fast, lightweight, and always run on the CPU but are not as accurate as the `whisper` model. + - The + - This config option applies to **live transcription only**. Recorded `speech` events will always use a different `whisper` model (and can be accelerated for CUDA hardware if available with `device: GPU`). +- **`language`**: Defines the language used by `whisper` to translate `speech` audio events (and live audio only if using the `large` model). + - Default: `en` + - You must use a valid [language code](https://github.com/openai/whisper/blob/main/whisper/tokenizer.py#L10). + - Transcriptions for `speech` events are translated. + - Live audio is translated only if you are using the `large` model. The `small` `sherpa-onnx` model is English-only. + +The only field that is valid at the camera level is `enabled`. + +#### Live transcription + +The single camera Live view in the Frigate UI supports live transcription of audio for streams defined with the `audio` role. Use the Enable/Disable Live Audio Transcription button/switch to toggle transcription processing. When speech is heard, the UI will display a black box over the top of the camera stream with text. The MQTT topic `frigate//audio/transcription` will also be updated in real-time with transcribed text. + +Results can be error-prone due to a number of factors, including: + +- Poor quality camera microphone +- Distance of the audio source to the camera microphone +- Low audio bitrate setting in the camera +- Background noise +- Using the `small` model - it's fast, but not accurate for poor quality audio + +For speech sources close to the camera with minimal background noise, use the `small` model. + +If you have CUDA hardware, you can experiment with the `large` `whisper` model on GPU. Performance is not quite as fast as the `sherpa-onnx` `small` model, but live transcription is far more accurate. Using the `large` model with CPU will likely be too slow for real-time transcription. + +#### Transcription and translation of `speech` audio events + +Any `speech` events in Explore can be transcribed and/or translated through the Transcribe button in the Tracked Object Details pane. + +In order to use transcription and translation for past events, you must enable audio detection and define `speech` as an audio type to listen for in your config. To have `speech` events translated into the language of your choice, set the `language` config parameter with the correct [language code](https://github.com/openai/whisper/blob/main/whisper/tokenizer.py#L10). + +The transcribed/translated speech will appear in the description box in the Tracked Object Details pane. If Semantic Search is enabled, embeddings are generated for the transcription text and are fully searchable using the description search type. + +Recorded `speech` events will always use a `whisper` model, regardless of the `model_size` config setting. Without a GPU, generating transcriptions for longer `speech` events may take a fair amount of time, so be patient. diff --git a/docs/docs/configuration/reference.md b/docs/docs/configuration/reference.md index 5f6644bdb..b8360c902 100644 --- a/docs/docs/configuration/reference.md +++ b/docs/docs/configuration/reference.md @@ -620,6 +620,19 @@ genai: object_prompts: person: "My special person prompt." +# Optional: Configuration for audio transcription +# NOTE: only the enabled option can be overridden at the camera level +audio_transcription: + # Optional: Enable license plate recognition (default: shown below) + enabled: False + # Optional: The device to run the models on (default: shown below) + device: CPU + # Optional: Set the model size used for transcription. (default: shown below) + model_size: small + # Optional: Set the language used for transcription translation. (default: shown below) + # List of language codes: https://github.com/openai/whisper/blob/main/whisper/tokenizer.py#L10 + language: en + # Optional: Restream configuration # Uses https://github.com/AlexxIT/go2rtc (v1.9.9) # NOTE: The default go2rtc API port (1984) must be used, diff --git a/docs/docs/integrations/mqtt.md b/docs/docs/integrations/mqtt.md index d6dcaa3fb..56f31a021 100644 --- a/docs/docs/integrations/mqtt.md +++ b/docs/docs/integrations/mqtt.md @@ -125,7 +125,7 @@ Message published for updates to tracked object metadata, for example: "name": "John", "score": 0.95, "camera": "front_door_cam", - "timestamp": 1607123958.748393, + "timestamp": 1607123958.748393 } ``` @@ -139,7 +139,7 @@ Message published for updates to tracked object metadata, for example: "plate": "123ABC", "score": 0.95, "camera": "driveway_cam", - "timestamp": 1607123958.748393, + "timestamp": 1607123958.748393 } ``` @@ -255,6 +255,12 @@ Publishes the rms value for audio detected on this camera. **NOTE:** Requires audio detection to be enabled +### `frigate//audio/transcription` + +Publishes transcribed text for audio detected on this camera. + +**NOTE:** Requires audio detection and transcription to be enabled + ### `frigate//enabled/set` Topic to turn Frigate's processing of a camera on and off. Expected values are `ON` and `OFF`. diff --git a/frigate/api/classification.py b/frigate/api/classification.py index 75ca13735..81112933c 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -14,7 +14,10 @@ from peewee import DoesNotExist from playhouse.shortcuts import model_to_dict from frigate.api.auth import require_role -from frigate.api.defs.request.classification_body import RenameFaceBody +from frigate.api.defs.request.classification_body import ( + AudioTranscriptionBody, + RenameFaceBody, +) from frigate.api.defs.tags import Tags from frigate.config.camera import DetectConfig from frigate.const import FACE_DIR @@ -366,3 +369,58 @@ def reindex_embeddings(request: Request): }, status_code=500, ) + + +@router.put("/audio/transcribe") +def transcribe_audio(request: Request, body: AudioTranscriptionBody): + event_id = body.event_id + + try: + event = Event.get(Event.id == event_id) + except DoesNotExist: + message = f"Event {event_id} not found" + logger.error(message) + return JSONResponse( + content=({"success": False, "message": message}), status_code=404 + ) + + if not request.app.frigate_config.cameras[event.camera].audio_transcription.enabled: + message = f"Audio transcription is not enabled for {event.camera}." + logger.error(message) + return JSONResponse( + content=( + { + "success": False, + "message": message, + } + ), + status_code=400, + ) + + context: EmbeddingsContext = request.app.embeddings + response = context.transcribe_audio(model_to_dict(event)) + + if response == "started": + return JSONResponse( + content={ + "success": True, + "message": "Audio transcription has started.", + }, + status_code=202, # 202 Accepted + ) + elif response == "in_progress": + return JSONResponse( + content={ + "success": False, + "message": "Audio transcription for a speech event is currently in progress. Try again later.", + }, + status_code=409, # 409 Conflict + ) + else: + return JSONResponse( + content={ + "success": False, + "message": "Failed to transcribe audio.", + }, + status_code=500, + ) diff --git a/frigate/api/defs/request/classification_body.py b/frigate/api/defs/request/classification_body.py index c4a32c332..31c5688bf 100644 --- a/frigate/api/defs/request/classification_body.py +++ b/frigate/api/defs/request/classification_body.py @@ -3,3 +3,7 @@ from pydantic import BaseModel class RenameFaceBody(BaseModel): new_name: str + + +class AudioTranscriptionBody(BaseModel): + event_id: str diff --git a/frigate/app.py b/frigate/app.py index ebbc003e8..65dc19472 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -494,7 +494,9 @@ class FrigateApp: ] if audio_cameras: - self.audio_process = AudioProcessor(audio_cameras, self.camera_metrics) + self.audio_process = AudioProcessor( + self.config, audio_cameras, self.camera_metrics + ) self.audio_process.start() self.processes["audio_detector"] = self.audio_process.pid or 0 diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index c94ce690f..6fee166b7 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -58,6 +58,7 @@ class Dispatcher: self._camera_settings_handlers: dict[str, Callable] = { "audio": self._on_audio_command, + "audio_transcription": self._on_audio_transcription_command, "detect": self._on_detect_command, "enabled": self._on_enabled_command, "improve_contrast": self._on_motion_improve_contrast_command, @@ -181,6 +182,9 @@ class Dispatcher: "snapshots": self.config.cameras[camera].snapshots.enabled, "record": self.config.cameras[camera].record.enabled, "audio": self.config.cameras[camera].audio.enabled, + "audio_transcription": self.config.cameras[ + camera + ].audio_transcription.live_enabled, "notifications": self.config.cameras[camera].notifications.enabled, "notifications_suspended": int( self.web_push_client.suspended_cameras.get(camera, 0) @@ -465,6 +469,37 @@ class Dispatcher: ) self.publish(f"{camera_name}/audio/state", payload, retain=True) + def _on_audio_transcription_command(self, camera_name: str, payload: str) -> None: + """Callback for live audio transcription topic.""" + audio_transcription_settings = self.config.cameras[ + camera_name + ].audio_transcription + + if payload == "ON": + if not self.config.cameras[ + camera_name + ].audio_transcription.enabled_in_config: + logger.error( + "Audio transcription must be enabled in the config to be turned on via MQTT." + ) + return + + if not audio_transcription_settings.live_enabled: + logger.info(f"Turning on live audio transcription for {camera_name}") + audio_transcription_settings.live_enabled = True + elif payload == "OFF": + if audio_transcription_settings.live_enabled: + logger.info(f"Turning off live audio transcription for {camera_name}") + audio_transcription_settings.live_enabled = False + + self.config_updater.publish_update( + CameraConfigUpdateTopic( + CameraConfigUpdateEnum.audio_transcription, camera_name + ), + audio_transcription_settings, + ) + self.publish(f"{camera_name}/audio_transcription/state", payload, retain=True) + def _on_recordings_command(self, camera_name: str, payload: str) -> None: """Callback for recordings topic.""" record_settings = self.config.cameras[camera_name].record diff --git a/frigate/comms/embeddings_updater.py b/frigate/comms/embeddings_updater.py index 74a87e60f..00bc88b3d 100644 --- a/frigate/comms/embeddings_updater.py +++ b/frigate/comms/embeddings_updater.py @@ -18,6 +18,7 @@ class EmbeddingsRequestEnum(Enum): reprocess_face = "reprocess_face" reprocess_plate = "reprocess_plate" reindex = "reindex" + transcribe_audio = "transcribe_audio" class EmbeddingsResponder: diff --git a/frigate/config/camera/camera.py b/frigate/config/camera/camera.py index 3b24dabac..33ad312a2 100644 --- a/frigate/config/camera/camera.py +++ b/frigate/config/camera/camera.py @@ -19,6 +19,7 @@ from frigate.util.builtin import ( from ..base import FrigateBaseModel from ..classification import ( + AudioTranscriptionConfig, CameraFaceRecognitionConfig, CameraLicensePlateRecognitionConfig, ) @@ -56,6 +57,9 @@ class CameraConfig(FrigateBaseModel): audio: AudioConfig = Field( default_factory=AudioConfig, title="Audio events configuration." ) + audio_transcription: AudioTranscriptionConfig = Field( + default_factory=AudioTranscriptionConfig, title="Audio transcription config." + ) birdseye: BirdseyeCameraConfig = Field( default_factory=BirdseyeCameraConfig, title="Birdseye camera configuration." ) diff --git a/frigate/config/camera/updater.py b/frigate/config/camera/updater.py index 140e02207..5ddc26d44 100644 --- a/frigate/config/camera/updater.py +++ b/frigate/config/camera/updater.py @@ -12,6 +12,7 @@ class CameraConfigUpdateEnum(str, Enum): """Supported camera config update types.""" audio = "audio" + audio_transcription = "audio_transcription" birdseye = "birdseye" detect = "detect" enabled = "enabled" @@ -74,6 +75,8 @@ class CameraConfigUpdateSubscriber: if update_type == CameraConfigUpdateEnum.audio: config.audio = updated_config + if update_type == CameraConfigUpdateEnum.audio_transcription: + config.audio_transcription = updated_config elif update_type == CameraConfigUpdateEnum.birdseye: config.birdseye = updated_config elif update_type == CameraConfigUpdateEnum.detect: diff --git a/frigate/config/classification.py b/frigate/config/classification.py index 4af60df4f..29568f5cd 100644 --- a/frigate/config/classification.py +++ b/frigate/config/classification.py @@ -19,11 +19,32 @@ class SemanticSearchModelEnum(str, Enum): jinav2 = "jinav2" -class LPRDeviceEnum(str, Enum): +class EnrichmentsDeviceEnum(str, Enum): GPU = "GPU" CPU = "CPU" +class AudioTranscriptionConfig(FrigateBaseModel): + enabled: bool = Field(default=False, title="Enable audio transcription.") + language: str = Field( + default="en", + title="Language abbreviation to use for audio event transcription/translation.", + ) + device: Optional[EnrichmentsDeviceEnum] = Field( + default=EnrichmentsDeviceEnum.CPU, + title="The device used for license plate recognition.", + ) + model_size: str = Field( + default="small", title="The size of the embeddings model used." + ) + enabled_in_config: Optional[bool] = Field( + default=None, title="Keep track of original state of camera." + ) + live_enabled: Optional[bool] = Field( + default=False, title="Enable live transcriptions." + ) + + class BirdClassificationConfig(FrigateBaseModel): enabled: bool = Field(default=False, title="Enable bird classification.") threshold: float = Field( @@ -144,8 +165,8 @@ class CameraFaceRecognitionConfig(FrigateBaseModel): class LicensePlateRecognitionConfig(FrigateBaseModel): enabled: bool = Field(default=False, title="Enable license plate recognition.") - device: Optional[LPRDeviceEnum] = Field( - default=LPRDeviceEnum.CPU, + device: Optional[EnrichmentsDeviceEnum] = Field( + default=EnrichmentsDeviceEnum.CPU, title="The device used for license plate recognition.", ) model_size: str = Field( diff --git a/frigate/config/config.py b/frigate/config/config.py index 58427f5d5..5bca436b6 100644 --- a/frigate/config/config.py +++ b/frigate/config/config.py @@ -54,6 +54,7 @@ from .camera.snapshots import SnapshotsConfig from .camera.timestamp import TimestampStyleConfig from .camera_group import CameraGroupConfig from .classification import ( + AudioTranscriptionConfig, ClassificationConfig, FaceRecognitionConfig, LicensePlateRecognitionConfig, @@ -419,6 +420,9 @@ class FrigateConfig(FrigateBaseModel): ) # Classification Config + audio_transcription: AudioTranscriptionConfig = Field( + default_factory=AudioTranscriptionConfig, title="Audio transcription config." + ) classification: ClassificationConfig = Field( default_factory=ClassificationConfig, title="Object classification config." ) @@ -472,6 +476,7 @@ class FrigateConfig(FrigateBaseModel): global_config = self.model_dump( include={ "audio": ..., + "audio_transcription": ..., "birdseye": ..., "face_recognition": ..., "lpr": ..., @@ -528,6 +533,7 @@ class FrigateConfig(FrigateBaseModel): allowed_fields_map = { "face_recognition": ["enabled", "min_area"], "lpr": ["enabled", "expire_time", "min_area", "enhancement"], + "audio_transcription": ["enabled", "live_enabled"], } for section in allowed_fields_map: @@ -609,6 +615,9 @@ class FrigateConfig(FrigateBaseModel): # set config pre-value camera_config.enabled_in_config = camera_config.enabled camera_config.audio.enabled_in_config = camera_config.audio.enabled + camera_config.audio_transcription.enabled_in_config = ( + camera_config.audio_transcription.enabled + ) camera_config.record.enabled_in_config = camera_config.record.enabled camera_config.notifications.enabled_in_config = ( camera_config.notifications.enabled @@ -701,6 +710,21 @@ class FrigateConfig(FrigateBaseModel): self.model.create_colormap(sorted(self.objects.all_objects)) self.model.check_and_load_plus_model(self.plus_api) + # Check audio transcription and audio detection requirements + if self.audio_transcription.enabled: + # If audio transcription is enabled globally, at least one camera must have audio detection enabled + if not any(camera.audio.enabled for camera in self.cameras.values()): + raise ValueError( + "Audio transcription is enabled globally, but no cameras have audio detection enabled. At least one camera must have audio detection enabled." + ) + else: + # If audio transcription is disabled globally, check each camera with audio_transcription enabled + for camera in self.cameras.values(): + if camera.audio_transcription.enabled and not camera.audio.enabled: + raise ValueError( + f"Camera {camera.name} has audio transcription enabled, but audio detection is not enabled for this camera. Audio detection must be enabled for cameras with audio transcription when it is disabled globally." + ) + if self.plus_api and not self.snapshots.clean_copy: logger.warning( "Frigate+ is configured but clean snapshots are not enabled, submissions to Frigate+ will not be possible./" diff --git a/frigate/data_processing/post/audio_transcription.py b/frigate/data_processing/post/audio_transcription.py new file mode 100644 index 000000000..146b4e0f1 --- /dev/null +++ b/frigate/data_processing/post/audio_transcription.py @@ -0,0 +1,212 @@ +"""Handle post-processing for audio transcription.""" + +import logging +import os +import threading +import time +from typing import Optional + +from faster_whisper import WhisperModel +from peewee import DoesNotExist + +from frigate.comms.embeddings_updater import EmbeddingsRequestEnum +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import FrigateConfig +from frigate.const import ( + CACHE_DIR, + MODEL_CACHE_DIR, + UPDATE_EVENT_DESCRIPTION, +) +from frigate.data_processing.types import PostProcessDataEnum +from frigate.types import TrackedObjectUpdateTypesEnum +from frigate.util.audio import get_audio_from_recording + +from ..types import DataProcessorMetrics +from .api import PostProcessorApi + +logger = logging.getLogger(__name__) + + +class AudioTranscriptionPostProcessor(PostProcessorApi): + def __init__( + self, + config: FrigateConfig, + requestor: InterProcessRequestor, + metrics: DataProcessorMetrics, + ): + super().__init__(config, metrics, None) + self.config = config + self.requestor = requestor + self.recognizer = None + self.transcription_lock = threading.Lock() + self.transcription_thread = None + self.transcription_running = False + + # faster-whisper handles model downloading automatically + self.model_path = os.path.join(MODEL_CACHE_DIR, "whisper") + os.makedirs(self.model_path, exist_ok=True) + + self.__build_recognizer() + + def __build_recognizer(self) -> None: + try: + self.recognizer = WhisperModel( + model_size_or_path="small", + device="cuda" + if self.config.audio_transcription.device == "GPU" + else "cpu", + download_root=self.model_path, + local_files_only=False, # Allow downloading if not cached + compute_type="int8", + ) + logger.debug("Audio transcription (recordings) initialized") + except Exception as e: + logger.error(f"Failed to initialize recordings audio transcription: {e}") + self.recognizer = None + + def process_data( + self, data: dict[str, any], data_type: PostProcessDataEnum + ) -> None: + """Transcribe audio from a recording. + + Args: + data (dict): Contains data about the input (event_id, camera, etc.). + data_type (enum): Describes the data being processed (recording or tracked_object). + + Returns: + None + """ + event_id = data["event_id"] + camera_name = data["camera"] + + if data_type == PostProcessDataEnum.recording: + start_ts = data["frame_time"] + recordings_available_through = data["recordings_available"] + end_ts = min(recordings_available_through, start_ts + 60) # Default 60s + + elif data_type == PostProcessDataEnum.tracked_object: + obj_data = data["event"]["data"] + obj_data["id"] = data["event"]["id"] + obj_data["camera"] = data["event"]["camera"] + start_ts = data["event"]["start_time"] + end_ts = data["event"].get( + "end_time", start_ts + 60 + ) # Use end_time if available + + else: + logger.error("No data type passed to audio transcription post-processing") + return + + try: + audio_data = get_audio_from_recording( + self.config.cameras[camera_name].ffmpeg, + camera_name, + start_ts, + end_ts, + sample_rate=16000, + ) + + if not audio_data: + logger.debug(f"No audio data extracted for {event_id}") + return + + transcription = self.__transcribe_audio(audio_data) + if not transcription: + logger.debug("No transcription generated from audio") + return + + logger.debug(f"Transcribed audio for {event_id}: '{transcription}'") + + self.requestor.send_data( + UPDATE_EVENT_DESCRIPTION, + { + "type": TrackedObjectUpdateTypesEnum.description, + "id": event_id, + "description": transcription, + "camera": camera_name, + }, + ) + + # Embed the description + self.requestor.send_data( + EmbeddingsRequestEnum.embed_description.value, + {"id": event_id, "description": transcription}, + ) + + except DoesNotExist: + logger.debug("No recording found for audio transcription post-processing") + return + except Exception as e: + logger.error(f"Error in audio transcription post-processing: {e}") + + def __transcribe_audio(self, audio_data: bytes) -> Optional[tuple[str, float]]: + """Transcribe WAV audio data using faster-whisper.""" + if not self.recognizer: + logger.debug("Recognizer not initialized") + return None + + try: + # Save audio data to a temporary wav (faster-whisper expects a file) + temp_wav = os.path.join(CACHE_DIR, f"temp_audio_{int(time.time())}.wav") + with open(temp_wav, "wb") as f: + f.write(audio_data) + + segments, info = self.recognizer.transcribe( + temp_wav, + language=self.config.audio_transcription.language, + beam_size=5, + ) + + os.remove(temp_wav) + + # Combine all segment texts + text = " ".join(segment.text.strip() for segment in segments) + if not text: + return None + + logger.debug( + "Detected language '%s' with probability %f" + % (info.language, info.language_probability) + ) + + return text + except Exception as e: + logger.error(f"Error transcribing audio: {e}") + return None + + def _transcription_wrapper(self, event: dict[str, any]) -> None: + """Wrapper to run transcription and reset running flag when done.""" + try: + self.process_data( + { + "event_id": event["id"], + "camera": event["camera"], + "event": event, + }, + PostProcessDataEnum.tracked_object, + ) + finally: + with self.transcription_lock: + self.transcription_running = False + self.transcription_thread = None + + def handle_request(self, topic: str, request_data: dict[str, any]) -> str | None: + if topic == "transcribe_audio": + event = request_data["event"] + + with self.transcription_lock: + if self.transcription_running: + logger.warning( + "Audio transcription for a speech event is already running." + ) + return "in_progress" + + # Mark as running and start the thread + self.transcription_running = True + self.transcription_thread = threading.Thread( + target=self._transcription_wrapper, args=(event,), daemon=True + ) + self.transcription_thread.start() + return "started" + + return None diff --git a/frigate/data_processing/real_time/audio_transcription.py b/frigate/data_processing/real_time/audio_transcription.py new file mode 100644 index 000000000..7ed644498 --- /dev/null +++ b/frigate/data_processing/real_time/audio_transcription.py @@ -0,0 +1,276 @@ +"""Handle processing audio for speech transcription using sherpa-onnx with FFmpeg pipe.""" + +import logging +import os +import queue +import threading +from typing import Optional + +import numpy as np +import sherpa_onnx + +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import CameraConfig, FrigateConfig +from frigate.const import MODEL_CACHE_DIR +from frigate.util.downloader import ModelDownloader + +from ..types import DataProcessorMetrics +from .api import RealTimeProcessorApi +from .whisper_online import FasterWhisperASR, OnlineASRProcessor + +logger = logging.getLogger(__name__) + + +class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): + def __init__( + self, + config: FrigateConfig, + camera_config: CameraConfig, + requestor: InterProcessRequestor, + metrics: DataProcessorMetrics, + stop_event: threading.Event, + ): + super().__init__(config, metrics) + self.config = config + self.camera_config = camera_config + self.requestor = requestor + self.recognizer = None + self.stream = None + self.transcription_segments = [] + self.audio_queue = queue.Queue() + self.stop_event = stop_event + + if self.config.audio_transcription.model_size == "large": + self.asr = FasterWhisperASR( + modelsize="tiny", + device="cuda" + if self.config.audio_transcription.device == "GPU" + else "cpu", + lan=config.audio_transcription.language, + model_dir=os.path.join(MODEL_CACHE_DIR, "whisper"), + ) + self.asr.use_vad() # Enable Silero VAD for low-RMS audio + + else: + # small model as default + download_path = os.path.join(MODEL_CACHE_DIR, "sherpa-onnx") + HF_ENDPOINT = os.environ.get("HF_ENDPOINT", "https://huggingface.co") + self.model_files = { + "encoder.onnx": f"{HF_ENDPOINT}/csukuangfj/sherpa-onnx-streaming-zipformer-en-2023-06-26/resolve/main/encoder-epoch-99-avg-1-chunk-16-left-128.onnx", + "decoder.onnx": f"{HF_ENDPOINT}/csukuangfj/sherpa-onnx-streaming-zipformer-en-2023-06-26/resolve/main/decoder-epoch-99-avg-1-chunk-16-left-128.onnx", + "joiner.onnx": f"{HF_ENDPOINT}/csukuangfj/sherpa-onnx-streaming-zipformer-en-2023-06-26/resolve/main/joiner-epoch-99-avg-1-chunk-16-left-128.onnx", + "tokens.txt": f"{HF_ENDPOINT}/csukuangfj/sherpa-onnx-streaming-zipformer-en-2023-06-26/resolve/main/tokens.txt", + } + + if not all( + os.path.exists(os.path.join(download_path, n)) + for n in self.model_files.keys() + ): + self.downloader = ModelDownloader( + model_name="sherpa-onnx", + download_path=download_path, + file_names=self.model_files.keys(), + download_func=self.__download_models, + complete_func=self.__build_recognizer, + ) + self.downloader.ensure_model_files() + + self.__build_recognizer() + + def __download_models(self, path: str) -> None: + try: + file_name = os.path.basename(path) + ModelDownloader.download_from_url(self.model_files[file_name], path) + except Exception as e: + logger.error(f"Failed to download {path}: {e}") + + def __build_recognizer(self) -> None: + try: + if self.config.audio_transcription.model_size == "large": + self.online = OnlineASRProcessor( + asr=self.asr, + ) + else: + self.recognizer = sherpa_onnx.OnlineRecognizer.from_transducer( + tokens=os.path.join(MODEL_CACHE_DIR, "sherpa-onnx/tokens.txt"), + encoder=os.path.join(MODEL_CACHE_DIR, "sherpa-onnx/encoder.onnx"), + decoder=os.path.join(MODEL_CACHE_DIR, "sherpa-onnx/decoder.onnx"), + joiner=os.path.join(MODEL_CACHE_DIR, "sherpa-onnx/joiner.onnx"), + num_threads=2, + sample_rate=16000, + feature_dim=80, + enable_endpoint_detection=True, + rule1_min_trailing_silence=2.4, + rule2_min_trailing_silence=1.2, + rule3_min_utterance_length=300, + decoding_method="greedy_search", + provider="cpu", + ) + self.stream = self.recognizer.create_stream() + logger.debug("Audio transcription (live) initialized") + except Exception as e: + logger.error( + f"Failed to initialize live streaming audio transcription: {e}" + ) + self.recognizer = None + + def __process_audio_stream( + self, audio_data: np.ndarray + ) -> Optional[tuple[str, bool]]: + if (not self.recognizer or not self.stream) and not self.online: + logger.debug( + "Audio transcription (streaming) recognizer or stream not initialized" + ) + return None + + try: + if audio_data.dtype != np.float32: + audio_data = audio_data.astype(np.float32) + + if audio_data.max() > 1.0 or audio_data.min() < -1.0: + audio_data = audio_data / 32768.0 # Normalize from int16 + + rms = float(np.sqrt(np.mean(np.absolute(np.square(audio_data))))) + logger.debug(f"Audio chunk size: {audio_data.size}, RMS: {rms:.4f}") + + if self.config.audio_transcription.model_size == "large": + # large model + self.online.insert_audio_chunk(audio_data) + output = self.online.process_iter() + text = output[2].strip() + is_endpoint = text.endswith((".", "!", "?")) + + if text: + self.transcription_segments.append(text) + concatenated_text = " ".join(self.transcription_segments) + logger.debug(f"Concatenated transcription: '{concatenated_text}'") + text = concatenated_text + + else: + # small model + self.stream.accept_waveform(16000, audio_data) + + while self.recognizer.is_ready(self.stream): + self.recognizer.decode_stream(self.stream) + + text = self.recognizer.get_result(self.stream).strip() + is_endpoint = self.recognizer.is_endpoint(self.stream) + + logger.debug(f"Transcription result: '{text}'") + + if not text: + logger.debug("No transcription, returning") + return None + + logger.debug(f"Endpoint detected: {is_endpoint}") + + if is_endpoint and self.config.audio_transcription.model_size == "small": + # reset sherpa if we've reached an endpoint + self.recognizer.reset(self.stream) + + return text, is_endpoint + except Exception as e: + logger.error(f"Error processing audio stream: {e}") + return None + + def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None: + pass + + def process_audio(self, obj_data: dict[str, any], audio: np.ndarray) -> bool | None: + if audio is None or audio.size == 0: + logger.debug("No audio data provided for transcription") + return None + + # enqueue audio data for processing in the thread + self.audio_queue.put((obj_data, audio)) + return None + + def run(self) -> None: + """Run method for the transcription thread to process queued audio data.""" + logger.debug( + f"Starting audio transcription thread for {self.camera_config.name}" + ) + while not self.stop_event.is_set(): + try: + # Get audio data from queue with a timeout to check stop_event + obj_data, audio = self.audio_queue.get(timeout=0.1) + result = self.__process_audio_stream(audio) + + if not result: + continue + + text, is_endpoint = result + logger.debug(f"Transcribed audio: '{text}', Endpoint: {is_endpoint}") + + self.requestor.send_data( + f"{self.camera_config.name}/audio/transcription", text + ) + + self.audio_queue.task_done() + + if is_endpoint: + self.reset(obj_data["camera"]) + + except queue.Empty: + continue + except Exception as e: + logger.error(f"Error processing audio in thread: {e}") + self.audio_queue.task_done() + + logger.debug( + f"Stopping audio transcription thread for {self.camera_config.name}" + ) + + def reset(self, camera: str) -> None: + if self.config.audio_transcription.model_size == "large": + # get final output from whisper + output = self.online.finish() + self.transcription_segments = [] + + self.requestor.send_data( + f"{self.camera_config.name}/audio/transcription", + (output[2].strip() + " "), + ) + + # reset whisper + self.online.init() + else: + # reset sherpa + self.recognizer.reset(self.stream) + + # Clear the audio queue + while not self.audio_queue.empty(): + try: + self.audio_queue.get_nowait() + self.audio_queue.task_done() + except queue.Empty: + break + + logger.debug("Stream reset") + + def stop(self) -> None: + """Stop the transcription thread and clean up.""" + self.stop_event.set() + # Clear the queue to prevent processing stale data + while not self.audio_queue.empty(): + try: + self.audio_queue.get_nowait() + self.audio_queue.task_done() + except queue.Empty: + break + logger.debug( + f"Transcription thread stop signaled for {self.camera_config.name}" + ) + + def handle_request( + self, topic: str, request_data: dict[str, any] + ) -> dict[str, any] | None: + if topic == "clear_audio_recognizer": + self.recognizer = None + self.stream = None + self.__build_recognizer() + return {"message": "Audio recognizer cleared and rebuilt", "success": True} + return None + + def expire_object(self, object_id: str) -> None: + pass diff --git a/frigate/data_processing/real_time/whisper_online.py b/frigate/data_processing/real_time/whisper_online.py new file mode 100644 index 000000000..96c1ce0cf --- /dev/null +++ b/frigate/data_processing/real_time/whisper_online.py @@ -0,0 +1,1155 @@ +# imported to Frigate from https://github.com/ufal/whisper_streaming +# with only minor modifications +import io +import logging +import math +import sys +import time +from functools import lru_cache + +import librosa +import numpy as np +import soundfile as sf + +logger = logging.getLogger(__name__) + + +@lru_cache(10**6) +def load_audio(fname): + a, _ = librosa.load(fname, sr=16000, dtype=np.float32) + return a + + +def load_audio_chunk(fname, beg, end): + audio = load_audio(fname) + beg_s = int(beg * 16000) + end_s = int(end * 16000) + return audio[beg_s:end_s] + + +# Whisper backend + + +class ASRBase: + sep = "" # join transcribe words with this character (" " for whisper_timestamped, + # "" for faster-whisper because it emits the spaces when neeeded) + + def __init__( + self, + lan, + modelsize=None, + cache_dir=None, + model_dir=None, + logfile=sys.stderr, + device="cpu", + ): + self.logfile = logfile + + self.transcribe_kargs = {} + if lan == "auto": + self.original_language = None + else: + self.original_language = lan + + self.model = self.load_model(modelsize, cache_dir, model_dir, device) + + def load_model(self, modelsize, cache_dir): + raise NotImplementedError("must be implemented in the child class") + + def transcribe(self, audio, init_prompt=""): + raise NotImplementedError("must be implemented in the child class") + + def use_vad(self): + raise NotImplementedError("must be implemented in the child class") + + +class WhisperTimestampedASR(ASRBase): + """Uses whisper_timestamped library as the backend. Initially, we tested the code on this backend. It worked, but slower than faster-whisper. + On the other hand, the installation for GPU could be easier. + """ + + sep = " " + + def load_model(self, modelsize=None, cache_dir=None, model_dir=None): + import whisper + from whisper_timestamped import transcribe_timestamped + + self.transcribe_timestamped = transcribe_timestamped + if model_dir is not None: + logger.debug("ignoring model_dir, not implemented") + return whisper.load_model(modelsize, download_root=cache_dir) + + def transcribe(self, audio, init_prompt=""): + result = self.transcribe_timestamped( + self.model, + audio, + language=self.original_language, + initial_prompt=init_prompt, + verbose=None, + condition_on_previous_text=True, + **self.transcribe_kargs, + ) + return result + + def ts_words(self, r): + # return: transcribe result object to [(beg,end,"word1"), ...] + o = [] + for s in r["segments"]: + for w in s["words"]: + t = (w["start"], w["end"], w["text"]) + o.append(t) + return o + + def segments_end_ts(self, res): + return [s["end"] for s in res["segments"]] + + def use_vad(self): + self.transcribe_kargs["vad"] = True + + def set_translate_task(self): + self.transcribe_kargs["task"] = "translate" + + +class FasterWhisperASR(ASRBase): + """Uses faster-whisper library as the backend. Works much faster, appx 4-times (in offline mode). For GPU, it requires installation with a specific CUDNN version.""" + + sep = "" + + def load_model(self, modelsize=None, cache_dir=None, model_dir=None, device="cpu"): + from faster_whisper import WhisperModel + + logging.getLogger("faster_whisper").setLevel(logging.WARNING) + + # this worked fast and reliably on NVIDIA L40 + model = WhisperModel( + model_size_or_path="small" if device == "cuda" else "tiny", + device=device, + compute_type="float16" if device == "cuda" else "int8", + local_files_only=False, + download_root=model_dir, + ) + + # or run on GPU with INT8 + # tested: the transcripts were different, probably worse than with FP16, and it was slightly (appx 20%) slower + # model = WhisperModel(model_size, device="cuda", compute_type="int8_float16") + + # or run on CPU with INT8 + # tested: works, but slow, appx 10-times than cuda FP16 + # model = WhisperModel(modelsize, device="cpu", compute_type="int8") #, download_root="faster-disk-cache-dir/") + return model + + def transcribe(self, audio, init_prompt=""): + # tested: beam_size=5 is faster and better than 1 (on one 200 second document from En ESIC, min chunk 0.01) + segments, info = self.model.transcribe( + audio, + language=self.original_language, + initial_prompt=init_prompt, + beam_size=5, + word_timestamps=True, + condition_on_previous_text=True, + **self.transcribe_kargs, + ) + # print(info) # info contains language detection result + + return list(segments) + + def ts_words(self, segments): + o = [] + for segment in segments: + for word in segment.words: + if segment.no_speech_prob > 0.9: + continue + # not stripping the spaces -- should not be merged with them! + w = word.word + t = (word.start, word.end, w) + o.append(t) + return o + + def segments_end_ts(self, res): + return [s.end for s in res] + + def use_vad(self): + self.transcribe_kargs["vad_filter"] = True + + def set_translate_task(self): + self.transcribe_kargs["task"] = "translate" + + +class MLXWhisper(ASRBase): + """ + Uses MLX Whisper library as the backend, optimized for Apple Silicon. + Models available: https://huggingface.co/collections/mlx-community/whisper-663256f9964fbb1177db93dc + Significantly faster than faster-whisper (without CUDA) on Apple M1. + """ + + sep = " " + + def load_model(self, modelsize=None, cache_dir=None, model_dir=None): + """ + Loads the MLX-compatible Whisper model. + + Args: + modelsize (str, optional): The size or name of the Whisper model to load. + If provided, it will be translated to an MLX-compatible model path using the `translate_model_name` method. + Example: "large-v3-turbo" -> "mlx-community/whisper-large-v3-turbo". + cache_dir (str, optional): Path to the directory for caching models. + **Note**: This is not supported by MLX Whisper and will be ignored. + model_dir (str, optional): Direct path to a custom model directory. + If specified, it overrides the `modelsize` parameter. + """ + import mlx.core as mx # Is installed with mlx-whisper + from mlx_whisper.transcribe import ModelHolder, transcribe + + if model_dir is not None: + logger.debug( + f"Loading whisper model from model_dir {model_dir}. modelsize parameter is not used." + ) + model_size_or_path = model_dir + elif modelsize is not None: + model_size_or_path = self.translate_model_name(modelsize) + logger.debug( + f"Loading whisper model {modelsize}. You use mlx whisper, so {model_size_or_path} will be used." + ) + + self.model_size_or_path = model_size_or_path + + # Note: ModelHolder.get_model loads the model into a static class variable, + # making it a global resource. This means: + # - Only one model can be loaded at a time; switching models requires reloading. + # - This approach may not be suitable for scenarios requiring multiple models simultaneously, + # such as using whisper-streaming as a module with varying model sizes. + dtype = mx.float16 # Default to mx.float16. In mlx_whisper.transcribe: dtype = mx.float16 if decode_options.get("fp16", True) else mx.float32 + ModelHolder.get_model( + model_size_or_path, dtype + ) # Model is preloaded to avoid reloading during transcription + + return transcribe + + def translate_model_name(self, model_name): + """ + Translates a given model name to its corresponding MLX-compatible model path. + + Args: + model_name (str): The name of the model to translate. + + Returns: + str: The MLX-compatible model path. + """ + # Dictionary mapping model names to MLX-compatible paths + model_mapping = { + "tiny.en": "mlx-community/whisper-tiny.en-mlx", + "tiny": "mlx-community/whisper-tiny-mlx", + "base.en": "mlx-community/whisper-base.en-mlx", + "base": "mlx-community/whisper-base-mlx", + "small.en": "mlx-community/whisper-small.en-mlx", + "small": "mlx-community/whisper-small-mlx", + "medium.en": "mlx-community/whisper-medium.en-mlx", + "medium": "mlx-community/whisper-medium-mlx", + "large-v1": "mlx-community/whisper-large-v1-mlx", + "large-v2": "mlx-community/whisper-large-v2-mlx", + "large-v3": "mlx-community/whisper-large-v3-mlx", + "large-v3-turbo": "mlx-community/whisper-large-v3-turbo", + "large": "mlx-community/whisper-large-mlx", + } + + # Retrieve the corresponding MLX model path + mlx_model_path = model_mapping.get(model_name) + + if mlx_model_path: + return mlx_model_path + else: + raise ValueError( + f"Model name '{model_name}' is not recognized or not supported." + ) + + def transcribe(self, audio, init_prompt=""): + segments = self.model( + audio, + language=self.original_language, + initial_prompt=init_prompt, + word_timestamps=True, + condition_on_previous_text=True, + path_or_hf_repo=self.model_size_or_path, + **self.transcribe_kargs, + ) + return segments.get("segments", []) + + def ts_words(self, segments): + """ + Extract timestamped words from transcription segments and skips words with high no-speech probability. + """ + return [ + (word["start"], word["end"], word["word"]) + for segment in segments + for word in segment.get("words", []) + if segment.get("no_speech_prob", 0) <= 0.9 + ] + + def segments_end_ts(self, res): + return [s["end"] for s in res] + + def use_vad(self): + self.transcribe_kargs["vad_filter"] = True + + def set_translate_task(self): + self.transcribe_kargs["task"] = "translate" + + +class OpenaiApiASR(ASRBase): + """Uses OpenAI's Whisper API for audio transcription.""" + + def __init__(self, lan=None, temperature=0, logfile=sys.stderr): + self.logfile = logfile + + self.modelname = "whisper-1" + self.original_language = ( + None if lan == "auto" else lan + ) # ISO-639-1 language code + self.response_format = "verbose_json" + self.temperature = temperature + + self.load_model() + + self.use_vad_opt = False + + # reset the task in set_translate_task + self.task = "transcribe" + + def load_model(self, *args, **kwargs): + from openai import OpenAI + + self.client = OpenAI() + + self.transcribed_seconds = ( + 0 # for logging how many seconds were processed by API, to know the cost + ) + + def ts_words(self, segments): + no_speech_segments = [] + if self.use_vad_opt: + for segment in segments.segments: + # TODO: threshold can be set from outside + if segment["no_speech_prob"] > 0.8: + no_speech_segments.append( + (segment.get("start"), segment.get("end")) + ) + + o = [] + for word in segments.words: + start = word.start + end = word.end + if any(s[0] <= start <= s[1] for s in no_speech_segments): + # print("Skipping word", word.get("word"), "because it's in a no-speech segment") + continue + o.append((start, end, word.word)) + return o + + def segments_end_ts(self, res): + return [s.end for s in res.words] + + def transcribe(self, audio_data, prompt=None, *args, **kwargs): + # Write the audio data to a buffer + buffer = io.BytesIO() + buffer.name = "temp.wav" + sf.write(buffer, audio_data, samplerate=16000, format="WAV", subtype="PCM_16") + buffer.seek(0) # Reset buffer's position to the beginning + + self.transcribed_seconds += math.ceil( + len(audio_data) / 16000 + ) # it rounds up to the whole seconds + + params = { + "model": self.modelname, + "file": buffer, + "response_format": self.response_format, + "temperature": self.temperature, + "timestamp_granularities": ["word", "segment"], + } + if self.task != "translate" and self.original_language: + params["language"] = self.original_language + if prompt: + params["prompt"] = prompt + + if self.task == "translate": + proc = self.client.audio.translations + else: + proc = self.client.audio.transcriptions + + # Process transcription/translation + transcript = proc.create(**params) + logger.debug( + f"OpenAI API processed accumulated {self.transcribed_seconds} seconds" + ) + + return transcript + + def use_vad(self): + self.use_vad_opt = True + + def set_translate_task(self): + self.task = "translate" + + +class HypothesisBuffer: + def __init__(self, logfile=sys.stderr): + self.commited_in_buffer = [] + self.buffer = [] + self.new = [] + + self.last_commited_time = 0 + self.last_commited_word = None + + self.logfile = logfile + + def insert(self, new, offset): + # compare self.commited_in_buffer and new. It inserts only the words in new that extend the commited_in_buffer, it means they are roughly behind last_commited_time and new in content + # the new tail is added to self.new + + new = [(a + offset, b + offset, t) for a, b, t in new] + self.new = [(a, b, t) for a, b, t in new if a > self.last_commited_time - 0.1] + + if len(self.new) >= 1: + a, b, t = self.new[0] + if abs(a - self.last_commited_time) < 1: + if self.commited_in_buffer: + # it's going to search for 1, 2, ..., 5 consecutive words (n-grams) that are identical in commited and new. If they are, they're dropped. + cn = len(self.commited_in_buffer) + nn = len(self.new) + for i in range(1, min(min(cn, nn), 5) + 1): # 5 is the maximum + c = " ".join( + [self.commited_in_buffer[-j][2] for j in range(1, i + 1)][ + ::-1 + ] + ) + tail = " ".join(self.new[j - 1][2] for j in range(1, i + 1)) + if c == tail: + words = [] + for j in range(i): + words.append(repr(self.new.pop(0))) + words_msg = " ".join(words) + logger.debug(f"removing last {i} words: {words_msg}") + break + + def flush(self): + # returns commited chunk = the longest common prefix of 2 last inserts. + + commit = [] + while self.new: + na, nb, nt = self.new[0] + + if len(self.buffer) == 0: + break + + if nt == self.buffer[0][2]: + commit.append((na, nb, nt)) + self.last_commited_word = nt + self.last_commited_time = nb + self.buffer.pop(0) + self.new.pop(0) + else: + break + self.buffer = self.new + self.new = [] + self.commited_in_buffer.extend(commit) + return commit + + def pop_commited(self, time): + while self.commited_in_buffer and self.commited_in_buffer[0][1] <= time: + self.commited_in_buffer.pop(0) + + def complete(self): + return self.buffer + + +class OnlineASRProcessor: + SAMPLING_RATE = 16000 + + def __init__( + self, asr, tokenizer=None, buffer_trimming=("segment", 15), logfile=sys.stderr + ): + """asr: WhisperASR object + tokenizer: sentence tokenizer object for the target language. Must have a method *split* that behaves like the one of MosesTokenizer. It can be None, if "segment" buffer trimming option is used, then tokenizer is not used at all. + ("segment", 15) + buffer_trimming: a pair of (option, seconds), where option is either "sentence" or "segment", and seconds is a number. Buffer is trimmed if it is longer than "seconds" threshold. Default is the most recommended option. + logfile: where to store the log. + """ + self.asr = asr + self.tokenizer = tokenizer + self.logfile = logfile + + self.init() + + self.buffer_trimming_way, self.buffer_trimming_sec = buffer_trimming + + def init(self, offset=None): + """run this when starting or restarting processing""" + self.audio_buffer = np.array([], dtype=np.float32) + self.transcript_buffer = HypothesisBuffer(logfile=self.logfile) + self.buffer_time_offset = 0 + if offset is not None: + self.buffer_time_offset = offset + self.transcript_buffer.last_commited_time = self.buffer_time_offset + self.commited = [] + + def insert_audio_chunk(self, audio): + self.audio_buffer = np.append(self.audio_buffer, audio) + + def prompt(self): + """Returns a tuple: (prompt, context), where "prompt" is a 200-character suffix of commited text that is inside of the scrolled away part of audio buffer. + "context" is the commited text that is inside the audio buffer. It is transcribed again and skipped. It is returned only for debugging and logging reasons. + """ + k = max(0, len(self.commited) - 1) + while k > 0 and self.commited[k - 1][1] > self.buffer_time_offset: + k -= 1 + + p = self.commited[:k] + p = [t for _, _, t in p] + prompt = [] + y = 0 + while p and y < 200: # 200 characters prompt size + x = p.pop(-1) + y += len(x) + 1 + prompt.append(x) + non_prompt = self.commited[k:] + return self.asr.sep.join(prompt[::-1]), self.asr.sep.join( + t for _, _, t in non_prompt + ) + + def process_iter(self): + """Runs on the current audio buffer. + Returns: a tuple (beg_timestamp, end_timestamp, "text"), or (None, None, ""). + The non-emty text is confirmed (committed) partial transcript. + """ + + prompt, non_prompt = self.prompt() + logger.debug(f"PROMPT: {prompt}") + logger.debug(f"CONTEXT: {non_prompt}") + logger.debug( + f"transcribing {len(self.audio_buffer) / self.SAMPLING_RATE:2.2f} seconds from {self.buffer_time_offset:2.2f}" + ) + res = self.asr.transcribe(self.audio_buffer, init_prompt=prompt) + + # transform to [(beg,end,"word1"), ...] + tsw = self.asr.ts_words(res) + + self.transcript_buffer.insert(tsw, self.buffer_time_offset) + o = self.transcript_buffer.flush() + self.commited.extend(o) + completed = self.to_flush(o) + logger.debug(f">>>>COMPLETE NOW: {completed}") + the_rest = self.to_flush(self.transcript_buffer.complete()) + logger.debug(f"INCOMPLETE: {the_rest}") + + # there is a newly confirmed text + + if o and self.buffer_trimming_way == "sentence": # trim the completed sentences + if ( + len(self.audio_buffer) / self.SAMPLING_RATE > self.buffer_trimming_sec + ): # longer than this + self.chunk_completed_sentence() + + if self.buffer_trimming_way == "segment": + s = self.buffer_trimming_sec # trim the completed segments longer than s, + else: + s = 30 # if the audio buffer is longer than 30s, trim it + + if len(self.audio_buffer) / self.SAMPLING_RATE > s: + self.chunk_completed_segment(res) + + # alternative: on any word + # l = self.buffer_time_offset + len(self.audio_buffer)/self.SAMPLING_RATE - 10 + # let's find commited word that is less + # k = len(self.commited)-1 + # while k>0 and self.commited[k][1] > l: + # k -= 1 + # t = self.commited[k][1] + logger.debug("chunking segment") + # self.chunk_at(t) + + logger.debug( + f"len of buffer now: {len(self.audio_buffer) / self.SAMPLING_RATE:2.2f}" + ) + return self.to_flush(o) + + def chunk_completed_sentence(self): + if self.commited == []: + return + logger.debug(self.commited) + sents = self.words_to_sentences(self.commited) + for s in sents: + logger.debug(f"\t\tSENT: {s}") + if len(sents) < 2: + return + while len(sents) > 2: + sents.pop(0) + # we will continue with audio processing at this timestamp + chunk_at = sents[-2][1] + + logger.debug(f"--- sentence chunked at {chunk_at:2.2f}") + self.chunk_at(chunk_at) + + def chunk_completed_segment(self, res): + if self.commited == []: + return + + ends = self.asr.segments_end_ts(res) + + t = self.commited[-1][1] + + if len(ends) > 1: + e = ends[-2] + self.buffer_time_offset + while len(ends) > 2 and e > t: + ends.pop(-1) + e = ends[-2] + self.buffer_time_offset + if e <= t: + logger.debug(f"--- segment chunked at {e:2.2f}") + self.chunk_at(e) + else: + logger.debug("--- last segment not within commited area") + else: + logger.debug("--- not enough segments to chunk") + + def chunk_at(self, time): + """trims the hypothesis and audio buffer at "time" """ + self.transcript_buffer.pop_commited(time) + cut_seconds = time - self.buffer_time_offset + self.audio_buffer = self.audio_buffer[int(cut_seconds * self.SAMPLING_RATE) :] + self.buffer_time_offset = time + + def words_to_sentences(self, words): + """Uses self.tokenizer for sentence segmentation of words. + Returns: [(beg,end,"sentence 1"),...] + """ + + cwords = [w for w in words] + t = " ".join(o[2] for o in cwords) + s = self.tokenizer.split(t) + out = [] + while s: + beg = None + end = None + sent = s.pop(0).strip() + fsent = sent + while cwords: + b, e, w = cwords.pop(0) + w = w.strip() + if beg is None and sent.startswith(w): + beg = b + elif end is None and sent == w: + end = e + out.append((beg, end, fsent)) + break + sent = sent[len(w) :].strip() + return out + + def finish(self): + """Flush the incomplete text when the whole processing ends. + Returns: the same format as self.process_iter() + """ + o = self.transcript_buffer.complete() + f = self.to_flush(o) + logger.debug(f"last, noncommited: {f}") + self.buffer_time_offset += len(self.audio_buffer) / 16000 + return f + + def to_flush( + self, + sents, + sep=None, + offset=0, + ): + # concatenates the timestamped words or sentences into one sequence that is flushed in one line + # sents: [(beg1, end1, "sentence1"), ...] or [] if empty + # return: (beg1,end-of-last-sentence,"concatenation of sentences") or (None, None, "") if empty + if sep is None: + sep = self.asr.sep + t = sep.join(s[2] for s in sents) + if len(sents) == 0: + b = None + e = None + else: + b = offset + sents[0][0] + e = offset + sents[-1][1] + return (b, e, t) + + +class VACOnlineASRProcessor(OnlineASRProcessor): + """Wraps OnlineASRProcessor with VAC (Voice Activity Controller). + + It works the same way as OnlineASRProcessor: it receives chunks of audio (e.g. 0.04 seconds), + it runs VAD and continuously detects whether there is speech or not. + When it detects end of speech (non-voice for 500ms), it makes OnlineASRProcessor to end the utterance immediately. + """ + + def __init__(self, online_chunk_size, *a, **kw): + self.online_chunk_size = online_chunk_size + + self.online = OnlineASRProcessor(*a, **kw) + + # VAC: + import torch + + model, _ = torch.hub.load(repo_or_dir="snakers4/silero-vad", model="silero_vad") + from silero_vad_iterator import FixedVADIterator + + self.vac = FixedVADIterator( + model + ) # we use the default options there: 500ms silence, 100ms padding, etc. + + self.logfile = self.online.logfile + self.init() + + def init(self): + self.online.init() + self.vac.reset_states() + self.current_online_chunk_buffer_size = 0 + + self.is_currently_final = False + + self.status = None # or "voice" or "nonvoice" + self.audio_buffer = np.array([], dtype=np.float32) + self.buffer_offset = 0 # in frames + + def clear_buffer(self): + self.buffer_offset += len(self.audio_buffer) + self.audio_buffer = np.array([], dtype=np.float32) + + def insert_audio_chunk(self, audio): + res = self.vac(audio) + self.audio_buffer = np.append(self.audio_buffer, audio) + + if res is not None: + frame = list(res.values())[0] - self.buffer_offset + if "start" in res and "end" not in res: + self.status = "voice" + send_audio = self.audio_buffer[frame:] + self.online.init( + offset=(frame + self.buffer_offset) / self.SAMPLING_RATE + ) + self.online.insert_audio_chunk(send_audio) + self.current_online_chunk_buffer_size += len(send_audio) + self.clear_buffer() + elif "end" in res and "start" not in res: + self.status = "nonvoice" + send_audio = self.audio_buffer[:frame] + self.online.insert_audio_chunk(send_audio) + self.current_online_chunk_buffer_size += len(send_audio) + self.is_currently_final = True + self.clear_buffer() + else: + beg = res["start"] - self.buffer_offset + end = res["end"] - self.buffer_offset + self.status = "nonvoice" + send_audio = self.audio_buffer[beg:end] + self.online.init(offset=(beg + self.buffer_offset) / self.SAMPLING_RATE) + self.online.insert_audio_chunk(send_audio) + self.current_online_chunk_buffer_size += len(send_audio) + self.is_currently_final = True + self.clear_buffer() + else: + if self.status == "voice": + self.online.insert_audio_chunk(self.audio_buffer) + self.current_online_chunk_buffer_size += len(self.audio_buffer) + self.clear_buffer() + else: + # We keep 1 second because VAD may later find start of voice in it. + # But we trim it to prevent OOM. + self.buffer_offset += max( + 0, len(self.audio_buffer) - self.SAMPLING_RATE + ) + self.audio_buffer = self.audio_buffer[-self.SAMPLING_RATE :] + + def process_iter(self): + if self.is_currently_final: + return self.finish() + elif ( + self.current_online_chunk_buffer_size + > self.SAMPLING_RATE * self.online_chunk_size + ): + self.current_online_chunk_buffer_size = 0 + ret = self.online.process_iter() + return ret + else: + print("no online update, only VAD", self.status, file=self.logfile) + return (None, None, "") + + def finish(self): + ret = self.online.finish() + self.current_online_chunk_buffer_size = 0 + self.is_currently_final = False + return ret + + +WHISPER_LANG_CODES = "af,am,ar,as,az,ba,be,bg,bn,bo,br,bs,ca,cs,cy,da,de,el,en,es,et,eu,fa,fi,fo,fr,gl,gu,ha,haw,he,hi,hr,ht,hu,hy,id,is,it,ja,jw,ka,kk,km,kn,ko,la,lb,ln,lo,lt,lv,mg,mi,mk,ml,mn,mr,ms,mt,my,ne,nl,nn,no,oc,pa,pl,ps,pt,ro,ru,sa,sd,si,sk,sl,sn,so,sq,sr,su,sv,sw,ta,te,tg,th,tk,tl,tr,tt,uk,ur,uz,vi,yi,yo,zh".split( + "," +) + + +def create_tokenizer(lan): + """returns an object that has split function that works like the one of MosesTokenizer""" + + assert lan in WHISPER_LANG_CODES, ( + "language must be Whisper's supported lang code: " + + " ".join(WHISPER_LANG_CODES) + ) + + if lan == "uk": + import tokenize_uk + + class UkrainianTokenizer: + def split(self, text): + return tokenize_uk.tokenize_sents(text) + + return UkrainianTokenizer() + + # supported by fast-mosestokenizer + if ( + lan + in "as bn ca cs de el en es et fi fr ga gu hi hu is it kn lt lv ml mni mr nl or pa pl pt ro ru sk sl sv ta te yue zh".split() + ): + from mosestokenizer import MosesTokenizer + + return MosesTokenizer(lan) + + # the following languages are in Whisper, but not in wtpsplit: + if ( + lan + in "as ba bo br bs fo haw hr ht jw lb ln lo mi nn oc sa sd sn so su sw tk tl tt".split() + ): + logger.debug( + f"{lan} code is not supported by wtpsplit. Going to use None lang_code option." + ) + lan = None + + from wtpsplit import WtP + + # downloads the model from huggingface on the first use + wtp = WtP("wtp-canine-s-12l-no-adapters") + + class WtPtok: + def split(self, sent): + return wtp.split(sent, lang_code=lan) + + return WtPtok() + + +def add_shared_args(parser): + """shared args for simulation (this entry point) and server + parser: argparse.ArgumentParser object + """ + parser.add_argument( + "--min-chunk-size", + type=float, + default=1.0, + help="Minimum audio chunk size in seconds. It waits up to this time to do processing. If the processing takes shorter time, it waits, otherwise it processes the whole segment that was received by this time.", + ) + parser.add_argument( + "--model", + type=str, + default="large-v2", + choices="tiny.en,tiny,base.en,base,small.en,small,medium.en,medium,large-v1,large-v2,large-v3,large,large-v3-turbo".split( + "," + ), + help="Name size of the Whisper model to use (default: large-v2). The model is automatically downloaded from the model hub if not present in model cache dir.", + ) + parser.add_argument( + "--model_cache_dir", + type=str, + default=None, + help="Overriding the default model cache dir where models downloaded from the hub are saved", + ) + parser.add_argument( + "--model_dir", + type=str, + default=None, + help="Dir where Whisper model.bin and other files are saved. This option overrides --model and --model_cache_dir parameter.", + ) + parser.add_argument( + "--lan", + "--language", + type=str, + default="auto", + help="Source language code, e.g. en,de,cs, or 'auto' for language detection.", + ) + parser.add_argument( + "--task", + type=str, + default="transcribe", + choices=["transcribe", "translate"], + help="Transcribe or translate.", + ) + parser.add_argument( + "--backend", + type=str, + default="faster-whisper", + choices=["faster-whisper", "whisper_timestamped", "mlx-whisper", "openai-api"], + help="Load only this backend for Whisper processing.", + ) + parser.add_argument( + "--vac", + action="store_true", + default=False, + help="Use VAC = voice activity controller. Recommended. Requires torch.", + ) + parser.add_argument( + "--vac-chunk-size", type=float, default=0.04, help="VAC sample size in seconds." + ) + parser.add_argument( + "--vad", + action="store_true", + default=False, + help="Use VAD = voice activity detection, with the default parameters.", + ) + parser.add_argument( + "--buffer_trimming", + type=str, + default="segment", + choices=["sentence", "segment"], + help='Buffer trimming strategy -- trim completed sentences marked with punctuation mark and detected by sentence segmenter, or the completed segments returned by Whisper. Sentence segmenter must be installed for "sentence" option.', + ) + parser.add_argument( + "--buffer_trimming_sec", + type=float, + default=15, + help="Buffer trimming length threshold in seconds. If buffer length is longer, trimming sentence/segment is triggered.", + ) + parser.add_argument( + "-l", + "--log-level", + dest="log_level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the log level", + default="DEBUG", + ) + + +def asr_factory(args, logfile=sys.stderr): + """ + Creates and configures an ASR and ASR Online instance based on the specified backend and arguments. + """ + backend = args.backend + if backend == "openai-api": + logger.debug("Using OpenAI API.") + asr = OpenaiApiASR(lan=args.lan) + else: + if backend == "faster-whisper": + asr_cls = FasterWhisperASR + elif backend == "mlx-whisper": + asr_cls = MLXWhisper + else: + asr_cls = WhisperTimestampedASR + + # Only for FasterWhisperASR and WhisperTimestampedASR + size = args.model + t = time.time() + logger.info(f"Loading Whisper {size} model for {args.lan}...") + asr = asr_cls( + modelsize=size, + lan=args.lan, + cache_dir=args.model_cache_dir, + model_dir=args.model_dir, + ) + e = time.time() + logger.info(f"done. It took {round(e - t, 2)} seconds.") + + # Apply common configurations + if getattr(args, "vad", False): # Checks if VAD argument is present and True + logger.info("Setting VAD filter") + asr.use_vad() + + language = args.lan + if args.task == "translate": + asr.set_translate_task() + tgt_language = "en" # Whisper translates into English + else: + tgt_language = language # Whisper transcribes in this language + + # Create the tokenizer + if args.buffer_trimming == "sentence": + tokenizer = create_tokenizer(tgt_language) + else: + tokenizer = None + + # Create the OnlineASRProcessor + if args.vac: + online = VACOnlineASRProcessor( + args.min_chunk_size, + asr, + tokenizer, + logfile=logfile, + buffer_trimming=(args.buffer_trimming, args.buffer_trimming_sec), + ) + else: + online = OnlineASRProcessor( + asr, + tokenizer, + logfile=logfile, + buffer_trimming=(args.buffer_trimming, args.buffer_trimming_sec), + ) + + return asr, online + + +def set_logging(args, logger, other="_server"): + logging.basicConfig( # format='%(name)s + format="%(levelname)s\t%(message)s" + ) + logger.setLevel(args.log_level) + logging.getLogger("whisper_online" + other).setLevel(args.log_level) + + +# logging.getLogger("whisper_online_server").setLevel(args.log_level) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "audio_path", + type=str, + help="Filename of 16kHz mono channel wav, on which live streaming is simulated.", + ) + add_shared_args(parser) + parser.add_argument( + "--start_at", + type=float, + default=0.0, + help="Start processing audio at this time.", + ) + parser.add_argument( + "--offline", action="store_true", default=False, help="Offline mode." + ) + parser.add_argument( + "--comp_unaware", + action="store_true", + default=False, + help="Computationally unaware simulation.", + ) + + args = parser.parse_args() + + # reset to store stderr to different file stream, e.g. open(os.devnull,"w") + logfile = sys.stderr + + if args.offline and args.comp_unaware: + logger.error( + "No or one option from --offline and --comp_unaware are available, not both. Exiting." + ) + sys.exit(1) + + # if args.log_level: + # logging.basicConfig(format='whisper-%(levelname)s:%(name)s: %(message)s', + # level=getattr(logging, args.log_level)) + + set_logging(args, logger) + + audio_path = args.audio_path + + SAMPLING_RATE = 16000 + duration = len(load_audio(audio_path)) / SAMPLING_RATE + logger.info("Audio duration is: %2.2f seconds" % duration) + + asr, online = asr_factory(args, logfile=logfile) + if args.vac: + min_chunk = args.vac_chunk_size + else: + min_chunk = args.min_chunk_size + + # load the audio into the LRU cache before we start the timer + a = load_audio_chunk(audio_path, 0, 1) + + # warm up the ASR because the very first transcribe takes much more time than the other + asr.transcribe(a) + + beg = args.start_at + start = time.time() - beg + + def output_transcript(o, now=None): + # output format in stdout is like: + # 4186.3606 0 1720 Takhle to je + # - the first three words are: + # - emission time from beginning of processing, in milliseconds + # - beg and end timestamp of the text segment, as estimated by Whisper model. The timestamps are not accurate, but they're useful anyway + # - the next words: segment transcript + if now is None: + now = time.time() - start + if o[0] is not None: + print( + "%1.4f %1.0f %1.0f %s" % (now * 1000, o[0] * 1000, o[1] * 1000, o[2]), + file=logfile, + flush=True, + ) + print( + "%1.4f %1.0f %1.0f %s" % (now * 1000, o[0] * 1000, o[1] * 1000, o[2]), + flush=True, + ) + else: + # No text, so no output + pass + + if args.offline: ## offline mode processing (for testing/debugging) + a = load_audio(audio_path) + online.insert_audio_chunk(a) + try: + o = online.process_iter() + except AssertionError as e: + logger.error(f"assertion error: {repr(e)}") + else: + output_transcript(o) + now = None + elif args.comp_unaware: # computational unaware mode + end = beg + min_chunk + while True: + a = load_audio_chunk(audio_path, beg, end) + online.insert_audio_chunk(a) + try: + o = online.process_iter() + except AssertionError as e: + logger.error(f"assertion error: {repr(e)}") + pass + else: + output_transcript(o, now=end) + + logger.debug(f"## last processed {end:.2f}s") + + if end >= duration: + break + + beg = end + + if end + min_chunk > duration: + end = duration + else: + end += min_chunk + now = duration + + else: # online = simultaneous mode + end = 0 + while True: + now = time.time() - start + if now < end + min_chunk: + time.sleep(min_chunk + end - now) + end = time.time() - start + a = load_audio_chunk(audio_path, beg, end) + beg = end + online.insert_audio_chunk(a) + + try: + o = online.process_iter() + except AssertionError as e: + logger.error(f"assertion error: {e}") + pass + else: + output_transcript(o) + now = time.time() - start + logger.debug( + f"## last processed {end:.2f} s, now is {now:.2f}, the latency is {now - end:.2f}" + ) + + if end >= duration: + break + now = None + + o = online.finish() + output_transcript(o, now=now) diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 0c118879c..bc1887e2c 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -291,3 +291,8 @@ class EmbeddingsContext: def reindex_embeddings(self) -> dict[str, Any]: return self.requestor.send_data(EmbeddingsRequestEnum.reindex.value, {}) + + def transcribe_audio(self, event: dict[str, any]) -> dict[str, any]: + return self.requestor.send_data( + EmbeddingsRequestEnum.transcribe_audio.value, {"event": event} + ) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 436d0e7df..25601f014 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -37,6 +37,9 @@ from frigate.data_processing.common.license_plate.model import ( LicensePlateModelRunner, ) from frigate.data_processing.post.api import PostProcessorApi +from frigate.data_processing.post.audio_transcription import ( + AudioTranscriptionPostProcessor, +) from frigate.data_processing.post.license_plate import ( LicensePlatePostProcessor, ) @@ -176,6 +179,14 @@ class EmbeddingMaintainer(threading.Thread): ) ) + if any( + c.enabled_in_config and c.audio_transcription.enabled + for c in self.config.cameras.values() + ): + self.post_processors.append( + AudioTranscriptionPostProcessor(self.config, self.requestor, metrics) + ) + self.stop_event = stop_event self.tracked_events: dict[str, list[Any]] = {} self.early_request_sent: dict[str, bool] = {} @@ -372,6 +383,8 @@ class EmbeddingMaintainer(threading.Thread): }, PostProcessDataEnum.recording, ) + elif isinstance(processor, AudioTranscriptionPostProcessor): + continue else: processor.process_data(event_id, PostProcessDataEnum.event_id) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 8a929c8ff..dc6ee7128 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -18,7 +18,7 @@ from frigate.comms.event_metadata_updater import ( EventMetadataTypeEnum, ) from frigate.comms.inter_process import InterProcessRequestor -from frigate.config import CameraConfig, CameraInput, FfmpegConfig +from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, @@ -30,6 +30,9 @@ from frigate.const import ( AUDIO_MIN_CONFIDENCE, AUDIO_SAMPLE_RATE, ) +from frigate.data_processing.real_time.audio_transcription import ( + AudioTranscriptionRealTimeProcessor, +) from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe from frigate.object_detection.base import load_labels @@ -75,6 +78,7 @@ class AudioProcessor(util.Process): def __init__( self, + config: FrigateConfig, cameras: list[CameraConfig], camera_metrics: dict[str, CameraMetrics], ): @@ -82,6 +86,7 @@ class AudioProcessor(util.Process): self.camera_metrics = camera_metrics self.cameras = cameras + self.config = config def run(self) -> None: audio_threads: list[AudioEventMaintainer] = [] @@ -94,6 +99,7 @@ class AudioProcessor(util.Process): for camera in self.cameras: audio_thread = AudioEventMaintainer( camera, + self.config, self.camera_metrics, self.stop_event, ) @@ -122,46 +128,71 @@ class AudioEventMaintainer(threading.Thread): def __init__( self, camera: CameraConfig, + config: FrigateConfig, camera_metrics: dict[str, CameraMetrics], stop_event: threading.Event, ) -> None: super().__init__(name=f"{camera.name}_audio_event_processor") - self.config = camera + self.config = config + self.camera_config = camera self.camera_metrics = camera_metrics self.detections: dict[dict[str, Any]] = {} self.stop_event = stop_event - self.detector = AudioTfl(stop_event, self.config.audio.num_threads) + self.detector = AudioTfl(stop_event, self.camera_config.audio.num_threads) self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),) self.chunk_size = int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE * 2)) - self.logger = logging.getLogger(f"audio.{self.config.name}") - self.ffmpeg_cmd = get_ffmpeg_command(self.config.ffmpeg) - self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio") + self.logger = logging.getLogger(f"audio.{self.camera_config.name}") + self.ffmpeg_cmd = get_ffmpeg_command(self.camera_config.ffmpeg) + self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio") self.audio_listener = None + self.transcription_processor = None + self.transcription_thread = None # create communication for audio detections self.requestor = InterProcessRequestor() self.config_subscriber = CameraConfigUpdateSubscriber( - {self.config.name: self.config}, - [CameraConfigUpdateEnum.audio, CameraConfigUpdateEnum.enabled], + {self.camera_config.name: self.camera_config}, + [ + CameraConfigUpdateEnum.audio, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.audio_transcription, + ], ) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) self.event_metadata_publisher = EventMetadataPublisher() + if self.camera_config.audio_transcription.enabled_in_config: + # init the transcription processor for this camera + self.transcription_processor = AudioTranscriptionRealTimeProcessor( + config=self.config, + camera_config=self.camera_config, + requestor=self.requestor, + metrics=self.camera_metrics[self.camera_config.name], + stop_event=self.stop_event, + ) + + self.transcription_thread = threading.Thread( + target=self.transcription_processor.run, + name=f"{self.camera_config.name}_transcription_processor", + daemon=True, + ) + self.transcription_thread.start() + self.was_enabled = camera.enabled def detect_audio(self, audio) -> None: - if not self.config.audio.enabled or self.stop_event.is_set(): + if not self.camera_config.audio.enabled or self.stop_event.is_set(): return audio_as_float = audio.astype(np.float32) rms, dBFS = self.calculate_audio_levels(audio_as_float) - self.camera_metrics[self.config.name].audio_rms.value = rms - self.camera_metrics[self.config.name].audio_dBFS.value = dBFS + self.camera_metrics[self.camera_config.name].audio_rms.value = rms + self.camera_metrics[self.camera_config.name].audio_dBFS.value = dBFS # only run audio detection when volume is above min_volume - if rms >= self.config.audio.min_volume: + if rms >= self.camera_config.audio.min_volume: # create waveform relative to max range and look for detections waveform = (audio / AUDIO_MAX_BIT_RANGE).astype(np.float32) model_detections = self.detector.detect(waveform) @@ -169,28 +200,42 @@ class AudioEventMaintainer(threading.Thread): for label, score, _ in model_detections: self.logger.debug( - f"{self.config.name} heard {label} with a score of {score}" + f"{self.camera_config.name} heard {label} with a score of {score}" ) - if label not in self.config.audio.listen: + if label not in self.camera_config.audio.listen: continue - if score > dict((self.config.audio.filters or {}).get(label, {})).get( - "threshold", 0.8 - ): + if score > dict( + (self.camera_config.audio.filters or {}).get(label, {}) + ).get("threshold", 0.8): self.handle_detection(label, score) audio_detections.append(label) # send audio detection data self.detection_publisher.publish( ( - self.config.name, + self.camera_config.name, datetime.datetime.now().timestamp(), dBFS, audio_detections, ) ) + # run audio transcription + if self.transcription_processor is not None and ( + self.camera_config.audio_transcription.live_enabled + ): + self.transcribing = True + # process audio until we've reached the endpoint + self.transcription_processor.process_audio( + { + "id": f"{self.camera_config.name}_audio", + "camera": self.camera_config.name, + }, + audio, + ) + self.expire_detections() def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]: @@ -204,8 +249,8 @@ class AudioEventMaintainer(threading.Thread): else: dBFS = 0 - self.requestor.send_data(f"{self.config.name}/audio/dBFS", float(dBFS)) - self.requestor.send_data(f"{self.config.name}/audio/rms", float(rms)) + self.requestor.send_data(f"{self.camera_config.name}/audio/dBFS", float(dBFS)) + self.requestor.send_data(f"{self.camera_config.name}/audio/rms", float(rms)) return float(rms), float(dBFS) @@ -220,13 +265,13 @@ class AudioEventMaintainer(threading.Thread): random.choices(string.ascii_lowercase + string.digits, k=6) ) event_id = f"{now}-{rand_id}" - self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON") + self.requestor.send_data(f"{self.camera_config.name}/audio/{label}", "ON") self.event_metadata_publisher.publish( EventMetadataTypeEnum.manual_event_create, ( now, - self.config.name, + self.camera_config.name, label, event_id, True, @@ -252,10 +297,10 @@ class AudioEventMaintainer(threading.Thread): if ( now - detection.get("last_detection", now) - > self.config.audio.max_not_heard + > self.camera_config.audio.max_not_heard ): self.requestor.send_data( - f"{self.config.name}/audio/{detection['label']}", "OFF" + f"{self.camera_config.name}/audio/{detection['label']}", "OFF" ) self.event_metadata_publisher.publish( @@ -264,12 +309,21 @@ class AudioEventMaintainer(threading.Thread): ) self.detections[detection["label"]] = None + # clear real-time transcription + if self.transcription_processor is not None: + self.transcription_processor.reset(self.camera_config.name) + self.requestor.send_data( + f"{self.camera_config.name}/audio/transcription", "" + ) + def expire_all_detections(self) -> None: """Immediately end all current detections""" now = datetime.datetime.now().timestamp() for label, detection in list(self.detections.items()): if detection: - self.requestor.send_data(f"{self.config.name}/audio/{label}", "OFF") + self.requestor.send_data( + f"{self.camera_config.name}/audio/{label}", "OFF" + ) self.event_metadata_publisher.publish( EventMetadataTypeEnum.manual_event_end, (detection["id"], now), @@ -290,7 +344,7 @@ class AudioEventMaintainer(threading.Thread): if self.stop_event.is_set(): return - time.sleep(self.config.ffmpeg.retry_interval) + time.sleep(self.camera_config.ffmpeg.retry_interval) self.logpipe.dump() self.start_or_restart_ffmpeg() @@ -312,20 +366,20 @@ class AudioEventMaintainer(threading.Thread): log_and_restart() def run(self) -> None: - if self.config.enabled: + if self.camera_config.enabled: self.start_or_restart_ffmpeg() while not self.stop_event.is_set(): - enabled = self.config.enabled + enabled = self.camera_config.enabled if enabled != self.was_enabled: if enabled: self.logger.debug( - f"Enabling audio detections for {self.config.name}" + f"Enabling audio detections for {self.camera_config.name}" ) self.start_or_restart_ffmpeg() else: self.logger.debug( - f"Disabling audio detections for {self.config.name}, ending events" + f"Disabling audio detections for {self.camera_config.name}, ending events" ) self.expire_all_detections() stop_ffmpeg(self.audio_listener, self.logger) @@ -344,6 +398,12 @@ class AudioEventMaintainer(threading.Thread): if self.audio_listener: stop_ffmpeg(self.audio_listener, self.logger) + if self.transcription_thread: + self.transcription_thread.join(timeout=2) + if self.transcription_thread.is_alive(): + self.logger.warning( + f"Audio transcription thread {self.transcription_thread.name} is still alive" + ) self.logpipe.close() self.requestor.stop() self.config_subscriber.stop() diff --git a/frigate/util/audio.py b/frigate/util/audio.py new file mode 100644 index 000000000..eede9c0ea --- /dev/null +++ b/frigate/util/audio.py @@ -0,0 +1,116 @@ +"""Utilities for creating and manipulating audio.""" + +import logging +import os +import subprocess as sp +from typing import Optional + +from pathvalidate import sanitize_filename + +from frigate.const import CACHE_DIR +from frigate.models import Recordings + +logger = logging.getLogger(__name__) + + +def get_audio_from_recording( + ffmpeg, + camera_name: str, + start_ts: float, + end_ts: float, + sample_rate: int = 16000, +) -> Optional[bytes]: + """Extract audio from recording files between start_ts and end_ts in WAV format suitable for sherpa-onnx. + + Args: + ffmpeg: FFmpeg configuration object + camera_name: Name of the camera + start_ts: Start timestamp in seconds + end_ts: End timestamp in seconds + sample_rate: Sample rate for output audio (default 16kHz for sherpa-onnx) + + Returns: + Bytes of WAV audio data or None if extraction failed + """ + # Fetch all relevant recording segments + recordings = ( + Recordings.select( + Recordings.path, + Recordings.start_time, + Recordings.end_time, + ) + .where( + (Recordings.start_time.between(start_ts, end_ts)) + | (Recordings.end_time.between(start_ts, end_ts)) + | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) + ) + .where(Recordings.camera == camera_name) + .order_by(Recordings.start_time.asc()) + ) + + if not recordings: + logger.debug( + f"No recordings found for {camera_name} between {start_ts} and {end_ts}" + ) + return None + + # Generate concat playlist file + file_name = sanitize_filename( + f"audio_playlist_{camera_name}_{start_ts}-{end_ts}.txt" + ) + file_path = os.path.join(CACHE_DIR, file_name) + try: + with open(file_path, "w") as file: + for clip in recordings: + file.write(f"file '{clip.path}'\n") + if clip.start_time < start_ts: + file.write(f"inpoint {int(start_ts - clip.start_time)}\n") + if clip.end_time > end_ts: + file.write(f"outpoint {int(end_ts - clip.start_time)}\n") + + ffmpeg_cmd = [ + ffmpeg.ffmpeg_path, + "-hide_banner", + "-loglevel", + "warning", + "-protocol_whitelist", + "pipe,file", + "-f", + "concat", + "-safe", + "0", + "-i", + file_path, + "-vn", # No video + "-acodec", + "pcm_s16le", # 16-bit PCM encoding + "-ar", + str(sample_rate), + "-ac", + "1", # Mono audio + "-f", + "wav", + "-", + ] + + process = sp.run( + ffmpeg_cmd, + capture_output=True, + ) + + if process.returncode == 0: + logger.debug( + f"Successfully extracted audio for {camera_name} from {start_ts} to {end_ts}" + ) + return process.stdout + else: + logger.error(f"Failed to extract audio: {process.stderr.decode()}") + return None + except Exception as e: + logger.error(f"Error extracting audio from recordings: {e}") + return None + finally: + try: + os.unlink(file_path) + except OSError: + pass diff --git a/web/public/locales/en/views/explore.json b/web/public/locales/en/views/explore.json index 7e2381445..8a61dcf58 100644 --- a/web/public/locales/en/views/explore.json +++ b/web/public/locales/en/views/explore.json @@ -103,12 +103,14 @@ "success": { "regenerate": "A new description has been requested from {{provider}}. Depending on the speed of your provider, the new description may take some time to regenerate.", "updatedSublabel": "Successfully updated sub label.", - "updatedLPR": "Successfully updated license plate." + "updatedLPR": "Successfully updated license plate.", + "audioTranscription": "Successfully requested audio transcription." }, "error": { "regenerate": "Failed to call {{provider}} for a new description: {{errorMessage}}", "updatedSublabelFailed": "Failed to update sub label: {{errorMessage}}", - "updatedLPRFailed": "Failed to update license plate: {{errorMessage}}" + "updatedLPRFailed": "Failed to update license plate: {{errorMessage}}", + "audioTranscription": "Failed to request audio transcription: {{errorMessage}}" } } }, @@ -173,6 +175,10 @@ "label": "Find similar", "aria": "Find similar tracked objects" }, + "audioTranscription": { + "label": "Transcribe", + "aria": "Request audio transcription" + }, "submitToPlus": { "label": "Submit to Frigate+", "aria": "Submit to Frigate Plus" diff --git a/web/public/locales/en/views/live.json b/web/public/locales/en/views/live.json index 1790467d2..fea120601 100644 --- a/web/public/locales/en/views/live.json +++ b/web/public/locales/en/views/live.json @@ -69,6 +69,10 @@ "enable": "Enable Audio Detect", "disable": "Disable Audio Detect" }, + "transcription": { + "enable": "Enable Live Audio Transcription", + "disable": "Disable Live Audio Transcription" + }, "autotracking": { "enable": "Enable Autotracking", "disable": "Disable Autotracking" @@ -135,6 +139,7 @@ "recording": "Recording", "snapshots": "Snapshots", "audioDetection": "Audio Detection", + "transcription": "Audio Transcription", "autotracking": "Autotracking" }, "history": { diff --git a/web/src/api/ws.tsx b/web/src/api/ws.tsx index 3e9c8c14f..79bf9e79d 100644 --- a/web/src/api/ws.tsx +++ b/web/src/api/ws.tsx @@ -8,6 +8,7 @@ import { FrigateReview, ModelState, ToggleableSetting, + TrackedObjectUpdateReturnType, } from "@/types/ws"; import { FrigateStats } from "@/types/stats"; import { createContainer } from "react-tracked"; @@ -60,6 +61,7 @@ function useValue(): useValueReturn { enabled, snapshots, audio, + audio_transcription, notifications, notifications_suspended, autotracking, @@ -71,6 +73,9 @@ function useValue(): useValueReturn { cameraStates[`${name}/detect/state`] = detect ? "ON" : "OFF"; cameraStates[`${name}/snapshots/state`] = snapshots ? "ON" : "OFF"; cameraStates[`${name}/audio/state`] = audio ? "ON" : "OFF"; + cameraStates[`${name}/audio_transcription/state`] = audio_transcription + ? "ON" + : "OFF"; cameraStates[`${name}/notifications/state`] = notifications ? "ON" : "OFF"; @@ -220,6 +225,20 @@ export function useAudioState(camera: string): { return { payload: payload as ToggleableSetting, send }; } +export function useAudioTranscriptionState(camera: string): { + payload: ToggleableSetting; + send: (payload: ToggleableSetting, retain?: boolean) => void; +} { + const { + value: { payload }, + send, + } = useWs( + `${camera}/audio_transcription/state`, + `${camera}/audio_transcription/set`, + ); + return { payload: payload as ToggleableSetting, send }; +} + export function useAutotrackingState(camera: string): { payload: ToggleableSetting; send: (payload: ToggleableSetting, retain?: boolean) => void; @@ -421,6 +440,15 @@ export function useAudioActivity(camera: string): { payload: number } { return { payload: payload as number }; } +export function useAudioLiveTranscription(camera: string): { + payload: string; +} { + const { + value: { payload }, + } = useWs(`${camera}/audio/transcription`, ""); + return { payload: payload as string }; +} + export function useMotionThreshold(camera: string): { payload: string; send: (payload: number, retain?: boolean) => void; @@ -463,11 +491,16 @@ export function useImproveContrast(camera: string): { return { payload: payload as ToggleableSetting, send }; } -export function useTrackedObjectUpdate(): { payload: string } { +export function useTrackedObjectUpdate(): { + payload: TrackedObjectUpdateReturnType; +} { const { value: { payload }, } = useWs("tracked_object_update", ""); - return useDeepMemo(JSON.parse(payload as string)); + const parsed = payload + ? JSON.parse(payload as string) + : { type: "", id: "", camera: "" }; + return { payload: useDeepMemo(parsed) }; } export function useNotifications(camera: string): { diff --git a/web/src/components/overlay/detail/SearchDetailDialog.tsx b/web/src/components/overlay/detail/SearchDetailDialog.tsx index 28ef5c45b..02957a8de 100644 --- a/web/src/components/overlay/detail/SearchDetailDialog.tsx +++ b/web/src/components/overlay/detail/SearchDetailDialog.tsx @@ -77,6 +77,7 @@ import { Trans, useTranslation } from "react-i18next"; import { TbFaceId } from "react-icons/tb"; import { useIsAdmin } from "@/hooks/use-is-admin"; import FaceSelectionDialog from "../FaceSelectionDialog"; +import { CgTranscript } from "react-icons/cg"; const SEARCH_TABS = [ "details", @@ -709,6 +710,34 @@ function ObjectDetailsTab({ [search, t], ); + // speech transcription + + const onTranscribe = useCallback(() => { + axios + .put(`/audio/transcribe`, { event_id: search.id }) + .then((resp) => { + if (resp.status == 202) { + toast.success(t("details.item.toast.success.audioTranscription"), { + position: "top-center", + }); + } + }) + .catch((error) => { + const errorMessage = + error.response?.data?.message || + error.response?.data?.detail || + "Unknown error"; + toast.error( + t("details.item.toast.error.audioTranscription", { + errorMessage, + }), + { + position: "top-center", + }, + ); + }); + }, [search, t]); + return (
@@ -894,6 +923,16 @@ function ObjectDetailsTab({ )} + {config?.cameras[search?.camera].audio_transcription.enabled && + search?.label == "speech" && + search?.end_time && ( + + )}
diff --git a/web/src/pages/Explore.tsx b/web/src/pages/Explore.tsx index 7e412201e..a911819b2 100644 --- a/web/src/pages/Explore.tsx +++ b/web/src/pages/Explore.tsx @@ -246,15 +246,13 @@ export default function Explore() { // mutation and revalidation - const trackedObjectUpdate = useTrackedObjectUpdate(); + const { payload: wsUpdate } = useTrackedObjectUpdate(); useEffect(() => { - if (trackedObjectUpdate) { + if (wsUpdate && wsUpdate.type == "description") { mutate(); } - // mutate / revalidate when event description updates come in - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [trackedObjectUpdate]); + }, [wsUpdate, mutate]); // embeddings reindex progress diff --git a/web/src/types/frigateConfig.ts b/web/src/types/frigateConfig.ts index 355d4cb72..cf2bf1476 100644 --- a/web/src/types/frigateConfig.ts +++ b/web/src/types/frigateConfig.ts @@ -41,6 +41,11 @@ export interface CameraConfig { min_volume: number; num_threads: number; }; + audio_transcription: { + enabled: boolean; + enabled_in_config: boolean; + live_enabled: boolean; + }; best_image_timeout: number; birdseye: { enabled: boolean; @@ -296,6 +301,10 @@ export interface FrigateConfig { num_threads: number; }; + audio_transcription: { + enabled: boolean; + }; + birdseye: BirdseyeConfig; cameras: { diff --git a/web/src/types/ws.ts b/web/src/types/ws.ts index 3badd961d..d1e810494 100644 --- a/web/src/types/ws.ts +++ b/web/src/types/ws.ts @@ -58,6 +58,7 @@ export interface FrigateCameraState { snapshots: boolean; record: boolean; audio: boolean; + audio_transcription: boolean; notifications: boolean; notifications_suspended: number; autotracking: boolean; @@ -84,3 +85,21 @@ export type EmbeddingsReindexProgressType = { }; export type ToggleableSetting = "ON" | "OFF"; + +export type TrackedObjectUpdateType = + | "description" + | "lpr" + | "transcription" + | "face"; + +export type TrackedObjectUpdateReturnType = { + type: TrackedObjectUpdateType; + id: string; + camera: string; + description?: string; + name?: string; + plate?: string; + score?: number; + timestamp?: number; + text?: string; +} | null; diff --git a/web/src/views/explore/ExploreView.tsx b/web/src/views/explore/ExploreView.tsx index afe5001af..f5cdb220d 100644 --- a/web/src/views/explore/ExploreView.tsx +++ b/web/src/views/explore/ExploreView.tsx @@ -74,13 +74,13 @@ export default function ExploreView({ }, {}); }, [events]); - const trackedObjectUpdate = useTrackedObjectUpdate(); + const { payload: wsUpdate } = useTrackedObjectUpdate(); useEffect(() => { - mutate(); - // mutate / revalidate when event description updates come in - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [trackedObjectUpdate]); + if (wsUpdate && wsUpdate.type == "description") { + mutate(); + } + }, [wsUpdate, mutate]); // update search detail when results change diff --git a/web/src/views/live/LiveCameraView.tsx b/web/src/views/live/LiveCameraView.tsx index b57f0c8a7..039265f65 100644 --- a/web/src/views/live/LiveCameraView.tsx +++ b/web/src/views/live/LiveCameraView.tsx @@ -1,5 +1,7 @@ import { + useAudioLiveTranscription, useAudioState, + useAudioTranscriptionState, useAutotrackingState, useDetectState, useEnabledState, @@ -90,6 +92,8 @@ import { LuX, } from "react-icons/lu"; import { + MdClosedCaption, + MdClosedCaptionDisabled, MdNoPhotography, MdOutlineRestartAlt, MdPersonOff, @@ -196,6 +200,29 @@ export default function LiveCameraView({ const { payload: enabledState } = useEnabledState(camera.name); const cameraEnabled = enabledState === "ON"; + // for audio transcriptions + + const { payload: audioTranscriptionState, send: sendTranscription } = + useAudioTranscriptionState(camera.name); + const { payload: transcription } = useAudioLiveTranscription(camera.name); + const transcriptionRef = useRef(null); + + useEffect(() => { + if (transcription) { + if (transcriptionRef.current) { + transcriptionRef.current.scrollTop = + transcriptionRef.current.scrollHeight; + } + } + }, [transcription]); + + useEffect(() => { + return () => { + // disable transcriptions when unmounting + if (audioTranscriptionState == "ON") sendTranscription("OFF"); + }; + }, [audioTranscriptionState, sendTranscription]); + // click overlay for ptzs const [clickOverlay, setClickOverlay] = useState(false); @@ -566,6 +593,9 @@ export default function LiveCameraView({ autotrackingEnabled={ camera.onvif.autotracking.enabled_in_config } + transcriptionEnabled={ + camera.audio_transcription.enabled_in_config + } fullscreen={fullscreen} streamName={streamName ?? ""} setStreamName={setStreamName} @@ -625,6 +655,16 @@ export default function LiveCameraView({ /> + {camera?.audio?.enabled_in_config && + audioTranscriptionState == "ON" && + transcription != null && ( +
+ {transcription} +
+ )} {camera.onvif.host != "" && ( @@ -983,6 +1023,7 @@ type FrigateCameraFeaturesProps = { recordingEnabled: boolean; audioDetectEnabled: boolean; autotrackingEnabled: boolean; + transcriptionEnabled: boolean; fullscreen: boolean; streamName: string; setStreamName?: (value: string | undefined) => void; @@ -1002,6 +1043,7 @@ function FrigateCameraFeatures({ recordingEnabled, audioDetectEnabled, autotrackingEnabled, + transcriptionEnabled, fullscreen, streamName, setStreamName, @@ -1033,6 +1075,8 @@ function FrigateCameraFeatures({ const { payload: audioState, send: sendAudio } = useAudioState(camera.name); const { payload: autotrackingState, send: sendAutotracking } = useAutotrackingState(camera.name); + const { payload: transcriptionState, send: sendTranscription } = + useAudioTranscriptionState(camera.name); // roles @@ -1196,6 +1240,27 @@ function FrigateCameraFeatures({ disabled={!cameraEnabled} /> )} + {audioDetectEnabled && transcriptionEnabled && ( + + sendTranscription(transcriptionState == "ON" ? "OFF" : "ON") + } + disabled={!cameraEnabled || audioState == "OFF"} + /> + )} {autotrackingEnabled && ( )} + {audioDetectEnabled && transcriptionEnabled && ( + + sendTranscription(transcriptionState == "ON" ? "OFF" : "ON") + } + /> + )} {autotrackingEnabled && (