blakeblackshear.frigate/frigate/comms/inter_process.py
Nicolas Mowen 662025a961
Remove faster_fifo (#7181)
* Remove faster_fifo

* Remove const
2023-07-16 07:42:56 -05:00

40 lines
1.1 KiB
Python

import multiprocessing as mp
import queue
import threading
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from frigate.comms.dispatcher import Communicator
class InterProcessCommunicator(Communicator):
def __init__(self, queue: Queue) -> None:
self.queue = queue
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None:
"""There is no communication back to the processes."""
pass
def subscribe(self, receiver: Callable) -> None:
self._dispatcher = receiver
self.reader_thread = threading.Thread(target=self.read)
self.reader_thread.start()
def read(self) -> None:
while not self.stop_event.is_set():
try:
(
topic,
value,
) = self.queue.get(True, 1)
except queue.Empty:
continue
self._dispatcher(topic, value)
def stop(self) -> None:
self.stop_event.set()
self.reader_thread.join()