import datetime import json import logging import os import re from typing import Any, Dict, List import requests from frigate.const import PLUS_ENV_VAR, PLUS_API_HOST from requests.models import Response import cv2 from numpy import ndarray logger = logging.getLogger(__name__) def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes: if image.shape[1] >= image.shape[0]: width = min(max_dim, image.shape[1]) height = int(width * image.shape[0] / image.shape[1]) else: height = min(max_dim, image.shape[0]) width = int(height * image.shape[1] / image.shape[0]) original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) jpg_bytes = jpg.tobytes() return jpg_bytes if type(jpg_bytes) is bytes else b"" class PlusApi: def __init__(self) -> None: self.host = PLUS_API_HOST self.key = None if PLUS_ENV_VAR in os.environ: self.key = os.environ.get(PLUS_ENV_VAR) # check for the addon options file elif os.path.isfile("/data/options.json"): with open("/data/options.json") as f: raw_options = f.read() options = json.loads(raw_options) self.key = options.get("plus_api_key") if self.key is not None and not re.match( r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}:[a-z0-9]{40}", self.key, ): logger.error("Plus API Key is not formatted correctly.") self.key = None self._is_active: bool = self.key is not None self._token_data: dict = {} def _refresh_token_if_needed(self) -> None: if ( self._token_data.get("expires") is None or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60 ): if self.key is None: raise Exception("Plus API not activated") parts = self.key.split(":") r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1])) if not r.ok: raise Exception("Unable to refresh API token") self._token_data = r.json() def _get_authorization_header(self) -> dict: self._refresh_token_if_needed() return {"authorization": f"Bearer {self._token_data.get('accessToken')}"} def _get(self, path: str) -> Response: return requests.get( f"{self.host}/v1/{path}", headers=self._get_authorization_header() ) def _post(self, path: str, data: dict) -> Response: return requests.post( f"{self.host}/v1/{path}", headers=self._get_authorization_header(), json=data, ) def _put(self, path: str, data: dict) -> Response: return requests.put( f"{self.host}/v1/{path}", headers=self._get_authorization_header(), json=data, ) def is_active(self) -> bool: return self._is_active def upload_image(self, image: ndarray, camera: str) -> str: r = self._get("image/signed_urls") presigned_urls = r.json() if not r.ok: raise Exception("Unable to get signed urls") # resize and submit original files = {"file": get_jpg_bytes(image, 1920, 85)} data = presigned_urls["original"]["fields"] data["content-type"] = "image/jpeg" r = requests.post(presigned_urls["original"]["url"], files=files, data=data) if not r.ok: logger.error(f"Failed to upload original: {r.status_code} {r.text}") raise Exception(r.text) # resize and submit annotate files = {"file": get_jpg_bytes(image, 640, 70)} data = presigned_urls["annotate"]["fields"] data["content-type"] = "image/jpeg" r = requests.post(presigned_urls["annotate"]["url"], files=files, data=data) if not r.ok: logger.error(f"Failed to upload annotate: {r.status_code} {r.text}") raise Exception(r.text) # resize and submit thumbnail files = {"file": get_jpg_bytes(image, 200, 70)} data = presigned_urls["thumbnail"]["fields"] data["content-type"] = "image/jpeg" r = requests.post(presigned_urls["thumbnail"]["url"], files=files, data=data) if not r.ok: logger.error(f"Failed to upload thumbnail: {r.status_code} {r.text}") raise Exception(r.text) # create image r = self._post( "image/create", {"id": presigned_urls["imageId"], "camera": camera} ) if not r.ok: raise Exception(r.text) # return image id return str(presigned_urls.get("imageId")) def add_false_positive( self, plus_id: str, region: List[float], bbox: List[float], score: float, label: str, model_hash: str, model_type: str, detector_type: str, ) -> None: r = self._put( f"image/{plus_id}/false_positive", { "label": label, "x": bbox[0], "y": bbox[1], "w": bbox[2], "h": bbox[3], "regionX": region[0], "regionY": region[1], "regionW": region[2], "regionH": region[3], "score": score, "model_hash": model_hash, "model_type": model_type, "detector_type": detector_type, }, ) if not r.ok: raise Exception(r.text) def add_annotation( self, plus_id: str, bbox: List[float], label: str, difficult: bool = False, ) -> None: r = self._put( f"image/{plus_id}/annotation", { "label": label, "x": bbox[0], "y": bbox[1], "w": bbox[2], "h": bbox[3], "difficult": difficult, }, ) if not r.ok: raise Exception(r.text) def get_model_download_url( self, model_id: str, ) -> str: r = self._get(f"model/{model_id}/signed_url") if not r.ok: raise Exception(r.text) presigned_url = r.json() return str(presigned_url.get("url")) def get_model_info(self, model_id: str) -> Any: r = self._get(f"model/{model_id}") if not r.ok: raise Exception(r.text) return r.json()