Cleanup event metadata updater

This commit is contained in:
Nicolas Mowen 2025-06-06 08:47:37 -06:00
parent 87efebde19
commit 107007a636
2 changed files with 7 additions and 5 deletions

View File

@ -28,7 +28,7 @@ class EventMetadataPublisher(Publisher):
def __init__(self) -> None:
super().__init__()
def publish(self, payload: Any, sub_topic: str) -> None:
def publish(self, payload: Any, sub_topic: str = "") -> None:
super().publish(payload, sub_topic)
@ -40,7 +40,9 @@ class EventMetadataSubscriber(Subscriber):
def __init__(self, topic: EventMetadataTypeEnum) -> None:
super().__init__(topic.value)
def _return_object(self, topic: str, payload: tuple) -> tuple:
def _return_object(
self, topic: str, payload: tuple | None
) -> tuple[str, Any] | tuple[None, None]:
if payload is None:
return (None, None)

View File

@ -84,7 +84,7 @@ class Subscriber:
def check_for_update(
self, timeout: float = FAST_QUEUE_TIMEOUT
) -> Optional[tuple[str, Any]]:
) -> tuple[str, Any] | tuple[None, None]:
"""Returns message or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
@ -103,5 +103,5 @@ class Subscriber:
def _return_object(
self, topic: str, payload: Optional[tuple[str, Any]]
) -> Optional[tuple[str, Any]]:
return payload
) -> tuple[str, Any] | tuple[None, None]:
return payload or (None, None)