mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-30 13:48:07 +02:00
* Fix the `Any` typing hint treewide There has been confusion between the Any type[1] and the any function[2] in typing hints. [1] https://docs.python.org/3/library/typing.html#typing.Any [2] https://docs.python.org/3/library/functions.html#any * Fix typing for various frame_shape members Frame shapes are most likely defined by height and width, so a single int cannot express that. * Wrap gpu stats functions in Optional[] These can return `None`, so they need to be `Type | None`, which is what `Optional` expresses very nicely. * Fix return type in get_latest_segment_datetime Returns a datetime object, not an integer. * Make the return type of FrameManager.write optional This is necessary since the SharedMemoryFrameManager.write function can return None. * Fix total_seconds() return type in get_tz_modifiers The function returns a float, not an int. https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds * Account for floating point results in to_relative_box Because the function uses division the return types may either be int or float. * Resolve ruff deprecation warning The config has been split into formatter and linter, and the global options are deprecated.
103 lines
2.9 KiB
Python
103 lines
2.9 KiB
Python
"""Facilitates communication over zmq proxy."""
|
|
|
|
import json
|
|
import threading
|
|
from typing import Any, Optional
|
|
|
|
import zmq
|
|
|
|
from frigate.const import FAST_QUEUE_TIMEOUT
|
|
|
|
SOCKET_PUB = "ipc:///tmp/cache/proxy_pub"
|
|
SOCKET_SUB = "ipc:///tmp/cache/proxy_sub"
|
|
|
|
|
|
class ZmqProxyRunner(threading.Thread):
|
|
def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
|
|
super().__init__(name="detection_proxy")
|
|
self.context = context
|
|
|
|
def run(self) -> None:
|
|
"""Run the proxy."""
|
|
incoming = self.context.socket(zmq.XSUB)
|
|
incoming.bind(SOCKET_PUB)
|
|
outgoing = self.context.socket(zmq.XPUB)
|
|
outgoing.bind(SOCKET_SUB)
|
|
|
|
# Blocking: This will unblock (via exception) when we destroy the context
|
|
# The incoming and outgoing sockets will be closed automatically
|
|
# when the context is destroyed as well.
|
|
try:
|
|
zmq.proxy(incoming, outgoing)
|
|
except zmq.ZMQError:
|
|
pass
|
|
|
|
|
|
class ZmqProxy:
|
|
"""Proxies video and audio detections."""
|
|
|
|
def __init__(self) -> None:
|
|
self.context = zmq.Context()
|
|
self.runner = ZmqProxyRunner(self.context)
|
|
self.runner.start()
|
|
|
|
def stop(self) -> None:
|
|
# destroying the context will tell the proxy to stop
|
|
self.context.destroy()
|
|
self.runner.join()
|
|
|
|
|
|
class Publisher:
|
|
"""Publishes messages."""
|
|
|
|
topic_base: str = ""
|
|
|
|
def __init__(self, topic: str = "") -> None:
|
|
self.topic = f"{self.topic_base}{topic}"
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.PUB)
|
|
self.socket.connect(SOCKET_PUB)
|
|
|
|
def publish(self, payload: Any, sub_topic: str = "") -> None:
|
|
"""Publish message."""
|
|
self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}")
|
|
|
|
def stop(self) -> None:
|
|
self.socket.close()
|
|
self.context.destroy()
|
|
|
|
|
|
class Subscriber:
|
|
"""Receives messages."""
|
|
|
|
topic_base: str = ""
|
|
|
|
def __init__(self, topic: str = "") -> None:
|
|
self.topic = f"{self.topic_base}{topic}"
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic)
|
|
self.socket.connect(SOCKET_SUB)
|
|
|
|
def check_for_update(
|
|
self, timeout: float = FAST_QUEUE_TIMEOUT
|
|
) -> Optional[tuple[str, Any]]:
|
|
"""Returns message or None if no update."""
|
|
try:
|
|
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
|
|
|
|
if has_update:
|
|
parts = self.socket.recv_string(flags=zmq.NOBLOCK).split(maxsplit=1)
|
|
return self._return_object(parts[0], json.loads(parts[1]))
|
|
except zmq.ZMQError:
|
|
pass
|
|
|
|
return self._return_object("", None)
|
|
|
|
def stop(self) -> None:
|
|
self.socket.close()
|
|
self.context.destroy()
|
|
|
|
def _return_object(self, topic: str, payload: Any) -> Any:
|
|
return payload
|