From add68b88608d0917806c7c25cc162b96cf892576 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 25 Jun 2025 07:24:45 -0600 Subject: [PATCH] Improve logging (#18867) * Ignore numpy get limits warning * Add function wrapper to redirect stdout and stderr to logpipe * Save stderr too * Add more to catch * run logpipe * Use other logging redirect class * Use other logging redirect class * add decorator for redirecting c/c++ level output to logger * fix typing --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> --- docker/main/Dockerfile | 3 + frigate/data_processing/common/face/model.py | 2 + frigate/detectors/plugins/cpu_tfl.py | 2 + frigate/embeddings/onnx/face_embedding.py | 2 + frigate/events/audio.py | 6 +- frigate/log.py | 190 ++++++++++++++++++- frigate/util/classification.py | 19 +- 7 files changed, 206 insertions(+), 18 deletions(-) diff --git a/docker/main/Dockerfile b/docker/main/Dockerfile index 959ff59d4..7850e6c18 100644 --- a/docker/main/Dockerfile +++ b/docker/main/Dockerfile @@ -224,6 +224,9 @@ ENV TRANSFORMERS_NO_ADVISORY_WARNINGS=1 # Set OpenCV ffmpeg loglevel to fatal: https://ffmpeg.org/doxygen/trunk/log_8h.html ENV OPENCV_FFMPEG_LOGLEVEL=8 +# Set NumPy to ignore getlimits warning +ENV PYTHONWARNINGS="ignore:::numpy.core.getlimits" + # Set HailoRT to disable logging ENV HAILORT_LOGGER_PATH=NONE diff --git a/frigate/data_processing/common/face/model.py b/frigate/data_processing/common/face/model.py index 0aeb76792..9c66ad3a0 100644 --- a/frigate/data_processing/common/face/model.py +++ b/frigate/data_processing/common/face/model.py @@ -11,6 +11,7 @@ from scipy import stats from frigate.config import FrigateConfig from frigate.const import MODEL_CACHE_DIR from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding, FaceNetEmbedding +from frigate.log import redirect_output_to_logger logger = logging.getLogger(__name__) @@ -37,6 +38,7 @@ class FaceRecognizer(ABC): def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: pass + @redirect_output_to_logger(logger, logging.DEBUG) def init_landmark_detector(self) -> None: landmark_model = os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml") diff --git a/frigate/detectors/plugins/cpu_tfl.py b/frigate/detectors/plugins/cpu_tfl.py index fc8db0f4b..37cc10777 100644 --- a/frigate/detectors/plugins/cpu_tfl.py +++ b/frigate/detectors/plugins/cpu_tfl.py @@ -5,6 +5,7 @@ from typing_extensions import Literal from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detector_config import BaseDetectorConfig +from frigate.log import redirect_output_to_logger from ..detector_utils import tflite_detect_raw, tflite_init @@ -27,6 +28,7 @@ class CpuDetectorConfig(BaseDetectorConfig): class CpuTfl(DetectionApi): type_key = DETECTOR_KEY + @redirect_output_to_logger(logger, logging.DEBUG) def __init__(self, detector_config: CpuDetectorConfig): interpreter = Interpreter( model_path=detector_config.model.path, diff --git a/frigate/embeddings/onnx/face_embedding.py b/frigate/embeddings/onnx/face_embedding.py index c0f35a581..acb4507a2 100644 --- a/frigate/embeddings/onnx/face_embedding.py +++ b/frigate/embeddings/onnx/face_embedding.py @@ -6,6 +6,7 @@ import os import numpy as np from frigate.const import MODEL_CACHE_DIR +from frigate.log import redirect_output_to_logger from frigate.util.downloader import ModelDownloader from .base_embedding import BaseEmbedding @@ -53,6 +54,7 @@ class FaceNetEmbedding(BaseEmbedding): self._load_model_and_utils() logger.debug(f"models are already downloaded for {self.model_name}") + @redirect_output_to_logger(logger, logging.DEBUG) def _load_model_and_utils(self): if self.runner is None: if self.downloader: diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 03c750a06..f99e6fe41 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -37,7 +37,7 @@ from frigate.data_processing.real_time.audio_transcription import ( AudioTranscriptionRealTimeProcessor, ) from frigate.ffmpeg_presets import parse_preset_input -from frigate.log import LogPipe +from frigate.log import LogPipe, redirect_output_to_logger from frigate.object_detection.base import load_labels from frigate.util.builtin import get_ffmpeg_arg_list from frigate.util.process import FrigateProcess @@ -49,6 +49,9 @@ except ModuleNotFoundError: from tensorflow.lite.python.interpreter import Interpreter +logger = logging.getLogger(__name__) + + def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0] input_args = get_ffmpeg_arg_list(ffmpeg.global_args) + ( @@ -423,6 +426,7 @@ class AudioEventMaintainer(threading.Thread): class AudioTfl: + @redirect_output_to_logger(logger, logging.DEBUG) def __init__(self, stop_event: threading.Event, num_threads=2): self.stop_event = stop_event self.num_threads = num_threads diff --git a/frigate/log.py b/frigate/log.py index 2e9c781f1..11f2da254 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -1,15 +1,18 @@ # In log.py import atexit +import io import logging import os import sys import threading from collections import deque +from contextlib import contextmanager from enum import Enum +from functools import wraps from logging.handlers import QueueHandler, QueueListener from multiprocessing.managers import SyncManager -from queue import Queue -from typing import Deque, Optional +from queue import Empty, Queue +from typing import Any, Callable, Deque, Generator, Optional from frigate.util.builtin import clean_camera_user_pass @@ -102,11 +105,11 @@ os.register_at_fork(after_in_child=reopen_std_streams) # based on https://codereview.stackexchange.com/a/17959 class LogPipe(threading.Thread): - def __init__(self, log_name: str): + def __init__(self, log_name: str, level: int = logging.ERROR): """Setup the object with a logger and start the thread""" super().__init__(daemon=False) self.logger = logging.getLogger(log_name) - self.level = logging.ERROR + self.level = level self.deque: Deque[str] = deque(maxlen=100) self.fdRead, self.fdWrite = os.pipe() self.pipeReader = os.fdopen(self.fdRead) @@ -135,3 +138,182 @@ class LogPipe(threading.Thread): def close(self) -> None: """Close the write end of the pipe.""" os.close(self.fdWrite) + + +class LogRedirect(io.StringIO): + """ + A custom file-like object to capture stdout and process it. + It extends io.StringIO to capture output and then processes it + line by line. + """ + + def __init__(self, logger_instance: logging.Logger, level: int): + super().__init__() + self.logger = logger_instance + self.log_level = level + self._line_buffer: list[str] = [] + + def write(self, s: Any) -> int: + if not isinstance(s, str): + s = str(s) + + self._line_buffer.append(s) + + # Process output line by line if a newline is present + if "\n" in s: + full_output = "".join(self._line_buffer) + lines = full_output.splitlines(keepends=True) + self._line_buffer = [] + + for line in lines: + if line.endswith("\n"): + self._process_line(line.rstrip("\n")) + else: + self._line_buffer.append(line) + + return len(s) + + def _process_line(self, line: str) -> None: + self.logger.log(self.log_level, line) + + def flush(self) -> None: + if self._line_buffer: + full_output = "".join(self._line_buffer) + self._line_buffer = [] + if full_output: # Only process if there's content + self._process_line(full_output) + + def __enter__(self) -> "LogRedirect": + """Context manager entry point.""" + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Context manager exit point. Ensures buffered content is flushed.""" + self.flush() + + +@contextmanager +def redirect_fd_to_queue(queue: Queue[str]) -> Generator[None, None, None]: + """Redirect file descriptor 1 (stdout) to a pipe and capture output in a queue.""" + stdout_fd = os.dup(1) + read_fd, write_fd = os.pipe() + os.dup2(write_fd, 1) + os.close(write_fd) + + stop_event = threading.Event() + + def reader() -> None: + """Read from pipe and put lines in queue until stop_event is set.""" + try: + with os.fdopen(read_fd, "r") as pipe: + while not stop_event.is_set(): + line = pipe.readline() + if not line: # EOF + break + queue.put(line.strip()) + except OSError as e: + queue.put(f"Reader error: {e}") + finally: + if not stop_event.is_set(): + stop_event.set() + + reader_thread = threading.Thread(target=reader, daemon=False) + reader_thread.start() + + try: + yield + finally: + os.dup2(stdout_fd, 1) + os.close(stdout_fd) + stop_event.set() + reader_thread.join(timeout=1.0) + try: + os.close(read_fd) + except OSError: + pass + + +def redirect_output_to_logger(logger: logging.Logger, level: int) -> Any: + """Decorator to redirect both Python sys.stdout/stderr and C-level stdout to logger.""" + + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + queue: Queue[str] = Queue() + + log_redirect = LogRedirect(logger, level) + old_stdout = sys.stdout + old_stderr = sys.stderr + sys.stdout = log_redirect + sys.stderr = log_redirect + + try: + # Redirect C-level stdout + with redirect_fd_to_queue(queue): + result = func(*args, **kwargs) + finally: + # Restore Python stdout/stderr + sys.stdout = old_stdout + sys.stderr = old_stderr + log_redirect.flush() + + # Log C-level output from queue + while True: + try: + logger.log(level, queue.get_nowait()) + except Empty: + break + + return result + + return wrapper + + return decorator + + +def suppress_os_output(func: Callable) -> Callable: + """ + A decorator that suppresses all output (stdout and stderr) + at the operating system file descriptor level for the decorated function. + This is useful for silencing noisy C/C++ libraries. + Note: This is a Unix-specific solution using os.dup2 and os.pipe. + It temporarily redirects file descriptors 1 (stdout) and 2 (stderr) + to a non-read pipe, effectively discarding their output. + """ + + @wraps(func) + def wrapper(*args: tuple, **kwargs: dict[str, Any]) -> Any: + # Save the original file descriptors for stdout (1) and stderr (2) + original_stdout_fd = os.dup(1) + original_stderr_fd = os.dup(2) + + # Create dummy pipes. We only need the write ends to redirect to. + # The data written to these pipes will be discarded as nothing + # will read from the read ends. + devnull_read_fd, devnull_write_fd = os.pipe() + + try: + # Redirect stdout (FD 1) and stderr (FD 2) to the write end of our dummy pipe + os.dup2(devnull_write_fd, 1) # Redirect stdout to devnull pipe + os.dup2(devnull_write_fd, 2) # Redirect stderr to devnull pipe + + # Execute the original function + result = func(*args, **kwargs) + + finally: + # Restore original stdout and stderr file descriptors (1 and 2) + # This is crucial to ensure normal printing resumes after the decorated function. + os.dup2(original_stdout_fd, 1) + os.dup2(original_stderr_fd, 2) + + # Close all duplicated and pipe file descriptors to prevent resource leaks. + # It's important to close the read end of the dummy pipe too, + # as nothing is explicitly reading from it. + os.close(original_stdout_fd) + os.close(original_stderr_fd) + os.close(devnull_read_fd) + os.close(devnull_write_fd) + + return result + + return wrapper diff --git a/frigate/util/classification.py b/frigate/util/classification.py index a2ba1bf26..3c030a986 100644 --- a/frigate/util/classification.py +++ b/frigate/util/classification.py @@ -1,7 +1,7 @@ """Util for classification models.""" +import logging import os -import sys import cv2 import numpy as np @@ -9,6 +9,7 @@ import numpy as np from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor from frigate.comms.inter_process import InterProcessRequestor from frigate.const import CLIPS_DIR, MODEL_CACHE_DIR, UPDATE_MODEL_STATE +from frigate.log import redirect_output_to_logger from frigate.types import ModelStatusTypesEnum from frigate.util.process import FrigateProcess @@ -16,6 +17,8 @@ BATCH_SIZE = 16 EPOCHS = 50 LEARNING_RATE = 0.001 +logger = logging.getLogger(__name__) + def __generate_representative_dataset_factory(dataset_dir: str): def generate_representative_dataset(): @@ -36,6 +39,7 @@ def __generate_representative_dataset_factory(dataset_dir: str): return generate_representative_dataset +@redirect_output_to_logger(logger, logging.DEBUG) def __train_classification_model(model_name: str) -> bool: """Train a classification model.""" @@ -55,14 +59,6 @@ def __train_classification_model(model_name: str) -> bool: ] ) - # TF and Keras are very loud with logging - # we want to avoid these logs so we - # temporarily redirect stdout / stderr - original_stdout = sys.stdout - original_stderr = sys.stderr - sys.stdout = open(os.devnull, "w") - sys.stderr = open(os.devnull, "w") - # Start with imagenet base model with 35% of channels in each layer base_model = MobileNetV2( input_shape=(224, 224, 3), @@ -124,10 +120,6 @@ def __train_classification_model(model_name: str) -> bool: with open(os.path.join(model_dir, "model.tflite"), "wb") as f: f.write(tflite_model) - # restore original stdout / stderr - sys.stdout = original_stdout - sys.stderr = original_stderr - @staticmethod def kickoff_model_training( @@ -146,6 +138,7 @@ def kickoff_model_training( # tensorflow will free CPU / GPU memory # upon training completion training_process = FrigateProcess( + None, target=__train_classification_model, name=f"model_training:{model_name}", args=(model_name,),