blakeblackshear.frigate/frigate/comms/detections_updater.py
Nicolas Mowen 687b68246b
Improve comms typing (#18599)
* Enable mypy for comms

* Make zmq data types consistent

* Cleanup inter process typing issues

* Cleanup embeddings typing

* Cleanup config updater

* Cleanup recordings updator

* Make publisher have a generic type

* Cleanup event metadata updater

* Cleanup event metadata updater

* Cleanup detections updater

* Cleanup websocket

* Cleanup mqtt

* Cleanup webpush

* Cleanup dispatcher

* Formatting

* Remove unused

* Add return type

* Fix tests

* Fix semantic triggers config typing

* Cleanup
2025-08-08 07:08:37 -05:00

43 lines
1.0 KiB
Python

"""Facilitates communication between processes."""
from enum import Enum
from typing import Any
from .zmq_proxy import Publisher, Subscriber
class DetectionTypeEnum(str, Enum):
all = ""
api = "api"
video = "video"
audio = "audio"
lpr = "lpr"
class DetectionPublisher(Publisher):
"""Simplifies receiving video and audio detections."""
topic_base = "detection/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
class DetectionSubscriber(Subscriber):
"""Simplifies receiving video and audio detections."""
topic_base = "detection/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
def check_for_update(
self, timeout: float | None = None
) -> tuple[str, Any] | tuple[None, None] | None:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: Any) -> Any:
if payload is None:
return (None, None)
return (topic[len(self.topic_base) :], payload)