blakeblackshear.frigate/frigate/detectors/plugins/openvino.py
Simonas Kazlauskas 4ff81d5877
yolo nas: do not invalidate model when input shape is different (#18799)
Model can be adjusted ahead of time to NHWC to avoid transpose on CPU,
for example. All the model information is already presented in the
configuration, and the stringent checks implemented in openvino are not
present on other backends anyway. OpenVINO will properly report issues
with mismatched layouts anyhow.
2025-06-20 09:11:48 -06:00

232 lines
8.7 KiB
Python

import logging
import os
import numpy as np
import openvino as ov
from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
from frigate.util.model import (
post_process_dfine,
post_process_rfdetr,
post_process_yolo,
)
logger = logging.getLogger(__name__)
DETECTOR_KEY = "openvino"
class OvDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default=None, title="Device Type")
class OvDetector(DetectionApi):
type_key = DETECTOR_KEY
supported_models = [
ModelTypeEnum.dfine,
ModelTypeEnum.rfdetr,
ModelTypeEnum.ssd,
ModelTypeEnum.yolonas,
ModelTypeEnum.yologeneric,
ModelTypeEnum.yolox,
]
def __init__(self, detector_config: OvDetectorConfig):
super().__init__(detector_config)
self.ov_core = ov.Core()
self.ov_model_type = detector_config.model.model_type
self.h = detector_config.model.height
self.w = detector_config.model.width
if not os.path.isfile(detector_config.model.path):
logger.error(f"OpenVino model file {detector_config.model.path} not found.")
raise FileNotFoundError
self.interpreter = self.ov_core.compile_model(
model=detector_config.model.path, device_name=detector_config.device
)
self.model_invalid = False
if self.ov_model_type not in self.supported_models:
logger.error(
f"OpenVino detector does not support {self.ov_model_type} models."
)
self.model_invalid = True
if self.ov_model_type == ModelTypeEnum.ssd:
model_inputs = self.interpreter.inputs
model_outputs = self.interpreter.outputs
if len(model_inputs) != 1:
logger.error(
f"SSD models must only have 1 input. Found {len(model_inputs)}."
)
self.model_invalid = True
if len(model_outputs) != 1:
logger.error(
f"SSD models must only have 1 output. Found {len(model_outputs)}."
)
self.model_invalid = True
output_shape = model_outputs[0].get_shape()
if output_shape[0] != 1 or output_shape[1] != 1 or output_shape[3] != 7:
logger.error(f"SSD model output doesn't match. Found {output_shape}.")
self.model_invalid = True
if self.ov_model_type == ModelTypeEnum.yolonas:
model_inputs = self.interpreter.inputs
model_outputs = self.interpreter.outputs
if len(model_inputs) != 1:
logger.error(
f"YoloNAS models must only have 1 input. Found {len(model_inputs)}."
)
self.model_invalid = True
if len(model_outputs) != 1:
logger.error(
f"YoloNAS models must be exported in flat format and only have 1 output. Found {len(model_outputs)}."
)
self.model_invalid = True
output_shape = model_outputs[0].partial_shape
if output_shape[-1] != 7:
logger.error(
f"YoloNAS models must be exported in flat format. Model output doesn't match. Found {output_shape}."
)
self.model_invalid = True
if self.ov_model_type == ModelTypeEnum.yolox:
self.output_indexes = 0
while True:
try:
tensor_shape = self.interpreter.output(self.output_indexes).shape
logger.info(
f"Model Output-{self.output_indexes} Shape: {tensor_shape}"
)
self.output_indexes += 1
except Exception:
logger.info(f"Model has {self.output_indexes} Output Tensors")
break
self.num_classes = tensor_shape[2] - 5
logger.info(f"YOLOX model has {self.num_classes} classes")
self.calculate_grids_strides()
## Takes in class ID, confidence score, and array of [x, y, w, h] that describes detection position,
## returns an array that's easily passable back to Frigate.
def process_yolo(self, class_id, conf, pos):
return [
class_id, # class ID
conf, # confidence score
(pos[1] - (pos[3] / 2)) / self.h, # y_min
(pos[0] - (pos[2] / 2)) / self.w, # x_min
(pos[1] + (pos[3] / 2)) / self.h, # y_max
(pos[0] + (pos[2] / 2)) / self.w, # x_max
]
def detect_raw(self, tensor_input):
infer_request = self.interpreter.create_infer_request()
# TODO: see if we can use shared_memory=True
input_tensor = ov.Tensor(array=tensor_input)
if self.ov_model_type == ModelTypeEnum.dfine:
infer_request.set_tensor("images", input_tensor)
target_sizes_tensor = ov.Tensor(
np.array([[self.h, self.w]], dtype=np.int64)
)
infer_request.set_tensor("orig_target_sizes", target_sizes_tensor)
infer_request.infer()
tensor_output = (
infer_request.get_output_tensor(0).data,
infer_request.get_output_tensor(1).data,
infer_request.get_output_tensor(2).data,
)
return post_process_dfine(tensor_output, self.w, self.h)
infer_request.infer(input_tensor)
detections = np.zeros((20, 6), np.float32)
if self.model_invalid:
return detections
elif self.ov_model_type == ModelTypeEnum.rfdetr:
return post_process_rfdetr(
[
infer_request.get_output_tensor(0).data,
infer_request.get_output_tensor(1).data,
]
)
elif self.ov_model_type == ModelTypeEnum.ssd:
results = infer_request.get_output_tensor(0).data[0][0]
for i, (_, class_id, score, xmin, ymin, xmax, ymax) in enumerate(results):
if i == 20:
break
detections[i] = [
class_id,
float(score),
ymin,
xmin,
ymax,
xmax,
]
return detections
elif self.ov_model_type == ModelTypeEnum.yolonas:
predictions = infer_request.get_output_tensor(0).data
for i, prediction in enumerate(predictions):
if i == 20:
break
(_, x_min, y_min, x_max, y_max, confidence, class_id) = prediction
# when running in GPU mode, empty predictions in the output have class_id of -1
if class_id < 0:
break
detections[i] = [
class_id,
confidence,
y_min / self.h,
x_min / self.w,
y_max / self.h,
x_max / self.w,
]
return detections
elif self.ov_model_type == ModelTypeEnum.yologeneric:
out_tensor = []
for item in infer_request.output_tensors:
out_tensor.append(item.data)
return post_process_yolo(out_tensor, self.w, self.h)
elif self.ov_model_type == ModelTypeEnum.yolox:
out_tensor = infer_request.get_output_tensor()
# [x, y, h, w, box_score, class_no_1, ..., class_no_80],
results = out_tensor.data
results[..., :2] = (results[..., :2] + self.grids) * self.expanded_strides
results[..., 2:4] = np.exp(results[..., 2:4]) * self.expanded_strides
image_pred = results[0, ...]
class_conf = np.max(
image_pred[:, 5 : 5 + self.num_classes], axis=1, keepdims=True
)
class_pred = np.argmax(image_pred[:, 5 : 5 + self.num_classes], axis=1)
class_pred = np.expand_dims(class_pred, axis=1)
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= 0.3).squeeze()
# Detections ordered as (x1, y1, x2, y2, obj_conf, class_conf, class_pred)
detections = np.concatenate(
(image_pred[:, :5], class_conf, class_pred), axis=1
)
detections = detections[conf_mask]
ordered = detections[detections[:, 5].argsort()[::-1]][:20]
for i, object_detected in enumerate(ordered):
detections[i] = self.process_yolo(
object_detected[6], object_detected[5], object_detected[:4]
)
return detections