blakeblackshear.frigate/frigate/comms/inter_process.py

41 lines
1.1 KiB
Python
Raw Normal View History

import multiprocessing as mp
import queue
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from faster_fifo import Queue
from frigate.comms.dispatcher import Communicator
class InterProcessCommunicator(Communicator):
def __init__(self, queue: Queue) -> None:
self.queue = queue
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None:
"""There is no communication back to the processes."""
pass
def subscribe(self, receiver: Callable) -> None:
self._dispatcher = receiver
self.reader_thread = threading.Thread(target=self.read)
self.reader_thread.start()
def read(self) -> None:
while not self.stop_event.is_set():
try:
(
topic,
value,
) = self.queue.get(True, 1)
except queue.Empty:
continue
self._dispatcher(topic, value)
def stop(self) -> None:
self.stop_event.set()
self.reader_thread.join()