blakeblackshear.frigate/frigate/detectors/plugins/memryx.py
Tim Wesley 6cd1d1f205
memryx: fix model download bug when using multiple detectors (#20030)
* Add locking for model download files

* ruff format

---------

Co-authored-by: Abinila Siva <abinila.siva@memryx.com>
2025-09-15 08:48:55 -05:00

768 lines
30 KiB
Python

import glob
import logging
import os
import shutil
import time
import urllib.request
import zipfile
from queue import Queue
import cv2
import numpy as np
from pydantic import BaseModel, Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import (
BaseDetectorConfig,
ModelTypeEnum,
)
from frigate.util.model import post_process_yolo
logger = logging.getLogger(__name__)
DETECTOR_KEY = "memryx"
# Configuration class for model settings
class ModelConfig(BaseModel):
path: str = Field(default=None, title="Model Path") # Path to the DFP file
labelmap_path: str = Field(default=None, title="Path to Label Map")
class MemryXDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default="PCIe", title="Device Path")
class MemryXDetector(DetectionApi):
type_key = DETECTOR_KEY # Set the type key
supported_models = [
ModelTypeEnum.ssd,
ModelTypeEnum.yolonas,
ModelTypeEnum.yologeneric, # Treated as yolov9 in MemryX implementation
ModelTypeEnum.yolox,
]
def __init__(self, detector_config):
"""Initialize MemryX detector with the provided configuration."""
try:
# Import MemryX SDK
from memryx import AsyncAccl
except ModuleNotFoundError:
raise ImportError(
"MemryX SDK is not installed. Install it and set up MIX environment."
)
return
model_cfg = getattr(detector_config, "model", None)
# Check if model_type was explicitly set by the user
if "model_type" in getattr(model_cfg, "__fields_set__", set()):
detector_config.model.model_type = model_cfg.model_type
else:
logger.info(
"model_type not set in config — defaulting to yolonas for MemryX."
)
detector_config.model.model_type = ModelTypeEnum.yolonas
self.capture_queue = Queue(maxsize=10)
self.output_queue = Queue(maxsize=10)
self.capture_id_queue = Queue(maxsize=10)
self.logger = logger
self.memx_model_path = detector_config.model.path # Path to .dfp file
self.memx_post_model = None # Path to .post file
self.expected_post_model = None
self.memx_device_path = detector_config.device # Device path
# Parse the device string to split PCIe:<index>
device_str = self.memx_device_path
self.device_id = []
self.device_id.append(int(device_str.split(":")[1]))
self.memx_model_height = detector_config.model.height
self.memx_model_width = detector_config.model.width
self.memx_model_type = detector_config.model.model_type
self.cache_dir = "/memryx_models"
if self.memx_model_type == ModelTypeEnum.yologeneric:
model_mapping = {
(640, 640): (
"https://developer.memryx.com/example_files/2p0_frigate/yolov9_640.zip",
"yolov9_640",
),
(320, 320): (
"https://developer.memryx.com/example_files/2p0_frigate/yolov9_320.zip",
"yolov9_320",
),
}
self.model_url, self.model_folder = model_mapping.get(
(self.memx_model_height, self.memx_model_width),
(
"https://developer.memryx.com/example_files/2p0_frigate/yolov9_320.zip",
"yolov9_320",
),
)
self.expected_dfp_model = "YOLO_v9_small_onnx.dfp"
elif self.memx_model_type == ModelTypeEnum.yolonas:
model_mapping = {
(640, 640): (
"https://developer.memryx.com/example_files/2p0_frigate/yolonas_640.zip",
"yolonas_640",
),
(320, 320): (
"https://developer.memryx.com/example_files/2p0_frigate/yolonas_320.zip",
"yolonas_320",
),
}
self.model_url, self.model_folder = model_mapping.get(
(self.memx_model_height, self.memx_model_width),
(
"https://developer.memryx.com/example_files/2p0_frigate/yolonas_320.zip",
"yolonas_320",
),
)
self.expected_dfp_model = "yolo_nas_s.dfp"
self.expected_post_model = "yolo_nas_s_post.onnx"
elif self.memx_model_type == ModelTypeEnum.yolox:
self.model_folder = "yolox"
self.model_url = (
"https://developer.memryx.com/example_files/2p0_frigate/yolox.zip"
)
self.expected_dfp_model = "YOLOX_640_640_3_onnx.dfp"
self.set_strides_grids()
elif self.memx_model_type == ModelTypeEnum.ssd:
self.model_folder = "ssd"
self.model_url = (
"https://developer.memryx.com/example_files/2p0_frigate/ssd.zip"
)
self.expected_dfp_model = "SSDlite_MobileNet_v2_320_320_3_onnx.dfp"
self.expected_post_model = "SSDlite_MobileNet_v2_320_320_3_onnx_post.onnx"
self.check_and_prepare_model()
logger.info(
f"Initializing MemryX with model: {self.memx_model_path} on device {self.memx_device_path}"
)
try:
# Load MemryX Model
logger.info(f"dfp path: {self.memx_model_path}")
# Initialization code
# Load MemryX Model with a device target
self.accl = AsyncAccl(
self.memx_model_path,
device_ids=self.device_id, # AsyncAccl device ids
local_mode=True,
)
# Models that use cropped post-processing sections (YOLO-NAS and SSD)
# --> These will be moved to pure numpy in the future to improve performance on low-end CPUs
if self.memx_post_model:
self.accl.set_postprocessing_model(self.memx_post_model, model_idx=0)
self.accl.connect_input(self.process_input)
self.accl.connect_output(self.process_output)
logger.info(
f"Loaded MemryX model from {self.memx_model_path} and {self.memx_post_model}"
)
except Exception as e:
logger.error(f"Failed to initialize MemryX model: {e}")
raise
def _acquire_file_lock(self, lock_path: str, timeout: int = 60, poll: float = 0.2):
"""
Create an exclusive lock file. Blocks (with polling) until it can acquire,
or raises TimeoutError. Uses only stdlib (os.O_EXCL).
"""
start = time.time()
while True:
try:
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
os.close(fd)
return
except FileExistsError:
if time.time() - start > timeout:
raise TimeoutError(f"Timeout waiting for lock: {lock_path}")
time.sleep(poll)
def _release_file_lock(self, lock_path: str):
"""Best-effort removal of the lock file."""
try:
os.remove(lock_path)
except FileNotFoundError:
pass
def load_yolo_constants(self):
base = f"{self.cache_dir}/{self.model_folder}"
# constants for yolov9 post-processing
self.const_A = np.load(f"{base}/_model_22_Constant_9_output_0.npy")
self.const_B = np.load(f"{base}/_model_22_Constant_10_output_0.npy")
self.const_C = np.load(f"{base}/_model_22_Constant_12_output_0.npy")
def check_and_prepare_model(self):
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir, exist_ok=True)
lock_path = os.path.join(self.cache_dir, f".{self.model_folder}.lock")
self._acquire_file_lock(lock_path)
try:
# ---------- CASE 1: user provided a custom model path ----------
if self.memx_model_path:
if not self.memx_model_path.endswith(".zip"):
raise ValueError(
f"Invalid model path: {self.memx_model_path}. "
"Only .zip files are supported. Please provide a .zip model archive."
)
if not os.path.exists(self.memx_model_path):
raise FileNotFoundError(
f"Custom model zip not found: {self.memx_model_path}"
)
logger.info(f"User provided zip model: {self.memx_model_path}")
# Extract custom zip into a separate area so it never clashes with MemryX cache
custom_dir = os.path.join(
self.cache_dir, "custom_models", self.model_folder
)
if os.path.isdir(custom_dir):
shutil.rmtree(custom_dir)
os.makedirs(custom_dir, exist_ok=True)
with zipfile.ZipFile(self.memx_model_path, "r") as zip_ref:
zip_ref.extractall(custom_dir)
logger.info(f"Custom model extracted to {custom_dir}.")
# Find .dfp and optional *_post.onnx recursively
dfp_candidates = glob.glob(
os.path.join(custom_dir, "**", "*.dfp"), recursive=True
)
post_candidates = glob.glob(
os.path.join(custom_dir, "**", "*_post.onnx"), recursive=True
)
if not dfp_candidates:
raise FileNotFoundError(
"No .dfp file found in custom model zip after extraction."
)
self.memx_model_path = dfp_candidates[0]
# Handle post model requirements by model type
if self.memx_model_type in [
ModelTypeEnum.yologeneric,
ModelTypeEnum.yolonas,
ModelTypeEnum.ssd,
]:
if not post_candidates:
raise FileNotFoundError(
f"No *_post.onnx file found in custom model zip for {self.memx_model_type.name}."
)
self.memx_post_model = post_candidates[0]
elif self.memx_model_type == ModelTypeEnum.yolox:
# Explicitly ignore any post model even if present
self.memx_post_model = None
else:
# Future model types can optionally use post if present
self.memx_post_model = (
post_candidates[0] if post_candidates else None
)
logger.info(f"Using custom model: {self.memx_model_path}")
return
# ---------- CASE 2: no custom model path -> use MemryX cached models ----------
model_subdir = os.path.join(self.cache_dir, self.model_folder)
dfp_path = os.path.join(model_subdir, self.expected_dfp_model)
post_path = (
os.path.join(model_subdir, self.expected_post_model)
if self.expected_post_model
else None
)
dfp_exists = os.path.exists(dfp_path)
post_exists = os.path.exists(post_path) if post_path else True
if dfp_exists and post_exists:
logger.info("Using cached models.")
self.memx_model_path = dfp_path
self.memx_post_model = post_path
if self.memx_model_type == ModelTypeEnum.yologeneric:
self.load_yolo_constants()
return
# ---------- CASE 3: download MemryX model (no cache) ----------
logger.info(
f"Model files not found locally. Downloading from {self.model_url}..."
)
zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip")
try:
if not os.path.exists(zip_path):
urllib.request.urlretrieve(self.model_url, zip_path)
logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...")
if not os.path.exists(model_subdir):
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(self.cache_dir)
logger.info(f"Model extracted to {self.cache_dir}.")
# Re-assign model paths after extraction
self.memx_model_path = os.path.join(
model_subdir, self.expected_dfp_model
)
self.memx_post_model = (
os.path.join(model_subdir, self.expected_post_model)
if self.expected_post_model
else None
)
if self.memx_model_type == ModelTypeEnum.yologeneric:
self.load_yolo_constants()
finally:
if os.path.exists(zip_path):
try:
os.remove(zip_path)
logger.info("Cleaned up ZIP file after extraction.")
except Exception as e:
logger.warning(
f"Failed to remove downloaded zip {zip_path}: {e}"
)
finally:
self._release_file_lock(lock_path)
def send_input(self, connection_id, tensor_input: np.ndarray):
"""Pre-process (if needed) and send frame to MemryX input queue"""
if tensor_input is None:
raise ValueError("[send_input] No image data provided for inference")
if self.memx_model_type == ModelTypeEnum.yolonas:
if tensor_input.ndim == 4 and tensor_input.shape[1:] == (320, 320, 3):
logger.debug("Transposing tensor from NHWC to NCHW for YOLO-NAS")
tensor_input = np.transpose(
tensor_input, (0, 3, 1, 2)
) # (1, H, W, C) → (1, C, H, W)
tensor_input = tensor_input.astype(np.float32)
tensor_input /= 255
if self.memx_model_type == ModelTypeEnum.yolox:
# Remove batch dim → (3, 640, 640)
tensor_input = tensor_input.squeeze(0)
# Convert CHW to HWC for OpenCV
tensor_input = np.transpose(tensor_input, (1, 2, 0)) # (640, 640, 3)
padded_img = np.ones((640, 640, 3), dtype=np.uint8) * 114
scale = min(
640 / float(tensor_input.shape[0]), 640 / float(tensor_input.shape[1])
)
sx, sy = (
int(tensor_input.shape[1] * scale),
int(tensor_input.shape[0] * scale),
)
resized_img = cv2.resize(
tensor_input, (sx, sy), interpolation=cv2.INTER_LINEAR
)
padded_img[:sy, :sx] = resized_img.astype(np.uint8)
# Step 4: Slice the padded image into 4 quadrants and concatenate them into 12 channels
x0 = padded_img[0::2, 0::2, :] # Top-left
x1 = padded_img[1::2, 0::2, :] # Bottom-left
x2 = padded_img[0::2, 1::2, :] # Top-right
x3 = padded_img[1::2, 1::2, :] # Bottom-right
# Step 5: Concatenate along the channel dimension (axis 2)
concatenated_img = np.concatenate([x0, x1, x2, x3], axis=2)
tensor_input = concatenated_img.astype(np.float32)
# Convert to CHW format (12, 320, 320)
tensor_input = np.transpose(tensor_input, (2, 0, 1))
# Add batch dimension → (1, 12, 320, 320)
tensor_input = np.expand_dims(tensor_input, axis=0)
# Send frame to MemryX for processing
self.capture_queue.put(tensor_input)
self.capture_id_queue.put(connection_id)
def process_input(self):
"""Input callback function: wait for frames in the input queue, preprocess, and send to MX3 (return)"""
while True:
try:
# Wait for a frame from the queue (blocking call)
frame = self.capture_queue.get(
block=True
) # Blocks until data is available
return frame
except Exception as e:
logger.info(f"[process_input] Error processing input: {e}")
time.sleep(0.1) # Prevent busy waiting in case of error
def receive_output(self):
"""Retrieve processed results from MemryX output queue + a copy of the original frame"""
connection_id = (
self.capture_id_queue.get()
) # Get the corresponding connection ID
detections = self.output_queue.get() # Get detections from MemryX
return connection_id, detections
def post_process_yolonas(self, output):
predictions = output[0]
detections = np.zeros((20, 6), np.float32)
for i, prediction in enumerate(predictions):
if i == 20:
break
(_, x_min, y_min, x_max, y_max, confidence, class_id) = prediction
if class_id < 0:
break
detections[i] = [
class_id,
confidence,
y_min / self.memx_model_height,
x_min / self.memx_model_width,
y_max / self.memx_model_height,
x_max / self.memx_model_width,
]
# Return the list of final detections
self.output_queue.put(detections)
def process_yolo(self, class_id, conf, pos):
"""
Takes in class ID, confidence score, and array of [x, y, w, h] that describes detection position,
returns an array that's easily passable back to Frigate.
"""
return [
class_id, # class ID
conf, # confidence score
(pos[1] - (pos[3] / 2)) / self.memx_model_height, # y_min
(pos[0] - (pos[2] / 2)) / self.memx_model_width, # x_min
(pos[1] + (pos[3] / 2)) / self.memx_model_height, # y_max
(pos[0] + (pos[2] / 2)) / self.memx_model_width, # x_max
]
def set_strides_grids(self):
grids = []
expanded_strides = []
strides = [8, 16, 32]
hsize_list = [self.memx_model_height // stride for stride in strides]
wsize_list = [self.memx_model_width // stride for stride in strides]
for hsize, wsize, stride in zip(hsize_list, wsize_list, strides):
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
grids.append(grid)
shape = grid.shape[:2]
expanded_strides.append(np.full((*shape, 1), stride))
self.grids = np.concatenate(grids, 1)
self.expanded_strides = np.concatenate(expanded_strides, 1)
def sigmoid(self, x: np.ndarray) -> np.ndarray:
return 1 / (1 + np.exp(-x))
def onnx_concat(self, inputs: list, axis: int) -> np.ndarray:
# Ensure all inputs are numpy arrays
if not all(isinstance(x, np.ndarray) for x in inputs):
raise TypeError("All inputs must be numpy arrays.")
# Ensure shapes match on non-concat axes
ref_shape = list(inputs[0].shape)
for i, tensor in enumerate(inputs[1:], start=1):
for ax in range(len(ref_shape)):
if ax == axis:
continue
if tensor.shape[ax] != ref_shape[ax]:
raise ValueError(
f"Shape mismatch at axis {ax} between input[0] and input[{i}]"
)
return np.concatenate(inputs, axis=axis)
def onnx_reshape(self, data: np.ndarray, shape: np.ndarray) -> np.ndarray:
# Ensure shape is a 1D array of integers
target_shape = shape.astype(int).tolist()
# Use NumPy reshape with dynamic handling of -1
reshaped = np.reshape(data, target_shape)
return reshaped
def post_process_yolox(self, output):
output_785 = output[0] # 785
output_794 = output[1] # 794
output_795 = output[2] # 795
output_811 = output[3] # 811
output_820 = output[4] # 820
output_821 = output[5] # 821
output_837 = output[6] # 837
output_846 = output[7] # 846
output_847 = output[8] # 847
output_795 = self.sigmoid(output_795)
output_785 = self.sigmoid(output_785)
output_821 = self.sigmoid(output_821)
output_811 = self.sigmoid(output_811)
output_847 = self.sigmoid(output_847)
output_837 = self.sigmoid(output_837)
concat_1 = self.onnx_concat([output_794, output_795, output_785], axis=1)
concat_2 = self.onnx_concat([output_820, output_821, output_811], axis=1)
concat_3 = self.onnx_concat([output_846, output_847, output_837], axis=1)
shape = np.array([1, 85, -1], dtype=np.int64)
reshape_1 = self.onnx_reshape(concat_1, shape)
reshape_2 = self.onnx_reshape(concat_2, shape)
reshape_3 = self.onnx_reshape(concat_3, shape)
concat_out = self.onnx_concat([reshape_1, reshape_2, reshape_3], axis=2)
output = concat_out.transpose(0, 2, 1) # 1, 840, 85
self.num_classes = output.shape[2] - 5
# [x, y, h, w, box_score, class_no_1, ..., class_no_80],
results = output
results[..., :2] = (results[..., :2] + self.grids) * self.expanded_strides
results[..., 2:4] = np.exp(results[..., 2:4]) * self.expanded_strides
image_pred = results[0, ...]
class_conf = np.max(
image_pred[:, 5 : 5 + self.num_classes], axis=1, keepdims=True
)
class_pred = np.argmax(image_pred[:, 5 : 5 + self.num_classes], axis=1)
class_pred = np.expand_dims(class_pred, axis=1)
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= 0.3).squeeze()
# Detections ordered as (x1, y1, x2, y2, obj_conf, class_conf, class_pred)
detections = np.concatenate((image_pred[:, :5], class_conf, class_pred), axis=1)
detections = detections[conf_mask]
# Sort by class confidence (index 5) and keep top 20 detections
ordered = detections[detections[:, 5].argsort()[::-1]][:20]
# Prepare a final detections array of shape (20, 6)
final_detections = np.zeros((20, 6), np.float32)
for i, object_detected in enumerate(ordered):
final_detections[i] = self.process_yolo(
object_detected[6], object_detected[5], object_detected[:4]
)
self.output_queue.put(final_detections)
def post_process_ssdlite(self, outputs):
dets = outputs[0].squeeze(0) # Shape: (1, num_dets, 5)
labels = outputs[1].squeeze(0)
detections = []
for i in range(dets.shape[0]):
x_min, y_min, x_max, y_max, confidence = dets[i]
class_id = int(labels[i]) # Convert label to integer
if confidence < 0.45:
continue # Skip detections below threshold
# Convert coordinates to integers
x_min, y_min, x_max, y_max = map(int, [x_min, y_min, x_max, y_max])
# Append valid detections [class_id, confidence, x, y, width, height]
detections.append([class_id, confidence, x_min, y_min, x_max, y_max])
final_detections = np.zeros((20, 6), np.float32)
if len(detections) == 0:
# logger.info("No detections found.")
self.output_queue.put(final_detections)
return
# Convert to NumPy array
detections = np.array(detections, dtype=np.float32)
# Apply Non-Maximum Suppression (NMS)
bboxes = detections[:, 2:6].tolist() # (x_min, y_min, width, height)
scores = detections[:, 1].tolist() # Confidence scores
indices = cv2.dnn.NMSBoxes(bboxes, scores, 0.45, 0.5)
if len(indices) > 0:
indices = indices.flatten()[:20] # Keep only the top 20 detections
selected_detections = detections[indices]
# Normalize coordinates AFTER NMS
for i, det in enumerate(selected_detections):
class_id, confidence, x_min, y_min, x_max, y_max = det
# Normalize coordinates
x_min /= self.memx_model_width
y_min /= self.memx_model_height
x_max /= self.memx_model_width
y_max /= self.memx_model_height
final_detections[i] = [class_id, confidence, y_min, x_min, y_max, x_max]
self.output_queue.put(final_detections)
def onnx_reshape_with_allowzero(
self, data: np.ndarray, shape: np.ndarray, allowzero: int = 0
) -> np.ndarray:
shape = shape.astype(int)
input_shape = data.shape
output_shape = []
for i, dim in enumerate(shape):
if dim == 0 and allowzero == 0:
output_shape.append(input_shape[i]) # Copy dimension from input
else:
output_shape.append(dim)
# Now let NumPy infer any -1 if needed
reshaped = np.reshape(data, output_shape)
return reshaped
def process_output(self, *outputs):
"""Output callback function -- receives frames from the MX3 and triggers post-processing"""
if self.memx_model_type == ModelTypeEnum.yologeneric:
if not self.memx_post_model:
conv_out1 = outputs[0]
conv_out2 = outputs[1]
conv_out3 = outputs[2]
conv_out4 = outputs[3]
conv_out5 = outputs[4]
conv_out6 = outputs[5]
concat_1 = self.onnx_concat([conv_out1, conv_out2], axis=1)
concat_2 = self.onnx_concat([conv_out3, conv_out4], axis=1)
concat_3 = self.onnx_concat([conv_out5, conv_out6], axis=1)
shape = np.array([1, 144, -1], dtype=np.int64)
reshaped_1 = self.onnx_reshape_with_allowzero(
concat_1, shape, allowzero=0
)
reshaped_2 = self.onnx_reshape_with_allowzero(
concat_2, shape, allowzero=0
)
reshaped_3 = self.onnx_reshape_with_allowzero(
concat_3, shape, allowzero=0
)
concat_4 = self.onnx_concat([reshaped_1, reshaped_2, reshaped_3], 2)
axis = 1
split_sizes = [64, 80]
# Calculate indices at which to split
indices = np.cumsum(split_sizes)[
:-1
] # [64] — split before the second chunk
# Perform split along axis 1
split_0, split_1 = np.split(concat_4, indices, axis=axis)
num_boxes = 2100 if self.memx_model_height == 320 else 8400
shape1 = np.array([1, 4, 16, num_boxes])
reshape_4 = self.onnx_reshape_with_allowzero(
split_0, shape1, allowzero=0
)
transpose_1 = reshape_4.transpose(0, 2, 1, 3)
axis = 1 # As per ONNX softmax node
# Subtract max for numerical stability
x_max = np.max(transpose_1, axis=axis, keepdims=True)
x_exp = np.exp(transpose_1 - x_max)
x_sum = np.sum(x_exp, axis=axis, keepdims=True)
softmax_output = x_exp / x_sum
# Weight W from the ONNX initializer (1, 16, 1, 1) with values 0 to 15
W = np.arange(16, dtype=np.float32).reshape(
1, 16, 1, 1
) # (1, 16, 1, 1)
# Apply 1x1 convolution: this is a weighted sum over channels
conv_output = np.sum(
softmax_output * W, axis=1, keepdims=True
) # shape: (1, 1, 4, 8400)
shape2 = np.array([1, 4, num_boxes])
reshape_5 = self.onnx_reshape_with_allowzero(
conv_output, shape2, allowzero=0
)
# ONNX Slice — get first 2 channels: [0:2] along axis 1
slice_output1 = reshape_5[:, 0:2, :] # Result: (1, 2, 8400)
# Slice channels 2 to 4 → axis = 1
slice_output2 = reshape_5[:, 2:4, :]
# Perform Subtraction
sub_output = self.const_A - slice_output1 # Equivalent to ONNX Sub
# Perform the ONNX-style Add
add_output = self.const_B + slice_output2
sub1 = add_output - sub_output
add1 = sub_output + add_output
div_output = add1 / 2.0
concat_5 = self.onnx_concat([div_output, sub1], axis=1)
# Expand B to (1, 1, 8400) so it can broadcast across axis=1 (4 channels)
const_C_expanded = self.const_C[:, np.newaxis, :] # Shape: (1, 1, 8400)
# Perform ONNX-style element-wise multiplication
mul_output = concat_5 * const_C_expanded # Result: (1, 4, 8400)
sigmoid_output = self.sigmoid(split_1)
outputs = self.onnx_concat([mul_output, sigmoid_output], axis=1)
final_detections = post_process_yolo(
outputs, self.memx_model_width, self.memx_model_height
)
self.output_queue.put(final_detections)
elif self.memx_model_type == ModelTypeEnum.yolonas:
return self.post_process_yolonas(outputs)
elif self.memx_model_type == ModelTypeEnum.yolox:
return self.post_process_yolox(outputs)
elif self.memx_model_type == ModelTypeEnum.ssd:
return self.post_process_ssdlite(outputs)
else:
raise Exception(
f"{self.memx_model_type} is currently not supported for memryx. See the docs for more info on supported models."
)
def detect_raw(self, tensor_input: np.ndarray):
"""Removed synchronous detect_raw() function so that we only use async"""
return 0