Fix camera access and improve typing (#15272)

* Fix camera access and improve typing:

* Formatting
This commit is contained in:
Nicolas Mowen 2024-11-30 18:22:36 -06:00 committed by GitHub
parent f094c59cd0
commit ee816b2251
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 18 deletions

View File

@ -35,8 +35,9 @@ from frigate.const import (
CLIPS_DIR,
)
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.models import Event, ReviewSegment, Timeline
from frigate.object_processing import TrackedObject
from frigate.object_processing import TrackedObject, TrackedObjectProcessor
from frigate.util.builtin import get_tz_modifiers
logger = logging.getLogger(__name__)
@ -1087,9 +1088,11 @@ def create_event(
)
try:
frame = request.app.detected_frames_processor.get_current_frame(camera_name)
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
external_processor: ExternalEventProcessor = request.app.external_processor
event_id = request.app.external_processor.create_manual_event(
frame = frame_processor.get_current_frame(camera_name)
event_id = external_processor.create_manual_event(
camera_name,
label,
body.source_type,

View File

@ -36,6 +36,7 @@ from frigate.const import (
RECORD_DIR,
)
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.object_processing import TrackedObjectProcessor
from frigate.util.builtin import get_tz_modifiers
from frigate.util.image import get_image_from_recording
@ -79,7 +80,11 @@ def mjpeg_feed(
def imagestream(
detected_frames_processor, camera_name: str, fps: int, height: int, draw_options
detected_frames_processor: TrackedObjectProcessor,
camera_name: str,
fps: int,
height: int,
draw_options: dict[str, any],
):
while True:
# max out at specified FPS
@ -118,6 +123,7 @@ def latest_frame(
extension: Extension,
params: MediaLatestFrameQueryParams = Depends(),
):
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
draw_options = {
"bounding_boxes": params.bbox,
"timestamp": params.timestamp,
@ -129,17 +135,14 @@ def latest_frame(
quality = params.quality
if camera_name in request.app.frigate_config.cameras:
frame = request.app.detected_frames_processor.get_current_frame(
camera_name, draw_options
)
frame = frame_processor.get_current_frame(camera_name, draw_options)
retry_interval = float(
request.app.frigate_config.cameras.get(camera_name).ffmpeg.retry_interval
or 10
)
if frame is None or datetime.now().timestamp() > (
request.app.detected_frames_processor.get_current_frame_time(camera_name)
+ retry_interval
frame_processor.get_current_frame_time(camera_name) + retry_interval
):
if request.app.camera_error_image is None:
error_image = glob.glob("/opt/frigate/frigate/images/camera-error.jpg")
@ -180,7 +183,7 @@ def latest_frame(
)
elif camera_name == "birdseye" and request.app.frigate_config.birdseye.restream:
frame = cv2.cvtColor(
request.app.detected_frames_processor.get_current_frame(camera_name),
frame_processor.get_current_frame(camera_name),
cv2.COLOR_YUV2BGR_I420,
)
@ -813,15 +816,15 @@ def grid_snapshot(
):
if camera_name in request.app.frigate_config.cameras:
detect = request.app.frigate_config.cameras[camera_name].detect
frame = request.app.detected_frames_processor.get_current_frame(camera_name, {})
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
frame = frame_processor.get_current_frame(camera_name, {})
retry_interval = float(
request.app.frigate_config.cameras.get(camera_name).ffmpeg.retry_interval
or 10
)
if frame is None or datetime.now().timestamp() > (
request.app.detected_frames_processor.get_current_frame_time(camera_name)
+ retry_interval
frame_processor.get_current_frame_time(camera_name) + retry_interval
):
return JSONResponse(
content={"success": False, "message": "Unable to get valid frame"},

View File

@ -10,6 +10,7 @@ from enum import Enum
from typing import Optional
import cv2
from numpy import ndarray
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.events_updater import EventUpdatePublisher
@ -45,7 +46,7 @@ class ExternalEventProcessor:
duration: Optional[int],
include_recording: bool,
draw: dict[str, any],
snapshot_frame: any,
snapshot_frame: Optional[ndarray],
) -> str:
now = datetime.datetime.now().timestamp()
camera_config = self.config.cameras.get(camera)
@ -131,8 +132,11 @@ class ExternalEventProcessor:
label: str,
event_id: str,
draw: dict[str, any],
img_frame: any,
) -> str:
img_frame: Optional[ndarray],
) -> Optional[str]:
if not img_frame:
return None
# write clean snapshot if enabled
if camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)

View File

@ -6,7 +6,7 @@ import queue
import threading
from collections import Counter, defaultdict
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from typing import Callable, Optional
import cv2
import numpy as np
@ -784,13 +784,18 @@ class TrackedObjectProcessor(threading.Thread):
else:
return {}
def get_current_frame(self, camera, draw_options={}):
def get_current_frame(
self, camera: str, draw_options: dict[str, any] = {}
) -> Optional[np.ndarray]:
if camera == "birdseye":
return self.frame_manager.get(
"birdseye",
(self.config.birdseye.height * 3 // 2, self.config.birdseye.width),
)
if camera not in self.camera_states:
return None
return self.camera_states[camera].get_current_frame(draw_options)
def get_current_frame_time(self, camera) -> int: