From 6cd1d1f2054dcf027957b9ff18db0da855257a57 Mon Sep 17 00:00:00 2001 From: Tim Wesley <49914745+tim-memryx@users.noreply.github.com> Date: Mon, 15 Sep 2025 09:48:55 -0400 Subject: [PATCH] memryx: fix model download bug when using multiple detectors (#20030) * Add locking for model download files * ruff format --------- Co-authored-by: Abinila Siva --- frigate/detectors/plugins/memryx.py | 246 ++++++++++++++++------------ 1 file changed, 141 insertions(+), 105 deletions(-) diff --git a/frigate/detectors/plugins/memryx.py b/frigate/detectors/plugins/memryx.py index 2c741e0f6..3b424bcc0 100644 --- a/frigate/detectors/plugins/memryx.py +++ b/frigate/detectors/plugins/memryx.py @@ -177,6 +177,29 @@ class MemryXDetector(DetectionApi): logger.error(f"Failed to initialize MemryX model: {e}") raise + def _acquire_file_lock(self, lock_path: str, timeout: int = 60, poll: float = 0.2): + """ + Create an exclusive lock file. Blocks (with polling) until it can acquire, + or raises TimeoutError. Uses only stdlib (os.O_EXCL). + """ + start = time.time() + while True: + try: + fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) + os.close(fd) + return + except FileExistsError: + if time.time() - start > timeout: + raise TimeoutError(f"Timeout waiting for lock: {lock_path}") + time.sleep(poll) + + def _release_file_lock(self, lock_path: str): + """Best-effort removal of the lock file.""" + try: + os.remove(lock_path) + except FileNotFoundError: + pass + def load_yolo_constants(self): base = f"{self.cache_dir}/{self.model_folder}" # constants for yolov9 post-processing @@ -188,122 +211,135 @@ class MemryXDetector(DetectionApi): if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir, exist_ok=True) - # ---------- CASE 1: user provided a custom model path ---------- - if self.memx_model_path: - if not self.memx_model_path.endswith(".zip"): - raise ValueError( - f"Invalid model path: {self.memx_model_path}. " - "Only .zip files are supported. Please provide a .zip model archive." - ) - if not os.path.exists(self.memx_model_path): - raise FileNotFoundError( - f"Custom model zip not found: {self.memx_model_path}" - ) - - logger.info(f"User provided zip model: {self.memx_model_path}") - - # Extract custom zip into a separate area so it never clashes with MemryX cache - custom_dir = os.path.join( - self.cache_dir, "custom_models", self.model_folder - ) - if os.path.isdir(custom_dir): - shutil.rmtree(custom_dir) - os.makedirs(custom_dir, exist_ok=True) - - with zipfile.ZipFile(self.memx_model_path, "r") as zip_ref: - zip_ref.extractall(custom_dir) - logger.info(f"Custom model extracted to {custom_dir}.") - - # Find .dfp and optional *_post.onnx recursively - dfp_candidates = glob.glob( - os.path.join(custom_dir, "**", "*.dfp"), recursive=True - ) - post_candidates = glob.glob( - os.path.join(custom_dir, "**", "*_post.onnx"), recursive=True - ) - - if not dfp_candidates: - raise FileNotFoundError( - "No .dfp file found in custom model zip after extraction." - ) - - self.memx_model_path = dfp_candidates[0] - - # Handle post model requirements by model type - if self.memx_model_type in [ - ModelTypeEnum.yologeneric, - ModelTypeEnum.yolonas, - ModelTypeEnum.ssd, - ]: - if not post_candidates: - raise FileNotFoundError( - f"No *_post.onnx file found in custom model zip for {self.memx_model_type.name}." - ) - self.memx_post_model = post_candidates[0] - elif self.memx_model_type == ModelTypeEnum.yolox: - # Explicitly ignore any post model even if present - self.memx_post_model = None - else: - # Future model types can optionally use post if present - self.memx_post_model = post_candidates[0] if post_candidates else None - - logger.info(f"Using custom model: {self.memx_model_path}") - return - - # ---------- CASE 2: no custom model path -> use MemryX cached models ---------- - model_subdir = os.path.join(self.cache_dir, self.model_folder) - dfp_path = os.path.join(model_subdir, self.expected_dfp_model) - post_path = ( - os.path.join(model_subdir, self.expected_post_model) - if self.expected_post_model - else None - ) - - dfp_exists = os.path.exists(dfp_path) - post_exists = os.path.exists(post_path) if post_path else True - - if dfp_exists and post_exists: - logger.info("Using cached models.") - self.memx_model_path = dfp_path - self.memx_post_model = post_path - if self.memx_model_type == ModelTypeEnum.yologeneric: - self.load_yolo_constants() - return - - # ---------- CASE 3: download MemryX model (no cache) ---------- - logger.info( - f"Model files not found locally. Downloading from {self.model_url}..." - ) - zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip") + lock_path = os.path.join(self.cache_dir, f".{self.model_folder}.lock") + self._acquire_file_lock(lock_path) try: - if not os.path.exists(zip_path): - urllib.request.urlretrieve(self.model_url, zip_path) - logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...") + # ---------- CASE 1: user provided a custom model path ---------- + if self.memx_model_path: + if not self.memx_model_path.endswith(".zip"): + raise ValueError( + f"Invalid model path: {self.memx_model_path}. " + "Only .zip files are supported. Please provide a .zip model archive." + ) + if not os.path.exists(self.memx_model_path): + raise FileNotFoundError( + f"Custom model zip not found: {self.memx_model_path}" + ) - if not os.path.exists(model_subdir): - with zipfile.ZipFile(zip_path, "r") as zip_ref: - zip_ref.extractall(self.cache_dir) - logger.info(f"Model extracted to {self.cache_dir}.") + logger.info(f"User provided zip model: {self.memx_model_path}") - # Re-assign model paths after extraction - self.memx_model_path = os.path.join(model_subdir, self.expected_dfp_model) - self.memx_post_model = ( + # Extract custom zip into a separate area so it never clashes with MemryX cache + custom_dir = os.path.join( + self.cache_dir, "custom_models", self.model_folder + ) + if os.path.isdir(custom_dir): + shutil.rmtree(custom_dir) + os.makedirs(custom_dir, exist_ok=True) + + with zipfile.ZipFile(self.memx_model_path, "r") as zip_ref: + zip_ref.extractall(custom_dir) + logger.info(f"Custom model extracted to {custom_dir}.") + + # Find .dfp and optional *_post.onnx recursively + dfp_candidates = glob.glob( + os.path.join(custom_dir, "**", "*.dfp"), recursive=True + ) + post_candidates = glob.glob( + os.path.join(custom_dir, "**", "*_post.onnx"), recursive=True + ) + + if not dfp_candidates: + raise FileNotFoundError( + "No .dfp file found in custom model zip after extraction." + ) + + self.memx_model_path = dfp_candidates[0] + + # Handle post model requirements by model type + if self.memx_model_type in [ + ModelTypeEnum.yologeneric, + ModelTypeEnum.yolonas, + ModelTypeEnum.ssd, + ]: + if not post_candidates: + raise FileNotFoundError( + f"No *_post.onnx file found in custom model zip for {self.memx_model_type.name}." + ) + self.memx_post_model = post_candidates[0] + elif self.memx_model_type == ModelTypeEnum.yolox: + # Explicitly ignore any post model even if present + self.memx_post_model = None + else: + # Future model types can optionally use post if present + self.memx_post_model = ( + post_candidates[0] if post_candidates else None + ) + + logger.info(f"Using custom model: {self.memx_model_path}") + return + + # ---------- CASE 2: no custom model path -> use MemryX cached models ---------- + model_subdir = os.path.join(self.cache_dir, self.model_folder) + dfp_path = os.path.join(model_subdir, self.expected_dfp_model) + post_path = ( os.path.join(model_subdir, self.expected_post_model) if self.expected_post_model else None ) - if self.memx_model_type == ModelTypeEnum.yologeneric: - self.load_yolo_constants() + dfp_exists = os.path.exists(dfp_path) + post_exists = os.path.exists(post_path) if post_path else True + + if dfp_exists and post_exists: + logger.info("Using cached models.") + self.memx_model_path = dfp_path + self.memx_post_model = post_path + if self.memx_model_type == ModelTypeEnum.yologeneric: + self.load_yolo_constants() + return + + # ---------- CASE 3: download MemryX model (no cache) ---------- + logger.info( + f"Model files not found locally. Downloading from {self.model_url}..." + ) + zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip") + + try: + if not os.path.exists(zip_path): + urllib.request.urlretrieve(self.model_url, zip_path) + logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...") + + if not os.path.exists(model_subdir): + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(self.cache_dir) + logger.info(f"Model extracted to {self.cache_dir}.") + + # Re-assign model paths after extraction + self.memx_model_path = os.path.join( + model_subdir, self.expected_dfp_model + ) + self.memx_post_model = ( + os.path.join(model_subdir, self.expected_post_model) + if self.expected_post_model + else None + ) + + if self.memx_model_type == ModelTypeEnum.yologeneric: + self.load_yolo_constants() + + finally: + if os.path.exists(zip_path): + try: + os.remove(zip_path) + logger.info("Cleaned up ZIP file after extraction.") + except Exception as e: + logger.warning( + f"Failed to remove downloaded zip {zip_path}: {e}" + ) finally: - if os.path.exists(zip_path): - try: - os.remove(zip_path) - logger.info("Cleaned up ZIP file after extraction.") - except Exception as e: - logger.warning(f"Failed to remove downloaded zip {zip_path}: {e}") + self._release_file_lock(lock_path) def send_input(self, connection_id, tensor_input: np.ndarray): """Pre-process (if needed) and send frame to MemryX input queue"""