mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-01-16 00:09:14 +01:00
ab50d0b006
* Add isort and ruff linter Both linters are pretty common among modern python code bases. The isort tool provides stable sorting and grouping, as well as pruning of unused imports. Ruff is a modern linter, that is very fast due to being written in rust. It can detect many common issues in a python codebase. Removes the pylint dev requirement, since ruff replaces it. * treewide: fix issues detected by ruff * treewide: fix bare except clauses * .devcontainer: Set up isort * treewide: optimize imports * treewide: apply black * treewide: make regex patterns raw strings This is necessary for escape sequences to be properly recognized.
81 lines
2.6 KiB
Python
81 lines
2.6 KiB
Python
import logging
|
|
|
|
import numpy as np
|
|
from pydantic import Field
|
|
from typing_extensions import Literal
|
|
|
|
from frigate.detectors.detection_api import DetectionApi
|
|
from frigate.detectors.detector_config import BaseDetectorConfig
|
|
|
|
try:
|
|
from tflite_runtime.interpreter import Interpreter, load_delegate
|
|
except ModuleNotFoundError:
|
|
from tensorflow.lite.python.interpreter import Interpreter, load_delegate
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DETECTOR_KEY = "edgetpu"
|
|
|
|
|
|
class EdgeTpuDetectorConfig(BaseDetectorConfig):
|
|
type: Literal[DETECTOR_KEY]
|
|
device: str = Field(default=None, title="Device Type")
|
|
|
|
|
|
class EdgeTpuTfl(DetectionApi):
|
|
type_key = DETECTOR_KEY
|
|
|
|
def __init__(self, detector_config: EdgeTpuDetectorConfig):
|
|
device_config = {"device": "usb"}
|
|
if detector_config.device is not None:
|
|
device_config = {"device": detector_config.device}
|
|
|
|
edge_tpu_delegate = None
|
|
|
|
try:
|
|
logger.info(f"Attempting to load TPU as {device_config['device']}")
|
|
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
|
|
logger.info("TPU found")
|
|
self.interpreter = Interpreter(
|
|
model_path=detector_config.model.path,
|
|
experimental_delegates=[edge_tpu_delegate],
|
|
)
|
|
except ValueError:
|
|
logger.error(
|
|
"No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors."
|
|
)
|
|
raise
|
|
|
|
self.interpreter.allocate_tensors()
|
|
|
|
self.tensor_input_details = self.interpreter.get_input_details()
|
|
self.tensor_output_details = self.interpreter.get_output_details()
|
|
|
|
def detect_raw(self, tensor_input):
|
|
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
|
|
self.interpreter.invoke()
|
|
|
|
boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
|
|
class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
|
|
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
|
|
count = int(
|
|
self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
|
|
)
|
|
|
|
detections = np.zeros((20, 6), np.float32)
|
|
|
|
for i in range(count):
|
|
if scores[i] < 0.4 or i == 20:
|
|
break
|
|
detections[i] = [
|
|
class_ids[i],
|
|
float(scores[i]),
|
|
boxes[i][0],
|
|
boxes[i][1],
|
|
boxes[i][2],
|
|
boxes[i][3],
|
|
]
|
|
|
|
return detections
|