blakeblackshear.frigate/frigate/detectors/plugins/openvino.py
Martin Weinelt ab50d0b006
Add isort and ruff linter (#6575)
* Add isort and ruff linter

Both linters are pretty common among modern python code bases.

The isort tool provides stable sorting and grouping, as well as pruning
of unused imports.

Ruff is a modern linter, that is very fast due to being written in rust.
It can detect many common issues in a python codebase.

Removes the pylint dev requirement, since ruff replaces it.

* treewide: fix issues detected by ruff

* treewide: fix bare except clauses

* .devcontainer: Set up isort

* treewide: optimize imports

* treewide: apply black

* treewide: make regex patterns raw strings

This is necessary for escape sequences to be properly recognized.
2023-05-29 05:31:17 -05:00

175 lines
7.0 KiB
Python

import logging
import numpy as np
import openvino.runtime as ov
from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
logger = logging.getLogger(__name__)
DETECTOR_KEY = "openvino"
class OvDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default=None, title="Device Type")
class OvDetector(DetectionApi):
type_key = DETECTOR_KEY
def __init__(self, detector_config: OvDetectorConfig):
self.ov_core = ov.Core()
self.ov_model = self.ov_core.read_model(detector_config.model.path)
self.ov_model_type = detector_config.model.model_type
self.h = detector_config.model.height
self.w = detector_config.model.width
self.interpreter = self.ov_core.compile_model(
model=self.ov_model, device_name=detector_config.device
)
logger.info(f"Model Input Shape: {self.interpreter.input(0).shape}")
self.output_indexes = 0
while True:
try:
tensor_shape = self.interpreter.output(self.output_indexes).shape
logger.info(f"Model Output-{self.output_indexes} Shape: {tensor_shape}")
self.output_indexes += 1
except Exception:
logger.info(f"Model has {self.output_indexes} Output Tensors")
break
if self.ov_model_type == ModelTypeEnum.yolox:
self.num_classes = tensor_shape[2] - 5
logger.info(f"YOLOX model has {self.num_classes} classes")
self.set_strides_grids()
def set_strides_grids(self):
grids = []
expanded_strides = []
strides = [8, 16, 32]
hsizes = [self.h // stride for stride in strides]
wsizes = [self.w // stride for stride in strides]
for hsize, wsize, stride in zip(hsizes, wsizes, strides):
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
grids.append(grid)
shape = grid.shape[:2]
expanded_strides.append(np.full((*shape, 1), stride))
self.grids = np.concatenate(grids, 1)
self.expanded_strides = np.concatenate(expanded_strides, 1)
## Takes in class ID, confidence score, and array of [x, y, w, h] that describes detection position,
## returns an array that's easily passable back to Frigate.
def process_yolo(self, class_id, conf, pos):
return [
class_id, # class ID
conf, # confidence score
(pos[1] - (pos[3] / 2)) / self.h, # y_min
(pos[0] - (pos[2] / 2)) / self.w, # x_min
(pos[1] + (pos[3] / 2)) / self.h, # y_max
(pos[0] + (pos[2] / 2)) / self.w, # x_max
]
def detect_raw(self, tensor_input):
infer_request = self.interpreter.create_infer_request()
infer_request.infer([tensor_input])
if self.ov_model_type == ModelTypeEnum.ssd:
results = infer_request.get_output_tensor()
detections = np.zeros((20, 6), np.float32)
i = 0
for object_detected in results.data[0, 0, :]:
if object_detected[0] != -1:
logger.debug(object_detected)
if object_detected[2] < 0.1 or i == 20:
break
detections[i] = [
object_detected[1], # Label ID
float(object_detected[2]), # Confidence
object_detected[4], # y_min
object_detected[3], # x_min
object_detected[6], # y_max
object_detected[5], # x_max
]
i += 1
return detections
elif self.ov_model_type == ModelTypeEnum.yolox:
out_tensor = infer_request.get_output_tensor()
# [x, y, h, w, box_score, class_no_1, ..., class_no_80],
results = out_tensor.data
results[..., :2] = (results[..., :2] + self.grids) * self.expanded_strides
results[..., 2:4] = np.exp(results[..., 2:4]) * self.expanded_strides
image_pred = results[0, ...]
class_conf = np.max(
image_pred[:, 5 : 5 + self.num_classes], axis=1, keepdims=True
)
class_pred = np.argmax(image_pred[:, 5 : 5 + self.num_classes], axis=1)
class_pred = np.expand_dims(class_pred, axis=1)
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= 0.3).squeeze()
# Detections ordered as (x1, y1, x2, y2, obj_conf, class_conf, class_pred)
dets = np.concatenate((image_pred[:, :5], class_conf, class_pred), axis=1)
dets = dets[conf_mask]
ordered = dets[dets[:, 5].argsort()[::-1]][:20]
detections = np.zeros((20, 6), np.float32)
for i, object_detected in enumerate(ordered):
detections[i] = self.process_yolo(
object_detected[6], object_detected[5], object_detected[:4]
)
return detections
elif self.ov_model_type == ModelTypeEnum.yolov8:
out_tensor = infer_request.get_output_tensor()
results = out_tensor.data[0]
output_data = np.transpose(results)
scores = np.max(output_data[:, 4:], axis=1)
if len(scores) == 0:
return np.zeros((20, 6), np.float32)
scores = np.expand_dims(scores, axis=1)
# add scores to the last column
dets = np.concatenate((output_data, scores), axis=1)
# filter out lines with scores below threshold
dets = dets[dets[:, -1] > 0.5, :]
# limit to top 20 scores, descending order
ordered = dets[dets[:, -1].argsort()[::-1]][:20]
detections = np.zeros((20, 6), np.float32)
for i, object_detected in enumerate(ordered):
detections[i] = self.process_yolo(
np.argmax(object_detected[4:-1]),
object_detected[-1],
object_detected[:4],
)
return detections
elif self.ov_model_type == ModelTypeEnum.yolov5:
out_tensor = infer_request.get_output_tensor()
output_data = out_tensor.data[0]
# filter out lines with scores below threshold
conf_mask = (output_data[:, 4] >= 0.5).squeeze()
output_data = output_data[conf_mask]
# limit to top 20 scores, descending order
ordered = output_data[output_data[:, 4].argsort()[::-1]][:20]
detections = np.zeros((20, 6), np.float32)
for i, object_detected in enumerate(ordered):
detections[i] = self.process_yolo(
np.argmax(object_detected[5:]),
object_detected[4],
object_detected[:4],
)
return detections