blakeblackshear.frigate/frigate/comms/event_metadata_updater.py
Nicolas Mowen dd6276e706
Embeddings fixes (#14269)
* Add debugging logs for more info

* Improve timeout handling

* Fix event cleanup

* Handle zmq error and empty data

* Don't run download

* Remove unneeded embeddings creations

* Update timouts

* Init models immediately

* Fix order of init

* Cleanup
2024-10-10 16:37:43 -05:00

52 lines
1.4 KiB
Python

"""Facilitates communication between processes."""
import logging
from enum import Enum
from typing import Optional
from frigate.events.types import RegenerateDescriptionEnum
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class EventMetadataTypeEnum(str, Enum):
all = ""
regenerate_description = "regenerate_description"
class EventMetadataPublisher(Publisher):
"""Simplifies receiving event metadata."""
topic_base = "event_metadata/"
def __init__(self, topic: EventMetadataTypeEnum) -> None:
topic = topic.value
super().__init__(topic)
def publish(self, payload: tuple[str, RegenerateDescriptionEnum]) -> None:
super().publish(payload)
class EventMetadataSubscriber(Subscriber):
"""Simplifies receiving event metadata."""
topic_base = "event_metadata/"
def __init__(self, topic: EventMetadataTypeEnum) -> None:
topic = topic.value
super().__init__(topic)
def check_for_update(
self, timeout: float = 1
) -> Optional[tuple[EventMetadataTypeEnum, str, RegenerateDescriptionEnum]]:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: any) -> any:
if payload is None:
return (None, None, None)
topic = EventMetadataTypeEnum[topic[len(self.topic_base) :]]
event_id, source = payload
return (topic, event_id, RegenerateDescriptionEnum(source))