blakeblackshear.frigate/frigate/plus.py
Felipe Santos f8b0329b37
Move database and config from homeassistant /config to addon /config (#16337)
* Move database and config from homeassistant /config to addon /config

* Re-implement config migration for the add-on

* Align some terms

* Improve function name

* Use local variables

* Add model.path migration

* Fix homeassistant config path

* Ensure migration scripts run before go2rtc and frigate

* Migrate all files I know

* Add ffmpeg.path migration

* Update docker/main/rootfs/etc/s6-overlay/s6-rc.d/prepare/run

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>

* Improve some variable names and organization

* Update docs to reflect addon config dir

* Update live.md with /addon_configs

* Move addon config section to configuration doc

* Align several terminologies and improve text

* Fix webrtc example config title

* Capitalize Add-on in more places

* Improve specific add-on config dir docs

* Align bash and python scripts to prefer config.yml over config.yaml

* Support config.json in migration shell scripts

* Change docs to reflect config.yml is preferred over config.yaml

* If previous config was yaml, migrate to yaml

* Fix typo in edgetpu.md

* Fix formatting of Python files

* Remove HailoRT Beta add-on variant from docs

* Add migration for labelmap and certs

* Fix variable name

* Fix new_config_file var unset

* Fix addon config directories table

* Improve db migration to avoid migrating files like .db.bak

* Fix echo location

---------

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-03-24 09:05:59 -05:00

237 lines
7.5 KiB
Python

import datetime
import json
import logging
import os
import re
from pathlib import Path
from typing import Any, List
import cv2
import requests
from numpy import ndarray
from requests.models import Response
from frigate.const import PLUS_API_HOST, PLUS_ENV_VAR
logger = logging.getLogger(__name__)
def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes:
if image.shape[1] >= image.shape[0]:
width = min(max_dim, image.shape[1])
height = int(width * image.shape[0] / image.shape[1])
else:
height = min(max_dim, image.shape[0])
width = int(height * image.shape[1] / image.shape[0])
original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
jpg_bytes = jpg.tobytes()
return jpg_bytes if isinstance(jpg_bytes, bytes) else b""
class PlusApi:
def __init__(self) -> None:
self.host = PLUS_API_HOST
self.key = None
if PLUS_ENV_VAR in os.environ:
self.key = os.environ.get(PLUS_ENV_VAR)
elif (
os.path.isdir("/run/secrets")
and os.access("/run/secrets", os.R_OK)
and PLUS_ENV_VAR in os.listdir("/run/secrets")
):
self.key = (
Path(os.path.join("/run/secrets", PLUS_ENV_VAR)).read_text().strip()
)
# check for the add-on options file
elif os.path.isfile("/data/options.json"):
with open("/data/options.json") as f:
raw_options = f.read()
options = json.loads(raw_options)
self.key = options.get("plus_api_key")
if self.key is not None and not re.match(
r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}:[a-z0-9]{40}",
self.key,
):
logger.error("Plus API Key is not formatted correctly.")
self.key = None
self._is_active: bool = self.key is not None
self._token_data: dict = {}
def _refresh_token_if_needed(self) -> None:
if (
self._token_data.get("expires") is None
or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60
):
if self.key is None:
raise Exception(
"Plus API key not set. See https://docs.frigate.video/integrations/plus#set-your-api-key"
)
parts = self.key.split(":")
r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1]))
if not r.ok:
raise Exception(f"Unable to refresh API token: {r.text}")
self._token_data = r.json()
def _get_authorization_header(self) -> dict:
self._refresh_token_if_needed()
return {"authorization": f"Bearer {self._token_data.get('accessToken')}"}
def _get(self, path: str) -> Response:
return requests.get(
f"{self.host}/v1/{path}", headers=self._get_authorization_header()
)
def _post(self, path: str, data: dict) -> Response:
return requests.post(
f"{self.host}/v1/{path}",
headers=self._get_authorization_header(),
json=data,
)
def _put(self, path: str, data: dict) -> Response:
return requests.put(
f"{self.host}/v1/{path}",
headers=self._get_authorization_header(),
json=data,
)
def is_active(self) -> bool:
return self._is_active
def upload_image(self, image: ndarray, camera: str) -> str:
r = self._get("image/signed_urls")
presigned_urls = r.json()
if not r.ok:
raise Exception("Unable to get signed urls")
# resize and submit original
files = {"file": get_jpg_bytes(image, 1920, 85)}
data = presigned_urls["original"]["fields"]
data["content-type"] = "image/jpeg"
r = requests.post(presigned_urls["original"]["url"], files=files, data=data)
if not r.ok:
logger.error(f"Failed to upload original: {r.status_code} {r.text}")
raise Exception(r.text)
# resize and submit thumbnail
files = {"file": get_jpg_bytes(image, 200, 70)}
data = presigned_urls["thumbnail"]["fields"]
data["content-type"] = "image/jpeg"
r = requests.post(presigned_urls["thumbnail"]["url"], files=files, data=data)
if not r.ok:
logger.error(f"Failed to upload thumbnail: {r.status_code} {r.text}")
raise Exception(r.text)
# create image
r = self._post(
"image/create", {"id": presigned_urls["imageId"], "camera": camera}
)
if not r.ok:
raise Exception(r.text)
# return image id
return str(presigned_urls.get("imageId"))
def add_false_positive(
self,
plus_id: str,
region: List[float],
bbox: List[float],
score: float,
label: str,
model_hash: str,
model_type: str,
detector_type: str,
) -> None:
r = self._put(
f"image/{plus_id}/false_positive",
{
"label": label,
"x": bbox[0],
"y": bbox[1],
"w": bbox[2],
"h": bbox[3],
"regionX": region[0],
"regionY": region[1],
"regionW": region[2],
"regionH": region[3],
"score": score,
"model_hash": model_hash,
"model_type": model_type,
"detector_type": detector_type,
},
)
if not r.ok:
try:
error_response = r.json()
errors = error_response.get("errors", [])
for error in errors:
if (
error.get("param") == "label"
and error.get("type") == "invalid_enum_value"
):
raise ValueError(f"Unsupported label value provided: {label}")
except ValueError as e:
raise e
raise Exception(r.text)
def add_annotation(
self,
plus_id: str,
bbox: List[float],
label: str,
difficult: bool = False,
) -> None:
r = self._put(
f"image/{plus_id}/annotation",
{
"label": label,
"x": bbox[0],
"y": bbox[1],
"w": bbox[2],
"h": bbox[3],
"difficult": difficult,
},
)
if not r.ok:
try:
error_response = r.json()
errors = error_response.get("errors", [])
for error in errors:
if (
error.get("param") == "label"
and error.get("type") == "invalid_enum_value"
):
raise ValueError(f"Unsupported label value provided: {label}")
except ValueError as e:
raise e
raise Exception(r.text)
def get_model_download_url(
self,
model_id: str,
) -> str:
r = self._get(f"model/{model_id}/signed_url")
if not r.ok:
raise Exception(r.text)
presigned_url = r.json()
return str(presigned_url.get("url"))
def get_model_info(self, model_id: str) -> Any:
r = self._get(f"model/{model_id}")
if not r.ok:
raise Exception(r.text)
return r.json()