blakeblackshear.frigate/frigate/comms/events_updater.py
Martin Weinelt 4d4d54d030
Fix various typing issues (#18187)
* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
2025-05-13 08:27:20 -06:00

54 lines
1.2 KiB
Python

"""Facilitates communication between processes."""
from typing import Any
from frigate.events.types import EventStateEnum, EventTypeEnum
from .zmq_proxy import Publisher, Subscriber
class EventUpdatePublisher(Publisher):
"""Publishes events (objects, audio, manual)."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("update")
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]]
) -> None:
super().publish(payload)
class EventUpdateSubscriber(Subscriber):
"""Receives event updates."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("update")
class EventEndPublisher(Publisher):
"""Publishes events that have ended."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("finalized")
def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]]
) -> None:
super().publish(payload)
class EventEndSubscriber(Subscriber):
"""Receives events that have ended."""
topic_base = "event/"
def __init__(self) -> None:
super().__init__("finalized")