mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			164 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			164 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import asyncio
 | |
| import faulthandler
 | |
| import logging
 | |
| import multiprocessing as mp
 | |
| import signal
 | |
| import sys
 | |
| import threading
 | |
| from abc import ABC, abstractmethod
 | |
| from asyncio.exceptions import TimeoutError
 | |
| from logging.handlers import QueueHandler
 | |
| from types import FrameType
 | |
| from typing import Optional
 | |
| 
 | |
| import frigate.log
 | |
| 
 | |
| from .multiprocessing_waiter import wait as mp_wait
 | |
| from .service import Service, ServiceManager
 | |
| 
 | |
| DEFAULT_STOP_TIMEOUT = 10  # seconds
 | |
| 
 | |
| 
 | |
| class BaseServiceProcess(Service, ABC):
 | |
|     """A Service the manages a multiprocessing.Process."""
 | |
| 
 | |
|     _process: Optional[mp.Process]
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         *,
 | |
|         name: Optional[str] = None,
 | |
|         manager: Optional[ServiceManager] = None,
 | |
|     ) -> None:
 | |
|         super().__init__(name=name, manager=manager)
 | |
| 
 | |
|         self._process = None
 | |
| 
 | |
|     async def on_start(self) -> None:
 | |
|         if self._process is not None:
 | |
|             if self._process.is_alive():
 | |
|                 return  # Already started.
 | |
|             else:
 | |
|                 self._process.close()
 | |
| 
 | |
|         # At this point, the process is either stopped or dead, so we can recreate it.
 | |
|         self._process = mp.Process(target=self._run)
 | |
|         self._process.name = self.name
 | |
|         self._process.daemon = True
 | |
|         self.before_start()
 | |
|         self._process.start()
 | |
|         self.after_start()
 | |
| 
 | |
|         self.manager.logger.info(f"Started {self.name} (pid: {self._process.pid})")
 | |
| 
 | |
|     async def on_stop(
 | |
|         self,
 | |
|         *,
 | |
|         force: bool = False,
 | |
|         timeout: Optional[float] = None,
 | |
|     ) -> None:
 | |
|         if timeout is None:
 | |
|             timeout = DEFAULT_STOP_TIMEOUT
 | |
| 
 | |
|         if self._process is None:
 | |
|             return  # Already stopped.
 | |
| 
 | |
|         running = True
 | |
| 
 | |
|         if not force:
 | |
|             self._process.terminate()
 | |
|             try:
 | |
|                 await asyncio.wait_for(mp_wait(self._process), timeout)
 | |
|                 running = False
 | |
|             except TimeoutError:
 | |
|                 self.manager.logger.warning(
 | |
|                     f"{self.name} is still running after {timeout} seconds. Killing."
 | |
|                 )
 | |
| 
 | |
|         if running:
 | |
|             self._process.kill()
 | |
|             await mp_wait(self._process)
 | |
| 
 | |
|         self._process.close()
 | |
|         self._process = None
 | |
| 
 | |
|         self.manager.logger.info(f"{self.name} stopped")
 | |
| 
 | |
|     @property
 | |
|     def pid(self) -> Optional[int]:
 | |
|         return self._process.pid if self._process else None
 | |
| 
 | |
|     def _run(self) -> None:
 | |
|         self.before_run()
 | |
|         self.run()
 | |
|         self.after_run()
 | |
| 
 | |
|     def before_start(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def after_start(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def before_run(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def after_run(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     @abstractmethod
 | |
|     def run(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def __getstate__(self) -> dict:
 | |
|         return {
 | |
|             k: v
 | |
|             for k, v in self.__dict__.items()
 | |
|             if not (k.startswith("_Service__") or k == "_process")
 | |
|         }
 | |
| 
 | |
| 
 | |
| class ServiceProcess(BaseServiceProcess):
 | |
|     logger: logging.Logger
 | |
| 
 | |
|     @property
 | |
|     def stop_event(self) -> threading.Event:
 | |
|         # Lazily create the stop_event. This allows the signal handler to tell if anyone is
 | |
|         # monitoring the stop event, and to raise a SystemExit if not.
 | |
|         if "stop_event" not in self.__dict__:
 | |
|             stop_event = threading.Event()
 | |
|             self.__dict__["stop_event"] = stop_event
 | |
|         else:
 | |
|             stop_event = self.__dict__["stop_event"]
 | |
|             assert isinstance(stop_event, threading.Event)
 | |
| 
 | |
|         return stop_event
 | |
| 
 | |
|     def before_start(self) -> None:
 | |
|         if frigate.log.log_listener is None:
 | |
|             raise RuntimeError("Logging has not yet been set up.")
 | |
|         self.__log_queue = frigate.log.log_listener.queue
 | |
| 
 | |
|     def before_run(self) -> None:
 | |
|         super().before_run()
 | |
| 
 | |
|         faulthandler.enable()
 | |
| 
 | |
|         def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
 | |
|             # Get the stop_event through the dict to bypass lazy initialization.
 | |
|             stop_event = self.__dict__.get("stop_event")
 | |
|             if stop_event is not None:
 | |
|                 # Someone is monitoring stop_event. We should set it.
 | |
|                 stop_event.set()
 | |
|             else:
 | |
|                 # Nobody is monitoring stop_event. We should raise SystemExit.
 | |
|                 sys.exit()
 | |
| 
 | |
|         signal.signal(signal.SIGTERM, receiveSignal)
 | |
|         signal.signal(signal.SIGINT, receiveSignal)
 | |
| 
 | |
|         self.logger = logging.getLogger(self.name)
 | |
| 
 | |
|         logging.basicConfig(handlers=[], force=True)
 | |
|         logging.getLogger().addHandler(QueueHandler(self.__log_queue))
 | |
|         del self.__log_queue
 |