blakeblackshear.frigate/frigate/comms/inter_process.py
Nicolas Mowen dd3dc7949a
Use zmq for inter process communication (#9309)
* Use zmq for inter process communication

* Use localhost for reply and request

* Use pyobj instead of json and Need to use separate requestors for each audio listener

* Cleanup port defining
2024-02-14 18:24:36 -06:00

73 lines
2.3 KiB
Python

"""Facilitates communication between processes."""
import multiprocessing as mp
import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
import zmq
from frigate.comms.dispatcher import Communicator
from frigate.const import PORT_INTER_PROCESS_COMM
class InterProcessCommunicator(Communicator):
def __init__(self) -> None:
INTER_PROCESS_COMM_PORT = (
os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}")
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None:
"""There is no communication back to the processes."""
pass
def subscribe(self, receiver: Callable) -> None:
self._dispatcher = receiver
self.reader_thread = threading.Thread(target=self.read)
self.reader_thread.start()
def read(self) -> None:
while not self.stop_event.wait(0.5):
while True: # load all messages that are queued
try:
(topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
response = self._dispatcher(topic, value)
if response is not None:
self.socket.send_pyobj(response)
else:
self.socket.send_pyobj([])
except zmq.ZMQError:
break
def stop(self) -> None:
self.stop_event.set()
self.reader_thread.join()
self.socket.close()
self.context.destroy()
class InterProcessRequestor:
"""Simplifies sending data to InterProcessCommunicator and getting a reply."""
def __init__(self) -> None:
port = os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(f"tcp://127.0.0.1:{port}")
def send_data(self, topic: str, data: any) -> any:
"""Sends data and then waits for reply."""
self.socket.send_pyobj((topic, data))
return self.socket.recv_pyobj()
def stop(self) -> None:
self.socket.close()
self.context.destroy()