From 4c3fea25a5badce1a170ea54dfd2080d71263d85 Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Tue, 3 Nov 2020 21:26:39 -0600 Subject: [PATCH] use a queue for logging --- frigate/__main__.py | 22 +++++++++++++++++++--- frigate/config.py | 2 -- frigate/edgetpu.py | 15 +++++++++------ frigate/events.py | 9 ++++++--- frigate/log.py | 25 +++++++++++++++++++++++++ frigate/mqtt.py | 13 ++++++++----- frigate/object_processing.py | 5 ++++- frigate/util.py | 30 ------------------------------ frigate/video.py | 29 ++++++++++++++++------------- frigate/watchdog.py | 9 ++++++--- 10 files changed, 93 insertions(+), 66 deletions(-) create mode 100644 frigate/log.py diff --git a/frigate/__main__.py b/frigate/__main__.py index 95fff2cbd..7d24f07cd 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,9 +1,12 @@ import faulthandler; faulthandler.enable() import os import json +import logging import yaml import multiprocessing as mp +import sys +from logging.handlers import QueueHandler from playhouse.sqlite_ext import SqliteExtDatabase from typing import Dict, List @@ -11,12 +14,18 @@ from frigate.config import FrigateConfig from frigate.edgetpu import EdgeTPUProcess from frigate.events import EventProcessor from frigate.http import create_app +from frigate.log import root_configurer, log_process from frigate.models import Event from frigate.mqtt import create_mqtt_client from frigate.object_processing import TrackedObjectProcessor from frigate.video import track_camera, capture_camera from frigate.watchdog import FrigateWatchdog +logger = logging.getLogger(__name__) + +cli = sys.modules['flask.cli'] +cli.show_server_banner = lambda *x: None + class FrigateApp(): def __init__(self): self.stop_event = mp.Event() @@ -25,8 +34,14 @@ class FrigateApp(): self.detectors: Dict[str, EdgeTPUProcess] = {} self.detection_out_events: Dict[str, mp.Event] = {} self.detection_shms: List[mp.shared_memory.SharedMemory] = [] + self.log_queue = mp.Queue() self.camera_metrics = {} + def init_logger(self): + self.log_process = mp.Process(target=log_process, args=(self.log_queue,)) + self.log_process.start() + root_configurer(self.log_queue) + def init_config(self): config_file = os.environ.get('CONFIG_FILE', '/config/config.yml') self.config = FrigateConfig(config_file=config_file) @@ -90,7 +105,7 @@ class FrigateApp(): camera_process.daemon = True self.camera_metrics[name]['process'] = camera_process camera_process.start() - print(f"Camera processor started for {name}: {camera_process.pid}") + logger.info(f"Camera processor started for {name}: {camera_process.pid}") def start_camera_capture_processes(self): for name, config in self.config.cameras.items(): @@ -99,7 +114,7 @@ class FrigateApp(): capture_process.daemon = True self.camera_metrics[name]['capture_process'] = capture_process capture_process.start() - print(f"Capture process started for {name}: {capture_process.pid}") + logger.info(f"Capture process started for {name}: {capture_process.pid}") def start_event_processor(self): self.event_processor = EventProcessor(self.config, self.camera_metrics, self.event_queue, self.stop_event) @@ -110,6 +125,7 @@ class FrigateApp(): self.frigate_watchdog.start() def start(self): + self.init_logger() self.init_config() self.init_queues() self.init_database() @@ -125,7 +141,7 @@ class FrigateApp(): self.stop() def stop(self): - print(f"Stopping...") + logger.info(f"Stopping...") self.stop_event.set() self.detected_frames_processor.join() diff --git a/frigate/config.py b/frigate/config.py index 1184a21c3..d00cccc50 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -10,8 +10,6 @@ import matplotlib.pyplot as plt import numpy as np import voluptuous as vol -from frigate.util import get_frame_shape - DETECTORS_SCHEMA = vol.Schema( { vol.Required(str): { diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index 97beee3aa..103fba817 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -3,6 +3,7 @@ import datetime import hashlib import multiprocessing as mp import queue +import logging from multiprocessing.connection import Connection from abc import ABC, abstractmethod from typing import Dict @@ -11,6 +12,8 @@ import tflite_runtime.interpreter as tflite from tflite_runtime.interpreter import load_delegate from frigate.util import EventsPerSecond, listen, SharedMemoryFrameManager +logger = logging.getLogger(__name__) + def load_labels(path, encoding='utf-8'): """Loads labels from file (with or without index numbers). Args: @@ -51,11 +54,11 @@ class LocalObjectDetector(ObjectDetector): if tf_device != 'cpu': try: - print(f"Attempting to load TPU as {device_config['device']}") + logging.info(f"Attempting to load TPU as {device_config['device']}") edge_tpu_delegate = load_delegate('libedgetpu.so.1.0', device_config) - print("TPU found") + logging.info("TPU found") except ValueError: - print("No EdgeTPU detected. Falling back to CPU.") + logging.info("No EdgeTPU detected. Falling back to CPU.") if edge_tpu_delegate is None: self.interpreter = tflite.Interpreter( @@ -100,7 +103,7 @@ class LocalObjectDetector(ObjectDetector): return detections def run_detector(detection_queue: mp.Queue, out_events: Dict[str, mp.Event], avg_speed, start, tf_device): - print(f"Starting detection process: {os.getpid()}") + logging.info(f"Starting detection process: {os.getpid()}") listen() frame_manager = SharedMemoryFrameManager() object_detector = LocalObjectDetector(tf_device=tf_device) @@ -143,10 +146,10 @@ class EdgeTPUProcess(): def stop(self): self.detect_process.terminate() - print("Waiting for detection process to exit gracefully...") + logging.info("Waiting for detection process to exit gracefully...") self.detect_process.join(timeout=30) if self.detect_process.exitcode is None: - print("Detection process didnt exit. Force killing...") + logging.info("Detection process didnt exit. Force killing...") self.detect_process.kill() self.detect_process.join() diff --git a/frigate/events.py b/frigate/events.py index dc2743ea5..98e331bc4 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -2,6 +2,7 @@ import os import time import psutil import threading +import logging from collections import defaultdict import json import datetime @@ -10,6 +11,8 @@ import queue from frigate.models import Event +logger = logging.getLogger(__name__) + class EventProcessor(threading.Thread): def __init__(self, config, camera_processes, event_queue, stop_event): threading.Thread.__init__(self) @@ -60,7 +63,7 @@ class EventProcessor(threading.Thread): if p_status == 0: duration = float(output.decode('utf-8').strip()) else: - print(f"bad file: {f}") + logger.info(f"bad file: {f}") os.remove(os.path.join(self.cache_dir,f)) continue @@ -133,7 +136,7 @@ class EventProcessor(threading.Thread): p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True) if p.returncode != 0: - print(p.stderr) + logger.error(p.stderr) return with open(f"{os.path.join(self.clips_dir, clip_name)}.json", 'w') as outfile: @@ -151,7 +154,7 @@ class EventProcessor(threading.Thread): def run(self): while True: if self.stop_event.is_set(): - print(f"Exiting event processor...") + logger.info(f"Exiting event processor...") break try: diff --git a/frigate/log.py b/frigate/log.py new file mode 100644 index 000000000..f562c5018 --- /dev/null +++ b/frigate/log.py @@ -0,0 +1,25 @@ +# adapted from https://medium.com/@jonathonbao/python3-logging-with-multiprocessing-f51f460b8778 +import logging +import threading +from logging import handlers + +def listener_configurer(): + root = logging.getLogger() + console_handler = logging.StreamHandler() + formatter = logging.Formatter('%(processName)-12s %(threadName)-12s %(name)-16s %(levelname)-8s: %(message)s') + console_handler.setFormatter(formatter) + root.addHandler(console_handler) + root.setLevel(logging.INFO) + +def root_configurer(queue): + h = handlers.QueueHandler(queue) + root = logging.getLogger() + root.addHandler(h) + root.setLevel(logging.INFO) + +def log_process(queue): + listener_configurer() + while True: + record = queue.get() + logger = logging.getLogger(record.name) + logger.handle(record) diff --git a/frigate/mqtt.py b/frigate/mqtt.py index a803fe992..858093f41 100644 --- a/frigate/mqtt.py +++ b/frigate/mqtt.py @@ -1,21 +1,24 @@ +import logging import paho.mqtt.client as mqtt from frigate.config import MqttConfig +logger = logging.getLogger(__name__) + def create_mqtt_client(config: MqttConfig): client = mqtt.Client(client_id=config.client_id) def on_connect(client, userdata, flags, rc): # TODO: use logging library - print("On connect called") + logger.info("On connect called") if rc != 0: if rc == 3: - print ("MQTT Server unavailable") + logger.error("MQTT Server unavailable") elif rc == 4: - print ("MQTT Bad username or password") + logger.error("MQTT Bad username or password") elif rc == 5: - print ("MQTT Not authorized") + logger.error("MQTT Not authorized") else: - print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc)) + logger.error("Unable to connect to MQTT: Connection refused. Error code: " + str(rc)) client.publish(config.topic_prefix+'/available', 'online', retain=True) client.on_connect = on_connect client.will_set(config.topic_prefix+'/available', payload='offline', qos=1, retain=True) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index f354308bf..d05a4de29 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -5,6 +5,7 @@ import time import copy import cv2 import threading +import logging import queue import copy import numpy as np @@ -17,6 +18,8 @@ from frigate.config import CameraConfig from typing import Callable, Dict from statistics import mean, median +logger = logging.getLogger(__name__) + PATH_TO_LABELS = '/labelmap.txt' LABELS = load_labels(PATH_TO_LABELS) @@ -366,7 +369,7 @@ class TrackedObjectProcessor(threading.Thread): def run(self): while True: if self.stop_event.is_set(): - print(f"Exiting object processor...") + logger.info(f"Exiting object processor...") break try: diff --git a/frigate/util.py b/frigate/util.py index 39254637d..6f8689860 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -14,36 +14,6 @@ import hashlib from multiprocessing import shared_memory from typing import AnyStr -def get_frame_shape(source): - ffprobe_cmd = " ".join([ - 'ffprobe', - '-v', - 'panic', - '-show_error', - '-show_streams', - '-of', - 'json', - '"'+source+'"' - ]) - print(ffprobe_cmd) - p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True) - (output, err) = p.communicate() - p_status = p.wait() - info = json.loads(output) - print(info) - - video_info = [s for s in info['streams'] if s['codec_type'] == 'video'][0] - - if video_info['height'] != 0 and video_info['width'] != 0: - return (video_info['height'], video_info['width'], 3) - - # fallback to using opencv if ffprobe didnt succeed - video = cv2.VideoCapture(source) - ret, frame = video.read() - frame_shape = frame.shape - video.release() - return frame_shape - def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'): if color is None: color = (0,0,255) diff --git a/frigate/video.py b/frigate/video.py index 5f1ac9d73..f850b25fd 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -4,6 +4,7 @@ import datetime import cv2 import queue import threading +import logging import ctypes import multiprocessing as mp import subprocess as sp @@ -20,6 +21,8 @@ from frigate.objects import ObjectTracker from frigate.edgetpu import RemoteObjectDetector from frigate.motion import MotionDetector +logger = logging.getLogger(__name__) + def filtered(obj, objects_to_track, object_filters, mask=None): object_name = obj[0] @@ -66,19 +69,19 @@ def create_tensor_input(frame, region): def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None): if not ffmpeg_process is None: - print("Terminating the existing ffmpeg process...") + logger.info("Terminating the existing ffmpeg process...") ffmpeg_process.terminate() try: - print("Waiting for ffmpeg to exit gracefully...") + logger.info("Waiting for ffmpeg to exit gracefully...") ffmpeg_process.communicate(timeout=30) except sp.TimeoutExpired: - print("FFmpeg didnt exit. Force killing...") + logger.info("FFmpeg didnt exit. Force killing...") ffmpeg_process.kill() ffmpeg_process.communicate() ffmpeg_process = None - print("Creating ffmpeg process...") - print(" ".join(ffmpeg_cmd)) + logger.info("Creating ffmpeg process...") + logger.info(" ".join(ffmpeg_cmd)) process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True) return process @@ -100,10 +103,10 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) except: - print(f"{camera_name}: ffmpeg sent a broken frame. something is wrong.") + logger.info(f"{camera_name}: ffmpeg sent a broken frame. something is wrong.") if ffmpeg_process.poll() != None: - print(f"{camera_name}: ffmpeg process is not running. exiting capture thread...") + logger.info(f"{camera_name}: ffmpeg process is not running. exiting capture thread...") frame_manager.delete(frame_name) break @@ -145,13 +148,13 @@ class CameraWatchdog(threading.Thread): if not self.capture_thread.is_alive(): self.start_ffmpeg() elif now - self.capture_thread.current_frame.value > 5: - print(f"No frames received from {self.name} in 5 seconds. Exiting ffmpeg...") + logger.info(f"No frames received from {self.name} in 5 seconds. Exiting ffmpeg...") self.ffmpeg_process.terminate() try: - print("Waiting for ffmpeg to exit gracefully...") + logger.info("Waiting for ffmpeg to exit gracefully...") self.ffmpeg_process.communicate(timeout=30) except sp.TimeoutExpired: - print("FFmpeg didnt exit. Force killing...") + logger.info("FFmpeg didnt exit. Force killing...") self.ffmpeg_process.kill() self.ffmpeg_process.communicate() @@ -209,7 +212,7 @@ def track_camera(name, config: CameraConfig, detection_queue, result_connection, process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector, object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask) - print(f"{name}: exiting subprocess") + logger.info(f"{name}: exiting subprocess") def reduce_boxes(boxes): if len(boxes) == 0: @@ -256,7 +259,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, while True: if exit_on_empty and frame_queue.empty(): - print(f"Exiting track_objects...") + logger.info(f"Exiting track_objects...") break try: @@ -269,7 +272,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, frame = frame_manager.get(f"{camera_name}{frame_time}", (frame_shape[0]*3//2, frame_shape[1])) if frame is None: - print(f"{camera_name}: frame {frame_time} is not in memory store.") + logger.info(f"{camera_name}: frame {frame_time} is not in memory store.") continue # look for motion diff --git a/frigate/watchdog.py b/frigate/watchdog.py index 1b54eab64..f12268dd4 100644 --- a/frigate/watchdog.py +++ b/frigate/watchdog.py @@ -1,7 +1,10 @@ import datetime +import logging import time import threading +logger = logging.getLogger(__name__) + class FrigateWatchdog(threading.Thread): def __init__(self, detectors, stop_event): threading.Thread.__init__(self) @@ -15,7 +18,7 @@ class FrigateWatchdog(threading.Thread): time.sleep(10) if self.stop_event.is_set(): - print(f"Exiting watchdog...") + logger.info(f"Exiting watchdog...") break now = datetime.datetime.now().timestamp() @@ -25,8 +28,8 @@ class FrigateWatchdog(threading.Thread): detection_start = detector.detection_start.value if (detection_start > 0.0 and now - detection_start > 10): - print("Detection appears to be stuck. Restarting detection process") + logger.info("Detection appears to be stuck. Restarting detection process") detector.start_or_restart() elif not detector.detect_process.is_alive(): - print("Detection appears to have stopped. Restarting detection process") + logger.info("Detection appears to have stopped. Restarting detection process") detector.start_or_restart()