From 323e4cec930a502be5ea2f021efcff8fa2e6c35a Mon Sep 17 00:00:00 2001 From: OmriAx Date: Thu, 27 Feb 2025 20:17:20 +0200 Subject: [PATCH] Working async infer --- frigate/detectors/detector_config.py | 1 + frigate/detectors/plugins/hailo8l.py | 786 +++++++++++---------------- 2 files changed, 309 insertions(+), 478 deletions(-) mode change 100644 => 100755 frigate/detectors/plugins/hailo8l.py diff --git a/frigate/detectors/detector_config.py b/frigate/detectors/detector_config.py index c8aea0a1d..7704896f9 100644 --- a/frigate/detectors/detector_config.py +++ b/frigate/detectors/detector_config.py @@ -37,6 +37,7 @@ class ModelTypeEnum(str, Enum): yolox = "yolox" yolov9 = "yolov9" yolonas = "yolonas" + hailoyolo = "hailo-yolo" class ModelConfig(BaseModel): diff --git a/frigate/detectors/plugins/hailo8l.py b/frigate/detectors/plugins/hailo8l.py old mode 100644 new mode 100755 index 72f47769d..b6a74a92a --- a/frigate/detectors/plugins/hailo8l.py +++ b/frigate/detectors/plugins/hailo8l.py @@ -2,8 +2,11 @@ import logging import os import subprocess import urllib.request - import numpy as np +import queue +import threading +from functools import partial +from typing import Dict, Optional, List, Tuple try: from hailo_platform import ( @@ -12,550 +15,377 @@ try: FormatType, HailoRTException, HailoStreamInterface, - InferVStreams, InputVStreamParams, OutputVStreamParams, VDevice, + HailoSchedulingAlgorithm, ) except ModuleNotFoundError: pass from pydantic import BaseModel, Field from typing_extensions import Literal -from typing import Dict, Optional, List - from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum, InputTensorEnum, PixelFormatEnum, InputDTypeEnum +from PIL import Image, ImageDraw, ImageFont -# Setup logging -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -file_handler = logging.FileHandler('hailo_detector_debug.log') -file_handler.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -file_handler.setFormatter(formatter) -logger.addHandler(file_handler) +# ----------------- Inline Utility Functions ----------------- # +def preprocess_image(image: Image.Image, model_w: int, model_h: int) -> Image.Image: + """ + Resize image with unchanged aspect ratio using padding. + """ + img_w, img_h = image.size + scale = min(model_w / img_w, model_h / img_h) + new_img_w, new_img_h = int(img_w * scale), int(img_h * scale) + image = image.resize((new_img_w, new_img_h), Image.Resampling.BICUBIC) + padded_image = Image.new('RGB', (model_w, model_h), (114, 114, 114)) + padded_image.paste(image, ((model_w - new_img_w) // 2, (model_h - new_img_h) // 2)) + return padded_image -# Define the detector key for Hailo +def extract_detections(input_data: list, threshold: float = 0.5) -> dict: + """ + (Legacy extraction function; not used by detect_raw below.) + Extract detections from raw inference output. + """ + boxes, scores, classes = [], [], [] + num_detections = 0 + for i, detection in enumerate(input_data): + if len(detection) == 0: + continue + for det in detection: + bbox, score = det[:4], det[4] + if score >= threshold: + boxes.append(bbox) + scores.append(score) + classes.append(i) + num_detections += 1 + return { + 'detection_boxes': boxes, + 'detection_classes': classes, + 'detection_scores': scores, + 'num_detections': num_detections + } +# ----------------- End of Utility Functions ----------------- # + +# Global constants and default URLs DETECTOR_KEY = "hailo8l" ARCH = None +H8_DEFAULT_MODEL = "yolov8s.hef" +H8L_DEFAULT_MODEL = "yolov6n.hef" +H8_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/yolov8s.hef" +H8L_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/yolov6n.hef" def detect_hailo_arch(): try: - # Run the hailortcli command to get device information result = subprocess.run(['hailortcli', 'fw-control', 'identify'], capture_output=True, text=True) - - # Check if the command was successful if result.returncode != 0: print(f"Error running hailortcli: {result.stderr}") return None - - # Search for the "Device Architecture" line in the output for line in result.stdout.split('\n'): if "Device Architecture" in line: if "HAILO8L" in line: return "hailo8l" elif "HAILO8" in line: return "hailo8" - print("Could not determine Hailo architecture from device information.") return None except Exception as e: print(f"An error occurred while detecting Hailo architecture: {e}") return None +# ----------------- Inline Asynchronous Inference Class ----------------- # +class HailoAsyncInference: + def __init__( + self, + hef_path: str, + input_queue: queue.Queue, + output_queue: queue.Queue, + batch_size: int = 1, + input_type: Optional[str] = None, + output_type: Optional[Dict[str, str]] = None, + send_original_frame: bool = False, + ) -> None: + self.input_queue = input_queue + self.output_queue = output_queue -# Configuration class for Hailo detector -class HailoDetectorConfig(BaseDetectorConfig): - type: Literal[DETECTOR_KEY] # Type of the detector - device: str = Field(default="PCIe", title="Device Type") # Device type (e.g., PCIe) - url: Optional[str] = Field(default=None, title="Custom Model URL") + # Create VDevice parameters with round-robin scheduling + params = VDevice.create_params() + params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN -# Hailo detector class implementation + # Load HEF and create the infer model + self.hef = HEF(hef_path) + self.target = VDevice(params) + self.infer_model = self.target.create_infer_model(hef_path) + self.infer_model.set_batch_size(batch_size) + if input_type is not None: + self._set_input_type(input_type) + if output_type is not None: + self._set_output_type(output_type) + self.output_type = output_type + self.send_original_frame = send_original_frame + + def _set_input_type(self, input_type: Optional[str] = None) -> None: + self.infer_model.input().set_format_type(getattr(FormatType, input_type)) + + def _set_output_type(self, output_type_dict: Optional[Dict[str, str]] = None) -> None: + for output_name, output_type in output_type_dict.items(): + self.infer_model.output(output_name).set_format_type(getattr(FormatType, output_type)) + + def callback(self, completion_info, bindings_list: List, input_batch: List): + if completion_info.exception: + logging.error(f"Inference error: {completion_info.exception}") + else: + for i, bindings in enumerate(bindings_list): + if len(bindings._output_names) == 1: + result = bindings.output().get_buffer() + else: + result = { + name: np.expand_dims(bindings.output(name).get_buffer(), axis=0) + for name in bindings._output_names + } + self.output_queue.put((input_batch[i], result)) + + def _create_bindings(self, configured_infer_model) -> object: + if self.output_type is None: + output_buffers = { + output_info.name: np.empty( + self.infer_model.output(output_info.name).shape, + dtype=getattr(np, str(output_info.format.type).split(".")[1].lower()) + ) + for output_info in self.hef.get_output_vstream_infos() + } + else: + output_buffers = { + name: np.empty( + self.infer_model.output(name).shape, + dtype=getattr(np, self.output_type[name].lower()) + ) + for name in self.output_type + } + return configured_infer_model.create_bindings(output_buffers=output_buffers) + + def get_input_shape(self) -> Tuple[int, ...]: + return self.hef.get_input_vstream_infos()[0].shape + + def run(self) -> None: + # Configure the infer model once and reuse vstream settings via run_async + with self.infer_model.configure() as configured_infer_model: + while True: + batch_data = self.input_queue.get() + if batch_data is None: + break # Sentinel to exit loop + if self.send_original_frame: + original_batch, preprocessed_batch = batch_data + else: + preprocessed_batch = batch_data + bindings_list = [] + for frame in preprocessed_batch: + bindings = self._create_bindings(configured_infer_model) + bindings.input().set_buffer(np.array(frame)) + bindings_list.append(bindings) + configured_infer_model.wait_for_async_ready(timeout_ms=10000) + job = configured_infer_model.run_async( + bindings_list, + partial( + self.callback, + input_batch=original_batch if self.send_original_frame else preprocessed_batch, + bindings_list=bindings_list, + ) + ) + job.wait(10000) # Wait for the last job to complete +# ----------------- End of Async Class ----------------- # + +# ----------------- HailoDetector Class ----------------- # class HailoDetector(DetectionApi): - type_key = DETECTOR_KEY # Set the type key to the Hailo detector key - - def __init__(self, detector_config: HailoDetectorConfig): - print(f"[INIT] Starting HailoDetector initialization with config: {detector_config}") - logger.info(f"[INIT] Starting HailoDetector initialization with config: {detector_config}") + type_key = DETECTOR_KEY - # Set global ARCH variable + def __init__(self, detector_config: 'HailoDetectorConfig'): global ARCH ARCH = detect_hailo_arch() - logger.info(f"[INIT] Detected Hailo architecture: {ARCH}") + self.cache_dir = "/config/model_cache/hailo" + self.device_type = detector_config.device + # Model attributes should be provided in detector_config.model + self.model_path = detector_config.model.path if hasattr(detector_config.model, "path") else None + self.model_height = detector_config.model.height if hasattr(detector_config.model, "height") else None + self.model_width = detector_config.model.width if hasattr(detector_config.model, "width") else None + self.model_type = detector_config.model.model_type if hasattr(detector_config.model, "model_type") else None + self.tensor_format = detector_config.model.input_tensor if hasattr(detector_config.model, "input_tensor") else None + self.pixel_format = detector_config.model.input_pixel_format if hasattr(detector_config.model, "input_pixel_format") else None + self.input_dtype = detector_config.model.input_dtype if hasattr(detector_config.model, "input_dtype") else None + self.url = detector_config.url + self.output_type = "FLOAT32" + self.working_model_path = self.check_and_prepare() - supported_models = [ - ModelTypeEnum.ssd, - ModelTypeEnum.yolov9, - ModelTypeEnum.hailoyolo, - ] - - # Initialize device type and model path from the configuration - self.h8l_device_type = detector_config.device - self.h8l_model_path = detector_config.model.path - self.h8l_model_type = detector_config.model.model_type - - # Set configuration based on model type - self.set_correct_config(self.h8l_model_type) - - # Override with custom URL if provided - if hasattr(detector_config, "url") and detector_config.url: - self.model_url = detector_config.url - self.expected_model_filename = self.model_url.split('/')[-1] - - self.check_and_prepare_model() + # Set up asynchronous inference + self.batch_size = 1 + self.input_queue = queue.Queue() + self.output_queue = queue.Queue() try: - # Validate device type - if self.h8l_device_type not in ["PCIe", "M.2"]: - raise ValueError(f"Unsupported device type: {self.h8l_device_type}") - - # Initialize the Hailo device - logger.info("[INIT] Creating VDevice instance") - self.target = VDevice() - - # Load the HEF (Hailo's binary format for neural networks) - logger.info(f"[INIT] Loading HEF from {self.h8l_model_path}") - self.hef = HEF(self.h8l_model_path) - - # Create configuration parameters from the HEF - logger.info("[INIT] Creating configuration parameters") - self.configure_params = ConfigureParams.create_from_hef( - hef=self.hef, interface=HailoStreamInterface.PCIe + logging.info(f"[INIT] Loading HEF model from {self.working_model_path}") + self.inference_engine = HailoAsyncInference( + self.working_model_path, + self.input_queue, + self.output_queue, + self.batch_size ) - - # Configure the device with the HEF - logger.info("[INIT] Configuring device with HEF") - self.network_groups = self.target.configure(self.hef, self.configure_params) - self.network_group = self.network_groups[0] - self.network_group_params = self.network_group.create_params() - - # Create input and output virtual stream parameters - logger.info("[INIT] Creating input/output stream parameters") - self.input_vstream_params = InputVStreamParams.make( - self.network_group, - format_type=self.hef.get_input_vstream_infos()[0].format.type, - ) - self.output_vstream_params = OutputVStreamParams.make( - self.network_group, format_type=getattr(FormatType, self.output_type) - ) - - # Get input and output stream information from the HEF - self.input_vstream_info = self.hef.get_input_vstream_infos() - self.output_vstream_info = self.hef.get_output_vstream_infos() - - for i, info in enumerate(self.input_vstream_info): - logger.info(f"[INIT] Input Stream {i}: Name={info.name}, Format={info.format}, Shape={info.shape}") - - for i, info in enumerate(self.output_vstream_info): - logger.info(f"[INIT] Output Stream {i}: Name={info.name}, Format={info.format}, Shape={info.shape}") - - logger.info("Hailo device initialized successfully") - except HailoRTException as e: - logger.error(f"HailoRTException during initialization: {e}") - raise + self.input_shape = self.inference_engine.get_input_shape() + logging.info(f"[INIT] Model input shape: {self.input_shape}") except Exception as e: - logger.error(f"Failed to initialize Hailo device: {e}") + logging.error(f"[INIT] Failed to initialize HailoAsyncInference: {e}") raise - def set_correct_config(self, modelname): - if modelname == ModelTypeEnum.ssd: - self.h8l_model_height = 300 - self.h8l_model_width = 300 - self.h8l_tensor_format = InputTensorEnum.nhwc - self.h8l_pixel_format = PixelFormatEnum.rgb - self.h8l_input_dtype = InputDTypeEnum.float - self.cache_dir = "/config/model_cache/h8l_cache" - self.expected_model_filename = "ssd_mobilenet_v1.hef" - self.output_type = "FLOAT32" - - if ARCH == "hailo8": - self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/ssd_mobilenet_v1.hef" - else: - self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/ssd_mobilenet_v1.hef" + @staticmethod + def extract_model_name(path: str = None, url: str = None) -> str: + model_name = None + if path and path.endswith(".hef"): + model_name = os.path.basename(path) + elif url and url.endswith(".hef"): + model_name = os.path.basename(url) else: - self.h8l_model_height = 640 - self.h8l_model_width = 640 - self.h8l_tensor_format = InputTensorEnum.nhwc - self.h8l_pixel_format = PixelFormatEnum.rgb # Default to RGB - self.h8l_input_dtype = InputDTypeEnum.int - self.cache_dir = "/config/model_cache/h8l_cache" - self.output_type = "FLOAT32" - + print("Model name not found in path or URL. Checking default settings...") if ARCH == "hailo8": - self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/yolov8m.hef" - self.expected_model_filename = "yolov8m.hef" - else: - self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/yolov8s.hef" - self.expected_model_filename = "yolov8s.hef" - - def check_and_prepare_model(self): - logger.info(f"[CHECK_MODEL] Checking for model at {self.cache_dir}/{self.expected_model_filename}") - - # Ensure cache directory exists + model_name = H8_DEFAULT_MODEL + else: + model_name = H8L_DEFAULT_MODEL + print(f"Using default model: {model_name}") + return model_name + + @staticmethod + def download_model(url: str, destination: str): + if not url.endswith(".hef"): + raise ValueError("Invalid model URL. Only .hef files are supported.") + try: + urllib.request.urlretrieve(url, destination) + print(f"Downloaded model to {destination}") + except Exception as e: + raise RuntimeError(f"Failed to download model from {url}: {str(e)}") + + def check_and_prepare(self) -> str: if not os.path.exists(self.cache_dir): - logger.info(f"[CHECK_MODEL] Creating cache directory: {self.cache_dir}") os.makedirs(self.cache_dir) - - # Check for the expected model file - model_file_path = os.path.join(self.cache_dir, self.expected_model_filename) - if not os.path.isfile(model_file_path): - logger.info(f"[CHECK_MODEL] Model not found at {model_file_path}, downloading from {self.model_url}") - urllib.request.urlretrieve(self.model_url, model_file_path) - logger.info(f"[CHECK_MODEL] Model downloaded to {model_file_path}") - else: - logger.info(f"[CHECK_MODEL] Model already exists at {model_file_path}") - - self.h8l_model_path = model_file_path + model_name = self.extract_model_name(self.model_path, self.url) + model_path = os.path.join(self.cache_dir, model_name) + if not self.model_path and not self.url: + if os.path.exists(model_path): + print(f"Model found in cache: {model_path}") + return model_path + else: + print(f"Downloading default model: {model_name}") + if ARCH == "hailo8": + self.download_model(H8_DEFAULT_URL, model_path) + else: + self.download_model(H8L_DEFAULT_URL, model_path) + elif self.model_path and self.url: + if os.path.exists(self.model_path): + print(f"Model found at path: {self.model_path}") + return self.model_path + else: + print(f"Model not found at path. Downloading from URL: {self.url}") + self.download_model(self.url, model_path) + elif self.url: + print(f"Downloading model from URL: {self.url}") + self.download_model(self.url, model_path) + elif self.model_path: + if os.path.exists(self.model_path): + print(f"Using existing model at: {self.model_path}") + return self.model_path + else: + raise FileNotFoundError(f"Model file not found at: {self.model_path}") + return model_path def detect_raw(self, tensor_input): - logger.info("[DETECT_RAW] Starting detection") + logging.info("[DETECT_RAW] Starting detection") + # Ensure tensor_input has a batch dimension + if isinstance(tensor_input, np.ndarray) and len(tensor_input.shape) == 3: + tensor_input = np.expand_dims(tensor_input, axis=0) + logging.info(f"[DETECT_RAW] Expanded input shape to {tensor_input.shape}") - if tensor_input is None: - error_msg = "[DETECT_RAW] The 'tensor_input' argument must be provided" - logger.error(error_msg) - raise ValueError(error_msg) + # Enqueue input and a sentinel value + self.input_queue.put(tensor_input) + self.input_queue.put(None) # Sentinel value - # Log input tensor information - logger.info(f"[DETECT_RAW] Input tensor type: {type(tensor_input)}") + # Run the inference engine + self.inference_engine.run() + result = self.output_queue.get() + if result is None: + logging.error("[DETECT_RAW] No inference result received") + return np.zeros((20, 6), dtype=np.float32) - if isinstance(tensor_input, np.ndarray): - logger.info(f"[DETECT_RAW] Input tensor shape: {tensor_input.shape}") - logger.info(f"[DETECT_RAW] Input tensor dtype: {tensor_input.dtype}") - logger.info(f"[DETECT_RAW] Input tensor min value: {np.min(tensor_input)}") - logger.info(f"[DETECT_RAW] Input tensor max value: {np.max(tensor_input)}") - logger.info(f"[DETECT_RAW] Input tensor mean value: {np.mean(tensor_input)}") + original_input, infer_results = result + logging.info("[DETECT_RAW] Inference completed.") - # Print sample of the tensor (first few elements) - flat_sample = tensor_input.flatten()[:10] - logger.info(f"[DETECT_RAW] Input tensor sample: {flat_sample}") - elif isinstance(tensor_input, list): - logger.info(f"[DETECT_RAW] Input is a list with length: {len(tensor_input)}") - tensor_input = np.array(tensor_input) - logger.info(f"[DETECT_RAW] Converted to array with shape: {tensor_input.shape}, dtype: {tensor_input.dtype}") - elif isinstance(tensor_input, dict): - logger.info(f"[DETECT_RAW] Input is a dictionary with keys: {tensor_input.keys()}") + # If infer_results is a single-element list, unwrap it. + if isinstance(infer_results, list) and len(infer_results) == 1: + infer_results = infer_results[0] - input_data = tensor_input - logger.debug("[DETECT_RAW] Input data prepared for inference") - - try: - logger.info("[DETECT_RAW] Creating inference pipeline") - with InferVStreams( - self.network_group, - self.input_vstream_params, - self.output_vstream_params, - ) as infer_pipeline: - input_dict = {} - if isinstance(input_data, dict): - logger.info("[DETECT_RAW] Input is already a dictionary, using as-is") - input_dict = input_data - elif isinstance(input_data, (list, tuple)): - logger.info("[DETECT_RAW] Converting list/tuple to dictionary for inference") - for idx, layer_info in enumerate(self.input_vstream_info): - input_dict[layer_info.name] = input_data[idx] - logger.info(f"[DETECT_RAW] Assigned data to input layer '{layer_info.name}'") - else: - if len(input_data.shape) == 3: - logger.info(f"[DETECT_RAW] Adding batch dimension to input with shape {input_data.shape}") - input_data = np.expand_dims(input_data, axis=0) - logger.info(f"[DETECT_RAW] New input shape after adding batch dimension: {input_data.shape}") - input_dict[self.input_vstream_info[0].name] = input_data - logger.info(f"[DETECT_RAW] Assigned data to input layer '{self.input_vstream_info[0].name}'") - - logger.info(f"[DETECT_RAW] Final input dictionary keys: {list(input_dict.keys())}") - - # Log details about each input layer - for key, value in input_dict.items(): - if isinstance(value, np.ndarray): - logger.info(f"[DETECT_RAW] Layer '{key}' has shape: {value.shape}, dtype: {value.dtype}") - - logger.info("[DETECT_RAW] Activating network group") - with self.network_group.activate(self.network_group_params): - logger.info("[DETECT_RAW] Running inference") - raw_output = infer_pipeline.infer(input_dict) - - logger.info(f"[DETECT_RAW] Inference complete, output keys: {list(raw_output.keys())}") - - # Log details about output structure for debugging - for key, value in raw_output.items(): - logger.info(f"[DETECT_RAW] Output layer '{key}' details:") - debug_output_structure(value, prefix=" ") - - # Process outputs based on model type - if self.h8l_model_type in [ModelTypeEnum.hailoyolo, ModelTypeEnum.yolov9, ModelTypeEnum.yolox, ModelTypeEnum.yolonas]: - logger.info(f"[DETECT_RAW] Processing YOLO-type output for model type: {self.h8l_model_type}") - detections = self.process_yolo_output(raw_output) - else: - # Default to SSD processing - logger.info(f"[DETECT_RAW] Processing SSD output for model type: {self.h8l_model_type}") - expected_output_name = self.output_vstream_info[0].name - if expected_output_name not in raw_output: - error_msg = f"[DETECT_RAW] Missing output stream {expected_output_name} in inference results" - logger.error(error_msg) - return np.zeros((20, 6), np.float32) - detections = self.process_ssd_output(raw_output[expected_output_name]) - - logger.info(f"[DETECT_RAW] Processed detections shape: {detections.shape}") - return detections - - except HailoRTException as e: - logger.error(f"[DETECT_RAW] HailoRTException during inference: {e}") - return np.zeros((20, 6), np.float32) - except Exception as e: - logger.error(f"[DETECT_RAW] Exception during inference: {e}") - return np.zeros((20, 6), np.float32) - finally: - logger.debug("[DETECT_RAW] Exiting function") - - def process_yolo_output(self, raw_output): - """ - Process YOLO outputs to match the expected Frigate detection format. - Returns detections in the format [class_id, score, ymin, xmin, ymax, xmax] - """ - logger.info("[PROCESS_YOLO] Processing YOLO output") - - # Initialize empty array for our results - match TFLite format - detections = np.zeros((20, 6), np.float32) - - try: - # Identify output layers for boxes, classes, and scores - boxes_layer = None - classes_layer = None - scores_layer = None - count_layer = None - - # Try to identify layers by name pattern - for key in raw_output.keys(): - key_lower = key.lower() - if any(box_term in key_lower for box_term in ['box', 'bbox', 'location']): - boxes_layer = key - elif any(class_term in key_lower for class_term in ['class', 'category', 'label']): - classes_layer = key - elif any(score_term in key_lower for score_term in ['score', 'confidence', 'prob']): - scores_layer = key - elif any(count_term in key_lower for count_term in ['count', 'num', 'detection_count']): - count_layer = key - - logger.info(f"[PROCESS_YOLO] Identified layers - Boxes: {boxes_layer}, Classes: {classes_layer}, " - f"Scores: {scores_layer}, Count: {count_layer}") - - # If we found all necessary layers - if boxes_layer and classes_layer and scores_layer: - # Extract data from the identified layers - boxes = raw_output[boxes_layer] - class_ids = raw_output[classes_layer] - scores = raw_output[scores_layer] - - # If these are lists, extract the first element (batch) - if isinstance(boxes, list) and len(boxes) > 0: - boxes = boxes[0] - if isinstance(class_ids, list) and len(class_ids) > 0: - class_ids = class_ids[0] - if isinstance(scores, list) and len(scores) > 0: - scores = scores[0] - - # Get detection count (if available) - count = 0 - if count_layer: - count_val = raw_output[count_layer] - if isinstance(count_val, list) and len(count_val) > 0: - count_val = count_val[0] - count = int(count_val[0] if isinstance(count_val, np.ndarray) else count_val) - else: - # Use the length of scores as count - count = len(scores) if hasattr(scores, '__len__') else 0 - - # Process detections like in the example - for i in range(count): - if i >= 20: # Limit to 20 detections - break - - if scores[i] < 0.4: # Use 0.4 threshold as in the example - continue - - # Add detection in the format [class_id, score, ymin, xmin, ymax, xmax] - detections[i] = [ - float(class_ids[i]), - float(scores[i]), - float(boxes[i][0]), # ymin - float(boxes[i][1]), # xmin - float(boxes[i][2]), # ymax - float(boxes[i][3]), # xmax - ] - else: - # Fallback: Try to process output as a combined detection array - logger.info("[PROCESS_YOLO] Couldn't identify separate output layers, trying unified format") - - # Look for a detection array in the output - detection_layer = None - for key, value in raw_output.items(): - if isinstance(value, list) and len(value) > 0: - if isinstance(value[0], np.ndarray) and value[0].ndim >= 2: - detection_layer = key - break - - if detection_layer: - # Get the detection array - detection_array = raw_output[detection_layer] - if isinstance(detection_array, list): - detection_array = detection_array[0] # First batch - - # Process each detection - detection_count = 0 - for i, detection in enumerate(detection_array): - if detection_count >= 20: - break - - # Format depends on YOLO variant but typically includes: - # class_id, score, box coordinates (could be [x,y,w,h] or [xmin,ymin,xmax,ymax]) - - # Extract elements based on shape - if len(detection) >= 6: # Likely [class_id, score, xmin, ymin, xmax, ymax] - class_id = detection[0] - score = detection[1] - - # Check if this is actually [x, y, w, h, conf, class_id] - if score > 1.0: # Score shouldn't be > 1, might be a coordinate - # Reorganize assuming [x, y, w, h, conf, class_id] format - x, y, w, h, confidence, *class_probs = detection - - # Get class with highest probability - if len(class_probs) > 1: - class_id = np.argmax(class_probs) - score = confidence * class_probs[class_id] - else: - class_id = class_probs[0] - score = confidence - - # Convert [x,y,w,h] to [ymin,xmin,ymax,xmax] - xmin = x - w/2 - ymin = y - h/2 - xmax = x + w/2 - ymax = y + h/2 - else: - # Use as is, but verify we have box coordinates - xmin, ymin, xmax, ymax = detection[2:6] - elif len(detection) >= 4: # Might be [class_id, score, xmin, ymin] - class_id = detection[0] - score = detection[1] - # For incomplete boxes, assume zeros - xmin, ymin = detection[2:4] - xmax = xmin + 0.1 # Small default size - ymax = ymin + 0.1 - else: - # Skip invalid detections - continue - - # Skip low confidence detections - if score < 0.4: - continue - - # Add to detection array - detections[detection_count] = [ - float(class_id), - float(score), - float(ymin), - float(xmin), - float(ymax), - float(xmax) - ] - detection_count += 1 - - logger.info(f"[PROCESS_YOLO] Processed {np.count_nonzero(detections[:, 1] > 0)} valid detections") - - except Exception as e: - logger.error(f"[PROCESS_YOLO] Error processing YOLO output: {e}") - # detections already initialized as zeros - - return detections - - def process_ssd_output(self, raw_output): - """ - Process SSD MobileNet v1 output with special handling for jagged arrays - """ - logger.info("[PROCESS_SSD] Processing SSD output") - - # Initialize empty lists for our results + # Set your threshold (adjust as needed) + threshold = 0.4 all_detections = [] - try: - if isinstance(raw_output, list) and len(raw_output) > 0: - # Handle first level of nesting - raw_detections = raw_output[0] - logger.debug(f"[PROCESS_SSD] First level output type: {type(raw_detections)}") + # Loop over the output list (each element corresponds to one output stream) + for idx, detection_set in enumerate(infer_results): + # Skip empty arrays + if not isinstance(detection_set, np.ndarray) or detection_set.size == 0: + continue - # Process all valid detections - for i, detection_group in enumerate(raw_detections): - # Skip empty arrays or invalid data - if not isinstance(detection_group, np.ndarray): - continue + logging.debug(f"[DETECT_RAW] Processing detection set {idx} with shape {detection_set.shape}") + # For each detection row in the set: + for det in detection_set: + # Expecting at least 5 elements: [ymin, xmin, ymax, xmax, confidence] + if det.shape[0] < 5: + continue + score = float(det[4]) + if score < threshold: + continue + # If there is a 6th element, assume it's a class id; otherwise use dummy class 0. + if det.shape[0] >= 6: + cls = int(det[5]) + else: + cls = 0 + # Append in the order Frigate expects: [class_id, confidence, ymin, xmin, ymax, xmax] + all_detections.append([cls, score, det[0], det[1], det[2], det[3]]) - # Skip empty arrays - if detection_group.size == 0: - continue + # If no valid detections were found, return a zero array. + if len(all_detections) == 0: + logging.warning("[DETECT_RAW] No valid detections found.") + return np.zeros((20, 6), dtype=np.float32) - # For the arrays with actual detections - if detection_group.shape[0] > 0: - # Extract the detection data - typical format is (ymin, xmin, ymax, xmax, score) - for j in range(detection_group.shape[0]): - detection = detection_group[j] + detections_array = np.array(all_detections, dtype=np.float32) - # Check if we have 5 values (expected format) - if len(detection) == 5: - ymin, xmin, ymax, xmax, score = detection - class_id = i # Use index as class ID + # Pad or truncate to exactly 20 rows + if detections_array.shape[0] < 20: + pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32) + detections_array = np.vstack((detections_array, pad)) + elif detections_array.shape[0] > 20: + detections_array = detections_array[:20, :] - # Add detection if score is reasonable - if 0 <= score <= 1.0 and score > 0.1: # Basic threshold - all_detections.append([float(class_id), float(score), - float(ymin), float(xmin), - float(ymax), float(xmax)]) - - # Convert to numpy array if we have valid detections - if all_detections: - detections_array = np.array(all_detections, dtype=np.float32) - - # Sort by score (descending) - sorted_idx = np.argsort(detections_array[:, 1])[::-1] - detections_array = detections_array[sorted_idx] - - # Take top 20 (or fewer if less available) - detections_array = detections_array[:20] - else: - detections_array = np.zeros((0, 6), dtype=np.float32) - - except Exception as e: - logger.error(f"[PROCESS_SSD] Error processing SSD output: {e}") - detections_array = np.zeros((0, 6), dtype=np.float32) - - # Pad to 20 detections if needed - if len(detections_array) < 20: - padding = np.zeros((20 - len(detections_array), 6), dtype=np.float32) - detections_array = np.vstack((detections_array, padding)) - - logger.info(f"[PROCESS_SSD] Final output shape: {detections_array.shape}") + logging.info(f"[DETECT_RAW] Processed detections: {detections_array}") return detections_array - def process_detections(self, raw_detections, threshold=0.5): - """ - Legacy detection processing method, kept for compatibility. - Now redirects to the more robust process_ssd_output method. - """ - logger.info("[PROCESS] Starting to process detections") - logger.info(f"[PROCESS] Using threshold: {threshold}") - - # Wrap the raw_detections in a list to match expected format for process_ssd_output - if not isinstance(raw_detections, list): - raw_detections = [raw_detections] - - # Process using the more robust method - return self.process_ssd_output(raw_detections) + # Preprocess method using inline utility + def preprocess(self, image): + return preprocess_image(image, self.input_shape[1], self.input_shape[0]) + # Close the Hailo device def close(self): - logger.info("[CLOSE] Closing Hailo device") + logging.info("[CLOSE] Closing HailoDetector") try: - self.target.close() - logger.info("Hailo device closed successfully") + self.inference_engine.hef.close() + logging.info("Hailo device closed successfully") except Exception as e: - logger.error(f"Failed to close Hailo device: {e}") - raise \ No newline at end of file + logging.error(f"Failed to close Hailo device: {e}") + raise + + # Asynchronous detection wrapper + def async_detect(self, tensor_input, callback): + def detection_thread(): + result = self.detect_raw(tensor_input) + callback(result) + thread = threading.Thread(target=detection_thread) + thread.start() + +# ----------------- Configuration Class ----------------- # +class HailoDetectorConfig(BaseDetectorConfig): + type: Literal[DETECTOR_KEY] + device: str = Field(default="PCIe", title="Device Type") + url: Optional[str] = Field(default=None, title="Custom Model URL")