mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			237 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			237 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import datetime
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
from pathlib import Path
 | 
						|
from typing import Any, List
 | 
						|
 | 
						|
import cv2
 | 
						|
import requests
 | 
						|
from numpy import ndarray
 | 
						|
from requests.models import Response
 | 
						|
 | 
						|
from frigate.const import PLUS_API_HOST, PLUS_ENV_VAR
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes:
 | 
						|
    if image.shape[1] >= image.shape[0]:
 | 
						|
        width = min(max_dim, image.shape[1])
 | 
						|
        height = int(width * image.shape[0] / image.shape[1])
 | 
						|
    else:
 | 
						|
        height = min(max_dim, image.shape[0])
 | 
						|
        width = int(height * image.shape[1] / image.shape[0])
 | 
						|
 | 
						|
    original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA)
 | 
						|
 | 
						|
    ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
 | 
						|
    jpg_bytes = jpg.tobytes()
 | 
						|
    return jpg_bytes if isinstance(jpg_bytes, bytes) else b""
 | 
						|
 | 
						|
 | 
						|
class PlusApi:
 | 
						|
    def __init__(self) -> None:
 | 
						|
        self.host = PLUS_API_HOST
 | 
						|
        self.key = None
 | 
						|
        if PLUS_ENV_VAR in os.environ:
 | 
						|
            self.key = os.environ.get(PLUS_ENV_VAR)
 | 
						|
        elif (
 | 
						|
            os.path.isdir("/run/secrets")
 | 
						|
            and os.access("/run/secrets", os.R_OK)
 | 
						|
            and PLUS_ENV_VAR in os.listdir("/run/secrets")
 | 
						|
        ):
 | 
						|
            self.key = (
 | 
						|
                Path(os.path.join("/run/secrets", PLUS_ENV_VAR)).read_text().strip()
 | 
						|
            )
 | 
						|
        # check for the addon options file
 | 
						|
        elif os.path.isfile("/data/options.json"):
 | 
						|
            with open("/data/options.json") as f:
 | 
						|
                raw_options = f.read()
 | 
						|
            options = json.loads(raw_options)
 | 
						|
            self.key = options.get("plus_api_key")
 | 
						|
 | 
						|
        if self.key is not None and not re.match(
 | 
						|
            r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}:[a-z0-9]{40}",
 | 
						|
            self.key,
 | 
						|
        ):
 | 
						|
            logger.error("Plus API Key is not formatted correctly.")
 | 
						|
            self.key = None
 | 
						|
 | 
						|
        self._is_active: bool = self.key is not None
 | 
						|
        self._token_data: dict = {}
 | 
						|
 | 
						|
    def _refresh_token_if_needed(self) -> None:
 | 
						|
        if (
 | 
						|
            self._token_data.get("expires") is None
 | 
						|
            or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60
 | 
						|
        ):
 | 
						|
            if self.key is None:
 | 
						|
                raise Exception(
 | 
						|
                    "Plus API key not set. See https://docs.frigate.video/integrations/plus#set-your-api-key"
 | 
						|
                )
 | 
						|
            parts = self.key.split(":")
 | 
						|
            r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1]))
 | 
						|
            if not r.ok:
 | 
						|
                raise Exception(f"Unable to refresh API token: {r.text}")
 | 
						|
            self._token_data = r.json()
 | 
						|
 | 
						|
    def _get_authorization_header(self) -> dict:
 | 
						|
        self._refresh_token_if_needed()
 | 
						|
        return {"authorization": f"Bearer {self._token_data.get('accessToken')}"}
 | 
						|
 | 
						|
    def _get(self, path: str) -> Response:
 | 
						|
        return requests.get(
 | 
						|
            f"{self.host}/v1/{path}", headers=self._get_authorization_header()
 | 
						|
        )
 | 
						|
 | 
						|
    def _post(self, path: str, data: dict) -> Response:
 | 
						|
        return requests.post(
 | 
						|
            f"{self.host}/v1/{path}",
 | 
						|
            headers=self._get_authorization_header(),
 | 
						|
            json=data,
 | 
						|
        )
 | 
						|
 | 
						|
    def _put(self, path: str, data: dict) -> Response:
 | 
						|
        return requests.put(
 | 
						|
            f"{self.host}/v1/{path}",
 | 
						|
            headers=self._get_authorization_header(),
 | 
						|
            json=data,
 | 
						|
        )
 | 
						|
 | 
						|
    def is_active(self) -> bool:
 | 
						|
        return self._is_active
 | 
						|
 | 
						|
    def upload_image(self, image: ndarray, camera: str) -> str:
 | 
						|
        r = self._get("image/signed_urls")
 | 
						|
        presigned_urls = r.json()
 | 
						|
        if not r.ok:
 | 
						|
            raise Exception("Unable to get signed urls")
 | 
						|
 | 
						|
        # resize and submit original
 | 
						|
        files = {"file": get_jpg_bytes(image, 1920, 85)}
 | 
						|
        data = presigned_urls["original"]["fields"]
 | 
						|
        data["content-type"] = "image/jpeg"
 | 
						|
        r = requests.post(presigned_urls["original"]["url"], files=files, data=data)
 | 
						|
        if not r.ok:
 | 
						|
            logger.error(f"Failed to upload original: {r.status_code} {r.text}")
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
        # resize and submit thumbnail
 | 
						|
        files = {"file": get_jpg_bytes(image, 200, 70)}
 | 
						|
        data = presigned_urls["thumbnail"]["fields"]
 | 
						|
        data["content-type"] = "image/jpeg"
 | 
						|
        r = requests.post(presigned_urls["thumbnail"]["url"], files=files, data=data)
 | 
						|
        if not r.ok:
 | 
						|
            logger.error(f"Failed to upload thumbnail: {r.status_code} {r.text}")
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
        # create image
 | 
						|
        r = self._post(
 | 
						|
            "image/create", {"id": presigned_urls["imageId"], "camera": camera}
 | 
						|
        )
 | 
						|
        if not r.ok:
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
        # return image id
 | 
						|
        return str(presigned_urls.get("imageId"))
 | 
						|
 | 
						|
    def add_false_positive(
 | 
						|
        self,
 | 
						|
        plus_id: str,
 | 
						|
        region: List[float],
 | 
						|
        bbox: List[float],
 | 
						|
        score: float,
 | 
						|
        label: str,
 | 
						|
        model_hash: str,
 | 
						|
        model_type: str,
 | 
						|
        detector_type: str,
 | 
						|
    ) -> None:
 | 
						|
        r = self._put(
 | 
						|
            f"image/{plus_id}/false_positive",
 | 
						|
            {
 | 
						|
                "label": label,
 | 
						|
                "x": bbox[0],
 | 
						|
                "y": bbox[1],
 | 
						|
                "w": bbox[2],
 | 
						|
                "h": bbox[3],
 | 
						|
                "regionX": region[0],
 | 
						|
                "regionY": region[1],
 | 
						|
                "regionW": region[2],
 | 
						|
                "regionH": region[3],
 | 
						|
                "score": score,
 | 
						|
                "model_hash": model_hash,
 | 
						|
                "model_type": model_type,
 | 
						|
                "detector_type": detector_type,
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
        if not r.ok:
 | 
						|
            try:
 | 
						|
                error_response = r.json()
 | 
						|
                errors = error_response.get("errors", [])
 | 
						|
                for error in errors:
 | 
						|
                    if (
 | 
						|
                        error.get("param") == "label"
 | 
						|
                        and error.get("type") == "invalid_enum_value"
 | 
						|
                    ):
 | 
						|
                        raise ValueError(f"Unsupported label value provided: {label}")
 | 
						|
            except ValueError as e:
 | 
						|
                raise e
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
    def add_annotation(
 | 
						|
        self,
 | 
						|
        plus_id: str,
 | 
						|
        bbox: List[float],
 | 
						|
        label: str,
 | 
						|
        difficult: bool = False,
 | 
						|
    ) -> None:
 | 
						|
        r = self._put(
 | 
						|
            f"image/{plus_id}/annotation",
 | 
						|
            {
 | 
						|
                "label": label,
 | 
						|
                "x": bbox[0],
 | 
						|
                "y": bbox[1],
 | 
						|
                "w": bbox[2],
 | 
						|
                "h": bbox[3],
 | 
						|
                "difficult": difficult,
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
        if not r.ok:
 | 
						|
            try:
 | 
						|
                error_response = r.json()
 | 
						|
                errors = error_response.get("errors", [])
 | 
						|
                for error in errors:
 | 
						|
                    if (
 | 
						|
                        error.get("param") == "label"
 | 
						|
                        and error.get("type") == "invalid_enum_value"
 | 
						|
                    ):
 | 
						|
                        raise ValueError(f"Unsupported label value provided: {label}")
 | 
						|
            except ValueError as e:
 | 
						|
                raise e
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
    def get_model_download_url(
 | 
						|
        self,
 | 
						|
        model_id: str,
 | 
						|
    ) -> str:
 | 
						|
        r = self._get(f"model/{model_id}/signed_url")
 | 
						|
 | 
						|
        if not r.ok:
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
        presigned_url = r.json()
 | 
						|
 | 
						|
        return str(presigned_url.get("url"))
 | 
						|
 | 
						|
    def get_model_info(self, model_id: str) -> Any:
 | 
						|
        r = self._get(f"model/{model_id}")
 | 
						|
 | 
						|
        if not r.ok:
 | 
						|
            raise Exception(r.text)
 | 
						|
 | 
						|
        return r.json()
 |