Working async infer

This commit is contained in:
OmriAx 2025-02-27 20:17:20 +02:00
parent ee45f50e09
commit 323e4cec93
2 changed files with 309 additions and 478 deletions

View File

@ -37,6 +37,7 @@ class ModelTypeEnum(str, Enum):
yolox = "yolox" yolox = "yolox"
yolov9 = "yolov9" yolov9 = "yolov9"
yolonas = "yolonas" yolonas = "yolonas"
hailoyolo = "hailo-yolo"
class ModelConfig(BaseModel): class ModelConfig(BaseModel):

778
frigate/detectors/plugins/hailo8l.py Normal file → Executable file
View File

@ -2,8 +2,11 @@ import logging
import os import os
import subprocess import subprocess
import urllib.request import urllib.request
import numpy as np import numpy as np
import queue
import threading
from functools import partial
from typing import Dict, Optional, List, Tuple
try: try:
from hailo_platform import ( from hailo_platform import (
@ -12,550 +15,377 @@ try:
FormatType, FormatType,
HailoRTException, HailoRTException,
HailoStreamInterface, HailoStreamInterface,
InferVStreams,
InputVStreamParams, InputVStreamParams,
OutputVStreamParams, OutputVStreamParams,
VDevice, VDevice,
HailoSchedulingAlgorithm,
) )
except ModuleNotFoundError: except ModuleNotFoundError:
pass pass
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing_extensions import Literal from typing_extensions import Literal
from typing import Dict, Optional, List
from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum, InputTensorEnum, PixelFormatEnum, InputDTypeEnum from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum, InputTensorEnum, PixelFormatEnum, InputDTypeEnum
from PIL import Image, ImageDraw, ImageFont
# Setup logging # ----------------- Inline Utility Functions ----------------- #
logger = logging.getLogger(__name__) def preprocess_image(image: Image.Image, model_w: int, model_h: int) -> Image.Image:
logger.setLevel(logging.DEBUG) """
file_handler = logging.FileHandler('hailo_detector_debug.log') Resize image with unchanged aspect ratio using padding.
file_handler.setLevel(logging.DEBUG) """
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') img_w, img_h = image.size
file_handler.setFormatter(formatter) scale = min(model_w / img_w, model_h / img_h)
logger.addHandler(file_handler) new_img_w, new_img_h = int(img_w * scale), int(img_h * scale)
image = image.resize((new_img_w, new_img_h), Image.Resampling.BICUBIC)
padded_image = Image.new('RGB', (model_w, model_h), (114, 114, 114))
padded_image.paste(image, ((model_w - new_img_w) // 2, (model_h - new_img_h) // 2))
return padded_image
# Define the detector key for Hailo def extract_detections(input_data: list, threshold: float = 0.5) -> dict:
"""
(Legacy extraction function; not used by detect_raw below.)
Extract detections from raw inference output.
"""
boxes, scores, classes = [], [], []
num_detections = 0
for i, detection in enumerate(input_data):
if len(detection) == 0:
continue
for det in detection:
bbox, score = det[:4], det[4]
if score >= threshold:
boxes.append(bbox)
scores.append(score)
classes.append(i)
num_detections += 1
return {
'detection_boxes': boxes,
'detection_classes': classes,
'detection_scores': scores,
'num_detections': num_detections
}
# ----------------- End of Utility Functions ----------------- #
# Global constants and default URLs
DETECTOR_KEY = "hailo8l" DETECTOR_KEY = "hailo8l"
ARCH = None ARCH = None
H8_DEFAULT_MODEL = "yolov8s.hef"
H8L_DEFAULT_MODEL = "yolov6n.hef"
H8_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/yolov8s.hef"
H8L_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/yolov6n.hef"
def detect_hailo_arch(): def detect_hailo_arch():
try: try:
# Run the hailortcli command to get device information
result = subprocess.run(['hailortcli', 'fw-control', 'identify'], capture_output=True, text=True) result = subprocess.run(['hailortcli', 'fw-control', 'identify'], capture_output=True, text=True)
# Check if the command was successful
if result.returncode != 0: if result.returncode != 0:
print(f"Error running hailortcli: {result.stderr}") print(f"Error running hailortcli: {result.stderr}")
return None return None
# Search for the "Device Architecture" line in the output
for line in result.stdout.split('\n'): for line in result.stdout.split('\n'):
if "Device Architecture" in line: if "Device Architecture" in line:
if "HAILO8L" in line: if "HAILO8L" in line:
return "hailo8l" return "hailo8l"
elif "HAILO8" in line: elif "HAILO8" in line:
return "hailo8" return "hailo8"
print("Could not determine Hailo architecture from device information.") print("Could not determine Hailo architecture from device information.")
return None return None
except Exception as e: except Exception as e:
print(f"An error occurred while detecting Hailo architecture: {e}") print(f"An error occurred while detecting Hailo architecture: {e}")
return None return None
# ----------------- Inline Asynchronous Inference Class ----------------- #
class HailoAsyncInference:
def __init__(
self,
hef_path: str,
input_queue: queue.Queue,
output_queue: queue.Queue,
batch_size: int = 1,
input_type: Optional[str] = None,
output_type: Optional[Dict[str, str]] = None,
send_original_frame: bool = False,
) -> None:
self.input_queue = input_queue
self.output_queue = output_queue
# Configuration class for Hailo detector # Create VDevice parameters with round-robin scheduling
class HailoDetectorConfig(BaseDetectorConfig): params = VDevice.create_params()
type: Literal[DETECTOR_KEY] # Type of the detector params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
device: str = Field(default="PCIe", title="Device Type") # Device type (e.g., PCIe)
url: Optional[str] = Field(default=None, title="Custom Model URL")
# Hailo detector class implementation # Load HEF and create the infer model
self.hef = HEF(hef_path)
self.target = VDevice(params)
self.infer_model = self.target.create_infer_model(hef_path)
self.infer_model.set_batch_size(batch_size)
if input_type is not None:
self._set_input_type(input_type)
if output_type is not None:
self._set_output_type(output_type)
self.output_type = output_type
self.send_original_frame = send_original_frame
def _set_input_type(self, input_type: Optional[str] = None) -> None:
self.infer_model.input().set_format_type(getattr(FormatType, input_type))
def _set_output_type(self, output_type_dict: Optional[Dict[str, str]] = None) -> None:
for output_name, output_type in output_type_dict.items():
self.infer_model.output(output_name).set_format_type(getattr(FormatType, output_type))
def callback(self, completion_info, bindings_list: List, input_batch: List):
if completion_info.exception:
logging.error(f"Inference error: {completion_info.exception}")
else:
for i, bindings in enumerate(bindings_list):
if len(bindings._output_names) == 1:
result = bindings.output().get_buffer()
else:
result = {
name: np.expand_dims(bindings.output(name).get_buffer(), axis=0)
for name in bindings._output_names
}
self.output_queue.put((input_batch[i], result))
def _create_bindings(self, configured_infer_model) -> object:
if self.output_type is None:
output_buffers = {
output_info.name: np.empty(
self.infer_model.output(output_info.name).shape,
dtype=getattr(np, str(output_info.format.type).split(".")[1].lower())
)
for output_info in self.hef.get_output_vstream_infos()
}
else:
output_buffers = {
name: np.empty(
self.infer_model.output(name).shape,
dtype=getattr(np, self.output_type[name].lower())
)
for name in self.output_type
}
return configured_infer_model.create_bindings(output_buffers=output_buffers)
def get_input_shape(self) -> Tuple[int, ...]:
return self.hef.get_input_vstream_infos()[0].shape
def run(self) -> None:
# Configure the infer model once and reuse vstream settings via run_async
with self.infer_model.configure() as configured_infer_model:
while True:
batch_data = self.input_queue.get()
if batch_data is None:
break # Sentinel to exit loop
if self.send_original_frame:
original_batch, preprocessed_batch = batch_data
else:
preprocessed_batch = batch_data
bindings_list = []
for frame in preprocessed_batch:
bindings = self._create_bindings(configured_infer_model)
bindings.input().set_buffer(np.array(frame))
bindings_list.append(bindings)
configured_infer_model.wait_for_async_ready(timeout_ms=10000)
job = configured_infer_model.run_async(
bindings_list,
partial(
self.callback,
input_batch=original_batch if self.send_original_frame else preprocessed_batch,
bindings_list=bindings_list,
)
)
job.wait(10000) # Wait for the last job to complete
# ----------------- End of Async Class ----------------- #
# ----------------- HailoDetector Class ----------------- #
class HailoDetector(DetectionApi): class HailoDetector(DetectionApi):
type_key = DETECTOR_KEY # Set the type key to the Hailo detector key type_key = DETECTOR_KEY
def __init__(self, detector_config: HailoDetectorConfig): def __init__(self, detector_config: 'HailoDetectorConfig'):
print(f"[INIT] Starting HailoDetector initialization with config: {detector_config}")
logger.info(f"[INIT] Starting HailoDetector initialization with config: {detector_config}")
# Set global ARCH variable
global ARCH global ARCH
ARCH = detect_hailo_arch() ARCH = detect_hailo_arch()
logger.info(f"[INIT] Detected Hailo architecture: {ARCH}") self.cache_dir = "/config/model_cache/hailo"
self.device_type = detector_config.device
# Model attributes should be provided in detector_config.model
self.model_path = detector_config.model.path if hasattr(detector_config.model, "path") else None
self.model_height = detector_config.model.height if hasattr(detector_config.model, "height") else None
self.model_width = detector_config.model.width if hasattr(detector_config.model, "width") else None
self.model_type = detector_config.model.model_type if hasattr(detector_config.model, "model_type") else None
self.tensor_format = detector_config.model.input_tensor if hasattr(detector_config.model, "input_tensor") else None
self.pixel_format = detector_config.model.input_pixel_format if hasattr(detector_config.model, "input_pixel_format") else None
self.input_dtype = detector_config.model.input_dtype if hasattr(detector_config.model, "input_dtype") else None
self.url = detector_config.url
self.output_type = "FLOAT32"
self.working_model_path = self.check_and_prepare()
supported_models = [ # Set up asynchronous inference
ModelTypeEnum.ssd, self.batch_size = 1
ModelTypeEnum.yolov9, self.input_queue = queue.Queue()
ModelTypeEnum.hailoyolo, self.output_queue = queue.Queue()
]
# Initialize device type and model path from the configuration
self.h8l_device_type = detector_config.device
self.h8l_model_path = detector_config.model.path
self.h8l_model_type = detector_config.model.model_type
# Set configuration based on model type
self.set_correct_config(self.h8l_model_type)
# Override with custom URL if provided
if hasattr(detector_config, "url") and detector_config.url:
self.model_url = detector_config.url
self.expected_model_filename = self.model_url.split('/')[-1]
self.check_and_prepare_model()
try: try:
# Validate device type logging.info(f"[INIT] Loading HEF model from {self.working_model_path}")
if self.h8l_device_type not in ["PCIe", "M.2"]: self.inference_engine = HailoAsyncInference(
raise ValueError(f"Unsupported device type: {self.h8l_device_type}") self.working_model_path,
self.input_queue,
# Initialize the Hailo device self.output_queue,
logger.info("[INIT] Creating VDevice instance") self.batch_size
self.target = VDevice()
# Load the HEF (Hailo's binary format for neural networks)
logger.info(f"[INIT] Loading HEF from {self.h8l_model_path}")
self.hef = HEF(self.h8l_model_path)
# Create configuration parameters from the HEF
logger.info("[INIT] Creating configuration parameters")
self.configure_params = ConfigureParams.create_from_hef(
hef=self.hef, interface=HailoStreamInterface.PCIe
) )
self.input_shape = self.inference_engine.get_input_shape()
# Configure the device with the HEF logging.info(f"[INIT] Model input shape: {self.input_shape}")
logger.info("[INIT] Configuring device with HEF")
self.network_groups = self.target.configure(self.hef, self.configure_params)
self.network_group = self.network_groups[0]
self.network_group_params = self.network_group.create_params()
# Create input and output virtual stream parameters
logger.info("[INIT] Creating input/output stream parameters")
self.input_vstream_params = InputVStreamParams.make(
self.network_group,
format_type=self.hef.get_input_vstream_infos()[0].format.type,
)
self.output_vstream_params = OutputVStreamParams.make(
self.network_group, format_type=getattr(FormatType, self.output_type)
)
# Get input and output stream information from the HEF
self.input_vstream_info = self.hef.get_input_vstream_infos()
self.output_vstream_info = self.hef.get_output_vstream_infos()
for i, info in enumerate(self.input_vstream_info):
logger.info(f"[INIT] Input Stream {i}: Name={info.name}, Format={info.format}, Shape={info.shape}")
for i, info in enumerate(self.output_vstream_info):
logger.info(f"[INIT] Output Stream {i}: Name={info.name}, Format={info.format}, Shape={info.shape}")
logger.info("Hailo device initialized successfully")
except HailoRTException as e:
logger.error(f"HailoRTException during initialization: {e}")
raise
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize Hailo device: {e}") logging.error(f"[INIT] Failed to initialize HailoAsyncInference: {e}")
raise raise
def set_correct_config(self, modelname): @staticmethod
if modelname == ModelTypeEnum.ssd: def extract_model_name(path: str = None, url: str = None) -> str:
self.h8l_model_height = 300 model_name = None
self.h8l_model_width = 300 if path and path.endswith(".hef"):
self.h8l_tensor_format = InputTensorEnum.nhwc model_name = os.path.basename(path)
self.h8l_pixel_format = PixelFormatEnum.rgb elif url and url.endswith(".hef"):
self.h8l_input_dtype = InputDTypeEnum.float model_name = os.path.basename(url)
self.cache_dir = "/config/model_cache/h8l_cache"
self.expected_model_filename = "ssd_mobilenet_v1.hef"
self.output_type = "FLOAT32"
if ARCH == "hailo8":
self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/ssd_mobilenet_v1.hef"
else:
self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/ssd_mobilenet_v1.hef"
else: else:
self.h8l_model_height = 640 print("Model name not found in path or URL. Checking default settings...")
self.h8l_model_width = 640
self.h8l_tensor_format = InputTensorEnum.nhwc
self.h8l_pixel_format = PixelFormatEnum.rgb # Default to RGB
self.h8l_input_dtype = InputDTypeEnum.int
self.cache_dir = "/config/model_cache/h8l_cache"
self.output_type = "FLOAT32"
if ARCH == "hailo8": if ARCH == "hailo8":
self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/yolov8m.hef" model_name = H8_DEFAULT_MODEL
self.expected_model_filename = "yolov8m.hef"
else: else:
self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/yolov8s.hef" model_name = H8L_DEFAULT_MODEL
self.expected_model_filename = "yolov8s.hef" print(f"Using default model: {model_name}")
return model_name
def check_and_prepare_model(self): @staticmethod
logger.info(f"[CHECK_MODEL] Checking for model at {self.cache_dir}/{self.expected_model_filename}") def download_model(url: str, destination: str):
if not url.endswith(".hef"):
raise ValueError("Invalid model URL. Only .hef files are supported.")
try:
urllib.request.urlretrieve(url, destination)
print(f"Downloaded model to {destination}")
except Exception as e:
raise RuntimeError(f"Failed to download model from {url}: {str(e)}")
# Ensure cache directory exists def check_and_prepare(self) -> str:
if not os.path.exists(self.cache_dir): if not os.path.exists(self.cache_dir):
logger.info(f"[CHECK_MODEL] Creating cache directory: {self.cache_dir}")
os.makedirs(self.cache_dir) os.makedirs(self.cache_dir)
model_name = self.extract_model_name(self.model_path, self.url)
# Check for the expected model file model_path = os.path.join(self.cache_dir, model_name)
model_file_path = os.path.join(self.cache_dir, self.expected_model_filename) if not self.model_path and not self.url:
if not os.path.isfile(model_file_path): if os.path.exists(model_path):
logger.info(f"[CHECK_MODEL] Model not found at {model_file_path}, downloading from {self.model_url}") print(f"Model found in cache: {model_path}")
urllib.request.urlretrieve(self.model_url, model_file_path) return model_path
logger.info(f"[CHECK_MODEL] Model downloaded to {model_file_path}") else:
else: print(f"Downloading default model: {model_name}")
logger.info(f"[CHECK_MODEL] Model already exists at {model_file_path}") if ARCH == "hailo8":
self.download_model(H8_DEFAULT_URL, model_path)
self.h8l_model_path = model_file_path else:
self.download_model(H8L_DEFAULT_URL, model_path)
elif self.model_path and self.url:
if os.path.exists(self.model_path):
print(f"Model found at path: {self.model_path}")
return self.model_path
else:
print(f"Model not found at path. Downloading from URL: {self.url}")
self.download_model(self.url, model_path)
elif self.url:
print(f"Downloading model from URL: {self.url}")
self.download_model(self.url, model_path)
elif self.model_path:
if os.path.exists(self.model_path):
print(f"Using existing model at: {self.model_path}")
return self.model_path
else:
raise FileNotFoundError(f"Model file not found at: {self.model_path}")
return model_path
def detect_raw(self, tensor_input): def detect_raw(self, tensor_input):
logger.info("[DETECT_RAW] Starting detection") logging.info("[DETECT_RAW] Starting detection")
# Ensure tensor_input has a batch dimension
if isinstance(tensor_input, np.ndarray) and len(tensor_input.shape) == 3:
tensor_input = np.expand_dims(tensor_input, axis=0)
logging.info(f"[DETECT_RAW] Expanded input shape to {tensor_input.shape}")
if tensor_input is None: # Enqueue input and a sentinel value
error_msg = "[DETECT_RAW] The 'tensor_input' argument must be provided" self.input_queue.put(tensor_input)
logger.error(error_msg) self.input_queue.put(None) # Sentinel value
raise ValueError(error_msg)
# Log input tensor information # Run the inference engine
logger.info(f"[DETECT_RAW] Input tensor type: {type(tensor_input)}") self.inference_engine.run()
result = self.output_queue.get()
if result is None:
logging.error("[DETECT_RAW] No inference result received")
return np.zeros((20, 6), dtype=np.float32)
if isinstance(tensor_input, np.ndarray): original_input, infer_results = result
logger.info(f"[DETECT_RAW] Input tensor shape: {tensor_input.shape}") logging.info("[DETECT_RAW] Inference completed.")
logger.info(f"[DETECT_RAW] Input tensor dtype: {tensor_input.dtype}")
logger.info(f"[DETECT_RAW] Input tensor min value: {np.min(tensor_input)}")
logger.info(f"[DETECT_RAW] Input tensor max value: {np.max(tensor_input)}")
logger.info(f"[DETECT_RAW] Input tensor mean value: {np.mean(tensor_input)}")
# Print sample of the tensor (first few elements) # If infer_results is a single-element list, unwrap it.
flat_sample = tensor_input.flatten()[:10] if isinstance(infer_results, list) and len(infer_results) == 1:
logger.info(f"[DETECT_RAW] Input tensor sample: {flat_sample}") infer_results = infer_results[0]
elif isinstance(tensor_input, list):
logger.info(f"[DETECT_RAW] Input is a list with length: {len(tensor_input)}")
tensor_input = np.array(tensor_input)
logger.info(f"[DETECT_RAW] Converted to array with shape: {tensor_input.shape}, dtype: {tensor_input.dtype}")
elif isinstance(tensor_input, dict):
logger.info(f"[DETECT_RAW] Input is a dictionary with keys: {tensor_input.keys()}")
input_data = tensor_input # Set your threshold (adjust as needed)
logger.debug("[DETECT_RAW] Input data prepared for inference") threshold = 0.4
try:
logger.info("[DETECT_RAW] Creating inference pipeline")
with InferVStreams(
self.network_group,
self.input_vstream_params,
self.output_vstream_params,
) as infer_pipeline:
input_dict = {}
if isinstance(input_data, dict):
logger.info("[DETECT_RAW] Input is already a dictionary, using as-is")
input_dict = input_data
elif isinstance(input_data, (list, tuple)):
logger.info("[DETECT_RAW] Converting list/tuple to dictionary for inference")
for idx, layer_info in enumerate(self.input_vstream_info):
input_dict[layer_info.name] = input_data[idx]
logger.info(f"[DETECT_RAW] Assigned data to input layer '{layer_info.name}'")
else:
if len(input_data.shape) == 3:
logger.info(f"[DETECT_RAW] Adding batch dimension to input with shape {input_data.shape}")
input_data = np.expand_dims(input_data, axis=0)
logger.info(f"[DETECT_RAW] New input shape after adding batch dimension: {input_data.shape}")
input_dict[self.input_vstream_info[0].name] = input_data
logger.info(f"[DETECT_RAW] Assigned data to input layer '{self.input_vstream_info[0].name}'")
logger.info(f"[DETECT_RAW] Final input dictionary keys: {list(input_dict.keys())}")
# Log details about each input layer
for key, value in input_dict.items():
if isinstance(value, np.ndarray):
logger.info(f"[DETECT_RAW] Layer '{key}' has shape: {value.shape}, dtype: {value.dtype}")
logger.info("[DETECT_RAW] Activating network group")
with self.network_group.activate(self.network_group_params):
logger.info("[DETECT_RAW] Running inference")
raw_output = infer_pipeline.infer(input_dict)
logger.info(f"[DETECT_RAW] Inference complete, output keys: {list(raw_output.keys())}")
# Log details about output structure for debugging
for key, value in raw_output.items():
logger.info(f"[DETECT_RAW] Output layer '{key}' details:")
debug_output_structure(value, prefix=" ")
# Process outputs based on model type
if self.h8l_model_type in [ModelTypeEnum.hailoyolo, ModelTypeEnum.yolov9, ModelTypeEnum.yolox, ModelTypeEnum.yolonas]:
logger.info(f"[DETECT_RAW] Processing YOLO-type output for model type: {self.h8l_model_type}")
detections = self.process_yolo_output(raw_output)
else:
# Default to SSD processing
logger.info(f"[DETECT_RAW] Processing SSD output for model type: {self.h8l_model_type}")
expected_output_name = self.output_vstream_info[0].name
if expected_output_name not in raw_output:
error_msg = f"[DETECT_RAW] Missing output stream {expected_output_name} in inference results"
logger.error(error_msg)
return np.zeros((20, 6), np.float32)
detections = self.process_ssd_output(raw_output[expected_output_name])
logger.info(f"[DETECT_RAW] Processed detections shape: {detections.shape}")
return detections
except HailoRTException as e:
logger.error(f"[DETECT_RAW] HailoRTException during inference: {e}")
return np.zeros((20, 6), np.float32)
except Exception as e:
logger.error(f"[DETECT_RAW] Exception during inference: {e}")
return np.zeros((20, 6), np.float32)
finally:
logger.debug("[DETECT_RAW] Exiting function")
def process_yolo_output(self, raw_output):
"""
Process YOLO outputs to match the expected Frigate detection format.
Returns detections in the format [class_id, score, ymin, xmin, ymax, xmax]
"""
logger.info("[PROCESS_YOLO] Processing YOLO output")
# Initialize empty array for our results - match TFLite format
detections = np.zeros((20, 6), np.float32)
try:
# Identify output layers for boxes, classes, and scores
boxes_layer = None
classes_layer = None
scores_layer = None
count_layer = None
# Try to identify layers by name pattern
for key in raw_output.keys():
key_lower = key.lower()
if any(box_term in key_lower for box_term in ['box', 'bbox', 'location']):
boxes_layer = key
elif any(class_term in key_lower for class_term in ['class', 'category', 'label']):
classes_layer = key
elif any(score_term in key_lower for score_term in ['score', 'confidence', 'prob']):
scores_layer = key
elif any(count_term in key_lower for count_term in ['count', 'num', 'detection_count']):
count_layer = key
logger.info(f"[PROCESS_YOLO] Identified layers - Boxes: {boxes_layer}, Classes: {classes_layer}, "
f"Scores: {scores_layer}, Count: {count_layer}")
# If we found all necessary layers
if boxes_layer and classes_layer and scores_layer:
# Extract data from the identified layers
boxes = raw_output[boxes_layer]
class_ids = raw_output[classes_layer]
scores = raw_output[scores_layer]
# If these are lists, extract the first element (batch)
if isinstance(boxes, list) and len(boxes) > 0:
boxes = boxes[0]
if isinstance(class_ids, list) and len(class_ids) > 0:
class_ids = class_ids[0]
if isinstance(scores, list) and len(scores) > 0:
scores = scores[0]
# Get detection count (if available)
count = 0
if count_layer:
count_val = raw_output[count_layer]
if isinstance(count_val, list) and len(count_val) > 0:
count_val = count_val[0]
count = int(count_val[0] if isinstance(count_val, np.ndarray) else count_val)
else:
# Use the length of scores as count
count = len(scores) if hasattr(scores, '__len__') else 0
# Process detections like in the example
for i in range(count):
if i >= 20: # Limit to 20 detections
break
if scores[i] < 0.4: # Use 0.4 threshold as in the example
continue
# Add detection in the format [class_id, score, ymin, xmin, ymax, xmax]
detections[i] = [
float(class_ids[i]),
float(scores[i]),
float(boxes[i][0]), # ymin
float(boxes[i][1]), # xmin
float(boxes[i][2]), # ymax
float(boxes[i][3]), # xmax
]
else:
# Fallback: Try to process output as a combined detection array
logger.info("[PROCESS_YOLO] Couldn't identify separate output layers, trying unified format")
# Look for a detection array in the output
detection_layer = None
for key, value in raw_output.items():
if isinstance(value, list) and len(value) > 0:
if isinstance(value[0], np.ndarray) and value[0].ndim >= 2:
detection_layer = key
break
if detection_layer:
# Get the detection array
detection_array = raw_output[detection_layer]
if isinstance(detection_array, list):
detection_array = detection_array[0] # First batch
# Process each detection
detection_count = 0
for i, detection in enumerate(detection_array):
if detection_count >= 20:
break
# Format depends on YOLO variant but typically includes:
# class_id, score, box coordinates (could be [x,y,w,h] or [xmin,ymin,xmax,ymax])
# Extract elements based on shape
if len(detection) >= 6: # Likely [class_id, score, xmin, ymin, xmax, ymax]
class_id = detection[0]
score = detection[1]
# Check if this is actually [x, y, w, h, conf, class_id]
if score > 1.0: # Score shouldn't be > 1, might be a coordinate
# Reorganize assuming [x, y, w, h, conf, class_id] format
x, y, w, h, confidence, *class_probs = detection
# Get class with highest probability
if len(class_probs) > 1:
class_id = np.argmax(class_probs)
score = confidence * class_probs[class_id]
else:
class_id = class_probs[0]
score = confidence
# Convert [x,y,w,h] to [ymin,xmin,ymax,xmax]
xmin = x - w/2
ymin = y - h/2
xmax = x + w/2
ymax = y + h/2
else:
# Use as is, but verify we have box coordinates
xmin, ymin, xmax, ymax = detection[2:6]
elif len(detection) >= 4: # Might be [class_id, score, xmin, ymin]
class_id = detection[0]
score = detection[1]
# For incomplete boxes, assume zeros
xmin, ymin = detection[2:4]
xmax = xmin + 0.1 # Small default size
ymax = ymin + 0.1
else:
# Skip invalid detections
continue
# Skip low confidence detections
if score < 0.4:
continue
# Add to detection array
detections[detection_count] = [
float(class_id),
float(score),
float(ymin),
float(xmin),
float(ymax),
float(xmax)
]
detection_count += 1
logger.info(f"[PROCESS_YOLO] Processed {np.count_nonzero(detections[:, 1] > 0)} valid detections")
except Exception as e:
logger.error(f"[PROCESS_YOLO] Error processing YOLO output: {e}")
# detections already initialized as zeros
return detections
def process_ssd_output(self, raw_output):
"""
Process SSD MobileNet v1 output with special handling for jagged arrays
"""
logger.info("[PROCESS_SSD] Processing SSD output")
# Initialize empty lists for our results
all_detections = [] all_detections = []
try: # Loop over the output list (each element corresponds to one output stream)
if isinstance(raw_output, list) and len(raw_output) > 0: for idx, detection_set in enumerate(infer_results):
# Handle first level of nesting # Skip empty arrays
raw_detections = raw_output[0] if not isinstance(detection_set, np.ndarray) or detection_set.size == 0:
logger.debug(f"[PROCESS_SSD] First level output type: {type(raw_detections)}") continue
# Process all valid detections logging.debug(f"[DETECT_RAW] Processing detection set {idx} with shape {detection_set.shape}")
for i, detection_group in enumerate(raw_detections): # For each detection row in the set:
# Skip empty arrays or invalid data for det in detection_set:
if not isinstance(detection_group, np.ndarray): # Expecting at least 5 elements: [ymin, xmin, ymax, xmax, confidence]
continue if det.shape[0] < 5:
continue
score = float(det[4])
if score < threshold:
continue
# If there is a 6th element, assume it's a class id; otherwise use dummy class 0.
if det.shape[0] >= 6:
cls = int(det[5])
else:
cls = 0
# Append in the order Frigate expects: [class_id, confidence, ymin, xmin, ymax, xmax]
all_detections.append([cls, score, det[0], det[1], det[2], det[3]])
# Skip empty arrays # If no valid detections were found, return a zero array.
if detection_group.size == 0: if len(all_detections) == 0:
continue logging.warning("[DETECT_RAW] No valid detections found.")
return np.zeros((20, 6), dtype=np.float32)
# For the arrays with actual detections detections_array = np.array(all_detections, dtype=np.float32)
if detection_group.shape[0] > 0:
# Extract the detection data - typical format is (ymin, xmin, ymax, xmax, score)
for j in range(detection_group.shape[0]):
detection = detection_group[j]
# Check if we have 5 values (expected format) # Pad or truncate to exactly 20 rows
if len(detection) == 5: if detections_array.shape[0] < 20:
ymin, xmin, ymax, xmax, score = detection pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32)
class_id = i # Use index as class ID detections_array = np.vstack((detections_array, pad))
elif detections_array.shape[0] > 20:
detections_array = detections_array[:20, :]
# Add detection if score is reasonable logging.info(f"[DETECT_RAW] Processed detections: {detections_array}")
if 0 <= score <= 1.0 and score > 0.1: # Basic threshold
all_detections.append([float(class_id), float(score),
float(ymin), float(xmin),
float(ymax), float(xmax)])
# Convert to numpy array if we have valid detections
if all_detections:
detections_array = np.array(all_detections, dtype=np.float32)
# Sort by score (descending)
sorted_idx = np.argsort(detections_array[:, 1])[::-1]
detections_array = detections_array[sorted_idx]
# Take top 20 (or fewer if less available)
detections_array = detections_array[:20]
else:
detections_array = np.zeros((0, 6), dtype=np.float32)
except Exception as e:
logger.error(f"[PROCESS_SSD] Error processing SSD output: {e}")
detections_array = np.zeros((0, 6), dtype=np.float32)
# Pad to 20 detections if needed
if len(detections_array) < 20:
padding = np.zeros((20 - len(detections_array), 6), dtype=np.float32)
detections_array = np.vstack((detections_array, padding))
logger.info(f"[PROCESS_SSD] Final output shape: {detections_array.shape}")
return detections_array return detections_array
def process_detections(self, raw_detections, threshold=0.5): # Preprocess method using inline utility
""" def preprocess(self, image):
Legacy detection processing method, kept for compatibility. return preprocess_image(image, self.input_shape[1], self.input_shape[0])
Now redirects to the more robust process_ssd_output method.
"""
logger.info("[PROCESS] Starting to process detections")
logger.info(f"[PROCESS] Using threshold: {threshold}")
# Wrap the raw_detections in a list to match expected format for process_ssd_output
if not isinstance(raw_detections, list):
raw_detections = [raw_detections]
# Process using the more robust method
return self.process_ssd_output(raw_detections)
# Close the Hailo device
def close(self): def close(self):
logger.info("[CLOSE] Closing Hailo device") logging.info("[CLOSE] Closing HailoDetector")
try: try:
self.target.close() self.inference_engine.hef.close()
logger.info("Hailo device closed successfully") logging.info("Hailo device closed successfully")
except Exception as e: except Exception as e:
logger.error(f"Failed to close Hailo device: {e}") logging.error(f"Failed to close Hailo device: {e}")
raise raise
# Asynchronous detection wrapper
def async_detect(self, tensor_input, callback):
def detection_thread():
result = self.detect_raw(tensor_input)
callback(result)
thread = threading.Thread(target=detection_thread)
thread.start()
# ----------------- Configuration Class ----------------- #
class HailoDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default="PCIe", title="Device Type")
url: Optional[str] = Field(default=None, title="Custom Model URL")