From 78134765008945ad03ef3de2257a1e301b13f36a Mon Sep 17 00:00:00 2001 From: OmriAx Date: Sun, 19 Jan 2025 16:16:43 +0200 Subject: [PATCH] Final Async Update --- frigate/detectors/plugins/hailo8l.py | 416 +++++++++------------------ 1 file changed, 143 insertions(+), 273 deletions(-) diff --git a/frigate/detectors/plugins/hailo8l.py b/frigate/detectors/plugins/hailo8l.py index 7453b6e0e..f60d5eb56 100644 --- a/frigate/detectors/plugins/hailo8l.py +++ b/frigate/detectors/plugins/hailo8l.py @@ -1,322 +1,192 @@ import logging import os +import queue +import threading +import subprocess import urllib.request -from typing import Optional - import numpy as np - -try: - from hailo_platform import ( - HEF, - ConfigureParams, - FormatType, - HailoRTException, - HailoStreamInterface, - InferVStreams, - InputVStreamParams, - OutputVStreamParams, - VDevice, - ) -except ModuleNotFoundError: - pass - -from pydantic import BaseModel, Field -from typing_extensions import Literal - +from hailo_platform import ( + HEF, + ConfigureParams, + FormatType, + HailoRTException, + HailoStreamInterface, + VDevice, + HailoSchedulingAlgorithm, + InferVStreams, + InputVStreamParams, + OutputVStreamParams +) from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detector_config import BaseDetectorConfig +from pydantic import BaseModel, Field +from typing_extensions import Literal +from typing import Optional -# Set up logging logger = logging.getLogger(__name__) -# Define the detector key for Hailo DETECTOR_KEY = "hailo8l" -# Configuration class for model settings +def get_device_architecture(): + """Get the device architecture from hailortcli.""" + try: + result = subprocess.run(['hailortcli', 'fw-control', 'identify'], capture_output=True, text=True) + for line in result.stdout.split('\n'): + if "Device Architecture" in line: + return line.split(':')[1].strip().lower() + except Exception: + return "unknown" + class ModelConfig(BaseModel): path: Optional[str] = Field(default=None, title="Model Path") - type: str = Field(default="ssd_mobilenet_v1", title="Model Type") - url: str = Field(default="", title="Model URL") - width: int = Field(default=300, title="Model Width") - height: int = Field(default=300, title="Model Height") + type: str = Field(default="yolov8s", title="Model Type") + width: int = Field(default=640, title="Model Width") + height: int = Field(default=640, title="Model Height") score_threshold: float = Field(default=0.3, title="Score Threshold") max_detections: int = Field(default=30, title="Maximum Detections") input_tensor: str = Field(default="input_tensor", title="Input Tensor Name") input_pixel_format: str = Field(default="RGB", title="Input Pixel Format") -# Configuration class for Hailo detector class HailoDetectorConfig(BaseDetectorConfig): type: Literal[DETECTOR_KEY] device: str = Field(default="PCIe", title="Device Type") model: ModelConfig -# Hailo detector class implementation +class HailoAsyncInference: + def __init__(self, config: HailoDetectorConfig): + self.config = config + self.input_queue = queue.Queue() + self.output_queue = queue.Queue() + params = VDevice.create_params() + params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN + self.target = VDevice(params) + self.hef = HEF(self.config.model.path) + self.infer_model = self.target.create_infer_model(self.config.model.path) + self.infer_model.set_batch_size(1) + + def infer(self): + while True: + batch_data = self.input_queue.get() + if batch_data is None: + break + + bindings = [] + for frame in batch_data: + binding = self.infer_model.create_bindings() + binding.input().set_buffer(frame) + bindings.append(binding) + + self.infer_model.run_async(bindings, self._callback, batch_data) + + def _callback(self, completion_info, bindings_list, batch_data): + if completion_info.exception: + logger.error(f"Inference error: {completion_info.exception}") + else: + results = [binding.output().get_buffer() for binding in bindings_list] + self.output_queue.put((batch_data, results)) + + def stop(self): + self.input_queue.put(None) + class HailoDetector(DetectionApi): type_key = DETECTOR_KEY - def __init__(self, detector_config: HailoDetectorConfig): - # Initialize base configuration - self.h8l_device_type = detector_config.device - self.h8l_model_path = detector_config.model.path - self.h8l_model_height = detector_config.model.height - self.h8l_model_width = detector_config.model.width - self.h8l_model_type = detector_config.model.type - self.h8l_tensor_format = detector_config.model.input_tensor - self.h8l_pixel_format = detector_config.model.input_pixel_format - self.model_url = detector_config.model.url - self.score_threshold = detector_config.model.score_threshold - self.max_detections = detector_config.model.max_detections - - self.cache_dir = "/config/model_cache/h8l_cache" + def __init__(self, config: HailoDetectorConfig): + super().__init__() + self.async_inference = HailoAsyncInference(config) + self.worker_thread = threading.Thread(target=self.async_inference.infer) + self.worker_thread.start() - logger.info(f"Initializing Hailo device as {self.h8l_device_type}") + # Determine device architecture + self.device_architecture = get_device_architecture() + if self.device_architecture not in ["hailo8", "hailo8l"]: + raise RuntimeError(f"Unsupported device architecture: {self.device_architecture}") + logger.info(f"Device architecture detected: {self.device_architecture}") + + # Ensure the model is available + self.cache_dir = "/config/model_cache/h8l_cache" + self.expected_model_filename = f"{config.model.type}.hef" self.check_and_prepare_model() - try: - # Validate device type - if self.h8l_device_type not in ["PCIe", "M.2"]: - raise ValueError(f"Unsupported device type: {self.h8l_device_type}") - - # Initialize the Hailo device - self.target = VDevice() - # Load the HEF (Hailo's binary format for neural networks) - self.hef = HEF(self.h8l_model_path) - # Create configuration parameters from the HEF - self.configure_params = ConfigureParams.create_from_hef( - hef=self.hef, interface=HailoStreamInterface.PCIe - ) - # Configure the device with the HEF - self.network_groups = self.target.configure(self.hef, self.configure_params) - self.network_group = self.network_groups[0] - self.network_group_params = self.network_group.create_params() - - # Create input and output virtual stream parameters - self.input_vstream_params = InputVStreamParams.make( - self.network_group, - format_type=self.hef.get_input_vstream_infos()[0].format.type, - ) - self.output_vstream_params = OutputVStreamParams.make( - self.network_group, - format_type=FormatType.FLOAT32 - ) - - # Get input and output stream information from the HEF - self.input_vstream_info = self.hef.get_input_vstream_infos() - self.output_vstream_info = self.hef.get_output_vstream_infos() - - logger.info("Hailo device initialized successfully") - logger.debug(f"[__init__] Model Path: {self.h8l_model_path}") - logger.debug(f"[__init__] Input Tensor Format: {self.h8l_tensor_format}") - logger.debug(f"[__init__] Input Pixel Format: {self.h8l_pixel_format}") - logger.debug(f"[__init__] Input VStream Info: {self.input_vstream_info[0]}") - logger.debug(f"[__init__] Output VStream Info: {self.output_vstream_info[0]}") - - except HailoRTException as e: - logger.error(f"HailoRTException during initialization: {e}") - raise - except Exception as e: - logger.error(f"Failed to initialize Hailo device: {e}") - raise - def check_and_prepare_model(self): - """Download and prepare the model if necessary""" + # Ensure cache directory exists if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) - model_filename = f"{self.h8l_model_type}.hef" - model_file_path = os.path.join(self.cache_dir, model_filename) - self.h8l_model_path = model_file_path + # Check for the expected model file + model_file_path = os.path.join(self.cache_dir, self.expected_model_filename) + self.async_inference.config.model.path = model_file_path if not os.path.isfile(model_file_path): - logger.info( - f"A model file was not found at {model_file_path}, Downloading one from {self.model_url}." - ) - urllib.request.urlretrieve(self.model_url, model_file_path) - logger.info(f"A model file was downloaded to {model_file_path}.") + if self.async_inference.config.model.path: + logger.info( + f"A model file was not found at {model_file_path}, Downloading one from the provided URL." + ) + urllib.request.urlretrieve(self.async_inference.config.model.path, model_file_path) + logger.info(f"A model file was downloaded to {model_file_path}.") + else: + raise RuntimeError("Model file path is missing and no URL is provided.") else: logger.info( f"A model file already exists at {model_file_path} not downloading one." ) def detect_raw(self, tensor_input): - logger.debug("[detect_raw] Entering function") - logger.debug( - f"[detect_raw] The `tensor_input` = {tensor_input} tensor_input shape = {tensor_input.shape}" - ) - - if tensor_input is None: - raise ValueError("[detect_raw] The 'tensor_input' argument must be provided") - - # Ensure tensor_input is a numpy array - if isinstance(tensor_input, list): - tensor_input = np.array(tensor_input) - logger.debug( - f"[detect_raw] Converted tensor_input to numpy array: shape {tensor_input.shape}" - ) - - input_data = tensor_input - logger.debug( - f"[detect_raw] Input data for inference shape: {tensor_input.shape}, dtype: {tensor_input.dtype}" - ) - + """ + Perform inference and return raw detection results. + """ + preprocessed_input = self.preprocess(tensor_input) + self.async_inference.input_queue.put([preprocessed_input]) try: - with InferVStreams( - self.network_group, - self.input_vstream_params, - self.output_vstream_params, - ) as infer_pipeline: - input_dict = {} - if isinstance(input_data, dict): - input_dict = input_data - logger.debug("[detect_raw] it a dictionary.") - elif isinstance(input_data, (list, tuple)): - for idx, layer_info in enumerate(self.input_vstream_info): - input_dict[layer_info.name] = input_data[idx] - logger.debug("[detect_raw] converted from list/tuple.") - else: - if len(input_data.shape) == 3: - input_data = np.expand_dims(input_data, axis=0) - logger.debug("[detect_raw] converted from an array.") - input_dict[self.input_vstream_info[0].name] = input_data + batch_data, raw_results = self.async_inference.output_queue.get(timeout=5) + return self.postprocess(raw_results) + except queue.Empty: + logger.warning("Inference timed out") + return np.zeros((20, 6), np.float32) - logger.debug( - f"[detect_raw] Input dictionary for inference keys: {input_dict.keys()}" - ) + def preprocess(self, frame): + input_shape = (self.async_inference.hef.get_input_vstream_infos()[0].shape) + resized_frame = np.resize(frame, input_shape) + return resized_frame / 255.0 - with self.network_group.activate(self.network_group_params): - raw_output = infer_pipeline.infer(input_dict) - logger.debug(f"[detect_raw] Raw inference output: {raw_output}") - - if self.output_vstream_info[0].name not in raw_output: - logger.error( - f"[detect_raw] Missing output stream {self.output_vstream_info[0].name} in inference results" - ) - return np.zeros((self.max_detections, 6), np.float32) - - raw_output = raw_output[self.output_vstream_info[0].name][0] - logger.debug( - f"[detect_raw] Raw output for stream {self.output_vstream_info[0].name}: {raw_output}" - ) - - # Process the raw output based on model type - detections = self.process_detections(raw_output) - if len(detections) == 0: - logger.debug( - "[detect_raw] No detections found after processing. Setting default values." - ) - return np.zeros((self.max_detections, 6), np.float32) - else: - return detections - - except HailoRTException as e: - logger.error(f"[detect_raw] HailoRTException during inference: {e}") - return np.zeros((self.max_detections, 6), np.float32) - except Exception as e: - logger.error(f"[detect_raw] Exception during inference: {e}") - return np.zeros((self.max_detections, 6), np.float32) - finally: - logger.debug("[detect_raw] Exiting function") - - def process_detections(self, raw_detections, threshold=None): - """Process detections based on model type""" - if threshold is None: - threshold = self.score_threshold - - if self.h8l_model_type == "ssd_mobilenet_v1": - return self._process_ssd_detections(raw_detections, threshold) - elif self.h8l_model_type == "yolov8s": - return self._process_yolo_detections(raw_detections, threshold, version=8) - elif self.h8l_model_type == "yolov6n": - return self._process_yolo_detections(raw_detections, threshold, version=6) + def postprocess(self, raw_output): + model_type = self.async_inference.config.model.type + if model_type == "ssd_mobilenet_v1": + return self._process_ssd(raw_output) + elif model_type in ["yolov8s", "yolov8m", "yolov6n"]: + return self._process_yolo(raw_output, version=model_type[-1]) else: - logger.error(f"Unsupported model type: {self.h8l_model_type}") - return np.zeros((self.max_detections, 6), np.float32) + logger.error(f"Unsupported model type: {model_type}") + return [] - def _process_ssd_detections(self, raw_detections, threshold): - """Process SSD MobileNet detections""" - boxes, scores, classes = [], [], [] - num_detections = 0 - - try: - for detection_set in raw_detections: - if not isinstance(detection_set, np.ndarray) or detection_set.size == 0: - continue - - for detection in detection_set: - if detection.shape[0] == 0: - continue - - ymin, xmin, ymax, xmax = detection[:4] - score = np.clip(detection[4], 0, 1) - - if score < threshold: - continue - - boxes.append([ymin, xmin, ymax, xmax]) - scores.append(score) - classes.append(int(detection[5])) - num_detections += 1 - - return self._format_output(boxes, scores, classes) - - except Exception as e: - logger.error(f"Error processing SSD detections: {e}") - return np.zeros((self.max_detections, 6), np.float32) - - def _process_yolo_detections(self, raw_detections, threshold, version): - """Process YOLO detections (v6 and v8)""" - boxes, scores, classes = [], [], [] - - try: - detections = raw_detections[0] - - for detection in detections: - if version == 8: - confidence = detection[4] - if confidence < threshold: - continue - class_scores = detection[5:] - else: # YOLOv6 - class_scores = detection[4:] - confidence = np.max(class_scores) - if confidence < threshold: - continue + def _process_ssd(self, raw_output): + detections = [] + for detection in raw_output[1]: + score = detection[4] + if score >= self.async_inference.config.model.score_threshold: + ymin, xmin, ymax, xmax = detection[:4] + detections.append({ + "bounding_box": [xmin, ymin, xmax, ymax], + "score": score, + "class": int(detection[5]) + }) + return detections + def _process_yolo(self, raw_output, version): + detections = [] + for detection in raw_output[1]: + confidence = detection[4] if version == "8" else np.max(detection[5:]) + if confidence >= self.async_inference.config.model.score_threshold: x, y, w, h = detection[:4] - - # Convert to corner format - ymin = y - h/2 - xmin = x - w/2 - ymax = y + h/2 - xmax = x + w/2 - - class_id = np.argmax(class_scores) - - boxes.append([ymin, xmin, ymax, xmax]) - scores.append(confidence) - classes.append(class_id) + ymin, xmin, ymax, xmax = y - h / 2, x - w / 2, y + h / 2, x + w / 2 + class_id = np.argmax(detection[5:]) + detections.append({ + "bounding_box": [xmin, ymin, xmax, ymax], + "score": confidence, + "class": class_id + }) + return detections - return self._format_output(boxes, scores, classes) - - except Exception as e: - logger.error(f"Error processing YOLO detections: {e}") - return np.zeros((self.max_detections, 6), np.float32) - - def _format_output(self, boxes, scores, classes): - """Format detections to standard output format""" - if not boxes: - return np.zeros((self.max_detections, 6), np.float32) - - combined = np.hstack(( - np.array(classes)[:, np.newaxis], - np.array(scores)[:, np.newaxis], - np.array(boxes) - )) - - if combined.shape[0] < self.max_detections: - padding = np.zeros((self.max_detections - combined.shape[0], 6), dtype=np.float32) - combined = np.vstack((combined, padding)) - else: - combined = combined[:self.max_detections] - - return combined \ No newline at end of file + def stop(self): + self.async_inference.stop() + self.worker_thread.join()