use a queue for logging

This commit is contained in:
Blake Blackshear 2020-11-03 21:26:39 -06:00
parent af303cbf2a
commit 4c3fea25a5
10 changed files with 93 additions and 66 deletions

View File

@ -1,9 +1,12 @@
import faulthandler; faulthandler.enable()
import os
import json
import logging
import yaml
import multiprocessing as mp
import sys
from logging.handlers import QueueHandler
from playhouse.sqlite_ext import SqliteExtDatabase
from typing import Dict, List
@ -11,12 +14,18 @@ from frigate.config import FrigateConfig
from frigate.edgetpu import EdgeTPUProcess
from frigate.events import EventProcessor
from frigate.http import create_app
from frigate.log import root_configurer, log_process
from frigate.models import Event
from frigate.mqtt import create_mqtt_client
from frigate.object_processing import TrackedObjectProcessor
from frigate.video import track_camera, capture_camera
from frigate.watchdog import FrigateWatchdog
logger = logging.getLogger(__name__)
cli = sys.modules['flask.cli']
cli.show_server_banner = lambda *x: None
class FrigateApp():
def __init__(self):
self.stop_event = mp.Event()
@ -25,8 +34,14 @@ class FrigateApp():
self.detectors: Dict[str, EdgeTPUProcess] = {}
self.detection_out_events: Dict[str, mp.Event] = {}
self.detection_shms: List[mp.shared_memory.SharedMemory] = []
self.log_queue = mp.Queue()
self.camera_metrics = {}
def init_logger(self):
self.log_process = mp.Process(target=log_process, args=(self.log_queue,))
self.log_process.start()
root_configurer(self.log_queue)
def init_config(self):
config_file = os.environ.get('CONFIG_FILE', '/config/config.yml')
self.config = FrigateConfig(config_file=config_file)
@ -90,7 +105,7 @@ class FrigateApp():
camera_process.daemon = True
self.camera_metrics[name]['process'] = camera_process
camera_process.start()
print(f"Camera processor started for {name}: {camera_process.pid}")
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
def start_camera_capture_processes(self):
for name, config in self.config.cameras.items():
@ -99,7 +114,7 @@ class FrigateApp():
capture_process.daemon = True
self.camera_metrics[name]['capture_process'] = capture_process
capture_process.start()
print(f"Capture process started for {name}: {capture_process.pid}")
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def start_event_processor(self):
self.event_processor = EventProcessor(self.config, self.camera_metrics, self.event_queue, self.stop_event)
@ -110,6 +125,7 @@ class FrigateApp():
self.frigate_watchdog.start()
def start(self):
self.init_logger()
self.init_config()
self.init_queues()
self.init_database()
@ -125,7 +141,7 @@ class FrigateApp():
self.stop()
def stop(self):
print(f"Stopping...")
logger.info(f"Stopping...")
self.stop_event.set()
self.detected_frames_processor.join()

View File

@ -10,8 +10,6 @@ import matplotlib.pyplot as plt
import numpy as np
import voluptuous as vol
from frigate.util import get_frame_shape
DETECTORS_SCHEMA = vol.Schema(
{
vol.Required(str): {

View File

@ -3,6 +3,7 @@ import datetime
import hashlib
import multiprocessing as mp
import queue
import logging
from multiprocessing.connection import Connection
from abc import ABC, abstractmethod
from typing import Dict
@ -11,6 +12,8 @@ import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond, listen, SharedMemoryFrameManager
logger = logging.getLogger(__name__)
def load_labels(path, encoding='utf-8'):
"""Loads labels from file (with or without index numbers).
Args:
@ -51,11 +54,11 @@ class LocalObjectDetector(ObjectDetector):
if tf_device != 'cpu':
try:
print(f"Attempting to load TPU as {device_config['device']}")
logging.info(f"Attempting to load TPU as {device_config['device']}")
edge_tpu_delegate = load_delegate('libedgetpu.so.1.0', device_config)
print("TPU found")
logging.info("TPU found")
except ValueError:
print("No EdgeTPU detected. Falling back to CPU.")
logging.info("No EdgeTPU detected. Falling back to CPU.")
if edge_tpu_delegate is None:
self.interpreter = tflite.Interpreter(
@ -100,7 +103,7 @@ class LocalObjectDetector(ObjectDetector):
return detections
def run_detector(detection_queue: mp.Queue, out_events: Dict[str, mp.Event], avg_speed, start, tf_device):
print(f"Starting detection process: {os.getpid()}")
logging.info(f"Starting detection process: {os.getpid()}")
listen()
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(tf_device=tf_device)
@ -143,10 +146,10 @@ class EdgeTPUProcess():
def stop(self):
self.detect_process.terminate()
print("Waiting for detection process to exit gracefully...")
logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
print("Detection process didnt exit. Force killing...")
logging.info("Detection process didnt exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()

View File

@ -2,6 +2,7 @@ import os
import time
import psutil
import threading
import logging
from collections import defaultdict
import json
import datetime
@ -10,6 +11,8 @@ import queue
from frigate.models import Event
logger = logging.getLogger(__name__)
class EventProcessor(threading.Thread):
def __init__(self, config, camera_processes, event_queue, stop_event):
threading.Thread.__init__(self)
@ -60,7 +63,7 @@ class EventProcessor(threading.Thread):
if p_status == 0:
duration = float(output.decode('utf-8').strip())
else:
print(f"bad file: {f}")
logger.info(f"bad file: {f}")
os.remove(os.path.join(self.cache_dir,f))
continue
@ -133,7 +136,7 @@ class EventProcessor(threading.Thread):
p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
if p.returncode != 0:
print(p.stderr)
logger.error(p.stderr)
return
with open(f"{os.path.join(self.clips_dir, clip_name)}.json", 'w') as outfile:
@ -151,7 +154,7 @@ class EventProcessor(threading.Thread):
def run(self):
while True:
if self.stop_event.is_set():
print(f"Exiting event processor...")
logger.info(f"Exiting event processor...")
break
try:

25
frigate/log.py Normal file
View File

@ -0,0 +1,25 @@
# adapted from https://medium.com/@jonathonbao/python3-logging-with-multiprocessing-f51f460b8778
import logging
import threading
from logging import handlers
def listener_configurer():
root = logging.getLogger()
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(processName)-12s %(threadName)-12s %(name)-16s %(levelname)-8s: %(message)s')
console_handler.setFormatter(formatter)
root.addHandler(console_handler)
root.setLevel(logging.INFO)
def root_configurer(queue):
h = handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.INFO)
def log_process(queue):
listener_configurer()
while True:
record = queue.get()
logger = logging.getLogger(record.name)
logger.handle(record)

View File

@ -1,21 +1,24 @@
import logging
import paho.mqtt.client as mqtt
from frigate.config import MqttConfig
logger = logging.getLogger(__name__)
def create_mqtt_client(config: MqttConfig):
client = mqtt.Client(client_id=config.client_id)
def on_connect(client, userdata, flags, rc):
# TODO: use logging library
print("On connect called")
logger.info("On connect called")
if rc != 0:
if rc == 3:
print ("MQTT Server unavailable")
logger.error("MQTT Server unavailable")
elif rc == 4:
print ("MQTT Bad username or password")
logger.error("MQTT Bad username or password")
elif rc == 5:
print ("MQTT Not authorized")
logger.error("MQTT Not authorized")
else:
print ("Unable to connect to MQTT: Connection refused. Error code: " + str(rc))
logger.error("Unable to connect to MQTT: Connection refused. Error code: " + str(rc))
client.publish(config.topic_prefix+'/available', 'online', retain=True)
client.on_connect = on_connect
client.will_set(config.topic_prefix+'/available', payload='offline', qos=1, retain=True)

View File

@ -5,6 +5,7 @@ import time
import copy
import cv2
import threading
import logging
import queue
import copy
import numpy as np
@ -17,6 +18,8 @@ from frigate.config import CameraConfig
from typing import Callable, Dict
from statistics import mean, median
logger = logging.getLogger(__name__)
PATH_TO_LABELS = '/labelmap.txt'
LABELS = load_labels(PATH_TO_LABELS)
@ -366,7 +369,7 @@ class TrackedObjectProcessor(threading.Thread):
def run(self):
while True:
if self.stop_event.is_set():
print(f"Exiting object processor...")
logger.info(f"Exiting object processor...")
break
try:

View File

@ -14,36 +14,6 @@ import hashlib
from multiprocessing import shared_memory
from typing import AnyStr
def get_frame_shape(source):
ffprobe_cmd = " ".join([
'ffprobe',
'-v',
'panic',
'-show_error',
'-show_streams',
'-of',
'json',
'"'+source+'"'
])
print(ffprobe_cmd)
p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
info = json.loads(output)
print(info)
video_info = [s for s in info['streams'] if s['codec_type'] == 'video'][0]
if video_info['height'] != 0 and video_info['width'] != 0:
return (video_info['height'], video_info['width'], 3)
# fallback to using opencv if ffprobe didnt succeed
video = cv2.VideoCapture(source)
ret, frame = video.read()
frame_shape = frame.shape
video.release()
return frame_shape
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None:
color = (0,0,255)

View File

@ -4,6 +4,7 @@ import datetime
import cv2
import queue
import threading
import logging
import ctypes
import multiprocessing as mp
import subprocess as sp
@ -20,6 +21,8 @@ from frigate.objects import ObjectTracker
from frigate.edgetpu import RemoteObjectDetector
from frigate.motion import MotionDetector
logger = logging.getLogger(__name__)
def filtered(obj, objects_to_track, object_filters, mask=None):
object_name = obj[0]
@ -66,19 +69,19 @@ def create_tensor_input(frame, region):
def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
if not ffmpeg_process is None:
print("Terminating the existing ffmpeg process...")
logger.info("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
try:
print("Waiting for ffmpeg to exit gracefully...")
logger.info("Waiting for ffmpeg to exit gracefully...")
ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
logger.info("FFmpeg didnt exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.communicate()
ffmpeg_process = None
print("Creating ffmpeg process...")
print(" ".join(ffmpeg_cmd))
logger.info("Creating ffmpeg process...")
logger.info(" ".join(ffmpeg_cmd))
process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, stdin = sp.DEVNULL, bufsize=frame_size*10, start_new_session=True)
return process
@ -100,10 +103,10 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
except:
print(f"{camera_name}: ffmpeg sent a broken frame. something is wrong.")
logger.info(f"{camera_name}: ffmpeg sent a broken frame. something is wrong.")
if ffmpeg_process.poll() != None:
print(f"{camera_name}: ffmpeg process is not running. exiting capture thread...")
logger.info(f"{camera_name}: ffmpeg process is not running. exiting capture thread...")
frame_manager.delete(frame_name)
break
@ -145,13 +148,13 @@ class CameraWatchdog(threading.Thread):
if not self.capture_thread.is_alive():
self.start_ffmpeg()
elif now - self.capture_thread.current_frame.value > 5:
print(f"No frames received from {self.name} in 5 seconds. Exiting ffmpeg...")
logger.info(f"No frames received from {self.name} in 5 seconds. Exiting ffmpeg...")
self.ffmpeg_process.terminate()
try:
print("Waiting for ffmpeg to exit gracefully...")
logger.info("Waiting for ffmpeg to exit gracefully...")
self.ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
print("FFmpeg didnt exit. Force killing...")
logger.info("FFmpeg didnt exit. Force killing...")
self.ffmpeg_process.kill()
self.ffmpeg_process.communicate()
@ -209,7 +212,7 @@ def track_camera(name, config: CameraConfig, detection_queue, result_connection,
process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector,
object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask)
print(f"{name}: exiting subprocess")
logger.info(f"{name}: exiting subprocess")
def reduce_boxes(boxes):
if len(boxes) == 0:
@ -256,7 +259,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
while True:
if exit_on_empty and frame_queue.empty():
print(f"Exiting track_objects...")
logger.info(f"Exiting track_objects...")
break
try:
@ -269,7 +272,7 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
frame = frame_manager.get(f"{camera_name}{frame_time}", (frame_shape[0]*3//2, frame_shape[1]))
if frame is None:
print(f"{camera_name}: frame {frame_time} is not in memory store.")
logger.info(f"{camera_name}: frame {frame_time} is not in memory store.")
continue
# look for motion

View File

@ -1,7 +1,10 @@
import datetime
import logging
import time
import threading
logger = logging.getLogger(__name__)
class FrigateWatchdog(threading.Thread):
def __init__(self, detectors, stop_event):
threading.Thread.__init__(self)
@ -15,7 +18,7 @@ class FrigateWatchdog(threading.Thread):
time.sleep(10)
if self.stop_event.is_set():
print(f"Exiting watchdog...")
logger.info(f"Exiting watchdog...")
break
now = datetime.datetime.now().timestamp()
@ -25,8 +28,8 @@ class FrigateWatchdog(threading.Thread):
detection_start = detector.detection_start.value
if (detection_start > 0.0 and
now - detection_start > 10):
print("Detection appears to be stuck. Restarting detection process")
logger.info("Detection appears to be stuck. Restarting detection process")
detector.start_or_restart()
elif not detector.detect_process.is_alive():
print("Detection appears to have stopped. Restarting detection process")
logger.info("Detection appears to have stopped. Restarting detection process")
detector.start_or_restart()