diff --git a/frigate/output.py b/frigate/output.py index 2f240cef7..5d8013d2e 100644 --- a/frigate/output.py +++ b/frigate/output.py @@ -20,8 +20,8 @@ from ws4py.server.wsgirefserver import ( WSGIServer, ) from ws4py.server.wsgiutils import WebSocketWSGIApplication -from ws4py.websocket import WebSocket +from frigate.comms.ws import WebSocket from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.const import BASE_DIR, BIRDSEYE_PIPE from frigate.types import CameraMetricsTypes @@ -108,9 +108,12 @@ class Canvas: return camera_aspect -class FFMpegConverter: +class FFMpegConverter(threading.Thread): def __init__( self, + camera: str, + input_queue: queue.Queue, + stop_event: mp.Event, in_width: int, in_height: int, out_width: int, @@ -118,6 +121,11 @@ class FFMpegConverter: quality: int, birdseye_rtsp: bool = False, ): + threading.Thread.__init__(self) + self.name = f"{camera}_output_converter" + self.camera = camera + self.input_queue = input_queue + self.stop_event = stop_event self.bd_pipe = None if birdseye_rtsp: @@ -167,7 +175,7 @@ class FFMpegConverter: os.close(stdin) self.reading_birdseye = False - def write(self, b) -> None: + def __write(self, b) -> None: self.process.stdin.write(b) if self.bd_pipe: @@ -203,9 +211,25 @@ class FFMpegConverter: self.process.kill() self.process.communicate() + def run(self) -> None: + while not self.stop_event.is_set(): + try: + frame = self.input_queue.get(True, timeout=1) + self.__write(frame) + except queue.Empty: + pass + + self.exit() + class BroadcastThread(threading.Thread): - def __init__(self, camera, converter, websocket_server, stop_event): + def __init__( + self, + camera: str, + converter: FFMpegConverter, + websocket_server, + stop_event: mp.Event, + ): super(BroadcastThread, self).__init__() self.camera = camera self.converter = converter @@ -678,15 +702,20 @@ def output_frames( websocket_server.initialize_websockets_manager() websocket_thread = threading.Thread(target=websocket_server.serve_forever) + inputs: dict[str, queue.Queue] = {} converters = {} broadcasters = {} for camera, cam_config in config.cameras.items(): + inputs[camera] = queue.Queue(maxsize=cam_config.detect.fps) width = int( cam_config.live.height * (cam_config.frame_shape[1] / cam_config.frame_shape[0]) ) converters[camera] = FFMpegConverter( + camera, + inputs[camera], + stop_event, cam_config.frame_shape[1], cam_config.frame_shape[0], width, @@ -698,7 +727,11 @@ def output_frames( ) if config.birdseye.enabled: + inputs["birdseye"] = queue.Queue(maxsize=10) converters["birdseye"] = FFMpegConverter( + "birdseye", + inputs["birdseye"], + stop_event, config.birdseye.width, config.birdseye.height, config.birdseye.width, @@ -715,6 +748,9 @@ def output_frames( websocket_thread.start() + for t in converters.values(): + t.start() + for t in broadcasters.values(): t.start() @@ -749,7 +785,11 @@ def output_frames( ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager ): # write to the converter for the camera if clients are listening to the specific camera - converters[camera].write(frame.tobytes()) + try: + inputs[camera].put_nowait(frame.tobytes()) + except queue.Full: + # drop frames if queue is full + pass if config.birdseye.enabled and ( config.birdseye.restream @@ -770,7 +810,11 @@ def output_frames( if config.birdseye.restream: birdseye_buffer[:] = frame_bytes - converters["birdseye"].write(frame_bytes) + try: + inputs["birdseye"].put_nowait(frame_bytes) + except queue.Full: + # drop frames if queue is full + pass if camera in previous_frames: frame_manager.delete(f"{camera}{previous_frames[camera]}") @@ -790,10 +834,9 @@ def output_frames( frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame_manager.delete(frame_id) - for c in converters.values(): - c.exit() for b in broadcasters.values(): b.join() + websocket_server.manager.close_all() websocket_server.manager.stop() websocket_server.manager.join()