blakeblackshear.frigate/frigate/comms/events_updater.py
Nicolas Mowen 66277fbb6c
Fix embeddings (#15072)
* Fix embeddings reading frames

* Fix event update reading

* Formatting

* Pin AIO http to fix build failure

* Pin starlette
2024-11-19 12:20:04 -06:00

52 lines
1.2 KiB
Python

"""Facilitates communication between processes."""
from frigate.events.types import EventStateEnum, EventTypeEnum
from .zmq_proxy import Publisher, Subscriber
class EventUpdatePublisher(Publisher):
"""Publishes events (objects, audio, manual)."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("update")
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, any]]
) -> None:
super().publish(payload)
class EventUpdateSubscriber(Subscriber):
"""Receives event updates."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("update")
class EventEndPublisher(Publisher):
"""Publishes events that have ended."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("finalized")
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]
) -> None:
super().publish(payload)
class EventEndSubscriber(Subscriber):
"""Receives events that have ended."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("finalized")