This commit is contained in:
Tim Wesley 2025-07-23 02:12:57 -07:00 committed by GitHub
commit 20acbe1020
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1151 additions and 36 deletions

View File

@ -13,4 +13,4 @@ core
web/dist/
web/node_modules/
web/.npm
web/.npm

View File

@ -176,6 +176,15 @@ RUN wget -q https://bootstrap.pypa.io/get-pip.py -O get-pip.py \
&& sed -i 's/args.append("setuptools")/args.append("setuptools==77.0.3")/' get-pip.py \
&& python3 get-pip.py "pip"
# Build MemryX SDK wheel
RUN pip3 wheel --wheel-dir=/wheels/memx/ \
--index-url https://download.pytorch.org/whl/cpu \
--extra-index-url https://developer.memryx.com/pip \
--extra-index-url https://pypi.org/simple \
memryx~=1.2.0
RUN rm -f /wheels/memx/PySide6* /wheels/memx/Qt* /wheels/memx/qt_material*
COPY docker/main/requirements.txt /requirements.txt
RUN pip3 install -r /requirements.txt
@ -240,11 +249,22 @@ RUN wget -q https://bootstrap.pypa.io/get-pip.py -O get-pip.py \
&& sed -i 's/args.append("setuptools")/args.append("setuptools==77.0.3")/' get-pip.py \
&& python3 get-pip.py "pip"
RUN --mount=type=bind,from=wheels,source=/wheels,target=/deps/wheels \
pip3 install -U /deps/wheels/*.whl
RUN --mount=type=bind,from=wheels,source=/wheels,target=/wheels \
bash -euo pipefail -c '\
echo "→ Installing MemryX SDK wheels…"; \
pip3 install --no-deps /wheels/memx/*.whl; \
echo "→ Installing/overwriting with project wheels…"; \
pip3 install --no-deps --upgrade /wheels/*.whl; \
'
COPY --from=deps-rootfs / /
# Install memx-accl using a custom function that installs the latest 1.2.x version and holds it.
RUN curl -fsSL https://developer.memryx.com/deb/memryx.asc | tee /etc/apt/trusted.gpg.d/memryx.asc && \
echo "deb https://developer.memryx.com/deb stable main" > /etc/apt/sources.list.d/memryx.list && \
apt-get update -qq && \
apt-get install -y --no-install-recommends memx-accl=1.2.2-1
RUN ldconfig
EXPOSE 5000

View File

@ -0,0 +1,94 @@
#!/bin/bash
set -e # Exit immediately if any command fails
set -o pipefail
#############################################
# Function to install a specific package version (latest in the 1.2 series)
# and mark it as held.
install_and_hold() {
local package="$1"
echo "Processing package: $package..."
# Retrieve versions from apt-cache policy that match the 1.2 series.
# This grep pattern handles lines that begin with optional whitespace,
# and optionally a '***' marker (with whitespace before and after),
# followed by a version number starting with "1.2".
local versions
versions=$(apt-cache policy "$package" | grep -E '^\s*(\*{3}\s*)?1\.2' | awk '{if($1=="***") print $2; else print $1}')
# Check if any 1.2 versions were found.
if [ -z "$versions" ]; then
echo "No 1.2 versions found for package $package" >&2
return 1
fi
# Sort versions in a version-aware manner and pick the latest one.
local latest
latest=$(echo "$versions" | sort -V | tail -n 1)
echo "Latest 1.2.x version of $package is: $latest"
# Install the specific version.
echo "Installing $package version $latest... "
sudo apt install -y "${package}=${latest}" || { echo "Installation of ${package} version ${latest} failed." >&2; return 1; }
# Mark the package on hold so it is not automatically upgraded.
echo "Marking $package at version $latest as held..."
sudo apt-mark hold "$package" || { echo "Failed to hold package $package." >&2; return 1; }
echo "Package $package installed and pinned at version $latest successfully."
echo "-------------------------------------------"
}
#############################################
echo "Starting MemryX driver and runtime installation..."
# Detect architecture
arch=$(uname -m)
# Purge existing packages and repo
echo "Removing old MemryX installations..."
sudo apt purge -y memx-* || true
sudo rm -f /etc/apt/sources.list.d/memryx.list /etc/apt/trusted.gpg.d/memryx.asc
# Install kernel headers
echo "Installing kernel headers for: $(uname -r)"
sudo apt update
sudo apt install -y linux-headers-$(uname -r)
# Add MemryX key and repo
echo "Adding MemryX GPG key and repository..."
wget -qO- https://developer.memryx.com/deb/memryx.asc | sudo tee /etc/apt/trusted.gpg.d/memryx.asc >/dev/null
echo 'deb https://developer.memryx.com/deb stable main' | sudo tee /etc/apt/sources.list.d/memryx.list >/dev/null
# Update and install memx-drivers using install_and_hold function.
echo "Installing memx-drivers..."
sudo apt update
install_and_hold "memx-drivers"
# ARM-specific board setup
if [[ "$arch" == "aarch64" || "$arch" == "arm64" ]]; then
echo " Running ARM board setup..."
sudo mx_arm_setup
fi
echo -e "\n\n\033[1;31mYOU MUST RESTART YOUR COMPUTER NOW\033[0m\n\n"
# Install mxa-manager and memx-accl using install_and_hold
# List of packages to process.
packages=("memx-accl" "mxa-manager")
for pkg in "${packages[@]}"; do
install_and_hold "$pkg"
done
# Update the configuration file to set the listen address to 0.0.0.0
# This is easier containers to connect to the host's manager daemon,
# since the default addr is 127.0.0.1 and some users might not
# have docker-host networking allowed
echo "Configuring mxa_manager.conf to listen on 0.0.0.0..."
sudo sed -i 's/^LISTEN_ADDRESS=.*/LISTEN_ADDRESS="0.0.0.0"/' /etc/memryx/mxa_manager.conf
# Restart mxa-manager service to apply configuration changes
echo "Restarting mxa-manager service..."
sudo service mxa-manager restart
echo "MemryX installation complete!"

View File

@ -13,6 +13,7 @@ Frigate supports multiple different detectors that work on different types of ha
- [Coral EdgeTPU](#edge-tpu-detector): The Google Coral EdgeTPU is available in USB and m.2 format allowing for a wide range of compatibility with devices.
- [Hailo](#hailo-8): The Hailo8 and Hailo8L AI Acceleration module is available in m.2 format with a HAT for RPi devices, offering a wide range of compatibility with devices.
- [MemryX](#memryx-mx3): The MX3 Acceleration module is available in M.2 format, offering broad compatibility across various platforms.
**AMD**
@ -52,7 +53,7 @@ This does not affect using hardware for accelerating other tasks such as [semant
# Officially Supported Detectors
Frigate provides the following builtin detector types: `cpu`, `edgetpu`, `hailo8l`, `onnx`, `openvino`, `rknn`, and `tensorrt`. By default, Frigate will use a single CPU detector. Other detectors may require additional configuration as described below. When using multiple detectors they will run in dedicated processes, but pull from a common queue of detection requests from across all cameras.
Frigate provides the following builtin detector types: `cpu`, `edgetpu`, `hailo8l`, `memryx` `onnx`, `openvino`, `rknn`, and `tensorrt`. By default, Frigate will use a single CPU detector. Other detectors may require additional configuration as described below. When using multiple detectors they will run in dedicated processes, but pull from a common queue of detection requests from across all cameras.
## Edge TPU Detector
@ -240,6 +241,155 @@ Hailo8 supports all models in the Hailo Model Zoo that include HailoRT post-proc
---
## MemryX MX3
This detector is available for use with the MemryX MX3 accelerator M.2 module. Frigate supports the MX3 on compatible hardware platforms, providing efficient and high-performance object detection.
See the [installation docs](../frigate/installation.md#memryx-mx3) for information on configuring the MemryX hardware.
To configure a MemryX detector, simply set the `type` attribute to `memryx` and follow the configuration guide below.
### Configuration
To configure the MemryX detector, use the following example configuration:
#### Single PCIe MemryX MX3
```yaml
detectors:
memx0:
type: memryx
device: PCIe:0
```
#### Multiple PCIe MemryX MX3 Modules
```yaml
detectors:
memx0:
type: memryx
device: PCIe:0
memx1:
type: memryx
device: PCIe:1
memx2:
type: memryx
device: PCIe:2
```
### Supported Models
MemryX `.dfp` models are automatically downloaded at runtime, if enabled, to the container at `/memryx_models/model_folder/`.
#### YOLO-NAS
The [YOLO-NAS](https://github.com/Deci-AI/super-gradients/blob/master/YOLONAS.md) model included in this detector is downloaded from the [Models Section](#downloading-yolo-nas-model) and compiled to DFP with [mx_nc](https://developer.memryx.com/tools/neural_compiler.html#usage).
The input size for **YOLO-NAS** can be set to either **320x320** (default) or **640x640**.
- The default size of **320x320** is optimized for lower CPU usage and faster inference times.
##### Configuration
Below is the recommended configuration for using the **YOLO-NAS** (small) model with the MemryX detector:
```yaml
detectors:
memx0:
type: memryx
device: PCIe:0
model:
model_type: yolonas
width: 320 # (Can be set to 640 for higher resolution)
height: 320 # (Can be set to 640 for higher resolution)
input_tensor: hwnc
input_dtype: float
# path: yolo_nas_s.dfp ##Model is normally fetched through the runtime, so 'path' can be omitted.##
labelmap_path: /labelmap/coco-80.txt
```
#### YOLOX
The model is sourced from the [OpenCV Model Zoo](https://github.com/opencv/opencv_zoo) and precompiled to DFP.
##### Configuration
Below is the recommended configuration for using the **YOLOX** (small) model with the MemryX detector:
```yaml
detectors:
memx0:
type: memryx
device: PCIe:0
model:
model_type: yolox
width: 640
height: 640
input_tensor: hwnc
input_dtype: float_denorm
# path: YOLOX_640_640_3_onnx.dfp ##Model is normally fetched through the runtime, so 'path' can be omitted.##
labelmap_path: /labelmap/coco-80.txt
```
#### YOLOv9
The YOLOv9s model included in this detector is downloaded from [the original GitHub](https://github.com/WongKinYiu/yolov9) like in the [Models Section](#yolov9-1) and compiled to DFP with [mx_nc](https://developer.memryx.com/tools/neural_compiler.html#usage).
##### Configuration
Below is the recommended configuration for using the **YOLOv9** (small) model with the MemryX detector:
```yaml
detectors:
memx0:
type: memryx
device: PCIe:0
model:
model_type: yolo-generic
width: 320 # (Can be set to 640 for higher resolution)
height: 320 # (Can be set to 640 for higher resolution)
input_tensor: hwnc
input_dtype: float
# path: YOLO_v9_small_onnx.dfp ##Model is normally fetched through the runtime, so 'path' can be omitted.##
labelmap_path: /labelmap/coco-80.txt
```
#### SSDLite MobileNet v2
The model is sourced from the [OpenMMLab Model Zoo](https://mmdeploy-oss.openmmlab.com/model/mmdet-det/ssdlite-e8679f.onnx) and has been converted to DFP.
##### Configuration
Below is the recommended configuration for using the **SSDLite MobileNet v2** model with the MemryX detector:
```yaml
detectors:
memx0:
type: memryx
device: PCIe:0
model:
model_type: ssd
width: 320
height: 320
input_tensor: hwnc
input_dtype: float
# path: SSDlite_MobileNet_v2_320_320_3_onnx.dfp ##Model is normally fetched during runtime, so 'path' can be omitted.##
labelmap_path: /labelmap/coco-80.txt
```
#### Using a Custom Model
To use your own model, bind-mount the compiled `.dfp` file into the container and specify its path using `model.path`. You will also have to update the `labelmap` accordingly.
For detailed instructions on compiling models, refer to the [MemryX Compiler](https://developer.memryx.com/tools/neural_compiler.html#usage) docs and [Tutorials](https://developer.memryx.com/tutorials/tutorials.html).
---
## OpenVINO Detector
The OpenVINO detector type runs an OpenVINO IR model on AMD and Intel CPUs, Intel GPUs and Intel VPU hardware. To configure an OpenVINO detector, set the `"type"` attribute to `"openvino"`.

View File

@ -58,6 +58,10 @@ Frigate supports multiple different detectors that work on different types of ha
- [Google Coral EdgeTPU](#google-coral-tpu): The Google Coral EdgeTPU is available in USB and m.2 format allowing for a wide range of compatibility with devices.
- [Supports primarily ssdlite and mobilenet model architectures](../../configuration/object_detectors#edge-tpu-detector)
- [MemryX](#memryx-mx3): The MX3 M.2 accelerator module is available in m.2 format allowing for a wide range of compatibility with devices.
- [Supports many model architectures](../../configuration/object_detectors#memryx-mx3)
- Runs best with tiny, small, or medium-size models
**AMD**
- [ROCm](#amd-gpus): ROCm can run on AMD Discrete GPUs to provide efficient object detection
@ -185,6 +189,36 @@ Frigate supports all Jetson boards, from the inexpensive Jetson Nano to the powe
Inference speed will vary depending on the YOLO model, jetson platform and jetson nvpmodel (GPU/DLA/EMC clock speed). It is typically 20-40 ms for most models. The DLA is more efficient than the GPU, but not faster, so using the DLA will reduce power consumption but will slightly increase inference time.
### MemryX MX3
Frigate supports the MemryX MX3 M.2 AI Acceleration Module on compatible hardware platforms, including both x86 (Intel/AMD) and ARM-based SBCs such as RPi 5.
A single MemryX MX3 module is capable of handling multiple camera streams using the default models, making it sufficient for most users. For larger deployments with more cameras or bigger models, multiple MX3 modules can be used. Frigate supports multi-detector configurations, allowing you to connect multiple MX3 modules to scale inference capacity seamlessly.
Detailed information is available [in the detector docs](/configuration/object_detectors#memryx-mx3).
Frigate supports the following models resolutions with the MemryX MX3 module:
- **Yolo-NAS**: 320 (default), 640
- **YOLOv9**: 320 (default), 640
- **YOLOX**: 640
- **SSDlite MobileNet v2**: 320
Due to the MX3's architecture, the maximum frames per second supported cannot be calculated as `1/inference time` and is measured separately. When deciding how many camera streams you may support with your configuration, use the **MX3 Total FPS** column to approximate of the detector's limit, not the Inference Time.
| Model | Input Size | MX3 Inference Time | MX3 Total FPS |
|----------------------|------------|--------------------|---------------|
| YOLO-NAS-Small | 320 | ~ 9 ms | ~ 378 |
| YOLO-NAS-Small | 640 | ~ 21 ms | ~ 138 |
| YOLOv9s | 320 | ~ 16 ms | ~ 382 |
| YOLOv9s | 640 | ~ 41 ms | ~ 110 |
| YOLOX-Small | 640 | ~ 16 ms | ~ 263 |
| SSDlite MobileNet v2 | 320 | ~ 5 ms | ~ 1056 |
Inference speeds may vary depending on the host platforms CPU performance. The above data was measured on an **Intel 13700 CPU**. Platforms like Raspberry Pi, x86 hosts, Orange Pi, and other ARM-based SBCs have different levels of processing capability, which will increase post-processing time and may result in lower FPS.
### Rockchip platform
Frigate supports hardware video processing on all Rockchip boards. However, hardware object detection is only supported on these boards:

View File

@ -132,6 +132,78 @@ If you are using `docker run`, add this option to your command `--device /dev/ha
Finally, configure [hardware object detection](/configuration/object_detectors#hailo-8l) to complete the setup.
### MemryX MX3
The MemryX MX3 Accelerator is available in the M.2 2280 form factor (like an NVMe SSD), and supports a variety of configurations:
- x86 (Intel/AMD) PCs
- Raspberry Pi 5
- Orange Pi 5 Plus/Max
- Multi-M.2 PCIe carrier cards
#### Configuration
#### Installation
To get started with MX3 hardware setup for your system, refer to the [Hardware Setup Guide](https://developer.memryx.com/get_started/hardware_setup.html).
Then follow these steps for installing the correct driver/runtime configuration:
1. Copy or download [this script](https://github.com/blakeblackshear/frigate/blob/dev/docker/memryx/user_installation.sh).
2. Ensure it has execution permissions with `sudo chmod +x user_installation.sh`
3. Run the script with `./user_installation.sh`
4. **Restart your computer** to complete driver installation.
#### Setup
To set up Frigate, follow the default installation instructions, for example: `ghcr.io/blakeblackshear/frigate:stable`
Next, grant Docker permissions to access your hardware by adding the following lines to your `docker-compose.yml` file:
```yaml
devices:
- /dev/memx0
```
During configuration, you must run Docker in privileged mode and ensure the container can access the host network gateway.
In your `docker-compose.yml`, also add:
```yaml
privileged: true
extra_hosts:
- "gateway.docker.internal:host-gateway"
```
If you can't use Docker Compose, you can run the container with something similar to this:
```bash
docker run -d \
--name frigate-memx \
--restart=unless-stopped \
--mount type=tmpfs,target=/tmp/cache,tmpfs-size=1000000000 \
--shm-size=256m \
-v /path/to/your/storage:/media/frigate \
-v /path/to/your/config:/config \
-v /etc/localtime:/etc/localtime:ro \
-e FRIGATE_RTSP_PASSWORD='password' \
--add-host gateway.docker.internal:host-gateway \
--privileged=true \
-p 8971:8971 \
-p 8554:8554 \
-p 5000:5000 \
-p 8555:8555/tcp \
-p 8555:8555/udp \
--device /dev/memx0 \
ghcr.io/blakeblackshear/frigate:stable
```
#### Configuration
Finally, configure [hardware object detection](/configuration/object_detectors#memryx-mx3) to complete the setup.
### Rockchip platform
Make sure that you use a linux distribution that comes with the rockchip BSP kernel 5.10 or 6.1 and necessary drivers (especially rkvdec2 and rknpu). To check, enter the following commands:

View File

@ -212,4 +212,4 @@ class BaseDetectorConfig(BaseModel):
)
model_config = ConfigDict(
extra="allow", arbitrary_types_allowed=True, protected_namespaces=()
)
)

View File

@ -0,0 +1,633 @@
import logging
import numpy as np
import cv2
import os
import urllib.request
import zipfile
from queue import Queue
import time
try:
# from memryx import AsyncAccl # Import MemryX SDK
from memryx import AsyncAccl
except ModuleNotFoundError:
raise ImportError(
"MemryX SDK is not installed. Install it and set up MIX environment."
)
from pydantic import BaseModel, Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
from frigate.util.model import post_process_yolo
logger = logging.getLogger(__name__)
DETECTOR_KEY = "memryx"
# Check if custom environment variable is set for mxa_manager IP otherwise
# fall back to 'gateway.docker.internal'
mxserver_addr = os.getenv('MXA_MANAGER_ADDRESS', 'gateway.docker.internal')
# Configuration class for model settings
class ModelConfig(BaseModel):
path: str = Field(default=None, title="Model Path") # Path to the DFP file
labelmap_path: str = Field(default=None, title="Path to Label Map")
class MemryXDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default="PCIe", title="Device Path")
class MemryXDetector(DetectionApi):
type_key = DETECTOR_KEY # Set the type key
supported_models = [
ModelTypeEnum.ssd,
ModelTypeEnum.yolonas,
ModelTypeEnum.yologeneric, # Treated as yolov9 in MemryX implementation
ModelTypeEnum.yolox,
]
def __init__(self, detector_config):
"""Initialize MemryX detector with the provided configuration."""
self.capture_queue = Queue(maxsize=10)
self.output_queue = Queue(maxsize=10)
self.capture_id_queue = Queue(maxsize=10)
self.logger = logger
self.memx_model_path = detector_config.model.path # Path to .dfp file
self.memx_post_model = None # Path to .post file
self.expected_post_model = None
self.memx_device_path = detector_config.device # Device path
# Parse the device string to split PCIe:<index>
device_str = self.memx_device_path
device_id = int(device_str.split(":")[1])
self.memx_model_height = detector_config.model.height
self.memx_model_width = detector_config.model.width
self.memx_model_type = detector_config.model.model_type
self.cache_dir = "/memryx_models"
if self.memx_model_type == ModelTypeEnum.yologeneric:
model_mapping = {
(640, 640): ("https://developer.memryx.com/example_files/1p2_frigate/yolov9_640.zip", "yolov9_640"),
(320, 320): ("https://developer.memryx.com/example_files/1p2_frigate/yolov9_320.zip", "yolov9_320")
}
self.model_url, self.model_folder = model_mapping.get(
(self.memx_model_height, self.memx_model_width),
("https://developer.memryx.com/example_files/1p2_frigate/yolov9_320.zip", "yolov9_320")
)
self.expected_dfp_model = (
"YOLO_v9_small_onnx.dfp"
)
elif self.memx_model_type == ModelTypeEnum.yolonas:
model_mapping = {
(640, 640): ("https://developer.memryx.com/example_files/1p2_frigate/yolonas_640.zip", "yolonas_640"),
(320, 320): ("https://developer.memryx.com/example_files/1p2_frigate/yolonas_320.zip", "yolonas_320")
}
self.model_url, self.model_folder = model_mapping.get(
(self.memx_model_height, self.memx_model_width),
("https://developer.memryx.com/example_files/1p2_frigate/yolonas_320.zip", "yolonas_320")
)
self.expected_dfp_model = (
"yolo_nas_s.dfp"
)
self.expected_post_model = (
"yolo_nas_s_post.onnx"
)
elif self.memx_model_type == ModelTypeEnum.yolox:
self.model_folder = "yolox"
self.model_url = (
"https://developer.memryx.com/example_files/1p2_frigate/yolox.zip"
)
self.expected_dfp_model = (
"YOLOX_640_640_3_onnx.dfp"
)
self.set_strides_grids()
elif self.memx_model_type == ModelTypeEnum.ssd:
self.model_folder = "ssd"
self.model_url = (
"https://developer.memryx.com/example_files/1p2_frigate/ssd.zip"
)
self.expected_dfp_model = (
"SSDlite_MobileNet_v2_320_320_3_onnx.dfp"
)
self.expected_post_model = (
"SSDlite_MobileNet_v2_320_320_3_onnx_post.onnx"
)
self.check_and_prepare_model()
logger.info(
f"Initializing MemryX with model: {self.memx_model_path} on device {self.memx_device_path}"
)
try:
# Load MemryX Model
logger.info(f"dfp path: {self.memx_model_path}")
# Initialization code
# Load MemryX Model with a unique device target
self.accl = AsyncAccl(
self.memx_model_path,
mxserver_addr = mxserver_addr,
group_id=device_id, # AsyncAccl device id
)
# Models that use cropped post-processing sections (YOLO-NAS and SSD)
# --> These will be moved to pure numpy in the future to improve performance on low-end CPUs
if self.memx_post_model:
self.accl.set_postprocessing_model(self.memx_post_model, model_idx=0)
self.accl.connect_input(self.process_input)
self.accl.connect_output(self.process_output)
logger.info(
f"Loaded MemryX model from {self.memx_model_path} and {self.memx_post_model}"
)
except Exception as e:
logger.error(f"Failed to initialize MemryX model: {e}")
raise
def load_yolo_constants(self):
base = f"{self.cache_dir}/{self.model_folder}"
# constants for yolov9 post-processing
self.const_A = np.load(
f"{base}/_model_22_Constant_9_output_0.npy"
)
self.const_B = np.load(
f"{base}/_model_22_Constant_10_output_0.npy"
)
self.const_C = np.load(
f"{base}/_model_22_Constant_12_output_0.npy"
)
def check_and_prepare_model(self):
"""Check if models exist; if not, download and extract them."""
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
dfp_exists = os.path.exists(self.memx_model_path) if self.memx_model_path else False
post_exists = os.path.exists(self.expected_post_model) if self.expected_post_model else True # ok if no post model
if dfp_exists and post_exists:
logger.info("Using cached models.")
return
logger.info(f"Model files not found. Downloading from {self.model_url}...")
zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip")
try:
# Before downloading, check if already downloaded
if not os.path.exists(zip_path):
# Download only if zip does not exist
urllib.request.urlretrieve(self.model_url, zip_path)
logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...")
# Before extracting, check if model folder exists already
model_subdir = os.path.join(self.cache_dir, self.model_folder)
if not os.path.exists(model_subdir):
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(self.cache_dir)
logger.info(f"Model extracted to {self.cache_dir}.")
# Assign extracted files to correct paths
for file in os.listdir(model_subdir):
file_path = os.path.join(model_subdir, file)
if file == self.expected_dfp_model:
self.memx_model_path = file_path
elif file == self.expected_post_model:
self.memx_post_model = file_path
logger.info(f"Assigned Model Path: {self.memx_model_path}")
logger.info(f"Assigned Post-processing Model Path: {self.memx_post_model}")
if self.memx_model_type == ModelTypeEnum.yologeneric:
self.load_yolo_constants()
except Exception as e:
logger.error(f"Failed to prepare model: {e}")
raise
finally:
# Remove zip only if we still have it and models exist
if os.path.exists(zip_path):
os.remove(zip_path)
logger.info("Cleaned up ZIP file after extraction.")
def send_input(self, connection_id, tensor_input: np.ndarray):
"""Pre-process (if needed) and send frame to MemryX input queue"""
if tensor_input is None:
raise ValueError("[send_input] No image data provided for inference")
if self.memx_model_type == ModelTypeEnum.yolox:
tensor_input = tensor_input.squeeze(2)
padded_img = np.ones((640, 640, 3), dtype=np.uint8) * 114
scale = min(
640 / float(tensor_input.shape[0]), 640 / float(tensor_input.shape[1])
)
sx, sy = (
int(tensor_input.shape[1] * scale),
int(tensor_input.shape[0] * scale),
)
resized_img = cv2.resize(
tensor_input, (sx, sy), interpolation=cv2.INTER_LINEAR
)
padded_img[:sy, :sx] = resized_img.astype(np.uint8)
# Step 4: Slice the padded image into 4 quadrants and concatenate them into 12 channels
x0 = padded_img[0::2, 0::2, :] # Top-left
x1 = padded_img[1::2, 0::2, :] # Bottom-left
x2 = padded_img[0::2, 1::2, :] # Top-right
x3 = padded_img[1::2, 1::2, :] # Bottom-right
# Step 5: Concatenate along the channel dimension (axis 2)
concatenated_img = np.concatenate([x0, x1, x2, x3], axis=2)
tensor_input = concatenated_img.astype(np.float32)
# Send frame to MemryX for processing
self.capture_queue.put(tensor_input)
self.capture_id_queue.put(connection_id)
def process_input(self):
"""Input callback function: wait for frames in the input queue, preprocess, and send to MX3 (return)"""
while True:
try:
# Wait for a frame from the queue (blocking call)
frame = self.capture_queue.get(
block=True
) # Blocks until data is available
return frame
except Exception as e:
logger.info(f"[process_input] Error processing input: {e}")
time.sleep(0.1) # Prevent busy waiting in case of error
def receive_output(self):
"""Retrieve processed results from MemryX output queue + a copy of the original frame"""
connection_id = (
self.capture_id_queue.get()
) # Get the corresponding connection ID
detections = self.output_queue.get() # Get detections from MemryX
return connection_id, detections
def post_process_yolonas(self, output):
predictions = output[0]
detections = np.zeros((20, 6), np.float32)
for i, prediction in enumerate(predictions):
if i == 20:
break
(_, x_min, y_min, x_max, y_max, confidence, class_id) = prediction
if class_id < 0:
break
detections[i] = [
class_id,
confidence,
y_min / self.memx_model_height,
x_min / self.memx_model_width,
y_max / self.memx_model_height,
x_max / self.memx_model_width,
]
# Return the list of final detections
self.output_queue.put(detections)
def process_yolo(self, class_id, conf, pos):
"""
Takes in class ID, confidence score, and array of [x, y, w, h] that describes detection position,
returns an array that's easily passable back to Frigate.
"""
return [
class_id, # class ID
conf, # confidence score
(pos[1] - (pos[3] / 2)) / self.memx_model_height, # y_min
(pos[0] - (pos[2] / 2)) / self.memx_model_width, # x_min
(pos[1] + (pos[3] / 2)) / self.memx_model_height, # y_max
(pos[0] + (pos[2] / 2)) / self.memx_model_width, # x_max
]
def set_strides_grids(self):
grids = []
expanded_strides = []
strides = [8, 16, 32]
hsize_list = [self.memx_model_height // stride for stride in strides]
wsize_list = [self.memx_model_width // stride for stride in strides]
for hsize, wsize, stride in zip(hsize_list, wsize_list, strides):
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
grids.append(grid)
shape = grid.shape[:2]
expanded_strides.append(np.full((*shape, 1), stride))
self.grids = np.concatenate(grids, 1)
self.expanded_strides = np.concatenate(expanded_strides, 1)
def sigmoid(self, x: np.ndarray) -> np.ndarray:
return 1 / (1 + np.exp(-x))
def onnx_concat(self, inputs: list, axis: int) -> np.ndarray:
# Ensure all inputs are numpy arrays
if not all(isinstance(x, np.ndarray) for x in inputs):
raise TypeError("All inputs must be numpy arrays.")
# Ensure shapes match on non-concat axes
ref_shape = list(inputs[0].shape)
for i, tensor in enumerate(inputs[1:], start=1):
for ax in range(len(ref_shape)):
if ax == axis:
continue
if tensor.shape[ax] != ref_shape[ax]:
raise ValueError(
f"Shape mismatch at axis {ax} between input[0] and input[{i}]"
)
return np.concatenate(inputs, axis=axis)
def onnx_reshape(self, data: np.ndarray, shape: np.ndarray) -> np.ndarray:
# Ensure shape is a 1D array of integers
target_shape = shape.astype(int).tolist()
# Use NumPy reshape with dynamic handling of -1
reshaped = np.reshape(data, target_shape)
return reshaped
def post_process_yolox(self, output):
output = [
np.expand_dims(tensor, axis=0) for tensor in output
] # Shape: (1, H, W, C)
# Move channel axis from 3rd (last) position to 1st position → (1, C, H, W)
output = [np.transpose(tensor, (0, 3, 1, 2)) for tensor in output]
output_785 = output[0] # 785
output_794 = output[1] # 794
output_795 = output[2] # 795
output_811 = output[3] # 811
output_820 = output[4] # 820
output_821 = output[5] # 821
output_837 = output[6] # 837
output_846 = output[7] # 846
output_847 = output[8] # 847
output_795 = self.sigmoid(output_795)
output_785 = self.sigmoid(output_785)
output_821 = self.sigmoid(output_821)
output_811 = self.sigmoid(output_811)
output_847 = self.sigmoid(output_847)
output_837 = self.sigmoid(output_837)
concat_1 = self.onnx_concat([output_794, output_795, output_785], axis=1)
concat_2 = self.onnx_concat([output_820, output_821, output_811], axis=1)
concat_3 = self.onnx_concat([output_846, output_847, output_837], axis=1)
shape = np.array([1, 85, -1], dtype=np.int64)
reshape_1 = self.onnx_reshape(concat_1, shape)
reshape_2 = self.onnx_reshape(concat_2, shape)
reshape_3 = self.onnx_reshape(concat_3, shape)
concat_out = self.onnx_concat([reshape_1, reshape_2, reshape_3], axis=2)
output = concat_out.transpose(0, 2, 1) # 1, 840, 85
self.num_classes = output.shape[2] - 5
# [x, y, h, w, box_score, class_no_1, ..., class_no_80],
results = output
results[..., :2] = (results[..., :2] + self.grids) * self.expanded_strides
results[..., 2:4] = np.exp(results[..., 2:4]) * self.expanded_strides
image_pred = results[0, ...]
class_conf = np.max(
image_pred[:, 5 : 5 + self.num_classes], axis=1, keepdims=True
)
class_pred = np.argmax(image_pred[:, 5 : 5 + self.num_classes], axis=1)
class_pred = np.expand_dims(class_pred, axis=1)
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= 0.3).squeeze()
# Detections ordered as (x1, y1, x2, y2, obj_conf, class_conf, class_pred)
detections = np.concatenate((image_pred[:, :5], class_conf, class_pred), axis=1)
detections = detections[conf_mask]
# Sort by class confidence (index 5) and keep top 20 detections
ordered = detections[detections[:, 5].argsort()[::-1]][:20]
# Prepare a final detections array of shape (20, 6)
final_detections = np.zeros((20, 6), np.float32)
for i, object_detected in enumerate(ordered):
final_detections[i] = self.process_yolo(
object_detected[6], object_detected[5], object_detected[:4]
)
self.output_queue.put(final_detections)
def post_process_ssdlite(self, outputs):
dets = outputs[0].squeeze(0) # Shape: (1, num_dets, 5)
labels = outputs[1].squeeze(0)
detections = []
for i in range(dets.shape[0]):
x_min, y_min, x_max, y_max, confidence = dets[i]
class_id = int(labels[i]) # Convert label to integer
if confidence < 0.45:
continue # Skip detections below threshold
# Convert coordinates to integers
x_min, y_min, x_max, y_max = map(int, [x_min, y_min, x_max, y_max])
# Append valid detections [class_id, confidence, x, y, width, height]
detections.append([class_id, confidence, x_min, y_min, x_max, y_max])
final_detections = np.zeros((20, 6), np.float32)
if len(detections) == 0:
# logger.info("No detections found.")
self.output_queue.put(final_detections)
return
# Convert to NumPy array
detections = np.array(detections, dtype=np.float32)
# Apply Non-Maximum Suppression (NMS)
bboxes = detections[:, 2:6].tolist() # (x_min, y_min, width, height)
scores = detections[:, 1].tolist() # Confidence scores
indices = cv2.dnn.NMSBoxes(bboxes, scores, 0.45, 0.5)
if len(indices) > 0:
indices = indices.flatten()[:20] # Keep only the top 20 detections
selected_detections = detections[indices]
# Normalize coordinates AFTER NMS
for i, det in enumerate(selected_detections):
class_id, confidence, x_min, y_min, x_max, y_max = det
# Normalize coordinates
x_min /= self.memx_model_width
y_min /= self.memx_model_height
x_max /= self.memx_model_width
y_max /= self.memx_model_height
final_detections[i] = [class_id, confidence, y_min, x_min, y_max, x_max]
self.output_queue.put(final_detections)
def onnx_reshape_with_allowzero(
self, data: np.ndarray, shape: np.ndarray, allowzero: int = 0
) -> np.ndarray:
shape = shape.astype(int)
input_shape = data.shape
output_shape = []
for i, dim in enumerate(shape):
if dim == 0 and allowzero == 0:
output_shape.append(input_shape[i]) # Copy dimension from input
else:
output_shape.append(dim)
# Now let NumPy infer any -1 if needed
reshaped = np.reshape(data, output_shape)
return reshaped
def process_output(self, *outputs):
"""Output callback function -- receives frames from the MX3 and triggers post-processing"""
if self.memx_model_type == ModelTypeEnum.yologeneric:
outputs = [
np.expand_dims(tensor, axis=0) for tensor in outputs
] # Shape: (1, H, W, C)
# Move channel axis from 3rd (last) position to 1st position → (1, C, H, W)
outputs = [np.transpose(tensor, (0, 3, 1, 2)) for tensor in outputs]
conv_out1 = outputs[0]
conv_out2 = outputs[1]
conv_out3 = outputs[2]
conv_out4 = outputs[3]
conv_out5 = outputs[4]
conv_out6 = outputs[5]
concat_1 = self.onnx_concat([conv_out1, conv_out2], axis=1)
concat_2 = self.onnx_concat([conv_out3, conv_out4], axis=1)
concat_3 = self.onnx_concat([conv_out5, conv_out6], axis=1)
shape = np.array([1, 144, -1], dtype=np.int64)
reshaped_1 = self.onnx_reshape_with_allowzero(concat_1, shape, allowzero=0)
reshaped_2 = self.onnx_reshape_with_allowzero(concat_2, shape, allowzero=0)
reshaped_3 = self.onnx_reshape_with_allowzero(concat_3, shape, allowzero=0)
concat_4 = self.onnx_concat([reshaped_1, reshaped_2, reshaped_3], 2)
axis = 1
split_sizes = [64, 80]
# Calculate indices at which to split
indices = np.cumsum(split_sizes)[
:-1
] # [64] — split before the second chunk
# Perform split along axis 1
split_0, split_1 = np.split(concat_4, indices, axis=axis)
num_boxes = 2100 if self.memx_model_height == 320 else 8400
shape1 = np.array([1, 4, 16, num_boxes])
reshape_4 = self.onnx_reshape_with_allowzero(split_0, shape1, allowzero=0)
transpose_1 = reshape_4.transpose(0, 2, 1, 3)
axis = 1 # As per ONNX softmax node
# Subtract max for numerical stability
x_max = np.max(transpose_1, axis=axis, keepdims=True)
x_exp = np.exp(transpose_1 - x_max)
x_sum = np.sum(x_exp, axis=axis, keepdims=True)
softmax_output = x_exp / x_sum
# Weight W from the ONNX initializer (1, 16, 1, 1) with values 0 to 15
W = np.arange(16, dtype=np.float32).reshape(1, 16, 1, 1) # (1, 16, 1, 1)
# Apply 1x1 convolution: this is a weighted sum over channels
conv_output = np.sum(
softmax_output * W, axis=1, keepdims=True
) # shape: (1, 1, 4, 8400)
shape2 = np.array([1, 4, num_boxes])
reshape_5 = self.onnx_reshape_with_allowzero(
conv_output, shape2, allowzero=0
)
# ONNX Slice — get first 2 channels: [0:2] along axis 1
slice_output1 = reshape_5[:, 0:2, :] # Result: (1, 2, 8400)
# Slice channels 2 to 4 → axis = 1
slice_output2 = reshape_5[:, 2:4, :]
# Perform Subtraction
sub_output = self.const_A - slice_output1 # Equivalent to ONNX Sub
# Perform the ONNX-style Add
add_output = self.const_B + slice_output2
sub1 = add_output - sub_output
add1 = sub_output + add_output
div_output = add1 / 2.0
concat_5 = self.onnx_concat([div_output, sub1], axis=1)
# Expand B to (1, 1, 8400) so it can broadcast across axis=1 (4 channels)
const_C_expanded = self.const_C[:, np.newaxis, :] # Shape: (1, 1, 8400)
# Perform ONNX-style element-wise multiplication
mul_output = concat_5 * const_C_expanded # Result: (1, 4, 8400)
sigmoid_output = self.sigmoid(split_1)
outputs = self.onnx_concat([mul_output, sigmoid_output], axis=1)
final_detections = post_process_yolo(
outputs, self.memx_model_width, self.memx_model_height
)
self.output_queue.put(final_detections)
elif self.memx_model_type == ModelTypeEnum.yolonas:
return self.post_process_yolonas(outputs)
elif self.memx_model_type == ModelTypeEnum.yolox:
return self.post_process_yolox(outputs)
elif self.memx_model_type == ModelTypeEnum.ssd:
return self.post_process_ssdlite(outputs)
else:
raise Exception(
f"{self.memx_model_type} is currently not supported for memryx. See the docs for more info on supported models."
)
def detect_raw(self, tensor_input: np.ndarray):
"""Removed synchronous detect_raw() function so that we only use async"""
return 0

View File

@ -1,4 +1,5 @@
import datetime
import time
import logging
import multiprocessing as mp
import os
@ -34,7 +35,7 @@ class ObjectDetector(ABC):
pass
class LocalObjectDetector(ObjectDetector):
class BaseLocalDetector(ObjectDetector):
def __init__(
self,
detector_config: BaseDetectorConfig = None,
@ -56,6 +57,18 @@ class LocalObjectDetector(ObjectDetector):
self.detect_api = create_detector(detector_config)
def _transform_input(self, tensor_input: np.ndarray) -> np.ndarray:
if self.input_transform:
tensor_input = np.transpose(tensor_input, self.input_transform)
if self.dtype == InputDTypeEnum.float:
tensor_input = tensor_input.astype(np.float32)
tensor_input /= 255
elif self.dtype == InputDTypeEnum.float_denorm:
tensor_input = tensor_input.astype(np.float32)
return tensor_input
def detect(self, tensor_input: np.ndarray, threshold=0.4):
detections = []
@ -73,27 +86,23 @@ class LocalObjectDetector(ObjectDetector):
self.fps.update()
return detections
class LocalObjectDetector(BaseLocalDetector):
def detect_raw(self, tensor_input: np.ndarray):
if self.input_transform:
tensor_input = np.transpose(tensor_input, self.input_transform)
if self.dtype == InputDTypeEnum.float:
tensor_input = tensor_input.astype(np.float32)
tensor_input /= 255
elif self.dtype == InputDTypeEnum.float_denorm:
tensor_input = tensor_input.astype(np.float32)
tensor_input = self._transform_input(tensor_input)
return self.detect_api.detect_raw(tensor_input=tensor_input)
def run_detector(
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
):
class AsyncLocalObjectDetector(BaseLocalDetector):
def async_send_input(self, tensor_input: np.ndarray, connection_id):
tensor_input = self._transform_input(tensor_input)
return self.detect_api.send_input(connection_id, tensor_input)
def async_receive_output(self):
return self.detect_api.receive_output()
def prepare_detector(name, out_events):
threading.current_thread().name = f"detector:{name}"
logger = logging.getLogger(f"detector.{name}")
logger.info(f"Starting detection process: {os.getpid()}")
@ -109,7 +118,6 @@ def run_detector(
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
outputs = {}
for name in out_events.keys():
@ -117,6 +125,24 @@ def run_detector(
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np}
return stop_event, frame_manager, outputs, logger
def run_detector(
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
):
stop_event, frame_manager, outputs, logger = prepare_detector(
name, out_events
)
object_detector = LocalObjectDetector(detector_config=detector_config)
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=1)
@ -145,6 +171,77 @@ def run_detector(
logger.info("Exited detection process...")
def async_run_detector(
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
):
stop_event, frame_manager, outputs, logger = prepare_detector(
name, out_events
)
object_detector = AsyncLocalObjectDetector(detector_config=detector_config)
def detect_worker():
# Continuously fetch frames and send them to the async detector
logger.info("Starting Detect Worker Thread")
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=1)
except queue.Empty:
continue
# Retrieve the input frame from shared memory
input_frame = frame_manager.get(
connection_id,
(1, detector_config.model.height, detector_config.model.width, 3),
)
if input_frame is None:
logger.warning(f"Failed to get frame {connection_id} from SHM")
continue
# send input to Accelator
start.value = datetime.datetime.now().timestamp()
object_detector.async_send_input(input_frame, connection_id)
def result_worker():
# Continuously receive detection results from the async detector
logger.info("Starting Result Worker Thread")
while not stop_event.is_set():
connection_id, detections = object_detector.async_receive_output()
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
# Update moving average inference time
avg_speed.value = (avg_speed.value * 9 + duration) / 10
if connection_id in outputs and detections is not None:
outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set()
# Initialize tracking variables
start.value = 0.0
avg_speed.value = 0.0
# Start threads for detection input and result output
detect_thread = threading.Thread(target=detect_worker, daemon=True)
result_thread = threading.Thread(target=result_worker, daemon=True)
detect_thread.start()
result_thread.start()
# Keep the main process alive while threads run
while not stop_event.is_set():
time.sleep(5)
logger.info("Exited async detection process...")
class ObjectDetectProcess:
def __init__(
self,
@ -179,18 +276,33 @@ class ObjectDetectProcess:
self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
self.detect_process = util.Process(
target=run_detector,
name=f"detector:{self.name}",
args=(
self.name,
self.detection_queue,
self.out_events,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
)
if self.detector_config.type == "memryx":
# MemryX requires asynchronous detection handling using async_run_detector
self.detect_process = util.Process(
target=async_run_detector,
name=f"detector:{self.name}",
args=(
self.name,
self.detection_queue,
self.out_events,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
)
else:
self.detect_process = util.Process(
target=run_detector,
name=f"detector:{self.name}",
args=(
self.name,
self.detection_queue,
self.out_events,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
)
self.detect_process.daemon = True
self.detect_process.start()