mirror of
https://github.com/blakeblackshear/frigate.git
synced 2024-11-21 19:07:46 +01:00
ab50d0b006
* Add isort and ruff linter Both linters are pretty common among modern python code bases. The isort tool provides stable sorting and grouping, as well as pruning of unused imports. Ruff is a modern linter, that is very fast due to being written in rust. It can detect many common issues in a python codebase. Removes the pylint dev requirement, since ruff replaces it. * treewide: fix issues detected by ruff * treewide: fix bare except clauses * .devcontainer: Set up isort * treewide: optimize imports * treewide: apply black * treewide: make regex patterns raw strings This is necessary for escape sequences to be properly recognized.
213 lines
6.5 KiB
Python
213 lines
6.5 KiB
Python
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from typing import Any, List
|
|
|
|
import cv2
|
|
import requests
|
|
from numpy import ndarray
|
|
from requests.models import Response
|
|
|
|
from frigate.const import PLUS_API_HOST, PLUS_ENV_VAR
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes:
|
|
if image.shape[1] >= image.shape[0]:
|
|
width = min(max_dim, image.shape[1])
|
|
height = int(width * image.shape[0] / image.shape[1])
|
|
else:
|
|
height = min(max_dim, image.shape[0])
|
|
width = int(height * image.shape[1] / image.shape[0])
|
|
|
|
original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA)
|
|
|
|
ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
|
|
jpg_bytes = jpg.tobytes()
|
|
return jpg_bytes if type(jpg_bytes) is bytes else b""
|
|
|
|
|
|
class PlusApi:
|
|
def __init__(self) -> None:
|
|
self.host = PLUS_API_HOST
|
|
self.key = None
|
|
if PLUS_ENV_VAR in os.environ:
|
|
self.key = os.environ.get(PLUS_ENV_VAR)
|
|
# check for the addon options file
|
|
elif os.path.isfile("/data/options.json"):
|
|
with open("/data/options.json") as f:
|
|
raw_options = f.read()
|
|
options = json.loads(raw_options)
|
|
self.key = options.get("plus_api_key")
|
|
|
|
if self.key is not None and not re.match(
|
|
r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}:[a-z0-9]{40}",
|
|
self.key,
|
|
):
|
|
logger.error("Plus API Key is not formatted correctly.")
|
|
self.key = None
|
|
|
|
self._is_active: bool = self.key is not None
|
|
self._token_data: dict = {}
|
|
|
|
def _refresh_token_if_needed(self) -> None:
|
|
if (
|
|
self._token_data.get("expires") is None
|
|
or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60
|
|
):
|
|
if self.key is None:
|
|
raise Exception("Plus API not activated")
|
|
parts = self.key.split(":")
|
|
r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1]))
|
|
if not r.ok:
|
|
raise Exception("Unable to refresh API token")
|
|
self._token_data = r.json()
|
|
|
|
def _get_authorization_header(self) -> dict:
|
|
self._refresh_token_if_needed()
|
|
return {"authorization": f"Bearer {self._token_data.get('accessToken')}"}
|
|
|
|
def _get(self, path: str) -> Response:
|
|
return requests.get(
|
|
f"{self.host}/v1/{path}", headers=self._get_authorization_header()
|
|
)
|
|
|
|
def _post(self, path: str, data: dict) -> Response:
|
|
return requests.post(
|
|
f"{self.host}/v1/{path}",
|
|
headers=self._get_authorization_header(),
|
|
json=data,
|
|
)
|
|
|
|
def _put(self, path: str, data: dict) -> Response:
|
|
return requests.put(
|
|
f"{self.host}/v1/{path}",
|
|
headers=self._get_authorization_header(),
|
|
json=data,
|
|
)
|
|
|
|
def is_active(self) -> bool:
|
|
return self._is_active
|
|
|
|
def upload_image(self, image: ndarray, camera: str) -> str:
|
|
r = self._get("image/signed_urls")
|
|
presigned_urls = r.json()
|
|
if not r.ok:
|
|
raise Exception("Unable to get signed urls")
|
|
|
|
# resize and submit original
|
|
files = {"file": get_jpg_bytes(image, 1920, 85)}
|
|
data = presigned_urls["original"]["fields"]
|
|
data["content-type"] = "image/jpeg"
|
|
r = requests.post(presigned_urls["original"]["url"], files=files, data=data)
|
|
if not r.ok:
|
|
logger.error(f"Failed to upload original: {r.status_code} {r.text}")
|
|
raise Exception(r.text)
|
|
|
|
# resize and submit annotate
|
|
files = {"file": get_jpg_bytes(image, 640, 70)}
|
|
data = presigned_urls["annotate"]["fields"]
|
|
data["content-type"] = "image/jpeg"
|
|
r = requests.post(presigned_urls["annotate"]["url"], files=files, data=data)
|
|
if not r.ok:
|
|
logger.error(f"Failed to upload annotate: {r.status_code} {r.text}")
|
|
raise Exception(r.text)
|
|
|
|
# resize and submit thumbnail
|
|
files = {"file": get_jpg_bytes(image, 200, 70)}
|
|
data = presigned_urls["thumbnail"]["fields"]
|
|
data["content-type"] = "image/jpeg"
|
|
r = requests.post(presigned_urls["thumbnail"]["url"], files=files, data=data)
|
|
if not r.ok:
|
|
logger.error(f"Failed to upload thumbnail: {r.status_code} {r.text}")
|
|
raise Exception(r.text)
|
|
|
|
# create image
|
|
r = self._post(
|
|
"image/create", {"id": presigned_urls["imageId"], "camera": camera}
|
|
)
|
|
if not r.ok:
|
|
raise Exception(r.text)
|
|
|
|
# return image id
|
|
return str(presigned_urls.get("imageId"))
|
|
|
|
def add_false_positive(
|
|
self,
|
|
plus_id: str,
|
|
region: List[float],
|
|
bbox: List[float],
|
|
score: float,
|
|
label: str,
|
|
model_hash: str,
|
|
model_type: str,
|
|
detector_type: str,
|
|
) -> None:
|
|
r = self._put(
|
|
f"image/{plus_id}/false_positive",
|
|
{
|
|
"label": label,
|
|
"x": bbox[0],
|
|
"y": bbox[1],
|
|
"w": bbox[2],
|
|
"h": bbox[3],
|
|
"regionX": region[0],
|
|
"regionY": region[1],
|
|
"regionW": region[2],
|
|
"regionH": region[3],
|
|
"score": score,
|
|
"model_hash": model_hash,
|
|
"model_type": model_type,
|
|
"detector_type": detector_type,
|
|
},
|
|
)
|
|
|
|
if not r.ok:
|
|
raise Exception(r.text)
|
|
|
|
def add_annotation(
|
|
self,
|
|
plus_id: str,
|
|
bbox: List[float],
|
|
label: str,
|
|
difficult: bool = False,
|
|
) -> None:
|
|
r = self._put(
|
|
f"image/{plus_id}/annotation",
|
|
{
|
|
"label": label,
|
|
"x": bbox[0],
|
|
"y": bbox[1],
|
|
"w": bbox[2],
|
|
"h": bbox[3],
|
|
"difficult": difficult,
|
|
},
|
|
)
|
|
|
|
if not r.ok:
|
|
raise Exception(r.text)
|
|
|
|
def get_model_download_url(
|
|
self,
|
|
model_id: str,
|
|
) -> str:
|
|
r = self._get(f"model/{model_id}/signed_url")
|
|
|
|
if not r.ok:
|
|
raise Exception(r.text)
|
|
|
|
presigned_url = r.json()
|
|
|
|
return str(presigned_url.get("url"))
|
|
|
|
def get_model_info(self, model_id: str) -> Any:
|
|
r = self._get(f"model/{model_id}")
|
|
|
|
if not r.ok:
|
|
raise Exception(r.text)
|
|
|
|
return r.json()
|