blakeblackshear.frigate/frigate/data_processing/real_time/api.py

58 lines
1.5 KiB
Python
Raw Normal View History

"""Local only processors for handling real time object processing."""
import logging
from abc import ABC, abstractmethod
import numpy as np
from frigate.config import FrigateConfig
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
class RealTimeProcessorApi(ABC):
@abstractmethod
def __init__(self, config: FrigateConfig, metrics: DataProcessorMetrics) -> None:
self.config = config
self.metrics = metrics
pass
@abstractmethod
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
"""Processes the frame with object data.
Args:
obj_data (dict): containing data about focused object in frame.
frame (ndarray): full yuv frame.
Returns:
None.
"""
pass
@abstractmethod
2025-01-18 18:52:01 +01:00
def handle_request(
self, topic: str, request_data: dict[str, any]
) -> dict[str, any] | None:
"""Handle metadata requests.
Args:
2025-01-18 18:52:01 +01:00
topic (str): topic that dictates what work is requested.
request_data (dict): containing data about requested change to process.
Returns:
None if request was not handled, otherwise return response.
"""
pass
@abstractmethod
def expire_object(self, object_id: str) -> None:
"""Handle objects that are no longer detected.
Args:
object_id (str): id of object that is no longer detected.
Returns:
None.
"""
pass