From f3765bc391eb629b049fc494544a9a1dc8ce4120 Mon Sep 17 00:00:00 2001 From: leccelecce <24962424+leccelecce@users.noreply.github.com> Date: Mon, 3 Mar 2025 17:01:02 +0000 Subject: [PATCH] GenAI minor refactor (#16916) --- frigate/embeddings/maintainer.py | 200 +++++++++++++++---------------- 1 file changed, 96 insertions(+), 104 deletions(-) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index c9b6062c9..dfaed532e 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -293,6 +293,7 @@ class EmbeddingMaintainer(threading.Thread): # Embed the thumbnail self._embed_thumbnail(event_id, thumbnail) + # Run GenAI if ( camera_config.genai.enabled and self.genai_client is not None @@ -306,82 +307,7 @@ class EmbeddingMaintainer(threading.Thread): or set(event.zones) & set(camera_config.genai.required_zones) ) ): - if event.has_snapshot and camera_config.genai.use_snapshot: - with open( - os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"), - "rb", - ) as image_file: - snapshot_image = image_file.read() - - img = cv2.imdecode( - np.frombuffer(snapshot_image, dtype=np.int8), - cv2.IMREAD_COLOR, - ) - - # crop snapshot based on region before sending off to genai - height, width = img.shape[:2] - x1_rel, y1_rel, width_rel, height_rel = event.data["region"] - - x1, y1 = int(x1_rel * width), int(y1_rel * height) - cropped_image = img[ - y1 : y1 + int(height_rel * height), - x1 : x1 + int(width_rel * width), - ] - - _, buffer = cv2.imencode(".jpg", cropped_image) - snapshot_image = buffer.tobytes() - - num_thumbnails = len(self.tracked_events.get(event_id, [])) - - embed_image = ( - [snapshot_image] - if event.has_snapshot and camera_config.genai.use_snapshot - else ( - [ - data["thumbnail"] - for data in self.tracked_events[event_id] - ] - if num_thumbnails > 0 - else [thumbnail] - ) - ) - - if camera_config.genai.debug_save_thumbnails and num_thumbnails > 0: - logger.debug( - f"Saving {num_thumbnails} thumbnails for event {event.id}" - ) - - Path( - os.path.join(CLIPS_DIR, f"genai-requests/{event.id}") - ).mkdir(parents=True, exist_ok=True) - - for idx, data in enumerate(self.tracked_events[event_id], 1): - jpg_bytes: bytes = data["thumbnail"] - - if jpg_bytes is None: - logger.warning( - f"Unable to save thumbnail {idx} for {event.id}." - ) - else: - with open( - os.path.join( - CLIPS_DIR, - f"genai-requests/{event.id}/{idx}.jpg", - ), - "wb", - ) as j: - j.write(jpg_bytes) - - # Generate the description. Call happens in a thread since it is network bound. - threading.Thread( - target=self._embed_description, - name=f"_embed_description_{event.id}", - daemon=True, - args=( - event, - embed_image, - ), - ).start() + self._process_genai_description(event, camera_config, thumbnail) # Delete tracked events based on the event_id if event_id in self.tracked_events: @@ -440,7 +366,58 @@ class EmbeddingMaintainer(threading.Thread): self.embeddings.embed_thumbnail(event_id, thumbnail) - def _embed_description(self, event: Event, thumbnails: list[bytes]) -> None: + def _process_genai_description(self, event, camera_config, thumbnail) -> None: + if event.has_snapshot and camera_config.genai.use_snapshot: + snapshot_image = self._read_and_crop_snapshot(event, camera_config) + if not snapshot_image: + return + + num_thumbnails = len(self.tracked_events.get(event.id, [])) + + embed_image = ( + [snapshot_image] + if event.has_snapshot and camera_config.genai.use_snapshot + else ( + [data["thumbnail"] for data in self.tracked_events[event.id]] + if num_thumbnails > 0 + else [thumbnail] + ) + ) + + if camera_config.genai.debug_save_thumbnails and num_thumbnails > 0: + logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}") + + Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir( + parents=True, exist_ok=True + ) + + for idx, data in enumerate(self.tracked_events[event.id], 1): + jpg_bytes: bytes = data["thumbnail"] + + if jpg_bytes is None: + logger.warning(f"Unable to save thumbnail {idx} for {event.id}.") + else: + with open( + os.path.join( + CLIPS_DIR, + f"genai-requests/{event.id}/{idx}.jpg", + ), + "wb", + ) as j: + j.write(jpg_bytes) + + # Generate the description. Call happens in a thread since it is network bound. + threading.Thread( + target=self._genai_embed_description, + name=f"_genai_embed_description_{event.id}", + daemon=True, + args=( + event, + embed_image, + ), + ).start() + + def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None: """Embed the description for an event.""" camera_config = self.config.cameras[event.camera] @@ -473,6 +450,45 @@ class EmbeddingMaintainer(threading.Thread): description, ) + def _read_and_crop_snapshot(self, event: Event, camera_config) -> bytes | None: + """Read, decode, and crop the snapshot image.""" + + snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg") + + if not os.path.isfile(snapshot_file): + logger.error( + f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}" + ) + return None + + try: + with open(snapshot_file, "rb") as image_file: + snapshot_image = image_file.read() + + img = cv2.imdecode( + np.frombuffer(snapshot_image, dtype=np.int8), + cv2.IMREAD_COLOR, + ) + + # Crop snapshot based on region + # provide full image if region doesn't exist (manual events) + height, width = img.shape[:2] + x1_rel, y1_rel, width_rel, height_rel = event.data.get( + "region", [0, 0, 1, 1] + ) + x1, y1 = int(x1_rel * width), int(y1_rel * height) + + cropped_image = img[ + y1 : y1 + int(height_rel * height), + x1 : x1 + int(width_rel * width), + ] + + _, buffer = cv2.imencode(".jpg", cropped_image) + + return buffer.tobytes() + except Exception: + return None + def handle_regenerate_description(self, event_id: str, source: str) -> None: try: event: Event = Event.get(Event.id == event_id) @@ -492,34 +508,10 @@ class EmbeddingMaintainer(threading.Thread): ) if event.has_snapshot and source == "snapshot": - snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg") - - if not os.path.isfile(snapshot_file): - logger.error( - f"Cannot regenerate description for {event.id}, snapshot file not found: {snapshot_file}" - ) + snapshot_image = self._read_and_crop_snapshot(event, camera_config) + if not snapshot_image: return - with open(snapshot_file, "rb") as image_file: - snapshot_image = image_file.read() - img = cv2.imdecode( - np.frombuffer(snapshot_image, dtype=np.int8), cv2.IMREAD_COLOR - ) - - # crop snapshot based on region before sending off to genai - # provide full image if region doesn't exist (manual events) - region = event.data.get("region", [0, 0, 1, 1]) - height, width = img.shape[:2] - x1_rel, y1_rel, width_rel, height_rel = region - - x1, y1 = int(x1_rel * width), int(y1_rel * height) - cropped_image = img[ - y1 : y1 + int(height_rel * height), x1 : x1 + int(width_rel * width) - ] - - _, buffer = cv2.imencode(".jpg", cropped_image) - snapshot_image = buffer.tobytes() - embed_image = ( [snapshot_image] if event.has_snapshot and source == "snapshot" @@ -530,4 +522,4 @@ class EmbeddingMaintainer(threading.Thread): ) ) - self._embed_description(event, embed_image) + self._genai_embed_description(event, embed_image)