Improve comms typing (#18599)

* Enable mypy for comms

* Make zmq data types consistent

* Cleanup inter process typing issues

* Cleanup embeddings typing

* Cleanup config updater

* Cleanup recordings updator

* Make publisher have a generic type

* Cleanup event metadata updater

* Cleanup event metadata updater

* Cleanup detections updater

* Cleanup websocket

* Cleanup mqtt

* Cleanup webpush

* Cleanup dispatcher

* Formatting

* Remove unused

* Add return type

* Fix tests

* Fix semantic triggers config typing

* Cleanup
This commit is contained in:
Nicolas Mowen
2025-08-08 06:08:37 -06:00
committed by Blake Blackshear
parent 1add72884a
commit fcf3824124
29 changed files with 168 additions and 128 deletions

View File

@@ -1172,7 +1172,6 @@ class LicensePlateProcessingMixin:
event_id = f"{now}-{rand_id}"
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.lpr_event_create,
(
now,
camera,
@@ -1183,6 +1182,7 @@ class LicensePlateProcessingMixin:
None,
plate,
),
EventMetadataTypeEnum.lpr_event_create.value,
)
return event_id
@@ -1526,7 +1526,7 @@ class LicensePlateProcessingMixin:
# If it's a known plate, publish to sub_label
if sub_label is not None:
self.sub_label_publisher.publish(
EventMetadataTypeEnum.sub_label, (id, sub_label, avg_confidence)
(id, sub_label, avg_confidence), EventMetadataTypeEnum.sub_label.value
)
# always publish to recognized_license_plate field
@@ -1545,8 +1545,8 @@ class LicensePlateProcessingMixin:
),
)
self.sub_label_publisher.publish(
EventMetadataTypeEnum.attribute,
(id, "recognized_license_plate", top_plate, avg_confidence),
EventMetadataTypeEnum.attribute.value,
)
# save the best snapshot for dedicated lpr cams not using frigate+
@@ -1560,8 +1560,8 @@ class LicensePlateProcessingMixin:
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, encoded_img = cv2.imencode(".jpg", frame_bgr)
self.sub_label_publisher.publish(
EventMetadataTypeEnum.save_lpr_snapshot,
(base64.b64encode(encoded_img).decode("ASCII"), id, camera),
EventMetadataTypeEnum.save_lpr_snapshot.value,
)
if id not in self.detected_license_plates: