2023-07-14 02:52:33 +02:00
|
|
|
import multiprocessing as mp
|
|
|
|
import queue
|
|
|
|
import threading
|
2023-07-16 14:42:56 +02:00
|
|
|
from multiprocessing import Queue
|
2023-07-14 02:52:33 +02:00
|
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
from typing import Callable
|
|
|
|
|
|
|
|
from frigate.comms.dispatcher import Communicator
|
|
|
|
|
|
|
|
|
|
|
|
class InterProcessCommunicator(Communicator):
|
|
|
|
def __init__(self, queue: Queue) -> None:
|
|
|
|
self.queue = queue
|
|
|
|
self.stop_event: MpEvent = mp.Event()
|
|
|
|
|
|
|
|
def publish(self, topic: str, payload: str, retain: bool) -> None:
|
|
|
|
"""There is no communication back to the processes."""
|
|
|
|
pass
|
|
|
|
|
|
|
|
def subscribe(self, receiver: Callable) -> None:
|
|
|
|
self._dispatcher = receiver
|
|
|
|
self.reader_thread = threading.Thread(target=self.read)
|
|
|
|
self.reader_thread.start()
|
|
|
|
|
|
|
|
def read(self) -> None:
|
|
|
|
while not self.stop_event.is_set():
|
|
|
|
try:
|
|
|
|
(
|
|
|
|
topic,
|
|
|
|
value,
|
|
|
|
) = self.queue.get(True, 1)
|
|
|
|
except queue.Empty:
|
|
|
|
continue
|
|
|
|
|
|
|
|
self._dispatcher(topic, value)
|
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
self.stop_event.set()
|
|
|
|
self.reader_thread.join()
|