mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
36cbffcc5e
* Initial re-implementation of semantic search * put docker-compose back and make reindex match docs * remove debug code and fix import * fix docs * manually build pysqlite3 as binaries are only available for x86-64 * update comment in build_pysqlite3.sh * only embed objects * better error handling when genai fails * ask ollama to pull requested model at startup * update ollama docs * address some PR review comments * fix lint * use IPC to write description, update docs for reindex * remove gemini-pro-vision from docs as it will be unavailable soon * fix OpenAI doc available models * fix api error in gemini and metadata for embeddings
101 lines
2.9 KiB
Python
101 lines
2.9 KiB
Python
"""Facilitates communication over zmq proxy."""
|
|
|
|
import threading
|
|
from typing import Optional
|
|
|
|
import zmq
|
|
|
|
SOCKET_PUB = "ipc:///tmp/cache/proxy_pub"
|
|
SOCKET_SUB = "ipc:///tmp/cache/proxy_sub"
|
|
|
|
|
|
class ZmqProxyRunner(threading.Thread):
|
|
def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
|
|
threading.Thread.__init__(self)
|
|
self.name = "detection_proxy"
|
|
self.context = context
|
|
|
|
def run(self) -> None:
|
|
"""Run the proxy."""
|
|
incoming = self.context.socket(zmq.XSUB)
|
|
incoming.bind(SOCKET_PUB)
|
|
outgoing = self.context.socket(zmq.XPUB)
|
|
outgoing.bind(SOCKET_SUB)
|
|
|
|
# Blocking: This will unblock (via exception) when we destroy the context
|
|
# The incoming and outgoing sockets will be closed automatically
|
|
# when the context is destroyed as well.
|
|
try:
|
|
zmq.proxy(incoming, outgoing)
|
|
except zmq.ZMQError:
|
|
pass
|
|
|
|
|
|
class ZmqProxy:
|
|
"""Proxies video and audio detections."""
|
|
|
|
def __init__(self) -> None:
|
|
self.context = zmq.Context()
|
|
self.runner = ZmqProxyRunner(self.context)
|
|
self.runner.start()
|
|
|
|
def stop(self) -> None:
|
|
# destroying the context will tell the proxy to stop
|
|
self.context.destroy()
|
|
self.runner.join()
|
|
|
|
|
|
class Publisher:
|
|
"""Publishes messages."""
|
|
|
|
topic_base: str = ""
|
|
|
|
def __init__(self, topic: str = "") -> None:
|
|
self.topic = f"{self.topic_base}{topic}"
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.PUB)
|
|
self.socket.connect(SOCKET_PUB)
|
|
|
|
def publish(self, payload: any, sub_topic: str = "") -> None:
|
|
"""Publish message."""
|
|
self.socket.send_string(f"{self.topic}{sub_topic}", flags=zmq.SNDMORE)
|
|
self.socket.send_json(payload)
|
|
|
|
def stop(self) -> None:
|
|
self.socket.close()
|
|
self.context.destroy()
|
|
|
|
|
|
class Subscriber:
|
|
"""Receives messages."""
|
|
|
|
topic_base: str = ""
|
|
|
|
def __init__(self, topic: str = "") -> None:
|
|
self.topic = f"{self.topic_base}{topic}"
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic)
|
|
self.socket.connect(SOCKET_SUB)
|
|
|
|
def check_for_update(self, timeout: float = 1) -> Optional[tuple[str, any]]:
|
|
"""Returns message or None if no update."""
|
|
try:
|
|
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
|
|
|
|
if has_update:
|
|
topic = self.socket.recv_string(flags=zmq.NOBLOCK)
|
|
payload = self.socket.recv_json()
|
|
return self._return_object(topic, payload)
|
|
except zmq.ZMQError:
|
|
pass
|
|
|
|
return self._return_object("", None)
|
|
|
|
def stop(self) -> None:
|
|
self.socket.close()
|
|
self.context.destroy()
|
|
|
|
def _return_object(self, topic: str, payload: any) -> any:
|
|
return payload
|