From d28ad0f0c87bd4f7fad991bb010482944487a1d1 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 24 Jul 2024 08:58:23 -0600 Subject: [PATCH] Use JSON instead of pickle for serialization (#12590) --- frigate/comms/config_updater.py | 4 ++-- frigate/comms/detections_updater.py | 4 ++-- frigate/comms/events_updater.py | 8 ++++---- frigate/comms/inter_process.py | 10 +++++----- frigate/record/maintainer.py | 22 +++++++++++----------- frigate/review/maintainer.py | 24 ++++++++++++------------ 6 files changed, 36 insertions(+), 36 deletions(-) diff --git a/frigate/comms/config_updater.py b/frigate/comms/config_updater.py index 273103911..6b35b149e 100644 --- a/frigate/comms/config_updater.py +++ b/frigate/comms/config_updater.py @@ -21,7 +21,7 @@ class ConfigPublisher: def publish(self, topic: str, payload: any) -> None: """There is no communication back to the processes.""" self.socket.send_string(topic, flags=zmq.SNDMORE) - self.socket.send_pyobj(payload) + self.socket.send_json(payload) def stop(self) -> None: self.stop_event.set() @@ -42,7 +42,7 @@ class ConfigSubscriber: """Returns updated config or None if no update.""" try: topic = self.socket.recv_string(flags=zmq.NOBLOCK) - return (topic, self.socket.recv_pyobj()) + return (topic, self.socket.recv_json()) except zmq.ZMQError: return (None, None) diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py index d9022ebab..af7b7b65d 100644 --- a/frigate/comms/detections_updater.py +++ b/frigate/comms/detections_updater.py @@ -68,7 +68,7 @@ class DetectionPublisher: def send_data(self, payload: any) -> None: """Publish detection.""" self.socket.send_string(self.topic.value, flags=zmq.SNDMORE) - self.socket.send_pyobj(payload) + self.socket.send_json(payload) def stop(self) -> None: self.socket.close() @@ -91,7 +91,7 @@ class DetectionSubscriber: if has_update: topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] - return (topic, self.socket.recv_pyobj()) + return (topic, self.socket.recv_json()) except zmq.ZMQError: pass diff --git a/frigate/comms/events_updater.py b/frigate/comms/events_updater.py index 29207df33..dd8caf8a3 100644 --- a/frigate/comms/events_updater.py +++ b/frigate/comms/events_updater.py @@ -20,7 +20,7 @@ class EventUpdatePublisher: self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] ) -> None: """There is no communication back to the processes.""" - self.socket.send_pyobj(payload) + self.socket.send_json(payload) def stop(self) -> None: self.socket.close() @@ -43,7 +43,7 @@ class EventUpdateSubscriber: has_update, _, _ = zmq.select([self.socket], [], [], timeout) if has_update: - return self.socket.recv_pyobj() + return self.socket.recv_json() except zmq.ZMQError: pass @@ -66,7 +66,7 @@ class EventEndPublisher: self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] ) -> None: """There is no communication back to the processes.""" - self.socket.send_pyobj(payload) + self.socket.send_json(payload) def stop(self) -> None: self.socket.close() @@ -89,7 +89,7 @@ class EventEndSubscriber: has_update, _, _ = zmq.select([self.socket], [], [], timeout) if has_update: - return self.socket.recv_pyobj() + return self.socket.recv_json() except zmq.ZMQError: pass diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py index deec77a40..32cec49e4 100644 --- a/frigate/comms/inter_process.py +++ b/frigate/comms/inter_process.py @@ -37,14 +37,14 @@ class InterProcessCommunicator(Communicator): break try: - (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) + (topic, value) = self.socket.recv_json(flags=zmq.NOBLOCK) response = self._dispatcher(topic, value) if response is not None: - self.socket.send_pyobj(response) + self.socket.send_json(response) else: - self.socket.send_pyobj([]) + self.socket.send_json([]) except zmq.ZMQError: break @@ -65,8 +65,8 @@ class InterProcessRequestor: def send_data(self, topic: str, data: any) -> any: """Sends data and then waits for reply.""" - self.socket.send_pyobj((topic, data)) - return self.socket.recv_pyobj() + self.socket.send_json((topic, data)) + return self.socket.recv_json() def stop(self) -> None: self.socket.close() diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 188c1c3eb..50ead905c 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -419,19 +419,19 @@ class RecordingMaintainer(threading.Thread): ) return { - Recordings.id: f"{start_time.timestamp()}-{rand_id}", - Recordings.camera: camera, - Recordings.path: file_path, - Recordings.start_time: start_time.timestamp(), - Recordings.end_time: end_time.timestamp(), - Recordings.duration: duration, - Recordings.motion: segment_info.motion_count, + Recordings.id.name: f"{start_time.timestamp()}-{rand_id}", + Recordings.camera.name: camera, + Recordings.path.name: file_path, + Recordings.start_time.name: start_time.timestamp(), + Recordings.end_time.name: end_time.timestamp(), + Recordings.duration.name: duration, + Recordings.motion.name: segment_info.motion_count, # TODO: update this to store list of active objects at some point - Recordings.objects: segment_info.active_object_count + Recordings.objects.name: segment_info.active_object_count + (1 if manual_event else 0), - Recordings.regions: segment_info.region_count, - Recordings.dBFS: segment_info.average_dBFS, - Recordings.segment_size: segment_size, + Recordings.regions.name: segment_info.region_count, + Recordings.dBFS.name: segment_info.average_dBFS, + Recordings.segment_size.name: segment_size, } except Exception as e: logger.error(f"Unable to store recording segment {cache_path}") diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 28bc22610..8fb1df362 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -127,13 +127,13 @@ class PendingReviewSegment: def get_data(self, ended: bool) -> dict: return { - ReviewSegment.id: self.id, - ReviewSegment.camera: self.camera, - ReviewSegment.start_time: self.start_time, - ReviewSegment.end_time: self.last_update if ended else None, - ReviewSegment.severity: self.severity.value, - ReviewSegment.thumb_path: self.frame_path, - ReviewSegment.data: { + ReviewSegment.id.name: self.id, + ReviewSegment.camera.name: self.camera, + ReviewSegment.start_time.name: self.start_time, + ReviewSegment.end_time.name: self.last_update if ended else None, + ReviewSegment.severity.name: self.severity.value, + ReviewSegment.thumb_path.name: self.frame_path, + ReviewSegment.data.name: { "detections": list(set(self.detections.keys())), "objects": list(set(self.detections.values())), "sub_labels": list(self.sub_labels), @@ -176,7 +176,7 @@ class ReviewSegmentMaintainer(threading.Thread): """New segment.""" new_data = segment.get_data(ended=False) self.requestor.send_data(UPSERT_REVIEW_SEGMENT, new_data) - start_data = {k.name: v for k, v in new_data.items()} + start_data = {k: v for k, v in new_data.items()} self.requestor.send_data( "reviews", json.dumps( @@ -207,8 +207,8 @@ class ReviewSegmentMaintainer(threading.Thread): json.dumps( { "type": "update", - "before": {k.name: v for k, v in prev_data.items()}, - "after": {k.name: v for k, v in new_data.items()}, + "before": {k: v for k, v in prev_data.items()}, + "after": {k: v for k, v in new_data.items()}, } ), ) @@ -226,8 +226,8 @@ class ReviewSegmentMaintainer(threading.Thread): json.dumps( { "type": "end", - "before": {k.name: v for k, v in prev_data.items()}, - "after": {k.name: v for k, v in final_data.items()}, + "before": {k: v for k, v in prev_data.items()}, + "after": {k: v for k, v in final_data.items()}, } ), )