mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-07 00:06:57 +01:00
165 lines
4.6 KiB
Python
165 lines
4.6 KiB
Python
|
import asyncio
|
||
|
import faulthandler
|
||
|
import logging
|
||
|
import multiprocessing as mp
|
||
|
import signal
|
||
|
import sys
|
||
|
import threading
|
||
|
from abc import ABC, abstractmethod
|
||
|
from asyncio.exceptions import TimeoutError
|
||
|
from logging.handlers import QueueHandler
|
||
|
from types import FrameType
|
||
|
from typing import Optional
|
||
|
|
||
|
import frigate.log
|
||
|
|
||
|
from .multiprocessing_waiter import wait as mp_wait
|
||
|
from .service import Service, ServiceManager
|
||
|
|
||
|
DEFAULT_STOP_TIMEOUT = 10 # seconds
|
||
|
|
||
|
|
||
|
class BaseServiceProcess(Service, ABC):
|
||
|
"""A Service the manages a multiprocessing.Process."""
|
||
|
|
||
|
_process: Optional[mp.Process]
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
*,
|
||
|
name: Optional[str] = None,
|
||
|
manager: Optional[ServiceManager] = None,
|
||
|
) -> None:
|
||
|
super().__init__(name=name, manager=manager)
|
||
|
|
||
|
self._process = None
|
||
|
|
||
|
async def on_start(self) -> None:
|
||
|
if self._process is not None:
|
||
|
if self._process.is_alive():
|
||
|
return # Already started.
|
||
|
else:
|
||
|
self._process.close()
|
||
|
|
||
|
# At this point, the process is either stopped or dead, so we can recreate it.
|
||
|
self._process = mp.Process(target=self._run)
|
||
|
self._process.name = self.name
|
||
|
self._process.daemon = True
|
||
|
self.before_start()
|
||
|
self._process.start()
|
||
|
self.after_start()
|
||
|
|
||
|
self.manager.logger.info(f"Started {self.name} (pid: {self._process.pid})")
|
||
|
|
||
|
async def on_stop(
|
||
|
self,
|
||
|
*,
|
||
|
force: bool = False,
|
||
|
timeout: Optional[float] = None,
|
||
|
) -> None:
|
||
|
if timeout is None:
|
||
|
timeout = DEFAULT_STOP_TIMEOUT
|
||
|
|
||
|
if self._process is None:
|
||
|
return # Already stopped.
|
||
|
|
||
|
running = True
|
||
|
|
||
|
if not force:
|
||
|
self._process.terminate()
|
||
|
try:
|
||
|
await asyncio.wait_for(mp_wait(self._process), timeout)
|
||
|
running = False
|
||
|
except TimeoutError:
|
||
|
self.manager.logger.warning(
|
||
|
f"{self.name} is still running after "
|
||
|
f"{timeout} seconds. Killing."
|
||
|
)
|
||
|
|
||
|
if running:
|
||
|
self._process.kill()
|
||
|
await mp_wait(self._process)
|
||
|
|
||
|
self._process.close()
|
||
|
self._process = None
|
||
|
|
||
|
self.manager.logger.info(f"{self.name} stopped")
|
||
|
|
||
|
@property
|
||
|
def pid(self) -> Optional[int]:
|
||
|
return self._process.pid if self._process else None
|
||
|
|
||
|
def _run(self) -> None:
|
||
|
self.before_run()
|
||
|
self.run()
|
||
|
self.after_run()
|
||
|
|
||
|
def before_start(self) -> None:
|
||
|
pass
|
||
|
|
||
|
def after_start(self) -> None:
|
||
|
pass
|
||
|
|
||
|
def before_run(self) -> None:
|
||
|
pass
|
||
|
|
||
|
def after_run(self) -> None:
|
||
|
pass
|
||
|
|
||
|
@abstractmethod
|
||
|
def run(self) -> None:
|
||
|
pass
|
||
|
|
||
|
def __getstate__(self) -> dict:
|
||
|
return {
|
||
|
k: v
|
||
|
for k, v in self.__dict__.items()
|
||
|
if not (k.startswith("_Service__") or k == "_process")
|
||
|
}
|
||
|
|
||
|
|
||
|
class ServiceProcess(BaseServiceProcess):
|
||
|
logger: logging.Logger
|
||
|
|
||
|
@property
|
||
|
def stop_event(self) -> threading.Event:
|
||
|
# Lazily create the stop_event. This allows the signal handler to tell if anyone is
|
||
|
# monitoring the stop event, and to raise a SystemExit if not.
|
||
|
if "stop_event" not in self.__dict__:
|
||
|
stop_event = threading.Event()
|
||
|
self.__dict__["stop_event"] = stop_event
|
||
|
else:
|
||
|
stop_event = self.__dict__["stop_event"]
|
||
|
assert isinstance(stop_event, threading.Event)
|
||
|
|
||
|
return stop_event
|
||
|
|
||
|
def before_start(self) -> None:
|
||
|
if frigate.log.log_listener is None:
|
||
|
raise RuntimeError("Logging has not yet been set up.")
|
||
|
self.__log_queue = frigate.log.log_listener.queue
|
||
|
|
||
|
def before_run(self) -> None:
|
||
|
super().before_run()
|
||
|
|
||
|
faulthandler.enable()
|
||
|
|
||
|
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
|
||
|
# Get the stop_event through the dict to bypass lazy initialization.
|
||
|
stop_event = self.__dict__.get("stop_event")
|
||
|
if stop_event is not None:
|
||
|
# Someone is monitoring stop_event. We should set it.
|
||
|
stop_event.set()
|
||
|
else:
|
||
|
# Nobody is monitoring stop_event. We should raise SystemExit.
|
||
|
sys.exit()
|
||
|
|
||
|
signal.signal(signal.SIGTERM, receiveSignal)
|
||
|
signal.signal(signal.SIGINT, receiveSignal)
|
||
|
|
||
|
self.logger = logging.getLogger(self.name)
|
||
|
|
||
|
logging.basicConfig(handlers=[], force=True)
|
||
|
logging.getLogger().addHandler(QueueHandler(self.__log_queue))
|
||
|
del self.__log_queue
|