Add basic typing for multiple modules:

* log.py
 * video.py
 * watchdog.py
 * zeroconf.py
This commit is contained in:
Sebastian Englbrecht 2022-04-12 22:24:45 +02:00 committed by Blake Blackshear
parent c6234bf548
commit 41f58c7692
5 changed files with 74 additions and 35 deletions

View File

@ -4,13 +4,14 @@ import threading
import os
import signal
import queue
import multiprocessing as mp
from multiprocessing.queues import Queue
from logging import handlers
from setproctitle import setproctitle
from typing import Deque
from collections import deque
def listener_configurer():
def listener_configurer() -> None:
root = logging.getLogger()
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
@ -21,14 +22,14 @@ def listener_configurer():
root.setLevel(logging.INFO)
def root_configurer(queue):
def root_configurer(queue: Queue) -> None:
h = handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.INFO)
def log_process(log_queue):
def log_process(log_queue: Queue) -> None:
threading.current_thread().name = f"logger"
setproctitle("frigate.logger")
listener_configurer()
@ -43,34 +44,32 @@ def log_process(log_queue):
# based on https://codereview.stackexchange.com/a/17959
class LogPipe(threading.Thread):
def __init__(self, log_name, level):
"""Setup the object with a logger and a loglevel
and start the thread
"""
def __init__(self, log_name: str):
"""Setup the object with a logger and start the thread"""
threading.Thread.__init__(self)
self.daemon = False
self.logger = logging.getLogger(log_name)
self.level = level
self.deque = deque(maxlen=100)
self.level = logging.ERROR
self.deque: Deque[str] = deque(maxlen=100)
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead)
self.start()
def fileno(self):
def fileno(self) -> int:
"""Return the write file descriptor of the pipe"""
return self.fdWrite
def run(self):
def run(self) -> None:
"""Run the thread, logging everything."""
for line in iter(self.pipeReader.readline, ""):
self.deque.append(line.strip("\n"))
self.pipeReader.close()
def dump(self):
def dump(self) -> None:
while len(self.deque) > 0:
self.logger.log(self.level, self.deque.popleft())
def close(self):
def close(self) -> None:
"""Close the write end of the pipe."""
os.close(self.fdWrite)

View File

@ -1,6 +1,40 @@
[mypy]
python_version = 3.9
show_error_codes = true
follow_imports = silent
ignore_missing_imports = true
strict_equality = true
warn_incomplete_stub = true
warn_redundant_casts = true
warn_unused_configs = true
warn_unused_ignores = true
enable_error_code = ignore-without-code
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
no_implicit_reexport = true
[mypy-frigate.*]
ignore_errors = true
[mypy-frigate.const]
ignore_errors = false
[mypy-frigate.log]
ignore_errors = false
[mypy-frigate.version]
ignore_errors = false
[mypy-frigate.watchdog]
ignore_errors = false
disallow_untyped_calls = false
[mypy-frigate.zeroconf]
ignore_errors = false

View File

@ -203,7 +203,7 @@ class CameraWatchdog(threading.Thread):
self.config = config
self.capture_thread = None
self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect", logging.ERROR)
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
self.ffmpeg_other_processes = []
self.camera_fps = camera_fps
self.ffmpeg_pid = ffmpeg_pid
@ -219,8 +219,7 @@ class CameraWatchdog(threading.Thread):
if "detect" in c["roles"]:
continue
logpipe = LogPipe(
f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}",
logging.ERROR,
f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}"
)
self.ffmpeg_other_processes.append(
{

View File

@ -5,21 +5,22 @@ import time
import os
import signal
from frigate.util import (
restart_frigate,
)
from frigate.edgetpu import EdgeTPUProcess
from frigate.util import restart_frigate
from multiprocessing.synchronize import Event
from typing import Dict
logger = logging.getLogger(__name__)
class FrigateWatchdog(threading.Thread):
def __init__(self, detectors, stop_event):
def __init__(self, detectors: Dict[str, EdgeTPUProcess], stop_event: Event):
threading.Thread.__init__(self)
self.name = "frigate_watchdog"
self.detectors = detectors
self.stop_event = stop_event
def run(self):
def run(self) -> None:
time.sleep(10)
while not self.stop_event.wait(10):
now = datetime.datetime.now().timestamp()
@ -32,7 +33,10 @@ class FrigateWatchdog(threading.Thread):
"Detection appears to be stuck. Restarting detection process..."
)
detector.start_or_restart()
elif not detector.detect_process.is_alive():
elif (
detector.detect_process is not None
and not detector.detect_process.is_alive()
):
logger.info("Detection appears to have stopped. Exiting frigate...")
restart_frigate()

View File

@ -14,38 +14,41 @@ logger = logging.getLogger(__name__)
ZEROCONF_TYPE = "_frigate._tcp.local."
# Taken from: http://stackoverflow.com/a/11735897
def get_local_ip() -> str:
def get_local_ip() -> bytes:
"""Try to determine the local IP address of the machine."""
host_ip_str = ""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Use Google Public DNS server to determine own IP
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0] # type: ignore
host_ip_str = sock.getsockname()[0]
except OSError:
try:
return socket.gethostbyname(socket.gethostname())
host_ip_str = socket.gethostbyname(socket.gethostname())
except socket.gaierror:
return "127.0.0.1"
host_ip_str = "127.0.0.1"
finally:
sock.close()
try:
host_ip_pton = socket.inet_pton(socket.AF_INET, host_ip_str)
except OSError:
host_ip_pton = socket.inet_pton(socket.AF_INET6, host_ip_str)
def broadcast_zeroconf(frigate_id):
return host_ip_pton
def broadcast_zeroconf(frigate_id: str) -> Zeroconf:
zeroconf = Zeroconf(interfaces=InterfaceChoice.Default, ip_version=IPVersion.V4Only)
host_ip = get_local_ip()
try:
host_ip_pton = socket.inet_pton(socket.AF_INET, host_ip)
except OSError:
host_ip_pton = socket.inet_pton(socket.AF_INET6, host_ip)
info = ServiceInfo(
ZEROCONF_TYPE,
name=f"{frigate_id}.{ZEROCONF_TYPE}",
addresses=[host_ip_pton],
addresses=[host_ip],
port=5000,
)
@ -56,4 +59,4 @@ def broadcast_zeroconf(frigate_id):
logger.error(
"Frigate instance with identical name present in the local network"
)
return zeroconf
return zeroconf