blakeblackshear.frigate/frigate/comms/events_updater.py
Nicolas Mowen 687b68246b
Improve comms typing (#18599)
* Enable mypy for comms

* Make zmq data types consistent

* Cleanup inter process typing issues

* Cleanup embeddings typing

* Cleanup config updater

* Cleanup recordings updator

* Make publisher have a generic type

* Cleanup event metadata updater

* Cleanup event metadata updater

* Cleanup detections updater

* Cleanup websocket

* Cleanup mqtt

* Cleanup webpush

* Cleanup dispatcher

* Formatting

* Remove unused

* Add return type

* Fix tests

* Fix semantic triggers config typing

* Cleanup
2025-08-08 07:08:37 -05:00

62 lines
1.4 KiB
Python

"""Facilitates communication between processes."""
from typing import Any
from frigate.events.types import EventStateEnum, EventTypeEnum
from .zmq_proxy import Publisher, Subscriber
class EventUpdatePublisher(
Publisher[tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]]]
):
"""Publishes events (objects, audio, manual)."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("update")
def publish(
self,
payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]],
sub_topic: str = "",
) -> None:
super().publish(payload, sub_topic)
class EventUpdateSubscriber(Subscriber):
"""Receives event updates."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("update")
class EventEndPublisher(
Publisher[tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]]]
):
"""Publishes events that have ended."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("finalized")
def publish(
self,
payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]],
sub_topic: str = "",
) -> None:
super().publish(payload, sub_topic)
class EventEndSubscriber(Subscriber):
"""Receives events that have ended."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("finalized")