blakeblackshear.frigate/frigate/comms/zmq_proxy.py
Martin Weinelt 4d4d54d030
Fix various typing issues (#18187)
* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
2025-05-13 08:27:20 -06:00

103 lines
2.9 KiB
Python

"""Facilitates communication over zmq proxy."""
import json
import threading
from typing import Any, Optional
import zmq
from frigate.const import FAST_QUEUE_TIMEOUT
SOCKET_PUB = "ipc:///tmp/cache/proxy_pub"
SOCKET_SUB = "ipc:///tmp/cache/proxy_sub"
class ZmqProxyRunner(threading.Thread):
def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
super().__init__(name="detection_proxy")
self.context = context
def run(self) -> None:
"""Run the proxy."""
incoming = self.context.socket(zmq.XSUB)
incoming.bind(SOCKET_PUB)
outgoing = self.context.socket(zmq.XPUB)
outgoing.bind(SOCKET_SUB)
# Blocking: This will unblock (via exception) when we destroy the context
# The incoming and outgoing sockets will be closed automatically
# when the context is destroyed as well.
try:
zmq.proxy(incoming, outgoing)
except zmq.ZMQError:
pass
class ZmqProxy:
"""Proxies video and audio detections."""
def __init__(self) -> None:
self.context = zmq.Context()
self.runner = ZmqProxyRunner(self.context)
self.runner.start()
def stop(self) -> None:
# destroying the context will tell the proxy to stop
self.context.destroy()
self.runner.join()
class Publisher:
"""Publishes messages."""
topic_base: str = ""
def __init__(self, topic: str = "") -> None:
self.topic = f"{self.topic_base}{topic}"
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.connect(SOCKET_PUB)
def publish(self, payload: Any, sub_topic: str = "") -> None:
"""Publish message."""
self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}")
def stop(self) -> None:
self.socket.close()
self.context.destroy()
class Subscriber:
"""Receives messages."""
topic_base: str = ""
def __init__(self, topic: str = "") -> None:
self.topic = f"{self.topic_base}{topic}"
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic)
self.socket.connect(SOCKET_SUB)
def check_for_update(
self, timeout: float = FAST_QUEUE_TIMEOUT
) -> Optional[tuple[str, Any]]:
"""Returns message or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
if has_update:
parts = self.socket.recv_string(flags=zmq.NOBLOCK).split(maxsplit=1)
return self._return_object(parts[0], json.loads(parts[1]))
except zmq.ZMQError:
pass
return self._return_object("", None)
def stop(self) -> None:
self.socket.close()
self.context.destroy()
def _return_object(self, topic: str, payload: Any) -> Any:
return payload