Files
blakeblackshear.frigate/frigate/output/camera.py
Josh Hawkins 77831304a7 Camera access fixes (#22987)
* only send monitoring notifications to users with camera access

* check access to similarity search event id camera

* require admin role for storage usage endpoint

* check camera access for jsmpeg and birdseye cameras

* tests

* formatting
2026-04-23 12:27:49 -06:00

185 lines
5.0 KiB
Python

"""Handle outputting individual cameras via jsmpeg."""
import logging
import queue
import subprocess as sp
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import CameraConfig, FfmpegConfig, FrigateConfig
from frigate.output.ws_auth import ws_has_camera_access
logger = logging.getLogger(__name__)
class FFMpegConverter(threading.Thread):
def __init__(
self,
camera: str,
ffmpeg: FfmpegConfig,
input_queue: queue.Queue,
stop_event: MpEvent,
in_width: int,
in_height: int,
out_width: int,
out_height: int,
quality: int,
):
super().__init__(name=f"{camera}_output_converter")
self.camera = camera
self.input_queue = input_queue
self.stop_event = stop_event
ffmpeg_cmd = [
ffmpeg.ffmpeg_path,
"-threads",
"1",
"-f",
"rawvideo",
"-pix_fmt",
"yuv420p",
"-video_size",
f"{in_width}x{in_height}",
"-i",
"pipe:",
"-threads",
"1",
"-f",
"mpegts",
"-s",
f"{out_width}x{out_height}",
"-codec:v",
"mpeg1video",
"-q",
f"{quality}",
"-bf",
"0",
"pipe:",
]
self.process = sp.Popen(
ffmpeg_cmd,
stdout=sp.PIPE,
stderr=sp.DEVNULL,
stdin=sp.PIPE,
start_new_session=True,
)
def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b)
def read(self, length: int) -> Any:
try:
return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError:
return False
def exit(self) -> None:
self.process.terminate()
try:
self.process.communicate(timeout=30)
except sp.TimeoutExpired:
self.process.kill()
self.process.communicate()
def run(self) -> None:
while not self.stop_event.is_set():
try:
frame = self.input_queue.get(True, timeout=1)
self.__write(frame)
except queue.Empty:
pass
self.exit()
class BroadcastThread(threading.Thread):
def __init__(
self,
camera: str,
converter: FFMpegConverter,
websocket_server: Any,
stop_event: MpEvent,
config: FrigateConfig,
):
super().__init__()
self.camera = camera
self.converter = converter
self.websocket_server = websocket_server
self.stop_event = stop_event
self.config = config
def run(self) -> None:
while not self.stop_event.is_set():
buf = self.converter.read(65536)
if buf:
manager = self.websocket_server.manager
with manager.lock:
websockets = manager.websockets.copy()
ws_iter = iter(websockets.values())
for ws in ws_iter:
if (
not ws.terminated
and ws.environ["PATH_INFO"] == f"/{self.camera}"
and ws_has_camera_access(ws, self.camera, self.config)
):
try:
ws.send(buf, binary=True)
except ValueError:
pass
except (BrokenPipeError, ConnectionResetError) as e:
logger.debug(f"Websocket unexpectedly closed {e}")
elif self.converter.process.poll() is not None:
break
class JsmpegCamera:
def __init__(
self,
config: CameraConfig,
frigate_config: FrigateConfig,
stop_event: MpEvent,
websocket_server: Any,
) -> None:
self.config = config
self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps)
width = int(
config.live.height * (config.frame_shape[1] / config.frame_shape[0])
)
self.converter = FFMpegConverter(
config.name or "",
config.ffmpeg,
self.input,
stop_event,
config.frame_shape[1],
config.frame_shape[0],
width,
config.live.height,
config.live.quality,
)
self.broadcaster = BroadcastThread(
config.name or "",
self.converter,
websocket_server,
stop_event,
frigate_config,
)
self.converter.start()
self.broadcaster.start()
def write_frame(self, frame_bytes: bytes) -> None:
try:
self.input.put_nowait(frame_bytes)
except queue.Full:
# drop frames if queue is full
pass
def stop(self) -> None:
self.converter.join()
self.broadcaster.join()