import datetime import json import logging import os import re from pathlib import Path from typing import Any, List import cv2 import requests from numpy import ndarray from requests.models import Response from frigate.const import PLUS_API_HOST, PLUS_ENV_VAR logger = logging.getLogger(__name__) def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes: if image.shape[1] >= image.shape[0]: width = min(max_dim, image.shape[1]) height = int(width * image.shape[0] / image.shape[1]) else: height = min(max_dim, image.shape[0]) width = int(height * image.shape[1] / image.shape[0]) original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) jpg_bytes = jpg.tobytes() return jpg_bytes if isinstance(jpg_bytes, bytes) else b"" class PlusApi: def __init__(self) -> None: self.host = PLUS_API_HOST self.key = None if PLUS_ENV_VAR in os.environ: self.key = os.environ.get(PLUS_ENV_VAR) elif os.path.isdir("/run/secrets") and PLUS_ENV_VAR in os.listdir( "/run/secrets" ): self.key = Path(os.path.join("/run/secrets", PLUS_ENV_VAR)).read_text() # check for the addon options file elif os.path.isfile("/data/options.json"): with open("/data/options.json") as f: raw_options = f.read() options = json.loads(raw_options) self.key = options.get("plus_api_key") if self.key is not None and not re.match( r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}:[a-z0-9]{40}", self.key, ): logger.error("Plus API Key is not formatted correctly.") self.key = None self._is_active: bool = self.key is not None self._token_data: dict = {} def _refresh_token_if_needed(self) -> None: if ( self._token_data.get("expires") is None or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60 ): if self.key is None: raise Exception("Plus API not activated") parts = self.key.split(":") r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1])) if not r.ok: raise Exception("Unable to refresh API token") self._token_data = r.json() def _get_authorization_header(self) -> dict: self._refresh_token_if_needed() return {"authorization": f"Bearer {self._token_data.get('accessToken')}"} def _get(self, path: str) -> Response: return requests.get( f"{self.host}/v1/{path}", headers=self._get_authorization_header() ) def _post(self, path: str, data: dict) -> Response: return requests.post( f"{self.host}/v1/{path}", headers=self._get_authorization_header(), json=data, ) def _put(self, path: str, data: dict) -> Response: return requests.put( f"{self.host}/v1/{path}", headers=self._get_authorization_header(), json=data, ) def is_active(self) -> bool: return self._is_active def upload_image(self, image: ndarray, camera: str) -> str: r = self._get("image/signed_urls") presigned_urls = r.json() if not r.ok: raise Exception("Unable to get signed urls") # resize and submit original files = {"file": get_jpg_bytes(image, 1920, 85)} data = presigned_urls["original"]["fields"] data["content-type"] = "image/jpeg" r = requests.post(presigned_urls["original"]["url"], files=files, data=data) if not r.ok: logger.error(f"Failed to upload original: {r.status_code} {r.text}") raise Exception(r.text) # resize and submit annotate files = {"file": get_jpg_bytes(image, 640, 70)} data = presigned_urls["annotate"]["fields"] data["content-type"] = "image/jpeg" r = requests.post(presigned_urls["annotate"]["url"], files=files, data=data) if not r.ok: logger.error(f"Failed to upload annotate: {r.status_code} {r.text}") raise Exception(r.text) # resize and submit thumbnail files = {"file": get_jpg_bytes(image, 200, 70)} data = presigned_urls["thumbnail"]["fields"] data["content-type"] = "image/jpeg" r = requests.post(presigned_urls["thumbnail"]["url"], files=files, data=data) if not r.ok: logger.error(f"Failed to upload thumbnail: {r.status_code} {r.text}") raise Exception(r.text) # create image r = self._post( "image/create", {"id": presigned_urls["imageId"], "camera": camera} ) if not r.ok: raise Exception(r.text) # return image id return str(presigned_urls.get("imageId")) def add_false_positive( self, plus_id: str, region: List[float], bbox: List[float], score: float, label: str, model_hash: str, model_type: str, detector_type: str, ) -> None: r = self._put( f"image/{plus_id}/false_positive", { "label": label, "x": bbox[0], "y": bbox[1], "w": bbox[2], "h": bbox[3], "regionX": region[0], "regionY": region[1], "regionW": region[2], "regionH": region[3], "score": score, "model_hash": model_hash, "model_type": model_type, "detector_type": detector_type, }, ) if not r.ok: try: error_response = r.json() errors = error_response.get("errors", []) for error in errors: if ( error.get("param") == "label" and error.get("type") == "invalid_enum_value" ): raise ValueError(f"Unsupported label value provided: {label}") except ValueError as e: raise e raise Exception(r.text) def add_annotation( self, plus_id: str, bbox: List[float], label: str, difficult: bool = False, ) -> None: r = self._put( f"image/{plus_id}/annotation", { "label": label, "x": bbox[0], "y": bbox[1], "w": bbox[2], "h": bbox[3], "difficult": difficult, }, ) if not r.ok: try: error_response = r.json() errors = error_response.get("errors", []) for error in errors: if ( error.get("param") == "label" and error.get("type") == "invalid_enum_value" ): raise ValueError(f"Unsupported label value provided: {label}") except ValueError as e: raise e raise Exception(r.text) def get_model_download_url( self, model_id: str, ) -> str: r = self._get(f"model/{model_id}/signed_url") if not r.ok: raise Exception(r.text) presigned_url = r.json() return str(presigned_url.get("url")) def get_model_info(self, model_id: str) -> Any: r = self._get(f"model/{model_id}") if not r.ok: raise Exception(r.text) return r.json()