mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-09-05 17:51:36 +02:00
* Enable mypy for comms * Make zmq data types consistent * Cleanup inter process typing issues * Cleanup embeddings typing * Cleanup config updater * Cleanup recordings updator * Make publisher have a generic type * Cleanup event metadata updater * Cleanup event metadata updater * Cleanup detections updater * Cleanup websocket * Cleanup mqtt * Cleanup webpush * Cleanup dispatcher * Formatting * Remove unused * Add return type * Fix tests * Fix semantic triggers config typing * Cleanup
35 lines
882 B
Python
35 lines
882 B
Python
"""Facilitates communication between processes."""
|
|
|
|
import logging
|
|
from enum import Enum
|
|
|
|
from .zmq_proxy import Publisher, Subscriber
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RecordingsDataTypeEnum(str, Enum):
|
|
all = ""
|
|
recordings_available_through = "recordings_available_through"
|
|
|
|
|
|
class RecordingsDataPublisher(Publisher[tuple[str, float]]):
|
|
"""Publishes latest recording data."""
|
|
|
|
topic_base = "recordings/"
|
|
|
|
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
|
|
super().__init__(topic.value)
|
|
|
|
def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
|
|
super().publish(payload, sub_topic)
|
|
|
|
|
|
class RecordingsDataSubscriber(Subscriber):
|
|
"""Receives latest recording data."""
|
|
|
|
topic_base = "recordings/"
|
|
|
|
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
|
|
super().__init__(topic.value)
|