Add automatic RKNN conversion and support for semantic search model (#19676)

* Create RKNN model runner and and use for jina v1 clip

* Formatting

* Handle model type inference

* Properly provide input to RKNN

* Adjust rknn conversion

* Update docs

* Formatting

* Fix path handling

* Handle inputs

* Cleanup

* Change normalization for better accuracy

* Clarify supported models

* Remove testing
This commit is contained in:
Nicolas Mowen
2025-08-21 05:30:14 -06:00
committed by GitHub
parent efeb089ff8
commit 1be84d6833
4 changed files with 233 additions and 23 deletions

View File

@@ -4,10 +4,12 @@ import logging
import os.path
from typing import Any
import numpy as np
import onnxruntime as ort
from frigate.const import MODEL_CACHE_DIR
from frigate.util.model import get_ort_providers
from frigate.util.rknn_converter import auto_convert_model, is_rknn_compatible
try:
import openvino as ov
@@ -25,7 +27,33 @@ class ONNXModelRunner:
self.model_path = model_path
self.ort: ort.InferenceSession = None
self.ov: ov.Core = None
providers, options = get_ort_providers(device == "CPU", device, requires_fp16)
self.rknn = None
self.type = "ort"
try:
if device != "CPU" and is_rknn_compatible(model_path):
# Try to auto-convert to RKNN format
rknn_path = auto_convert_model(model_path)
if rknn_path:
try:
self.rknn = RKNNModelRunner(rknn_path, device)
self.type = "rknn"
logger.info(f"Using RKNN model: {rknn_path}")
return
except Exception as e:
logger.debug(
f"Failed to load RKNN model, falling back to ONNX: {e}"
)
self.rknn = None
except ImportError:
pass
# Fall back to standard ONNX providers
providers, options = get_ort_providers(
device == "CPU",
device,
requires_fp16,
)
self.interpreter = None
if "OpenVINOExecutionProvider" in providers:
@@ -55,7 +83,9 @@ class ONNXModelRunner:
)
def get_input_names(self) -> list[str]:
if self.type == "ov":
if self.type == "rknn":
return self.rknn.get_input_names()
elif self.type == "ov":
input_names = []
for input in self.interpreter.inputs:
@@ -67,7 +97,9 @@ class ONNXModelRunner:
def get_input_width(self):
"""Get the input width of the model regardless of backend."""
if self.type == "ort":
if self.type == "rknn":
return self.rknn.get_input_width()
elif self.type == "ort":
return self.ort.get_inputs()[0].shape[3]
elif self.type == "ov":
input_info = self.interpreter.inputs
@@ -90,8 +122,10 @@ class ONNXModelRunner:
return -1
return -1
def run(self, input: dict[str, Any]) -> Any:
if self.type == "ov":
def run(self, input: dict[str, Any]) -> Any | None:
if self.type == "rknn":
return self.rknn.run(input)
elif self.type == "ov":
infer_request = self.interpreter.create_infer_request()
try:
@@ -107,3 +141,121 @@ class ONNXModelRunner:
return outputs
elif self.type == "ort":
return self.ort.run(None, input)
class RKNNModelRunner:
"""Run RKNN models for embeddings."""
def __init__(self, model_path: str, device: str = "AUTO", model_type: str = None):
self.model_path = model_path
self.device = device
self.model_type = model_type
self.rknn = None
self._load_model()
def _load_model(self):
"""Load the RKNN model."""
try:
from rknnlite.api import RKNNLite
self.rknn = RKNNLite(verbose=False)
if self.rknn.load_rknn(self.model_path) != 0:
logger.error(f"Failed to load RKNN model: {self.model_path}")
raise RuntimeError("Failed to load RKNN model")
if self.rknn.init_runtime() != 0:
logger.error("Failed to initialize RKNN runtime")
raise RuntimeError("Failed to initialize RKNN runtime")
logger.info(f"Successfully loaded RKNN model: {self.model_path}")
except ImportError:
logger.error("RKNN Lite not available")
raise ImportError("RKNN Lite not available")
except Exception as e:
logger.error(f"Error loading RKNN model: {e}")
raise
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
# For CLIP models, we need to determine the model type from the path
model_name = os.path.basename(self.model_path).lower()
if "vision" in model_name:
return ["pixel_values"]
else:
# Default fallback - try to infer from model type
if self.model_type and "jina-clip" in self.model_type:
if "vision" in self.model_type:
return ["pixel_values"]
# Generic fallback
return ["input"]
def get_input_width(self) -> int:
"""Get the input width of the model."""
# For CLIP vision models, this is typically 224
model_name = os.path.basename(self.model_path).lower()
if "vision" in model_name:
return 224 # CLIP V1 uses 224x224
return -1
def run(self, inputs: dict[str, Any]) -> Any:
"""Run inference with the RKNN model."""
if not self.rknn:
raise RuntimeError("RKNN model not loaded")
try:
input_names = self.get_input_names()
rknn_inputs = []
for name in input_names:
if name in inputs:
if name == "pixel_values":
# RKNN expects NHWC format, but ONNX typically provides NCHW
# Transpose from [batch, channels, height, width] to [batch, height, width, channels]
pixel_data = inputs[name]
if len(pixel_data.shape) == 4 and pixel_data.shape[1] == 3:
# Transpose from NCHW to NHWC
pixel_data = np.transpose(pixel_data, (0, 2, 3, 1))
rknn_inputs.append(pixel_data)
else:
rknn_inputs.append(inputs[name])
else:
logger.warning(f"Input '{name}' not found in inputs, using default")
if name == "pixel_values":
batch_size = 1
if inputs:
for val in inputs.values():
if hasattr(val, "shape") and len(val.shape) > 0:
batch_size = val.shape[0]
break
# Create default in NHWC format as expected by RKNN
rknn_inputs.append(
np.zeros((batch_size, 224, 224, 3), dtype=np.float32)
)
else:
batch_size = 1
if inputs:
for val in inputs.values():
if hasattr(val, "shape") and len(val.shape) > 0:
batch_size = val.shape[0]
break
rknn_inputs.append(np.zeros((batch_size, 1), dtype=np.float32))
outputs = self.rknn.inference(inputs=rknn_inputs)
return outputs
except Exception as e:
logger.error(f"Error during RKNN inference: {e}")
raise
def __del__(self):
"""Cleanup when the runner is destroyed."""
if self.rknn:
try:
self.rknn.release()
except Exception:
pass

View File

@@ -27,9 +27,50 @@ MODEL_TYPE_CONFIGS = {
"std_values": [[255, 255, 255]],
"target_platform": None, # Will be set dynamically
},
"jina-clip-v1-vision": {
"mean_values": [[0.48145466 * 255, 0.4578275 * 255, 0.40821073 * 255]],
"std_values": [[0.26862954 * 255, 0.26130258 * 255, 0.27577711 * 255]],
"target_platform": None, # Will be set dynamically
},
}
def get_rknn_model_type(model_path: str) -> str | None:
if all(keyword in str(model_path) for keyword in ["jina-clip-v1", "vision"]):
return "jina-clip-v1-vision"
model_name = os.path.basename(str(model_path)).lower()
if any(keyword in model_name for keyword in ["yolo", "yolox", "yolonas"]):
return model_name
return None
def is_rknn_compatible(model_path: str, model_type: str | None = None) -> bool:
"""
Check if a model is compatible with RKNN conversion.
Args:
model_path: Path to the model file
model_type: Type of the model (if known)
Returns:
True if the model is RKNN-compatible, False otherwise
"""
soc = get_soc_type()
if soc is None:
return False
if not model_type:
model_type = get_rknn_model_type(model_path)
if model_type and model_type in MODEL_TYPE_CONFIGS:
return True
return False
def ensure_torch_dependencies() -> bool:
"""Dynamically install torch dependencies if not available."""
try:
@@ -67,13 +108,12 @@ def ensure_torch_dependencies() -> bool:
def ensure_rknn_toolkit() -> bool:
"""Ensure RKNN toolkit is available."""
try:
import rknn # type: ignore # noqa: F401
from rknn.api import RKNN # type: ignore # noqa: F401
logger.debug("RKNN toolkit is already available")
return True
except ImportError:
logger.error("RKNN toolkit not found. Please ensure it's installed.")
except ImportError as e:
logger.error(f"RKNN toolkit not found. Please ensure it's installed. {e}")
return False
@@ -109,11 +149,11 @@ def convert_onnx_to_rknn(
True if conversion successful, False otherwise
"""
if not ensure_torch_dependencies():
logger.error("PyTorch dependencies not available")
logger.debug("PyTorch dependencies not available")
return False
if not ensure_rknn_toolkit():
logger.error("RKNN toolkit not available")
logger.debug("RKNN toolkit not available")
return False
# Get SoC type if not provided
@@ -125,7 +165,7 @@ def convert_onnx_to_rknn(
# Get model config for the specified type
if model_type not in MODEL_TYPE_CONFIGS:
logger.error(f"Unsupported model type: {model_type}")
logger.debug(f"Unsupported model type: {model_type}")
return False
config = MODEL_TYPE_CONFIGS[model_type].copy()
@@ -138,7 +178,16 @@ def convert_onnx_to_rknn(
rknn = RKNN(verbose=True)
rknn.config(**config)
if rknn.load_onnx(model=onnx_path) != 0:
if model_type == "jina-clip-v1-vision":
load_output = rknn.load_onnx(
model=onnx_path,
inputs=["pixel_values"],
input_size_list=[[1, 3, 224, 224]],
)
else:
load_output = rknn.load_onnx(model=onnx_path)
if load_output != 0:
logger.error("Failed to load ONNX model")
return False
@@ -265,7 +314,7 @@ def is_lock_stale(lock_file_path: Path, max_age: int = 600) -> bool:
def wait_for_conversion_completion(
rknn_path: Path, lock_file_path: Path, timeout: int = 300
model_type: str, rknn_path: Path, lock_file_path: Path, timeout: int = 300
) -> bool:
"""
Wait for another process to complete the conversion.
@@ -307,7 +356,7 @@ def wait_for_conversion_completion(
# Check if RKNN file appeared while waiting
if rknn_path.exists():
logger.info(f"RKNN model appeared while waiting: {rknn_path}")
return str(rknn_path)
return True
# Convert ONNX to RKNN
logger.info(
@@ -320,12 +369,12 @@ def wait_for_conversion_completion(
if onnx_path.exists():
if convert_onnx_to_rknn(
str(onnx_path), str(rknn_path), "yolo-generic", False
str(onnx_path), str(rknn_path), model_type, False
):
return str(rknn_path)
return True
logger.error("Failed to convert model after stale lock cleanup")
return None
return False
finally:
release_conversion_lock(lock_file_path)
@@ -338,7 +387,7 @@ def wait_for_conversion_completion(
def auto_convert_model(
model_path: str, model_type: str, quantization: bool = False
model_path: str, model_type: str | None = None, quantization: bool = False
) -> Optional[str]:
"""
Automatically convert a model to RKNN format if needed.
@@ -377,6 +426,9 @@ def auto_convert_model(
logger.info(f"Converting {model_path} to RKNN format...")
rknn_path.parent.mkdir(parents=True, exist_ok=True)
if not model_type:
model_type = get_rknn_model_type(base_path)
if convert_onnx_to_rknn(
str(base_path), str(rknn_path), model_type, quantization
):
@@ -392,7 +444,10 @@ def auto_convert_model(
f"Another process is converting {model_path}, waiting for completion..."
)
if wait_for_conversion_completion(rknn_path, lock_file_path):
if not model_type:
model_type = get_rknn_model_type(base_path)
if wait_for_conversion_completion(model_type, rknn_path, lock_file_path):
return str(rknn_path)
else:
logger.error(f"Timeout waiting for conversion of {model_path}")