blakeblackshear.frigate/frigate/detectors/plugins/rknn.py
Nicolas Mowen 212784b68e
implement RKNN downloads for yolov9 and yolox models (#17875)
* Add other rockchip download models

* Specify newer release version

* Specify newer release version

* Update docs for rknn downloads

* Update hardware docs
2025-04-23 12:22:23 -05:00

305 lines
10 KiB
Python

import logging
import os.path
import re
import urllib.request
from typing import Literal
import cv2
import numpy as np
from pydantic import Field
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
from frigate.util.model import post_process_yolo
logger = logging.getLogger(__name__)
DETECTOR_KEY = "rknn"
supported_socs = ["rk3562", "rk3566", "rk3568", "rk3576", "rk3588"]
supported_models = {
ModelTypeEnum.yologeneric: "^frigate-fp16-yolov9-[cemst]$",
ModelTypeEnum.yolonas: "^deci-fp16-yolonas_[sml]$",
ModelTypeEnum.yolox: "^rock-(fp16|i8)-yolox_(nano|tiny)$",
}
model_cache_dir = os.path.join(MODEL_CACHE_DIR, "rknn_cache/")
class RknnDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
num_cores: int = Field(default=0, ge=0, le=3, title="Number of NPU cores to use.")
class Rknn(DetectionApi):
type_key = DETECTOR_KEY
def __init__(self, config: RknnDetectorConfig):
super().__init__(config)
self.height = config.model.height
self.width = config.model.width
core_mask = 2**config.num_cores - 1
soc = self.get_soc()
model_path = config.model.path or "deci-fp16-yolonas_s"
model_props = self.parse_model_input(model_path, soc)
if self.detector_config.model.model_type == ModelTypeEnum.yolox:
self.calculate_grids_strides(expanded=False)
if model_props["preset"]:
config.model.model_type = model_props["model_type"]
if model_props["model_type"] == ModelTypeEnum.yolonas:
logger.info(
"You are using yolo-nas with weights from DeciAI. "
"These weights are subject to their license and can't be used commercially. "
"For more information, see: https://docs.deci.ai/super-gradients/latest/LICENSE.YOLONAS.html"
)
from rknnlite.api import RKNNLite
self.rknn = RKNNLite(verbose=False)
if self.rknn.load_rknn(model_props["path"]) != 0:
logger.error("Error initializing rknn model.")
if self.rknn.init_runtime(core_mask=core_mask) != 0:
logger.error(
"Error initializing rknn runtime. Do you run docker in privileged mode?"
)
def __del__(self):
self.rknn.release()
def get_soc(self):
try:
with open("/proc/device-tree/compatible") as file:
soc = file.read().split(",")[-1].strip("\x00")
except FileNotFoundError:
raise Exception("Make sure to run docker in privileged mode.")
if soc not in supported_socs:
raise Exception(
f"Your SoC is not supported. Your SoC is: {soc}. Currently these SoCs are supported: {supported_socs}."
)
return soc
def parse_model_input(self, model_path, soc):
model_props = {}
# find out if user provides his own model
# user provided models should be a path and contain a "/"
if "/" in model_path:
model_props["preset"] = False
model_props["path"] = model_path
else:
model_props["preset"] = True
"""
Filenames follow this pattern:
origin-quant-basename-soc-tk_version-rev.rknn
origin: From where comes the model? default: upstream repo; rknn: modifications from airockchip
quant: i8 or fp16
basename: e.g. yolonas_s
soc: e.g. rk3588
tk_version: e.g. v2.0.0
rev: e.g. 1
Full name could be: default-fp16-yolonas_s-rk3588-v2.0.0-1.rknn
"""
model_matched = False
for model_type, pattern in supported_models.items():
if re.match(pattern, model_path):
model_matched = True
model_props["model_type"] = model_type
if model_matched:
model_props["filename"] = model_path + f"-{soc}-v2.3.2-1.rknn"
model_props["path"] = model_cache_dir + model_props["filename"]
if not os.path.isfile(model_props["path"]):
self.download_model(model_props["filename"])
else:
supported_models_str = ", ".join(
model[1:-1] for model in supported_models
)
raise Exception(
f"Model {model_path} is unsupported. Provide your own model or choose one of the following: {supported_models_str}"
)
return model_props
def download_model(self, filename):
if not os.path.isdir(model_cache_dir):
os.mkdir(model_cache_dir)
urllib.request.urlretrieve(
f"https://github.com/MarcA711/rknn-models/releases/download/v2.3.2/{filename}",
model_cache_dir + filename,
)
def check_config(self, config):
if (config.model.width != 320) or (config.model.height != 320):
raise Exception(
"Make sure to set the model width and height to 320 in your config."
)
if config.model.input_pixel_format != "bgr":
raise Exception(
'Make sure to set the model input_pixel_format to "bgr" in your config.'
)
if config.model.input_tensor != "nhwc":
raise Exception(
'Make sure to set the model input_tensor to "nhwc" in your config.'
)
def post_process_yolonas(self, output: list[np.ndarray]):
"""
@param output: output of inference
expected shape: [np.array(1, N, 4), np.array(1, N, 80)]
where N depends on the input size e.g. N=2100 for 320x320 images
@return: best results: np.array(20, 6) where each row is
in this order (class_id, score, y1/height, x1/width, y2/height, x2/width)
"""
N = output[0].shape[1]
boxes = output[0].reshape(N, 4)
scores = output[1].reshape(N, 80)
class_ids = np.argmax(scores, axis=1)
scores = scores[np.arange(N), class_ids]
args_best = np.argwhere(scores > self.thresh)[:, 0]
num_matches = len(args_best)
if num_matches == 0:
return np.zeros((20, 6), np.float32)
elif num_matches > 20:
args_best20 = np.argpartition(scores[args_best], -20)[-20:]
args_best = args_best[args_best20]
boxes = boxes[args_best]
class_ids = class_ids[args_best]
scores = scores[args_best]
boxes = np.transpose(
np.vstack(
(
boxes[:, 1] / self.height,
boxes[:, 0] / self.width,
boxes[:, 3] / self.height,
boxes[:, 2] / self.width,
)
)
)
results = np.hstack(
(class_ids[..., np.newaxis], scores[..., np.newaxis], boxes)
)
return np.resize(results, (20, 6))
def post_process_yolox(
self,
predictions: list[np.ndarray],
grids: np.ndarray,
expanded_strides: np.ndarray,
) -> np.ndarray:
def sp_flatten(_in: np.ndarray):
ch = _in.shape[1]
_in = _in.transpose(0, 2, 3, 1)
return _in.reshape(-1, ch)
boxes, scores, classes_conf = [], [], []
input_data = [
_in.reshape([1, -1] + list(_in.shape[-2:])) for _in in predictions
]
for i in range(len(input_data)):
unprocessed_box = input_data[i][:, :4, :, :]
box_xy = unprocessed_box[:, :2, :, :]
box_wh = np.exp(unprocessed_box[:, 2:4, :, :]) * expanded_strides[i]
box_xy += grids[i]
box_xy *= expanded_strides[i]
box = np.concatenate((box_xy, box_wh), axis=1)
# Convert [c_x, c_y, w, h] to [x1, y1, x2, y2]
xyxy = np.copy(box)
xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2 # top left x
xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2 # top left y
xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2 # bottom right x
xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2 # bottom right y
boxes.append(xyxy)
scores.append(input_data[i][:, 4:5, :, :])
classes_conf.append(input_data[i][:, 5:, :, :])
# flatten data
boxes = np.concatenate([sp_flatten(_v) for _v in boxes])
classes_conf = np.concatenate([sp_flatten(_v) for _v in classes_conf])
scores = np.concatenate([sp_flatten(_v) for _v in scores])
# reshape and filter boxes
box_confidences = scores.reshape(-1)
class_max_score = np.max(classes_conf, axis=-1)
classes = np.argmax(classes_conf, axis=-1)
_class_pos = np.where(class_max_score * box_confidences >= 0.4)
scores = (class_max_score * box_confidences)[_class_pos]
boxes = boxes[_class_pos]
classes = classes[_class_pos]
# run nms
indices = cv2.dnn.NMSBoxes(
bboxes=boxes,
scores=scores,
score_threshold=0.4,
nms_threshold=0.4,
)
results = np.zeros((20, 6), np.float32)
if len(indices) > 0:
for i, idx in enumerate(indices.flatten()[:20]):
box = boxes[idx]
results[i] = [
classes[idx],
scores[idx],
box[1] / self.height,
box[0] / self.width,
box[3] / self.height,
box[2] / self.width,
]
return results
def post_process(self, output):
if self.detector_config.model.model_type == ModelTypeEnum.yolonas:
return self.post_process_yolonas(output)
elif self.detector_config.model.model_type == ModelTypeEnum.yologeneric:
return post_process_yolo(output, self.width, self.height)
elif self.detector_config.model.model_type == ModelTypeEnum.yolox:
return self.post_process_yolox(output, self.grids, self.expanded_strides)
else:
raise ValueError(
f'Model type "{self.detector_config.model.model_type}" is currently not supported.'
)
def detect_raw(self, tensor_input):
output = self.rknn.inference(
[
tensor_input,
]
)
return self.post_process(output)