blakeblackshear.frigate/frigate/comms/recordings_updater.py
Nicolas Mowen 687b68246b
Improve comms typing (#18599)
* Enable mypy for comms

* Make zmq data types consistent

* Cleanup inter process typing issues

* Cleanup embeddings typing

* Cleanup config updater

* Cleanup recordings updator

* Make publisher have a generic type

* Cleanup event metadata updater

* Cleanup event metadata updater

* Cleanup detections updater

* Cleanup websocket

* Cleanup mqtt

* Cleanup webpush

* Cleanup dispatcher

* Formatting

* Remove unused

* Add return type

* Fix tests

* Fix semantic triggers config typing

* Cleanup
2025-08-08 07:08:37 -05:00

35 lines
882 B
Python

"""Facilitates communication between processes."""
import logging
from enum import Enum
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum):
all = ""
recordings_available_through = "recordings_available_through"
class RecordingsDataPublisher(Publisher[tuple[str, float]]):
"""Publishes latest recording data."""
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)
def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
super().publish(payload, sub_topic)
class RecordingsDataSubscriber(Subscriber):
"""Receives latest recording data."""
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)