mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-07-30 13:48:07 +02:00
* Add other rockchip download models * Specify newer release version * Specify newer release version * Update docs for rknn downloads * Update hardware docs
305 lines
10 KiB
Python
305 lines
10 KiB
Python
import logging
|
|
import os.path
|
|
import re
|
|
import urllib.request
|
|
from typing import Literal
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from pydantic import Field
|
|
|
|
from frigate.const import MODEL_CACHE_DIR
|
|
from frigate.detectors.detection_api import DetectionApi
|
|
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
|
|
from frigate.util.model import post_process_yolo
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DETECTOR_KEY = "rknn"
|
|
|
|
supported_socs = ["rk3562", "rk3566", "rk3568", "rk3576", "rk3588"]
|
|
|
|
supported_models = {
|
|
ModelTypeEnum.yologeneric: "^frigate-fp16-yolov9-[cemst]$",
|
|
ModelTypeEnum.yolonas: "^deci-fp16-yolonas_[sml]$",
|
|
ModelTypeEnum.yolox: "^rock-(fp16|i8)-yolox_(nano|tiny)$",
|
|
}
|
|
|
|
model_cache_dir = os.path.join(MODEL_CACHE_DIR, "rknn_cache/")
|
|
|
|
|
|
class RknnDetectorConfig(BaseDetectorConfig):
|
|
type: Literal[DETECTOR_KEY]
|
|
num_cores: int = Field(default=0, ge=0, le=3, title="Number of NPU cores to use.")
|
|
|
|
|
|
class Rknn(DetectionApi):
|
|
type_key = DETECTOR_KEY
|
|
|
|
def __init__(self, config: RknnDetectorConfig):
|
|
super().__init__(config)
|
|
self.height = config.model.height
|
|
self.width = config.model.width
|
|
core_mask = 2**config.num_cores - 1
|
|
soc = self.get_soc()
|
|
|
|
model_path = config.model.path or "deci-fp16-yolonas_s"
|
|
|
|
model_props = self.parse_model_input(model_path, soc)
|
|
|
|
if self.detector_config.model.model_type == ModelTypeEnum.yolox:
|
|
self.calculate_grids_strides(expanded=False)
|
|
|
|
if model_props["preset"]:
|
|
config.model.model_type = model_props["model_type"]
|
|
|
|
if model_props["model_type"] == ModelTypeEnum.yolonas:
|
|
logger.info(
|
|
"You are using yolo-nas with weights from DeciAI. "
|
|
"These weights are subject to their license and can't be used commercially. "
|
|
"For more information, see: https://docs.deci.ai/super-gradients/latest/LICENSE.YOLONAS.html"
|
|
)
|
|
|
|
from rknnlite.api import RKNNLite
|
|
|
|
self.rknn = RKNNLite(verbose=False)
|
|
if self.rknn.load_rknn(model_props["path"]) != 0:
|
|
logger.error("Error initializing rknn model.")
|
|
if self.rknn.init_runtime(core_mask=core_mask) != 0:
|
|
logger.error(
|
|
"Error initializing rknn runtime. Do you run docker in privileged mode?"
|
|
)
|
|
|
|
def __del__(self):
|
|
self.rknn.release()
|
|
|
|
def get_soc(self):
|
|
try:
|
|
with open("/proc/device-tree/compatible") as file:
|
|
soc = file.read().split(",")[-1].strip("\x00")
|
|
except FileNotFoundError:
|
|
raise Exception("Make sure to run docker in privileged mode.")
|
|
|
|
if soc not in supported_socs:
|
|
raise Exception(
|
|
f"Your SoC is not supported. Your SoC is: {soc}. Currently these SoCs are supported: {supported_socs}."
|
|
)
|
|
|
|
return soc
|
|
|
|
def parse_model_input(self, model_path, soc):
|
|
model_props = {}
|
|
|
|
# find out if user provides his own model
|
|
# user provided models should be a path and contain a "/"
|
|
if "/" in model_path:
|
|
model_props["preset"] = False
|
|
model_props["path"] = model_path
|
|
else:
|
|
model_props["preset"] = True
|
|
|
|
"""
|
|
Filenames follow this pattern:
|
|
origin-quant-basename-soc-tk_version-rev.rknn
|
|
origin: From where comes the model? default: upstream repo; rknn: modifications from airockchip
|
|
quant: i8 or fp16
|
|
basename: e.g. yolonas_s
|
|
soc: e.g. rk3588
|
|
tk_version: e.g. v2.0.0
|
|
rev: e.g. 1
|
|
|
|
Full name could be: default-fp16-yolonas_s-rk3588-v2.0.0-1.rknn
|
|
"""
|
|
|
|
model_matched = False
|
|
|
|
for model_type, pattern in supported_models.items():
|
|
if re.match(pattern, model_path):
|
|
model_matched = True
|
|
model_props["model_type"] = model_type
|
|
|
|
if model_matched:
|
|
model_props["filename"] = model_path + f"-{soc}-v2.3.2-1.rknn"
|
|
|
|
model_props["path"] = model_cache_dir + model_props["filename"]
|
|
|
|
if not os.path.isfile(model_props["path"]):
|
|
self.download_model(model_props["filename"])
|
|
else:
|
|
supported_models_str = ", ".join(
|
|
model[1:-1] for model in supported_models
|
|
)
|
|
raise Exception(
|
|
f"Model {model_path} is unsupported. Provide your own model or choose one of the following: {supported_models_str}"
|
|
)
|
|
|
|
return model_props
|
|
|
|
def download_model(self, filename):
|
|
if not os.path.isdir(model_cache_dir):
|
|
os.mkdir(model_cache_dir)
|
|
|
|
urllib.request.urlretrieve(
|
|
f"https://github.com/MarcA711/rknn-models/releases/download/v2.3.2/{filename}",
|
|
model_cache_dir + filename,
|
|
)
|
|
|
|
def check_config(self, config):
|
|
if (config.model.width != 320) or (config.model.height != 320):
|
|
raise Exception(
|
|
"Make sure to set the model width and height to 320 in your config."
|
|
)
|
|
|
|
if config.model.input_pixel_format != "bgr":
|
|
raise Exception(
|
|
'Make sure to set the model input_pixel_format to "bgr" in your config.'
|
|
)
|
|
|
|
if config.model.input_tensor != "nhwc":
|
|
raise Exception(
|
|
'Make sure to set the model input_tensor to "nhwc" in your config.'
|
|
)
|
|
|
|
def post_process_yolonas(self, output: list[np.ndarray]):
|
|
"""
|
|
@param output: output of inference
|
|
expected shape: [np.array(1, N, 4), np.array(1, N, 80)]
|
|
where N depends on the input size e.g. N=2100 for 320x320 images
|
|
|
|
@return: best results: np.array(20, 6) where each row is
|
|
in this order (class_id, score, y1/height, x1/width, y2/height, x2/width)
|
|
"""
|
|
|
|
N = output[0].shape[1]
|
|
|
|
boxes = output[0].reshape(N, 4)
|
|
scores = output[1].reshape(N, 80)
|
|
|
|
class_ids = np.argmax(scores, axis=1)
|
|
scores = scores[np.arange(N), class_ids]
|
|
|
|
args_best = np.argwhere(scores > self.thresh)[:, 0]
|
|
|
|
num_matches = len(args_best)
|
|
if num_matches == 0:
|
|
return np.zeros((20, 6), np.float32)
|
|
elif num_matches > 20:
|
|
args_best20 = np.argpartition(scores[args_best], -20)[-20:]
|
|
args_best = args_best[args_best20]
|
|
|
|
boxes = boxes[args_best]
|
|
class_ids = class_ids[args_best]
|
|
scores = scores[args_best]
|
|
|
|
boxes = np.transpose(
|
|
np.vstack(
|
|
(
|
|
boxes[:, 1] / self.height,
|
|
boxes[:, 0] / self.width,
|
|
boxes[:, 3] / self.height,
|
|
boxes[:, 2] / self.width,
|
|
)
|
|
)
|
|
)
|
|
|
|
results = np.hstack(
|
|
(class_ids[..., np.newaxis], scores[..., np.newaxis], boxes)
|
|
)
|
|
|
|
return np.resize(results, (20, 6))
|
|
|
|
def post_process_yolox(
|
|
self,
|
|
predictions: list[np.ndarray],
|
|
grids: np.ndarray,
|
|
expanded_strides: np.ndarray,
|
|
) -> np.ndarray:
|
|
def sp_flatten(_in: np.ndarray):
|
|
ch = _in.shape[1]
|
|
_in = _in.transpose(0, 2, 3, 1)
|
|
return _in.reshape(-1, ch)
|
|
|
|
boxes, scores, classes_conf = [], [], []
|
|
|
|
input_data = [
|
|
_in.reshape([1, -1] + list(_in.shape[-2:])) for _in in predictions
|
|
]
|
|
|
|
for i in range(len(input_data)):
|
|
unprocessed_box = input_data[i][:, :4, :, :]
|
|
box_xy = unprocessed_box[:, :2, :, :]
|
|
box_wh = np.exp(unprocessed_box[:, 2:4, :, :]) * expanded_strides[i]
|
|
|
|
box_xy += grids[i]
|
|
box_xy *= expanded_strides[i]
|
|
box = np.concatenate((box_xy, box_wh), axis=1)
|
|
|
|
# Convert [c_x, c_y, w, h] to [x1, y1, x2, y2]
|
|
xyxy = np.copy(box)
|
|
xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2 # top left x
|
|
xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2 # top left y
|
|
xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2 # bottom right x
|
|
xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2 # bottom right y
|
|
|
|
boxes.append(xyxy)
|
|
scores.append(input_data[i][:, 4:5, :, :])
|
|
classes_conf.append(input_data[i][:, 5:, :, :])
|
|
|
|
# flatten data
|
|
boxes = np.concatenate([sp_flatten(_v) for _v in boxes])
|
|
classes_conf = np.concatenate([sp_flatten(_v) for _v in classes_conf])
|
|
scores = np.concatenate([sp_flatten(_v) for _v in scores])
|
|
|
|
# reshape and filter boxes
|
|
box_confidences = scores.reshape(-1)
|
|
class_max_score = np.max(classes_conf, axis=-1)
|
|
classes = np.argmax(classes_conf, axis=-1)
|
|
_class_pos = np.where(class_max_score * box_confidences >= 0.4)
|
|
scores = (class_max_score * box_confidences)[_class_pos]
|
|
boxes = boxes[_class_pos]
|
|
classes = classes[_class_pos]
|
|
|
|
# run nms
|
|
indices = cv2.dnn.NMSBoxes(
|
|
bboxes=boxes,
|
|
scores=scores,
|
|
score_threshold=0.4,
|
|
nms_threshold=0.4,
|
|
)
|
|
|
|
results = np.zeros((20, 6), np.float32)
|
|
|
|
if len(indices) > 0:
|
|
for i, idx in enumerate(indices.flatten()[:20]):
|
|
box = boxes[idx]
|
|
results[i] = [
|
|
classes[idx],
|
|
scores[idx],
|
|
box[1] / self.height,
|
|
box[0] / self.width,
|
|
box[3] / self.height,
|
|
box[2] / self.width,
|
|
]
|
|
|
|
return results
|
|
|
|
def post_process(self, output):
|
|
if self.detector_config.model.model_type == ModelTypeEnum.yolonas:
|
|
return self.post_process_yolonas(output)
|
|
elif self.detector_config.model.model_type == ModelTypeEnum.yologeneric:
|
|
return post_process_yolo(output, self.width, self.height)
|
|
elif self.detector_config.model.model_type == ModelTypeEnum.yolox:
|
|
return self.post_process_yolox(output, self.grids, self.expanded_strides)
|
|
else:
|
|
raise ValueError(
|
|
f'Model type "{self.detector_config.model.model_type}" is currently not supported.'
|
|
)
|
|
|
|
def detect_raw(self, tensor_input):
|
|
output = self.rknn.inference(
|
|
[
|
|
tensor_input,
|
|
]
|
|
)
|
|
return self.post_process(output)
|