From 01fa1777ac31ea17ac4445dc942be906b904f43d Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 19 Aug 2024 15:23:38 -0600 Subject: [PATCH] Fix ZMQ race condition with events (#13198) --- frigate/comms/zmq_proxy.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/frigate/comms/zmq_proxy.py b/frigate/comms/zmq_proxy.py index b6012966f..bbe660160 100644 --- a/frigate/comms/zmq_proxy.py +++ b/frigate/comms/zmq_proxy.py @@ -1,5 +1,6 @@ """Facilitates communication over zmq proxy.""" +import json import threading from typing import Optional @@ -58,8 +59,7 @@ class Publisher: def publish(self, payload: any, sub_topic: str = "") -> None: """Publish message.""" - self.socket.send_string(f"{self.topic}{sub_topic}", flags=zmq.SNDMORE) - self.socket.send_json(payload) + self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}") def stop(self) -> None: self.socket.close() @@ -84,9 +84,8 @@ class Subscriber: has_update, _, _ = zmq.select([self.socket], [], [], timeout) if has_update: - topic = self.socket.recv_string(flags=zmq.NOBLOCK) - payload = self.socket.recv_json() - return self._return_object(topic, payload) + parts = self.socket.recv_string(flags=zmq.NOBLOCK).split(maxsplit=1) + return self._return_object(parts[0], json.loads(parts[1])) except zmq.ZMQError: pass