mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-30 13:48:07 +02:00
321 lines
9.8 KiB
Python
321 lines
9.8 KiB
Python
# In log.py
|
|
import atexit
|
|
import io
|
|
import logging
|
|
import os
|
|
import sys
|
|
import threading
|
|
from collections import deque
|
|
from contextlib import contextmanager
|
|
from enum import Enum
|
|
from functools import wraps
|
|
from logging.handlers import QueueHandler, QueueListener
|
|
from multiprocessing.managers import SyncManager
|
|
from queue import Empty, Queue
|
|
from typing import Any, Callable, Deque, Generator, Optional
|
|
|
|
from frigate.util.builtin import clean_camera_user_pass
|
|
|
|
LOG_HANDLER = logging.StreamHandler()
|
|
LOG_HANDLER.setFormatter(
|
|
logging.Formatter(
|
|
"[%(asctime)s] %(name)-30s %(levelname)-8s: %(message)s",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
)
|
|
)
|
|
|
|
# filter out norfair warning
|
|
LOG_HANDLER.addFilter(
|
|
lambda record: not record.getMessage().startswith(
|
|
"You are using a scalar distance function"
|
|
)
|
|
)
|
|
|
|
# filter out tflite logging
|
|
LOG_HANDLER.addFilter(
|
|
lambda record: "Created TensorFlow Lite XNNPACK delegate for CPU."
|
|
not in record.getMessage()
|
|
)
|
|
|
|
|
|
class LogLevel(str, Enum):
|
|
debug = "debug"
|
|
info = "info"
|
|
warning = "warning"
|
|
error = "error"
|
|
critical = "critical"
|
|
|
|
|
|
log_listener: Optional[QueueListener] = None
|
|
log_queue: Optional[Queue] = None
|
|
|
|
|
|
def setup_logging(manager: SyncManager) -> None:
|
|
global log_listener, log_queue
|
|
log_queue = manager.Queue()
|
|
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
|
|
|
|
atexit.register(_stop_logging)
|
|
log_listener.start()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
handlers=[],
|
|
force=True,
|
|
)
|
|
|
|
logging.getLogger().addHandler(QueueHandler(log_listener.queue))
|
|
|
|
|
|
def _stop_logging() -> None:
|
|
global log_listener
|
|
if log_listener is not None:
|
|
log_listener.stop()
|
|
log_listener = None
|
|
|
|
|
|
def apply_log_levels(default: str, log_levels: dict[str, LogLevel]) -> None:
|
|
logging.getLogger().setLevel(default)
|
|
|
|
log_levels = {
|
|
"absl": LogLevel.error,
|
|
"httpx": LogLevel.error,
|
|
"matplotlib": LogLevel.error,
|
|
"tensorflow": LogLevel.error,
|
|
"werkzeug": LogLevel.error,
|
|
"ws4py": LogLevel.error,
|
|
**log_levels,
|
|
}
|
|
|
|
for log, level in log_levels.items():
|
|
logging.getLogger(log).setLevel(level.value.upper())
|
|
|
|
|
|
# When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the
|
|
# process is created after a thread (for example a logging thread) is created and the process fork
|
|
# happens while an internal lock is held, the stdout/err flush can cause a deadlock.
|
|
#
|
|
# https://github.com/python/cpython/issues/91776
|
|
def reopen_std_streams() -> None:
|
|
sys.stdout = os.fdopen(1, "w")
|
|
sys.stderr = os.fdopen(2, "w")
|
|
|
|
|
|
os.register_at_fork(after_in_child=reopen_std_streams)
|
|
|
|
|
|
# based on https://codereview.stackexchange.com/a/17959
|
|
class LogPipe(threading.Thread):
|
|
def __init__(self, log_name: str, level: int = logging.ERROR):
|
|
"""Setup the object with a logger and start the thread"""
|
|
super().__init__(daemon=False)
|
|
self.logger = logging.getLogger(log_name)
|
|
self.level = level
|
|
self.deque: Deque[str] = deque(maxlen=100)
|
|
self.fdRead, self.fdWrite = os.pipe()
|
|
self.pipeReader = os.fdopen(self.fdRead)
|
|
self.start()
|
|
|
|
def cleanup_log(self, log: str) -> str:
|
|
"""Cleanup the log line to remove sensitive info and string tokens."""
|
|
log = clean_camera_user_pass(log).strip("\n")
|
|
return log
|
|
|
|
def fileno(self) -> int:
|
|
"""Return the write file descriptor of the pipe"""
|
|
return self.fdWrite
|
|
|
|
def run(self) -> None:
|
|
"""Run the thread, logging everything."""
|
|
for line in iter(self.pipeReader.readline, ""):
|
|
self.deque.append(self.cleanup_log(line))
|
|
|
|
self.pipeReader.close()
|
|
|
|
def dump(self) -> None:
|
|
while len(self.deque) > 0:
|
|
self.logger.log(self.level, self.deque.popleft())
|
|
|
|
def close(self) -> None:
|
|
"""Close the write end of the pipe."""
|
|
os.close(self.fdWrite)
|
|
|
|
|
|
class LogRedirect(io.StringIO):
|
|
"""
|
|
A custom file-like object to capture stdout and process it.
|
|
It extends io.StringIO to capture output and then processes it
|
|
line by line.
|
|
"""
|
|
|
|
def __init__(self, logger_instance: logging.Logger, level: int):
|
|
super().__init__()
|
|
self.logger = logger_instance
|
|
self.log_level = level
|
|
self._line_buffer: list[str] = []
|
|
|
|
def write(self, s: Any) -> int:
|
|
if not isinstance(s, str):
|
|
s = str(s)
|
|
|
|
self._line_buffer.append(s)
|
|
|
|
# Process output line by line if a newline is present
|
|
if "\n" in s:
|
|
full_output = "".join(self._line_buffer)
|
|
lines = full_output.splitlines(keepends=True)
|
|
self._line_buffer = []
|
|
|
|
for line in lines:
|
|
if line.endswith("\n"):
|
|
self._process_line(line.rstrip("\n"))
|
|
else:
|
|
self._line_buffer.append(line)
|
|
|
|
return len(s)
|
|
|
|
def _process_line(self, line: str) -> None:
|
|
self.logger.log(self.log_level, line)
|
|
|
|
def flush(self) -> None:
|
|
if self._line_buffer:
|
|
full_output = "".join(self._line_buffer)
|
|
self._line_buffer = []
|
|
if full_output: # Only process if there's content
|
|
self._process_line(full_output)
|
|
|
|
def __enter__(self) -> "LogRedirect":
|
|
"""Context manager entry point."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
"""Context manager exit point. Ensures buffered content is flushed."""
|
|
self.flush()
|
|
|
|
|
|
@contextmanager
|
|
def __redirect_fd_to_queue(queue: Queue[str]) -> Generator[None, None, None]:
|
|
"""Redirect file descriptor 1 (stdout) to a pipe and capture output in a queue."""
|
|
stdout_fd = os.dup(1)
|
|
read_fd, write_fd = os.pipe()
|
|
os.dup2(write_fd, 1)
|
|
os.close(write_fd)
|
|
|
|
stop_event = threading.Event()
|
|
|
|
def reader() -> None:
|
|
"""Read from pipe and put lines in queue until stop_event is set."""
|
|
try:
|
|
with os.fdopen(read_fd, "r") as pipe:
|
|
while not stop_event.is_set():
|
|
line = pipe.readline()
|
|
if not line: # EOF
|
|
break
|
|
queue.put(line.strip())
|
|
except OSError as e:
|
|
queue.put(f"Reader error: {e}")
|
|
finally:
|
|
if not stop_event.is_set():
|
|
stop_event.set()
|
|
|
|
reader_thread = threading.Thread(target=reader, daemon=False)
|
|
reader_thread.start()
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
os.dup2(stdout_fd, 1)
|
|
os.close(stdout_fd)
|
|
stop_event.set()
|
|
reader_thread.join(timeout=1.0)
|
|
try:
|
|
os.close(read_fd)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def redirect_output_to_logger(logger: logging.Logger, level: int) -> Any:
|
|
"""Decorator to redirect both Python sys.stdout/stderr and C-level stdout to logger."""
|
|
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
|
queue: Queue[str] = Queue()
|
|
|
|
log_redirect = LogRedirect(logger, level)
|
|
old_stdout = sys.stdout
|
|
old_stderr = sys.stderr
|
|
sys.stdout = log_redirect
|
|
sys.stderr = log_redirect
|
|
|
|
try:
|
|
# Redirect C-level stdout
|
|
with __redirect_fd_to_queue(queue):
|
|
result = func(*args, **kwargs)
|
|
finally:
|
|
# Restore Python stdout/stderr
|
|
sys.stdout = old_stdout
|
|
sys.stderr = old_stderr
|
|
log_redirect.flush()
|
|
|
|
# Log C-level output from queue
|
|
while True:
|
|
try:
|
|
logger.log(level, queue.get_nowait())
|
|
except Empty:
|
|
break
|
|
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def suppress_os_output(func: Callable) -> Callable:
|
|
"""
|
|
A decorator that suppresses all output (stdout and stderr)
|
|
at the operating system file descriptor level for the decorated function.
|
|
This is useful for silencing noisy C/C++ libraries.
|
|
Note: This is a Unix-specific solution using os.dup2 and os.pipe.
|
|
It temporarily redirects file descriptors 1 (stdout) and 2 (stderr)
|
|
to a non-read pipe, effectively discarding their output.
|
|
"""
|
|
|
|
@wraps(func)
|
|
def wrapper(*args: tuple, **kwargs: dict[str, Any]) -> Any:
|
|
# Save the original file descriptors for stdout (1) and stderr (2)
|
|
original_stdout_fd = os.dup(1)
|
|
original_stderr_fd = os.dup(2)
|
|
|
|
# Create dummy pipes. We only need the write ends to redirect to.
|
|
# The data written to these pipes will be discarded as nothing
|
|
# will read from the read ends.
|
|
devnull_read_fd, devnull_write_fd = os.pipe()
|
|
|
|
try:
|
|
# Redirect stdout (FD 1) and stderr (FD 2) to the write end of our dummy pipe
|
|
os.dup2(devnull_write_fd, 1) # Redirect stdout to devnull pipe
|
|
os.dup2(devnull_write_fd, 2) # Redirect stderr to devnull pipe
|
|
|
|
# Execute the original function
|
|
result = func(*args, **kwargs)
|
|
|
|
finally:
|
|
# Restore original stdout and stderr file descriptors (1 and 2)
|
|
# This is crucial to ensure normal printing resumes after the decorated function.
|
|
os.dup2(original_stdout_fd, 1)
|
|
os.dup2(original_stderr_fd, 2)
|
|
|
|
# Close all duplicated and pipe file descriptors to prevent resource leaks.
|
|
# It's important to close the read end of the dummy pipe too,
|
|
# as nothing is explicitly reading from it.
|
|
os.close(original_stdout_fd)
|
|
os.close(original_stderr_fd)
|
|
os.close(devnull_read_fd)
|
|
os.close(devnull_write_fd)
|
|
|
|
return result
|
|
|
|
return wrapper
|