This commit is contained in:
Nicolas Mowen 2025-07-18 18:35:12 +00:00 committed by GitHub
commit f8e328e028
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 168 additions and 127 deletions

View File

@ -1094,7 +1094,7 @@ def set_sub_label(
new_score = None new_score = None
request.app.event_metadata_updater.publish( request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.sub_label, (event_id, new_sub_label, new_score) (event_id, new_sub_label, new_score), EventMetadataTypeEnum.sub_label.value
) )
return JSONResponse( return JSONResponse(
@ -1148,8 +1148,8 @@ def set_plate(
new_score = None new_score = None
request.app.event_metadata_updater.publish( request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.attribute,
(event_id, "recognized_license_plate", new_plate, new_score), (event_id, "recognized_license_plate", new_plate, new_score),
EventMetadataTypeEnum.attribute,
) )
return JSONResponse( return JSONResponse(
@ -1232,8 +1232,8 @@ def regenerate_description(
if camera_config.genai.enabled or params.force: if camera_config.genai.enabled or params.force:
request.app.event_metadata_updater.publish( request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.regenerate_description,
(event.id, params.source, params.force), (event.id, params.source, params.force),
EventMetadataTypeEnum.regenerate_description.value,
) )
return JSONResponse( return JSONResponse(
@ -1390,7 +1390,6 @@ def create_event(
event_id = f"{now}-{rand_id}" event_id = f"{now}-{rand_id}"
request.app.event_metadata_updater.publish( request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_create,
( (
now, now,
camera_name, camera_name,
@ -1403,6 +1402,7 @@ def create_event(
body.source_type, body.source_type,
body.draw, body.draw,
), ),
EventMetadataTypeEnum.manual_event_create.value,
) )
return JSONResponse( return JSONResponse(
@ -1426,7 +1426,7 @@ def end_event(request: Request, event_id: str, body: EventsEndBody):
try: try:
end_time = body.end_time or datetime.datetime.now().timestamp() end_time = body.end_time or datetime.datetime.now().timestamp()
request.app.event_metadata_updater.publish( request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_end, (event_id, end_time) (event_id, end_time), EventMetadataTypeEnum.manual_event_end.value
) )
except Exception: except Exception:
return JSONResponse( return JSONResponse(

View File

@ -3,7 +3,7 @@
import multiprocessing as mp import multiprocessing as mp
from _pickle import UnpicklingError from _pickle import UnpicklingError
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional from typing import Any
import zmq import zmq
@ -33,7 +33,7 @@ class ConfigPublisher:
class ConfigSubscriber: class ConfigSubscriber:
"""Simplifies receiving an updated config.""" """Simplifies receiving an updated config."""
def __init__(self, topic: str, exact=False) -> None: def __init__(self, topic: str, exact: bool = False) -> None:
self.topic = topic self.topic = topic
self.exact = exact self.exact = exact
self.context = zmq.Context() self.context = zmq.Context()
@ -41,7 +41,7 @@ class ConfigSubscriber:
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic) self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.socket.connect(SOCKET_PUB_SUB) self.socket.connect(SOCKET_PUB_SUB)
def check_for_update(self) -> Optional[tuple[str, Any]]: def check_for_update(self) -> tuple[str, Any] | tuple[None, None]:
"""Returns updated config or None if no update.""" """Returns updated config or None if no update."""
try: try:
topic = self.socket.recv_string(flags=zmq.NOBLOCK) topic = self.socket.recv_string(flags=zmq.NOBLOCK)

View File

@ -1,7 +1,7 @@
"""Facilitates communication between processes.""" """Facilitates communication between processes."""
from enum import Enum from enum import Enum
from typing import Any, Optional from typing import Any
from .zmq_proxy import Publisher, Subscriber from .zmq_proxy import Publisher, Subscriber
@ -19,8 +19,7 @@ class DetectionPublisher(Publisher):
topic_base = "detection/" topic_base = "detection/"
def __init__(self, topic: DetectionTypeEnum) -> None: def __init__(self, topic: str) -> None:
topic = topic.value
super().__init__(topic) super().__init__(topic)
@ -29,16 +28,15 @@ class DetectionSubscriber(Subscriber):
topic_base = "detection/" topic_base = "detection/"
def __init__(self, topic: DetectionTypeEnum) -> None: def __init__(self, topic: str) -> None:
topic = topic.value
super().__init__(topic) super().__init__(topic)
def check_for_update( def check_for_update(
self, timeout: float = None self, timeout: float | None = None
) -> Optional[tuple[DetectionTypeEnum, Any]]: ) -> tuple[str, Any] | tuple[None, None] | None:
return super().check_for_update(timeout) return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: Any) -> Any: def _return_object(self, topic: str, payload: Any) -> Any:
if payload is None: if payload is None:
return (None, None) return (None, None)
return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload) return (topic[len(self.topic_base) :], payload)

View File

@ -54,10 +54,9 @@ class Dispatcher:
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
self.comms = communicators self.comms = communicators
self.camera_activity = CameraActivityManager(config, self.publish) self.camera_activity = CameraActivityManager(config, self.publish)
self.model_state = {} self.model_state: dict[str, ModelStatusTypesEnum] = {}
self.embeddings_reindex = {} self.embeddings_reindex: dict[str, Any] = {}
self.birdseye_layout = {} self.birdseye_layout: dict[str, Any] = {}
self._camera_settings_handlers: dict[str, Callable] = { self._camera_settings_handlers: dict[str, Callable] = {
"audio": self._on_audio_command, "audio": self._on_audio_command,
"audio_transcription": self._on_audio_transcription_command, "audio_transcription": self._on_audio_transcription_command,
@ -88,10 +87,12 @@ class Dispatcher:
(comm for comm in communicators if isinstance(comm, WebPushClient)), None (comm for comm in communicators if isinstance(comm, WebPushClient)), None
) )
def _receive(self, topic: str, payload: str) -> Optional[Any]: def _receive(self, topic: str, payload: Any) -> Optional[Any]:
"""Handle receiving of payload from communicators.""" """Handle receiving of payload from communicators."""
def handle_camera_command(command_type, camera_name, command, payload): def handle_camera_command(
command_type: str, camera_name: str, command: str, payload: str
) -> None:
try: try:
if command_type == "set": if command_type == "set":
self._camera_settings_handlers[command](camera_name, payload) self._camera_settings_handlers[command](camera_name, payload)
@ -100,13 +101,13 @@ class Dispatcher:
except KeyError: except KeyError:
logger.error(f"Invalid command type or handler: {command_type}") logger.error(f"Invalid command type or handler: {command_type}")
def handle_restart(): def handle_restart() -> None:
restart_frigate() restart_frigate()
def handle_insert_many_recordings(): def handle_insert_many_recordings() -> None:
Recordings.insert_many(payload).execute() Recordings.insert_many(payload).execute()
def handle_request_region_grid(): def handle_request_region_grid() -> Any:
camera = payload camera = payload
grid = get_camera_regions_grid( grid = get_camera_regions_grid(
camera, camera,
@ -115,24 +116,24 @@ class Dispatcher:
) )
return grid return grid
def handle_insert_preview(): def handle_insert_preview() -> None:
Previews.insert(payload).execute() Previews.insert(payload).execute()
def handle_upsert_review_segment(): def handle_upsert_review_segment() -> None:
ReviewSegment.insert(payload).on_conflict( ReviewSegment.insert(payload).on_conflict(
conflict_target=[ReviewSegment.id], conflict_target=[ReviewSegment.id],
update=payload, update=payload,
).execute() ).execute()
def handle_clear_ongoing_review_segments(): def handle_clear_ongoing_review_segments() -> None:
ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where( ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where(
ReviewSegment.end_time.is_null(True) ReviewSegment.end_time.is_null(True)
).execute() ).execute()
def handle_update_camera_activity(): def handle_update_camera_activity() -> None:
self.camera_activity.update_activity(payload) self.camera_activity.update_activity(payload)
def handle_update_event_description(): def handle_update_event_description() -> None:
event: Event = Event.get(Event.id == payload["id"]) event: Event = Event.get(Event.id == payload["id"])
event.data["description"] = payload["description"] event.data["description"] = payload["description"]
event.save() event.save()
@ -148,38 +149,38 @@ class Dispatcher:
), ),
) )
def handle_update_model_state(): def handle_update_model_state() -> None:
if payload: if payload:
model = payload["model"] model = payload["model"]
state = payload["state"] state = payload["state"]
self.model_state[model] = ModelStatusTypesEnum[state] self.model_state[model] = ModelStatusTypesEnum[state]
self.publish("model_state", json.dumps(self.model_state)) self.publish("model_state", json.dumps(self.model_state))
def handle_model_state(): def handle_model_state() -> None:
self.publish("model_state", json.dumps(self.model_state.copy())) self.publish("model_state", json.dumps(self.model_state.copy()))
def handle_update_embeddings_reindex_progress(): def handle_update_embeddings_reindex_progress() -> None:
self.embeddings_reindex = payload self.embeddings_reindex = payload
self.publish( self.publish(
"embeddings_reindex_progress", "embeddings_reindex_progress",
json.dumps(payload), json.dumps(payload),
) )
def handle_embeddings_reindex_progress(): def handle_embeddings_reindex_progress() -> None:
self.publish( self.publish(
"embeddings_reindex_progress", "embeddings_reindex_progress",
json.dumps(self.embeddings_reindex.copy()), json.dumps(self.embeddings_reindex.copy()),
) )
def handle_update_birdseye_layout(): def handle_update_birdseye_layout() -> None:
if payload: if payload:
self.birdseye_layout = payload self.birdseye_layout = payload
self.publish("birdseye_layout", json.dumps(self.birdseye_layout)) self.publish("birdseye_layout", json.dumps(self.birdseye_layout))
def handle_birdseye_layout(): def handle_birdseye_layout() -> None:
self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy())) self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy()))
def handle_on_connect(): def handle_on_connect() -> None:
camera_status = self.camera_activity.last_camera_activity.copy() camera_status = self.camera_activity.last_camera_activity.copy()
cameras_with_status = camera_status.keys() cameras_with_status = camera_status.keys()
@ -219,7 +220,7 @@ class Dispatcher:
) )
self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy())) self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy()))
def handle_notification_test(): def handle_notification_test() -> None:
self.publish("notification_test", "Test notification") self.publish("notification_test", "Test notification")
# Dictionary mapping topic to handlers # Dictionary mapping topic to handlers
@ -266,11 +267,12 @@ class Dispatcher:
logger.error( logger.error(
f"Received invalid {topic.split('/')[-1]} command: {topic}" f"Received invalid {topic.split('/')[-1]} command: {topic}"
) )
return return None
elif topic in topic_handlers: elif topic in topic_handlers:
return topic_handlers[topic]() return topic_handlers[topic]()
else: else:
self.publish(topic, payload, retain=False) self.publish(topic, payload, retain=False)
return None
def publish(self, topic: str, payload: Any, retain: bool = False) -> None: def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Handle publishing to communicators.""" """Handle publishing to communicators."""
@ -373,11 +375,11 @@ class Dispatcher:
if payload == "ON": if payload == "ON":
if not motion_settings.improve_contrast: if not motion_settings.improve_contrast:
logger.info(f"Turning on improve contrast for {camera_name}") logger.info(f"Turning on improve contrast for {camera_name}")
motion_settings.improve_contrast = True # type: ignore[union-attr] motion_settings.improve_contrast = True
elif payload == "OFF": elif payload == "OFF":
if motion_settings.improve_contrast: if motion_settings.improve_contrast:
logger.info(f"Turning off improve contrast for {camera_name}") logger.info(f"Turning off improve contrast for {camera_name}")
motion_settings.improve_contrast = False # type: ignore[union-attr] motion_settings.improve_contrast = False
self.config_updater.publish_update( self.config_updater.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name),
@ -421,7 +423,7 @@ class Dispatcher:
motion_settings = self.config.cameras[camera_name].motion motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion contour area for {camera_name}: {payload}") logger.info(f"Setting motion contour area for {camera_name}: {payload}")
motion_settings.contour_area = payload # type: ignore[union-attr] motion_settings.contour_area = payload
self.config_updater.publish_update( self.config_updater.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name),
motion_settings, motion_settings,
@ -438,7 +440,7 @@ class Dispatcher:
motion_settings = self.config.cameras[camera_name].motion motion_settings = self.config.cameras[camera_name].motion
logger.info(f"Setting motion threshold for {camera_name}: {payload}") logger.info(f"Setting motion threshold for {camera_name}: {payload}")
motion_settings.threshold = payload # type: ignore[union-attr] motion_settings.threshold = payload
self.config_updater.publish_update( self.config_updater.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name), CameraConfigUpdateTopic(CameraConfigUpdateEnum.motion, camera_name),
motion_settings, motion_settings,
@ -453,7 +455,7 @@ class Dispatcher:
notification_settings = self.config.notifications notification_settings = self.config.notifications
logger.info(f"Setting all notifications: {payload}") logger.info(f"Setting all notifications: {payload}")
notification_settings.enabled = payload == "ON" # type: ignore[union-attr] notification_settings.enabled = payload == "ON"
self.config_updater.publisher.publish( self.config_updater.publisher.publish(
"config/notifications", notification_settings "config/notifications", notification_settings
) )

View File

@ -1,10 +1,14 @@
"""Facilitates communication between processes.""" """Facilitates communication between processes."""
import logging
from enum import Enum from enum import Enum
from typing import Any, Callable from typing import Any, Callable
import zmq import zmq
logger = logging.getLogger(__name__)
SOCKET_REP_REQ = "ipc:///tmp/cache/embeddings" SOCKET_REP_REQ = "ipc:///tmp/cache/embeddings"
@ -41,9 +45,16 @@ class EmbeddingsResponder:
break break
try: try:
(topic, value) = self.socket.recv_json(flags=zmq.NOBLOCK) raw = self.socket.recv_json(flags=zmq.NOBLOCK)
if isinstance(raw, list):
(topic, value) = raw
response = process(topic, value) response = process(topic, value)
else:
logging.warning(
f"Received unexpected data type in ZMQ recv_json: {type(raw)}"
)
response = None
if response is not None: if response is not None:
self.socket.send_json(response) self.socket.send_json(response)
@ -65,7 +76,7 @@ class EmbeddingsRequestor:
self.socket = self.context.socket(zmq.REQ) self.socket = self.context.socket(zmq.REQ)
self.socket.connect(SOCKET_REP_REQ) self.socket.connect(SOCKET_REP_REQ)
def send_data(self, topic: str, data: Any) -> str: def send_data(self, topic: str, data: Any) -> Any:
"""Sends data and then waits for reply.""" """Sends data and then waits for reply."""
try: try:
self.socket.send_json((topic, data)) self.socket.send_json((topic, data))

View File

@ -28,8 +28,8 @@ class EventMetadataPublisher(Publisher):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
def publish(self, topic: EventMetadataTypeEnum, payload: Any) -> None: def publish(self, payload: Any, sub_topic: str = "") -> None:
super().publish(payload, topic.value) super().publish(payload, sub_topic)
class EventMetadataSubscriber(Subscriber): class EventMetadataSubscriber(Subscriber):
@ -40,7 +40,9 @@ class EventMetadataSubscriber(Subscriber):
def __init__(self, topic: EventMetadataTypeEnum) -> None: def __init__(self, topic: EventMetadataTypeEnum) -> None:
super().__init__(topic.value) super().__init__(topic.value)
def _return_object(self, topic: str, payload: tuple) -> tuple: def _return_object(
self, topic: str, payload: tuple | None
) -> tuple[str, Any] | tuple[None, None]:
if payload is None: if payload is None:
return (None, None) return (None, None)

View File

@ -7,7 +7,9 @@ from frigate.events.types import EventStateEnum, EventTypeEnum
from .zmq_proxy import Publisher, Subscriber from .zmq_proxy import Publisher, Subscriber
class EventUpdatePublisher(Publisher): class EventUpdatePublisher(
Publisher[tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]]]
):
"""Publishes events (objects, audio, manual).""" """Publishes events (objects, audio, manual)."""
topic_base = "event/" topic_base = "event/"
@ -16,9 +18,11 @@ class EventUpdatePublisher(Publisher):
super().__init__("update") super().__init__("update")
def publish( def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]] self,
payload: tuple[EventTypeEnum, EventStateEnum, str, str, dict[str, Any]],
sub_topic: str = "",
) -> None: ) -> None:
super().publish(payload) super().publish(payload, sub_topic)
class EventUpdateSubscriber(Subscriber): class EventUpdateSubscriber(Subscriber):
@ -30,7 +34,9 @@ class EventUpdateSubscriber(Subscriber):
super().__init__("update") super().__init__("update")
class EventEndPublisher(Publisher): class EventEndPublisher(
Publisher[tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]]]
):
"""Publishes events that have ended.""" """Publishes events that have ended."""
topic_base = "event/" topic_base = "event/"
@ -39,9 +45,11 @@ class EventEndPublisher(Publisher):
super().__init__("finalized") super().__init__("finalized")
def publish( def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]] self,
payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, Any]],
sub_topic: str = "",
) -> None: ) -> None:
super().publish(payload) super().publish(payload, sub_topic)
class EventEndSubscriber(Subscriber): class EventEndSubscriber(Subscriber):

View File

@ -1,5 +1,6 @@
"""Facilitates communication between processes.""" """Facilitates communication between processes."""
import logging
import multiprocessing as mp import multiprocessing as mp
import threading import threading
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
@ -9,6 +10,8 @@ import zmq
from frigate.comms.base_communicator import Communicator from frigate.comms.base_communicator import Communicator
logger = logging.getLogger(__name__)
SOCKET_REP_REQ = "ipc:///tmp/cache/comms" SOCKET_REP_REQ = "ipc:///tmp/cache/comms"
@ -19,7 +22,7 @@ class InterProcessCommunicator(Communicator):
self.socket.bind(SOCKET_REP_REQ) self.socket.bind(SOCKET_REP_REQ)
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None: def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""There is no communication back to the processes.""" """There is no communication back to the processes."""
pass pass
@ -37,9 +40,16 @@ class InterProcessCommunicator(Communicator):
break break
try: try:
(topic, value) = self.socket.recv_json(flags=zmq.NOBLOCK) raw = self.socket.recv_json(flags=zmq.NOBLOCK)
if isinstance(raw, list):
(topic, value) = raw
response = self._dispatcher(topic, value) response = self._dispatcher(topic, value)
else:
logging.warning(
f"Received unexpected data type in ZMQ recv_json: {type(raw)}"
)
response = None
if response is not None: if response is not None:
self.socket.send_json(response) self.socket.send_json(response)

View File

@ -11,7 +11,7 @@ from frigate.config import FrigateConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class MqttClient(Communicator): # type: ignore[misc] class MqttClient(Communicator):
"""Frigate wrapper for mqtt client.""" """Frigate wrapper for mqtt client."""
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig) -> None:
@ -75,7 +75,7 @@ class MqttClient(Communicator): # type: ignore[misc]
) )
self.publish( self.publish(
f"{camera_name}/improve_contrast/state", f"{camera_name}/improve_contrast/state",
"ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr] "ON" if camera.motion.improve_contrast else "OFF",
retain=True, retain=True,
) )
self.publish( self.publish(
@ -85,12 +85,12 @@ class MqttClient(Communicator): # type: ignore[misc]
) )
self.publish( self.publish(
f"{camera_name}/motion_threshold/state", f"{camera_name}/motion_threshold/state",
camera.motion.threshold, # type: ignore[union-attr] camera.motion.threshold,
retain=True, retain=True,
) )
self.publish( self.publish(
f"{camera_name}/motion_contour_area/state", f"{camera_name}/motion_contour_area/state",
camera.motion.contour_area, # type: ignore[union-attr] camera.motion.contour_area,
retain=True, retain=True,
) )
self.publish( self.publish(
@ -150,7 +150,7 @@ class MqttClient(Communicator): # type: ignore[misc]
client: mqtt.Client, client: mqtt.Client,
userdata: Any, userdata: Any,
flags: Any, flags: Any,
reason_code: mqtt.ReasonCode, reason_code: mqtt.ReasonCode, # type: ignore[name-defined]
properties: Any, properties: Any,
) -> None: ) -> None:
"""Mqtt connection callback.""" """Mqtt connection callback."""
@ -182,7 +182,7 @@ class MqttClient(Communicator): # type: ignore[misc]
client: mqtt.Client, client: mqtt.Client,
userdata: Any, userdata: Any,
flags: Any, flags: Any,
reason_code: mqtt.ReasonCode, reason_code: mqtt.ReasonCode, # type: ignore[name-defined]
properties: Any, properties: Any,
) -> None: ) -> None:
"""Mqtt disconnection callback.""" """Mqtt disconnection callback."""

View File

@ -13,17 +13,16 @@ class RecordingsDataTypeEnum(str, Enum):
recordings_available_through = "recordings_available_through" recordings_available_through = "recordings_available_through"
class RecordingsDataPublisher(Publisher): class RecordingsDataPublisher(Publisher[tuple[str, float]]):
"""Publishes latest recording data.""" """Publishes latest recording data."""
topic_base = "recordings/" topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None: def __init__(self, topic: RecordingsDataTypeEnum) -> None:
topic = topic.value super().__init__(topic.value)
super().__init__(topic)
def publish(self, payload: tuple[str, float]) -> None: def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
super().publish(payload) super().publish(payload, sub_topic)
class RecordingsDataSubscriber(Subscriber): class RecordingsDataSubscriber(Subscriber):
@ -32,5 +31,4 @@ class RecordingsDataSubscriber(Subscriber):
topic_base = "recordings/" topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None: def __init__(self, topic: RecordingsDataTypeEnum) -> None:
topic = topic.value super().__init__(topic.value)
super().__init__(topic)

View File

@ -39,7 +39,7 @@ class PushNotification:
ttl: int = 0 ttl: int = 0
class WebPushClient(Communicator): # type: ignore[misc] class WebPushClient(Communicator):
"""Frigate wrapper for webpush client.""" """Frigate wrapper for webpush client."""
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None: def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
@ -50,10 +50,12 @@ class WebPushClient(Communicator): # type: ignore[misc]
self.web_pushers: dict[str, list[WebPusher]] = {} self.web_pushers: dict[str, list[WebPusher]] = {}
self.expired_subs: dict[str, list[str]] = {} self.expired_subs: dict[str, list[str]] = {}
self.suspended_cameras: dict[str, int] = { self.suspended_cameras: dict[str, int] = {
c.name: 0 for c in self.config.cameras.values() c.name: 0 # type: ignore[misc]
for c in self.config.cameras.values()
} }
self.last_camera_notification_time: dict[str, float] = { self.last_camera_notification_time: dict[str, float] = {
c.name: 0 for c in self.config.cameras.values() c.name: 0 # type: ignore[misc]
for c in self.config.cameras.values()
} }
self.last_notification_time: float = 0 self.last_notification_time: float = 0
self.notification_queue: queue.Queue[PushNotification] = queue.Queue() self.notification_queue: queue.Queue[PushNotification] = queue.Queue()

View File

@ -4,7 +4,7 @@ import errno
import json import json
import logging import logging
import threading import threading
from typing import Callable from typing import Any, Callable
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
from ws4py.server.wsgirefserver import ( from ws4py.server.wsgirefserver import (
@ -21,8 +21,8 @@ from frigate.config import FrigateConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class WebSocket(WebSocket_): class WebSocket(WebSocket_): # type: ignore[misc]
def unhandled_error(self, error): def unhandled_error(self, error: Any) -> None:
""" """
Handles the unfriendly socket closures on the server side Handles the unfriendly socket closures on the server side
without showing a confusing error message without showing a confusing error message
@ -33,12 +33,12 @@ class WebSocket(WebSocket_):
logging.getLogger("ws4py").exception("Failed to receive data") logging.getLogger("ws4py").exception("Failed to receive data")
class WebSocketClient(Communicator): # type: ignore[misc] class WebSocketClient(Communicator):
"""Frigate wrapper for ws client.""" """Frigate wrapper for ws client."""
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig) -> None:
self.config = config self.config = config
self.websocket_server = None self.websocket_server: WSGIServer | None = None
def subscribe(self, receiver: Callable) -> None: def subscribe(self, receiver: Callable) -> None:
self._dispatcher = receiver self._dispatcher = receiver
@ -47,10 +47,10 @@ class WebSocketClient(Communicator): # type: ignore[misc]
def start(self) -> None: def start(self) -> None:
"""Start the websocket client.""" """Start the websocket client."""
class _WebSocketHandler(WebSocket): # type: ignore[misc] class _WebSocketHandler(WebSocket):
receiver = self._dispatcher receiver = self._dispatcher
def received_message(self, message: WebSocket.received_message) -> None: def received_message(self, message: WebSocket.received_message) -> None: # type: ignore[name-defined]
try: try:
json_message = json.loads(message.data.decode("utf-8")) json_message = json.loads(message.data.decode("utf-8"))
json_message = { json_message = {
@ -86,7 +86,7 @@ class WebSocketClient(Communicator): # type: ignore[misc]
) )
self.websocket_thread.start() self.websocket_thread.start()
def publish(self, topic: str, payload: str, _: bool) -> None: def publish(self, topic: str, payload: Any, _: bool = False) -> None:
try: try:
ws_message = json.dumps( ws_message = json.dumps(
{ {
@ -109,9 +109,11 @@ class WebSocketClient(Communicator): # type: ignore[misc]
pass pass
def stop(self) -> None: def stop(self) -> None:
if self.websocket_server is not None:
self.websocket_server.manager.close_all() self.websocket_server.manager.close_all()
self.websocket_server.manager.stop() self.websocket_server.manager.stop()
self.websocket_server.manager.join() self.websocket_server.manager.join()
self.websocket_server.shutdown() self.websocket_server.shutdown()
self.websocket_thread.join() self.websocket_thread.join()
logger.info("Exiting websocket client...") logger.info("Exiting websocket client...")

View File

@ -2,7 +2,7 @@
import json import json
import threading import threading
from typing import Any, Optional from typing import Any, Generic, Optional, TypeVar
import zmq import zmq
@ -47,7 +47,10 @@ class ZmqProxy:
self.runner.join() self.runner.join()
class Publisher: T = TypeVar("T")
class Publisher(Generic[T]):
"""Publishes messages.""" """Publishes messages."""
topic_base: str = "" topic_base: str = ""
@ -58,7 +61,7 @@ class Publisher:
self.socket = self.context.socket(zmq.PUB) self.socket = self.context.socket(zmq.PUB)
self.socket.connect(SOCKET_PUB) self.socket.connect(SOCKET_PUB)
def publish(self, payload: Any, sub_topic: str = "") -> None: def publish(self, payload: T, sub_topic: str = "") -> None:
"""Publish message.""" """Publish message."""
self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}") self.socket.send_string(f"{self.topic}{sub_topic} {json.dumps(payload)}")
@ -80,8 +83,8 @@ class Subscriber:
self.socket.connect(SOCKET_SUB) self.socket.connect(SOCKET_SUB)
def check_for_update( def check_for_update(
self, timeout: float = FAST_QUEUE_TIMEOUT self, timeout: float | None = FAST_QUEUE_TIMEOUT
) -> Optional[tuple[str, Any]]: ) -> tuple[str, Any] | tuple[None, None] | None:
"""Returns message or None if no update.""" """Returns message or None if no update."""
try: try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout) has_update, _, _ = zmq.select([self.socket], [], [], timeout)
@ -98,5 +101,7 @@ class Subscriber:
self.socket.close() self.socket.close()
self.context.destroy() self.context.destroy()
def _return_object(self, topic: str, payload: Any) -> Any: def _return_object(
self, topic: str, payload: Optional[tuple[str, Any]]
) -> tuple[str, Any] | tuple[None, None] | None:
return payload return payload

View File

@ -80,9 +80,7 @@ class CameraConfig(FrigateBaseModel):
lpr: CameraLicensePlateRecognitionConfig = Field( lpr: CameraLicensePlateRecognitionConfig = Field(
default_factory=CameraLicensePlateRecognitionConfig, title="LPR config." default_factory=CameraLicensePlateRecognitionConfig, title="LPR config."
) )
motion: Optional[MotionConfig] = Field( motion: MotionConfig = Field(None, title="Motion detection configuration.")
None, title="Motion detection configuration."
)
objects: ObjectConfig = Field( objects: ObjectConfig = Field(
default_factory=ObjectConfig, title="Object configuration." default_factory=ObjectConfig, title="Object configuration."
) )

View File

@ -10,7 +10,7 @@ __all__ = ["NotificationConfig"]
class NotificationConfig(FrigateBaseModel): class NotificationConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable notifications") enabled: bool = Field(default=False, title="Enable notifications")
email: Optional[str] = Field(default=None, title="Email required for push.") email: Optional[str] = Field(default=None, title="Email required for push.")
cooldown: Optional[int] = Field( cooldown: int = Field(
default=0, ge=0, title="Cooldown period for notifications (time in seconds)." default=0, ge=0, title="Cooldown period for notifications (time in seconds)."
) )
enabled_in_config: Optional[bool] = Field( enabled_in_config: Optional[bool] = Field(

View File

@ -142,7 +142,7 @@ class TriggerConfig(FrigateBaseModel):
gt=0.0, gt=0.0,
le=1.0, le=1.0,
) )
actions: Optional[List[TriggerAction]] = Field( actions: List[TriggerAction] = Field(
default=[], title="Actions to perform when trigger is matched" default=[], title="Actions to perform when trigger is matched"
) )
@ -150,8 +150,8 @@ class TriggerConfig(FrigateBaseModel):
class CameraSemanticSearchConfig(FrigateBaseModel): class CameraSemanticSearchConfig(FrigateBaseModel):
triggers: Optional[Dict[str, TriggerConfig]] = Field( triggers: Dict[str, TriggerConfig] = Field(
default=None, default={},
title="Trigger actions on tracked objects that match existing thumbnails or descriptions", title="Trigger actions on tracked objects that match existing thumbnails or descriptions",
) )

View File

@ -30,7 +30,7 @@ class MqttConfig(FrigateBaseModel):
) )
tls_client_key: Optional[str] = Field(default=None, title="MQTT TLS Client Key") tls_client_key: Optional[str] = Field(default=None, title="MQTT TLS Client Key")
tls_insecure: Optional[bool] = Field(default=None, title="MQTT TLS Insecure") tls_insecure: Optional[bool] = Field(default=None, title="MQTT TLS Insecure")
qos: Optional[int] = Field(default=0, title="MQTT QoS") qos: int = Field(default=0, title="MQTT QoS")
@model_validator(mode="after") @model_validator(mode="after")
def user_requires_pass(self, info: ValidationInfo) -> Self: def user_requires_pass(self, info: ValidationInfo) -> Self:

View File

@ -1170,7 +1170,6 @@ class LicensePlateProcessingMixin:
event_id = f"{now}-{rand_id}" event_id = f"{now}-{rand_id}"
self.event_metadata_publisher.publish( self.event_metadata_publisher.publish(
EventMetadataTypeEnum.lpr_event_create,
( (
now, now,
camera, camera,
@ -1181,6 +1180,7 @@ class LicensePlateProcessingMixin:
None, None,
plate, plate,
), ),
EventMetadataTypeEnum.lpr_event_create.value,
) )
return event_id return event_id
@ -1518,7 +1518,7 @@ class LicensePlateProcessingMixin:
# If it's a known plate, publish to sub_label # If it's a known plate, publish to sub_label
if sub_label is not None: if sub_label is not None:
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.sub_label, (id, sub_label, avg_confidence) (id, sub_label, avg_confidence), EventMetadataTypeEnum.sub_label.value
) )
# always publish to recognized_license_plate field # always publish to recognized_license_plate field
@ -1537,8 +1537,8 @@ class LicensePlateProcessingMixin:
), ),
) )
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.attribute,
(id, "recognized_license_plate", top_plate, avg_confidence), (id, "recognized_license_plate", top_plate, avg_confidence),
EventMetadataTypeEnum.attribute,
) )
# save the best snapshot for dedicated lpr cams not using frigate+ # save the best snapshot for dedicated lpr cams not using frigate+
@ -1552,8 +1552,8 @@ class LicensePlateProcessingMixin:
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, encoded_img = cv2.imencode(".jpg", frame_bgr) _, encoded_img = cv2.imencode(".jpg", frame_bgr)
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.save_lpr_snapshot,
(base64.b64encode(encoded_img).decode("ASCII"), id, camera), (base64.b64encode(encoded_img).decode("ASCII"), id, camera),
EventMetadataTypeEnum.save_lpr_snapshot.value,
) )
if id not in self.detected_license_plates: if id not in self.detected_license_plates:

View File

@ -147,8 +147,8 @@ class BirdRealTimeProcessor(RealTimeProcessorApi):
return return
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.sub_label,
(obj_data["id"], self.labelmap[best_id], score), (obj_data["id"], self.labelmap[best_id], score),
EventMetadataTypeEnum.sub_label.value,
) )
self.detected_birds[obj_data["id"]] = score self.detected_birds[obj_data["id"]] = score

View File

@ -294,16 +294,16 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi):
): ):
if sub_label != "none": if sub_label != "none":
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.sub_label,
(obj_data["id"], sub_label, score), (obj_data["id"], sub_label, score),
EventMetadataTypeEnum.sub_label,
) )
elif ( elif (
self.model_config.object_config.classification_type self.model_config.object_config.classification_type
== ObjectClassificationType.attribute == ObjectClassificationType.attribute
): ):
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.attribute,
(obj_data["id"], self.model_config.name, sub_label, score), (obj_data["id"], self.model_config.name, sub_label, score),
EventMetadataTypeEnum.attribute,
) )
def handle_request(self, topic, request_data): def handle_request(self, topic, request_data):

View File

@ -321,8 +321,8 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
if weighted_score >= self.face_config.recognition_threshold: if weighted_score >= self.face_config.recognition_threshold:
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
EventMetadataTypeEnum.sub_label,
(id, weighted_sub_label, weighted_score), (id, weighted_sub_label, weighted_score),
EventMetadataTypeEnum.sub_label.value,
) )
self.__update_metrics(datetime.datetime.now().timestamp() - start) self.__update_metrics(datetime.datetime.now().timestamp() - start)

View File

@ -143,7 +143,7 @@ class EmbeddingMaintainer(threading.Thread):
self.recordings_subscriber = RecordingsDataSubscriber( self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through RecordingsDataTypeEnum.recordings_available_through
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
self.embeddings_responder = EmbeddingsResponder() self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
@ -500,8 +500,8 @@ class EmbeddingMaintainer(threading.Thread):
to_remove.append(id) to_remove.append(id)
for id in to_remove: for id in to_remove:
self.event_metadata_publisher.publish( self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(id, now), (id, now),
EventMetadataTypeEnum.manual_event_end.value,
) )
self.detected_license_plates.pop(id) self.detected_license_plates.pop(id)

View File

@ -183,7 +183,7 @@ class AudioEventMaintainer(threading.Thread):
CameraConfigUpdateEnum.audio_transcription, CameraConfigUpdateEnum.audio_transcription,
], ],
) )
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value)
self.event_metadata_publisher = EventMetadataPublisher() self.event_metadata_publisher = EventMetadataPublisher()
if self.camera_config.audio_transcription.enabled_in_config: if self.camera_config.audio_transcription.enabled_in_config:
@ -293,7 +293,6 @@ class AudioEventMaintainer(threading.Thread):
self.requestor.send_data(f"{self.camera_config.name}/audio/{label}", "ON") self.requestor.send_data(f"{self.camera_config.name}/audio/{label}", "ON")
self.event_metadata_publisher.publish( self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_create,
( (
now, now,
self.camera_config.name, self.camera_config.name,
@ -306,6 +305,7 @@ class AudioEventMaintainer(threading.Thread):
"audio", "audio",
{}, {},
), ),
EventMetadataTypeEnum.manual_event_create.value,
) )
self.detections[label] = { self.detections[label] = {
"id": event_id, "id": event_id,
@ -329,8 +329,8 @@ class AudioEventMaintainer(threading.Thread):
) )
self.event_metadata_publisher.publish( self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], detection["last_detection"]), (detection["id"], detection["last_detection"]),
EventMetadataTypeEnum.manual_event_end.value,
) )
self.detections[detection["label"]] = None self.detections[detection["label"]] = None
@ -343,8 +343,8 @@ class AudioEventMaintainer(threading.Thread):
f"{self.camera_config.name}/audio/{label}", "OFF" f"{self.camera_config.name}/audio/{label}", "OFF"
) )
self.event_metadata_publisher.publish( self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], now), (detection["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
) )
self.detections[label] = None self.detections[label] = None

View File

@ -35,6 +35,9 @@ disallow_untyped_calls = false
[mypy-frigate.const] [mypy-frigate.const]
ignore_errors = false ignore_errors = false
[mypy-frigate.comms.*]
ignore_errors = false
[mypy-frigate.events] [mypy-frigate.events]
ignore_errors = false ignore_errors = false

View File

@ -96,7 +96,7 @@ class OutputProcess(FrigateProcess):
websocket_server.initialize_websockets_manager() websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever) websocket_thread = threading.Thread(target=websocket_server.serve_forever)
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
config_subscriber = CameraConfigUpdateSubscriber( config_subscriber = CameraConfigUpdateSubscriber(
self.config, self.config,
self.config.cameras, self.config.cameras,

View File

@ -79,7 +79,7 @@ class RecordingMaintainer(threading.Thread):
self.config.cameras, self.config.cameras,
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record], [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
self.recordings_publisher = RecordingsDataPublisher( self.recordings_publisher = RecordingsDataPublisher(
RecordingsDataTypeEnum.recordings_available_through RecordingsDataTypeEnum.recordings_available_through
) )
@ -545,7 +545,7 @@ class RecordingMaintainer(threading.Thread):
if not topic: if not topic:
break break
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video.value:
( (
camera, camera,
_, _,
@ -564,7 +564,7 @@ class RecordingMaintainer(threading.Thread):
regions, regions,
) )
) )
elif topic == DetectionTypeEnum.audio: elif topic == DetectionTypeEnum.audio.value:
( (
camera, camera,
frame_time, frame_time,
@ -580,7 +580,9 @@ class RecordingMaintainer(threading.Thread):
audio_detections, audio_detections,
) )
) )
elif topic == DetectionTypeEnum.api or DetectionTypeEnum.lpr: elif (
topic == DetectionTypeEnum.api.value or DetectionTypeEnum.lpr.value
):
continue continue
if frame_time < run_start - stale_frame_count_threshold: if frame_time < run_start - stale_frame_count_threshold:

View File

@ -164,7 +164,7 @@ class ReviewSegmentMaintainer(threading.Thread):
CameraConfigUpdateEnum.review, CameraConfigUpdateEnum.review,
], ],
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
# manual events # manual events
self.indefinite_events: dict[str, dict[str, Any]] = {} self.indefinite_events: dict[str, dict[str, Any]] = {}
@ -484,7 +484,7 @@ class ReviewSegmentMaintainer(threading.Thread):
if not topic: if not topic:
continue continue
if topic == DetectionTypeEnum.video: if topic == DetectionTypeEnum.video.value:
( (
camera, camera,
frame_name, frame_name,
@ -493,14 +493,14 @@ class ReviewSegmentMaintainer(threading.Thread):
_, _,
_, _,
) = data ) = data
elif topic == DetectionTypeEnum.audio: elif topic == DetectionTypeEnum.audio.value:
( (
camera, camera,
frame_time, frame_time,
_, _,
audio_detections, audio_detections,
) = data ) = data
elif topic == DetectionTypeEnum.api or DetectionTypeEnum.lpr: elif topic == DetectionTypeEnum.api.value or DetectionTypeEnum.lpr.value:
( (
camera, camera,
frame_time, frame_time,

View File

@ -214,7 +214,7 @@ class TestHttp(unittest.TestCase):
id = "123456.random" id = "123456.random"
sub_label = "sub" sub_label = "sub"
def update_event(topic, payload): def update_event(payload: Any, topic: str):
event = Event.get(id=id) event = Event.get(id=id)
event.sub_label = payload[1] event.sub_label = payload[1]
event.save() event.save()
@ -250,7 +250,7 @@ class TestHttp(unittest.TestCase):
id = "123456.random" id = "123456.random"
sub_label = "sub" sub_label = "sub"
def update_event(topic, payload): def update_event(payload: Any, _: str):
event = Event.get(id=id) event = Event.get(id=id)
event.sub_label = payload[1] event.sub_label = payload[1]
event.save() event.save()

View File

@ -78,7 +78,7 @@ class TrackedObjectProcessor(threading.Thread):
) )
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all.value)
self.event_sender = EventUpdatePublisher() self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber() self.event_end_subscriber = EventEndSubscriber()
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all) self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)