blakeblackshear.frigate/frigate/comms/detections_updater.py
Martin Weinelt 4d4d54d030
Fix various typing issues (#18187)
* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
2025-05-13 08:27:20 -06:00

45 lines
1.1 KiB
Python

"""Facilitates communication between processes."""
from enum import Enum
from typing import Any, Optional
from .zmq_proxy import Publisher, Subscriber
class DetectionTypeEnum(str, Enum):
all = ""
api = "api"
video = "video"
audio = "audio"
lpr = "lpr"
class DetectionPublisher(Publisher):
"""Simplifies receiving video and audio detections."""
topic_base = "detection/"
def __init__(self, topic: DetectionTypeEnum) -> None:
topic = topic.value
super().__init__(topic)
class DetectionSubscriber(Subscriber):
"""Simplifies receiving video and audio detections."""
topic_base = "detection/"
def __init__(self, topic: DetectionTypeEnum) -> None:
topic = topic.value
super().__init__(topic)
def check_for_update(
self, timeout: float = None
) -> Optional[tuple[DetectionTypeEnum, Any]]:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: Any) -> Any:
if payload is None:
return (None, None)
return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload)