mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-07 00:06:57 +01:00
8163c036ef
* Use zmq for events ended * Cleanup * Update deps * formatting
104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
"""Facilitates communication between processes."""
|
|
|
|
import threading
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
import zmq
|
|
|
|
SOCKET_CONTROL = "inproc://control.detections_updater"
|
|
SOCKET_PUB = "ipc:///tmp/cache/detect_pub"
|
|
SOCKET_SUB = "ipc:///tmp/cache/detect_sub"
|
|
|
|
|
|
class DetectionTypeEnum(str, Enum):
|
|
all = ""
|
|
api = "api"
|
|
video = "video"
|
|
audio = "audio"
|
|
|
|
|
|
class DetectionProxyRunner(threading.Thread):
|
|
def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
|
|
threading.Thread.__init__(self)
|
|
self.name = "detection_proxy"
|
|
self.context = context
|
|
|
|
def run(self) -> None:
|
|
"""Run the proxy."""
|
|
control = self.context.socket(zmq.SUB)
|
|
control.connect(SOCKET_CONTROL)
|
|
control.setsockopt_string(zmq.SUBSCRIBE, "")
|
|
incoming = self.context.socket(zmq.XSUB)
|
|
incoming.bind(SOCKET_PUB)
|
|
outgoing = self.context.socket(zmq.XPUB)
|
|
outgoing.bind(SOCKET_SUB)
|
|
|
|
zmq.proxy_steerable(
|
|
incoming, outgoing, None, control
|
|
) # blocking, will unblock terminate message is received
|
|
incoming.close()
|
|
outgoing.close()
|
|
|
|
|
|
class DetectionProxy:
|
|
"""Proxies video and audio detections."""
|
|
|
|
def __init__(self) -> None:
|
|
self.context = zmq.Context()
|
|
self.control = self.context.socket(zmq.PUB)
|
|
self.control.bind(SOCKET_CONTROL)
|
|
self.runner = DetectionProxyRunner(self.context)
|
|
self.runner.start()
|
|
|
|
def stop(self) -> None:
|
|
self.control.send_string("TERMINATE") # tell the proxy to stop
|
|
self.runner.join()
|
|
self.context.destroy()
|
|
|
|
|
|
class DetectionPublisher:
|
|
"""Simplifies receiving video and audio detections."""
|
|
|
|
def __init__(self, topic: DetectionTypeEnum) -> None:
|
|
self.topic = topic
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.PUB)
|
|
self.socket.connect(SOCKET_PUB)
|
|
|
|
def send_data(self, payload: any) -> None:
|
|
"""Publish detection."""
|
|
self.socket.send_string(self.topic.value, flags=zmq.SNDMORE)
|
|
self.socket.send_pyobj(payload)
|
|
|
|
def stop(self) -> None:
|
|
self.socket.close()
|
|
self.context.destroy()
|
|
|
|
|
|
class DetectionSubscriber:
|
|
"""Simplifies receiving video and audio detections."""
|
|
|
|
def __init__(self, topic: DetectionTypeEnum) -> None:
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value)
|
|
self.socket.connect(SOCKET_SUB)
|
|
|
|
def get_data(self, timeout: float = None) -> Optional[tuple[str, any]]:
|
|
"""Returns detections or None if no update."""
|
|
try:
|
|
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
|
|
|
|
if has_update:
|
|
topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)]
|
|
return (topic, self.socket.recv_pyobj())
|
|
except zmq.ZMQError:
|
|
pass
|
|
|
|
return (None, None)
|
|
|
|
def stop(self) -> None:
|
|
self.socket.close()
|
|
self.context.destroy()
|