2024-02-19 14:26:59 +01:00
|
|
|
"""Facilitates communication between processes."""
|
|
|
|
|
|
|
|
import multiprocessing as mp
|
|
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
import zmq
|
|
|
|
|
|
|
|
SOCKET_PUB_SUB = "ipc:///tmp/cache/config"
|
|
|
|
|
|
|
|
|
|
|
|
class ConfigPublisher:
|
|
|
|
"""Publishes config changes to different processes."""
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
self.context = zmq.Context()
|
|
|
|
self.socket = self.context.socket(zmq.PUB)
|
|
|
|
self.socket.bind(SOCKET_PUB_SUB)
|
|
|
|
self.stop_event: MpEvent = mp.Event()
|
|
|
|
|
|
|
|
def publish(self, topic: str, payload: any) -> None:
|
|
|
|
"""There is no communication back to the processes."""
|
|
|
|
self.socket.send_string(topic, flags=zmq.SNDMORE)
|
2024-07-24 17:37:29 +02:00
|
|
|
self.socket.send_pyobj(payload)
|
2024-02-19 14:26:59 +01:00
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
self.stop_event.set()
|
|
|
|
self.socket.close()
|
|
|
|
self.context.destroy()
|
|
|
|
|
|
|
|
|
|
|
|
class ConfigSubscriber:
|
|
|
|
"""Simplifies receiving an updated config."""
|
|
|
|
|
|
|
|
def __init__(self, topic: str) -> None:
|
|
|
|
self.context = zmq.Context()
|
|
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
|
|
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
|
|
|
|
self.socket.connect(SOCKET_PUB_SUB)
|
|
|
|
|
|
|
|
def check_for_update(self) -> Optional[tuple[str, any]]:
|
|
|
|
"""Returns updated config or None if no update."""
|
|
|
|
try:
|
|
|
|
topic = self.socket.recv_string(flags=zmq.NOBLOCK)
|
2024-07-24 17:37:29 +02:00
|
|
|
return (topic, self.socket.recv_pyobj())
|
2024-02-19 14:26:59 +01:00
|
|
|
except zmq.ZMQError:
|
|
|
|
return (None, None)
|
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
self.socket.close()
|
|
|
|
self.context.destroy()
|