mirror of
				https://github.com/blakeblackshear/frigate.git
				synced 2025-10-27 10:52:11 +01:00 
			
		
		
		
	* Add other rockchip download models * Specify newer release version * Specify newer release version * Update docs for rknn downloads * Update hardware docs
		
			
				
	
	
		
			305 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			305 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import os.path
 | |
| import re
 | |
| import urllib.request
 | |
| from typing import Literal
 | |
| 
 | |
| import cv2
 | |
| import numpy as np
 | |
| from pydantic import Field
 | |
| 
 | |
| from frigate.const import MODEL_CACHE_DIR
 | |
| from frigate.detectors.detection_api import DetectionApi
 | |
| from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
 | |
| from frigate.util.model import post_process_yolo
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| DETECTOR_KEY = "rknn"
 | |
| 
 | |
| supported_socs = ["rk3562", "rk3566", "rk3568", "rk3576", "rk3588"]
 | |
| 
 | |
| supported_models = {
 | |
|     ModelTypeEnum.yologeneric: "^frigate-fp16-yolov9-[cemst]$",
 | |
|     ModelTypeEnum.yolonas: "^deci-fp16-yolonas_[sml]$",
 | |
|     ModelTypeEnum.yolox: "^rock-(fp16|i8)-yolox_(nano|tiny)$",
 | |
| }
 | |
| 
 | |
| model_cache_dir = os.path.join(MODEL_CACHE_DIR, "rknn_cache/")
 | |
| 
 | |
| 
 | |
| class RknnDetectorConfig(BaseDetectorConfig):
 | |
|     type: Literal[DETECTOR_KEY]
 | |
|     num_cores: int = Field(default=0, ge=0, le=3, title="Number of NPU cores to use.")
 | |
| 
 | |
| 
 | |
| class Rknn(DetectionApi):
 | |
|     type_key = DETECTOR_KEY
 | |
| 
 | |
|     def __init__(self, config: RknnDetectorConfig):
 | |
|         super().__init__(config)
 | |
|         self.height = config.model.height
 | |
|         self.width = config.model.width
 | |
|         core_mask = 2**config.num_cores - 1
 | |
|         soc = self.get_soc()
 | |
| 
 | |
|         model_path = config.model.path or "deci-fp16-yolonas_s"
 | |
| 
 | |
|         model_props = self.parse_model_input(model_path, soc)
 | |
| 
 | |
|         if self.detector_config.model.model_type == ModelTypeEnum.yolox:
 | |
|             self.calculate_grids_strides(expanded=False)
 | |
| 
 | |
|         if model_props["preset"]:
 | |
|             config.model.model_type = model_props["model_type"]
 | |
| 
 | |
|             if model_props["model_type"] == ModelTypeEnum.yolonas:
 | |
|                 logger.info(
 | |
|                     "You are using yolo-nas with weights from DeciAI. "
 | |
|                     "These weights are subject to their license and can't be used commercially. "
 | |
|                     "For more information, see: https://docs.deci.ai/super-gradients/latest/LICENSE.YOLONAS.html"
 | |
|                 )
 | |
| 
 | |
|         from rknnlite.api import RKNNLite
 | |
| 
 | |
|         self.rknn = RKNNLite(verbose=False)
 | |
|         if self.rknn.load_rknn(model_props["path"]) != 0:
 | |
|             logger.error("Error initializing rknn model.")
 | |
|         if self.rknn.init_runtime(core_mask=core_mask) != 0:
 | |
|             logger.error(
 | |
|                 "Error initializing rknn runtime. Do you run docker in privileged mode?"
 | |
|             )
 | |
| 
 | |
|     def __del__(self):
 | |
|         self.rknn.release()
 | |
| 
 | |
|     def get_soc(self):
 | |
|         try:
 | |
|             with open("/proc/device-tree/compatible") as file:
 | |
|                 soc = file.read().split(",")[-1].strip("\x00")
 | |
|         except FileNotFoundError:
 | |
|             raise Exception("Make sure to run docker in privileged mode.")
 | |
| 
 | |
|         if soc not in supported_socs:
 | |
|             raise Exception(
 | |
|                 f"Your SoC is not supported. Your SoC is: {soc}. Currently these SoCs are supported: {supported_socs}."
 | |
|             )
 | |
| 
 | |
|         return soc
 | |
| 
 | |
|     def parse_model_input(self, model_path, soc):
 | |
|         model_props = {}
 | |
| 
 | |
|         # find out if user provides his own model
 | |
|         # user provided models should be a path and contain a "/"
 | |
|         if "/" in model_path:
 | |
|             model_props["preset"] = False
 | |
|             model_props["path"] = model_path
 | |
|         else:
 | |
|             model_props["preset"] = True
 | |
| 
 | |
|             """
 | |
|             Filenames follow this pattern:
 | |
|             origin-quant-basename-soc-tk_version-rev.rknn
 | |
|             origin: From where comes the model? default: upstream repo; rknn: modifications from airockchip
 | |
|             quant: i8 or fp16
 | |
|             basename: e.g. yolonas_s
 | |
|             soc: e.g. rk3588
 | |
|             tk_version: e.g. v2.0.0
 | |
|             rev: e.g. 1
 | |
| 
 | |
|             Full name could be: default-fp16-yolonas_s-rk3588-v2.0.0-1.rknn
 | |
|             """
 | |
| 
 | |
|             model_matched = False
 | |
| 
 | |
|             for model_type, pattern in supported_models.items():
 | |
|                 if re.match(pattern, model_path):
 | |
|                     model_matched = True
 | |
|                     model_props["model_type"] = model_type
 | |
| 
 | |
|             if model_matched:
 | |
|                 model_props["filename"] = model_path + f"-{soc}-v2.3.2-1.rknn"
 | |
| 
 | |
|                 model_props["path"] = model_cache_dir + model_props["filename"]
 | |
| 
 | |
|                 if not os.path.isfile(model_props["path"]):
 | |
|                     self.download_model(model_props["filename"])
 | |
|             else:
 | |
|                 supported_models_str = ", ".join(
 | |
|                     model[1:-1] for model in supported_models
 | |
|                 )
 | |
|                 raise Exception(
 | |
|                     f"Model {model_path} is unsupported. Provide your own model or choose one of the following: {supported_models_str}"
 | |
|                 )
 | |
| 
 | |
|         return model_props
 | |
| 
 | |
|     def download_model(self, filename):
 | |
|         if not os.path.isdir(model_cache_dir):
 | |
|             os.mkdir(model_cache_dir)
 | |
| 
 | |
|         urllib.request.urlretrieve(
 | |
|             f"https://github.com/MarcA711/rknn-models/releases/download/v2.3.2/{filename}",
 | |
|             model_cache_dir + filename,
 | |
|         )
 | |
| 
 | |
|     def check_config(self, config):
 | |
|         if (config.model.width != 320) or (config.model.height != 320):
 | |
|             raise Exception(
 | |
|                 "Make sure to set the model width and height to 320 in your config."
 | |
|             )
 | |
| 
 | |
|         if config.model.input_pixel_format != "bgr":
 | |
|             raise Exception(
 | |
|                 'Make sure to set the model input_pixel_format to "bgr" in your config.'
 | |
|             )
 | |
| 
 | |
|         if config.model.input_tensor != "nhwc":
 | |
|             raise Exception(
 | |
|                 'Make sure to set the model input_tensor to "nhwc" in your config.'
 | |
|             )
 | |
| 
 | |
|     def post_process_yolonas(self, output: list[np.ndarray]):
 | |
|         """
 | |
|         @param output: output of inference
 | |
|         expected shape: [np.array(1, N, 4), np.array(1, N, 80)]
 | |
|         where N depends on the input size e.g. N=2100 for 320x320 images
 | |
| 
 | |
|         @return: best results: np.array(20, 6) where each row is
 | |
|         in this order (class_id, score, y1/height, x1/width, y2/height, x2/width)
 | |
|         """
 | |
| 
 | |
|         N = output[0].shape[1]
 | |
| 
 | |
|         boxes = output[0].reshape(N, 4)
 | |
|         scores = output[1].reshape(N, 80)
 | |
| 
 | |
|         class_ids = np.argmax(scores, axis=1)
 | |
|         scores = scores[np.arange(N), class_ids]
 | |
| 
 | |
|         args_best = np.argwhere(scores > self.thresh)[:, 0]
 | |
| 
 | |
|         num_matches = len(args_best)
 | |
|         if num_matches == 0:
 | |
|             return np.zeros((20, 6), np.float32)
 | |
|         elif num_matches > 20:
 | |
|             args_best20 = np.argpartition(scores[args_best], -20)[-20:]
 | |
|             args_best = args_best[args_best20]
 | |
| 
 | |
|         boxes = boxes[args_best]
 | |
|         class_ids = class_ids[args_best]
 | |
|         scores = scores[args_best]
 | |
| 
 | |
|         boxes = np.transpose(
 | |
|             np.vstack(
 | |
|                 (
 | |
|                     boxes[:, 1] / self.height,
 | |
|                     boxes[:, 0] / self.width,
 | |
|                     boxes[:, 3] / self.height,
 | |
|                     boxes[:, 2] / self.width,
 | |
|                 )
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         results = np.hstack(
 | |
|             (class_ids[..., np.newaxis], scores[..., np.newaxis], boxes)
 | |
|         )
 | |
| 
 | |
|         return np.resize(results, (20, 6))
 | |
| 
 | |
|     def post_process_yolox(
 | |
|         self,
 | |
|         predictions: list[np.ndarray],
 | |
|         grids: np.ndarray,
 | |
|         expanded_strides: np.ndarray,
 | |
|     ) -> np.ndarray:
 | |
|         def sp_flatten(_in: np.ndarray):
 | |
|             ch = _in.shape[1]
 | |
|             _in = _in.transpose(0, 2, 3, 1)
 | |
|             return _in.reshape(-1, ch)
 | |
| 
 | |
|         boxes, scores, classes_conf = [], [], []
 | |
| 
 | |
|         input_data = [
 | |
|             _in.reshape([1, -1] + list(_in.shape[-2:])) for _in in predictions
 | |
|         ]
 | |
| 
 | |
|         for i in range(len(input_data)):
 | |
|             unprocessed_box = input_data[i][:, :4, :, :]
 | |
|             box_xy = unprocessed_box[:, :2, :, :]
 | |
|             box_wh = np.exp(unprocessed_box[:, 2:4, :, :]) * expanded_strides[i]
 | |
| 
 | |
|             box_xy += grids[i]
 | |
|             box_xy *= expanded_strides[i]
 | |
|             box = np.concatenate((box_xy, box_wh), axis=1)
 | |
| 
 | |
|             # Convert [c_x, c_y, w, h] to [x1, y1, x2, y2]
 | |
|             xyxy = np.copy(box)
 | |
|             xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2  # top left x
 | |
|             xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2  # top left y
 | |
|             xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2  # bottom right x
 | |
|             xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2  # bottom right y
 | |
| 
 | |
|             boxes.append(xyxy)
 | |
|             scores.append(input_data[i][:, 4:5, :, :])
 | |
|             classes_conf.append(input_data[i][:, 5:, :, :])
 | |
| 
 | |
|         # flatten data
 | |
|         boxes = np.concatenate([sp_flatten(_v) for _v in boxes])
 | |
|         classes_conf = np.concatenate([sp_flatten(_v) for _v in classes_conf])
 | |
|         scores = np.concatenate([sp_flatten(_v) for _v in scores])
 | |
| 
 | |
|         # reshape and filter boxes
 | |
|         box_confidences = scores.reshape(-1)
 | |
|         class_max_score = np.max(classes_conf, axis=-1)
 | |
|         classes = np.argmax(classes_conf, axis=-1)
 | |
|         _class_pos = np.where(class_max_score * box_confidences >= 0.4)
 | |
|         scores = (class_max_score * box_confidences)[_class_pos]
 | |
|         boxes = boxes[_class_pos]
 | |
|         classes = classes[_class_pos]
 | |
| 
 | |
|         # run nms
 | |
|         indices = cv2.dnn.NMSBoxes(
 | |
|             bboxes=boxes,
 | |
|             scores=scores,
 | |
|             score_threshold=0.4,
 | |
|             nms_threshold=0.4,
 | |
|         )
 | |
| 
 | |
|         results = np.zeros((20, 6), np.float32)
 | |
| 
 | |
|         if len(indices) > 0:
 | |
|             for i, idx in enumerate(indices.flatten()[:20]):
 | |
|                 box = boxes[idx]
 | |
|                 results[i] = [
 | |
|                     classes[idx],
 | |
|                     scores[idx],
 | |
|                     box[1] / self.height,
 | |
|                     box[0] / self.width,
 | |
|                     box[3] / self.height,
 | |
|                     box[2] / self.width,
 | |
|                 ]
 | |
| 
 | |
|         return results
 | |
| 
 | |
|     def post_process(self, output):
 | |
|         if self.detector_config.model.model_type == ModelTypeEnum.yolonas:
 | |
|             return self.post_process_yolonas(output)
 | |
|         elif self.detector_config.model.model_type == ModelTypeEnum.yologeneric:
 | |
|             return post_process_yolo(output, self.width, self.height)
 | |
|         elif self.detector_config.model.model_type == ModelTypeEnum.yolox:
 | |
|             return self.post_process_yolox(output, self.grids, self.expanded_strides)
 | |
|         else:
 | |
|             raise ValueError(
 | |
|                 f'Model type "{self.detector_config.model.model_type}" is currently not supported.'
 | |
|             )
 | |
| 
 | |
|     def detect_raw(self, tensor_input):
 | |
|         output = self.rknn.inference(
 | |
|             [
 | |
|                 tensor_input,
 | |
|             ]
 | |
|         )
 | |
|         return self.post_process(output)
 |