blakeblackshear.frigate/frigate/comms/event_metadata_updater.py
Josh Hawkins fa4643fddf
LPR improvements (#17289)
* config options

* processing in maintainer

* detect and process dedicated lpr plates

* create camera type, add manual event and save snapshot

* use const

* ensure lpr events are always detections, typing fixes

* docs

* docs tweaks

* add preprocessing and penalization for low confidence chars
2025-03-23 13:30:48 -06:00

51 lines
1.4 KiB
Python

"""Facilitates communication between processes."""
import logging
from enum import Enum
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class EventMetadataTypeEnum(str, Enum):
all = ""
manual_event_create = "manual_event_create"
manual_event_end = "manual_event_end"
regenerate_description = "regenerate_description"
sub_label = "sub_label"
recognized_license_plate = "recognized_license_plate"
lpr_event_create = "lpr_event_create"
save_lpr_snapshot = "save_lpr_snapshot"
class EventMetadataPublisher(Publisher):
"""Simplifies receiving event metadata."""
topic_base = "event_metadata/"
def __init__(self) -> None:
super().__init__()
def publish(self, topic: EventMetadataTypeEnum, payload: any) -> None:
super().publish(payload, topic.value)
class EventMetadataSubscriber(Subscriber):
"""Simplifies receiving event metadata."""
topic_base = "event_metadata/"
def __init__(self, topic: EventMetadataTypeEnum) -> None:
super().__init__(topic.value)
def check_for_update(self, timeout: float = 1) -> tuple | None:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: tuple) -> tuple:
if payload is None:
return (None, None)
topic = EventMetadataTypeEnum[topic[len(self.topic_base) :]]
return (topic, payload)