Make output writing asynchronous (#8530)

* Don't wait for frame write in output process

* Formatting

* Handle websocket exception

* Limit camera queeue size to 1 second
This commit is contained in:
Nicolas Mowen 2023-11-07 16:24:56 -07:00 committed by GitHub
parent ca84732574
commit ef36aabd30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,8 +20,8 @@ from ws4py.server.wsgirefserver import (
WSGIServer, WSGIServer,
) )
from ws4py.server.wsgiutils import WebSocketWSGIApplication from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.websocket import WebSocket
from frigate.comms.ws import WebSocket
from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import BASE_DIR, BIRDSEYE_PIPE from frigate.const import BASE_DIR, BIRDSEYE_PIPE
from frigate.types import CameraMetricsTypes from frigate.types import CameraMetricsTypes
@ -108,9 +108,12 @@ class Canvas:
return camera_aspect return camera_aspect
class FFMpegConverter: class FFMpegConverter(threading.Thread):
def __init__( def __init__(
self, self,
camera: str,
input_queue: queue.Queue,
stop_event: mp.Event,
in_width: int, in_width: int,
in_height: int, in_height: int,
out_width: int, out_width: int,
@ -118,6 +121,11 @@ class FFMpegConverter:
quality: int, quality: int,
birdseye_rtsp: bool = False, birdseye_rtsp: bool = False,
): ):
threading.Thread.__init__(self)
self.name = f"{camera}_output_converter"
self.camera = camera
self.input_queue = input_queue
self.stop_event = stop_event
self.bd_pipe = None self.bd_pipe = None
if birdseye_rtsp: if birdseye_rtsp:
@ -167,7 +175,7 @@ class FFMpegConverter:
os.close(stdin) os.close(stdin)
self.reading_birdseye = False self.reading_birdseye = False
def write(self, b) -> None: def __write(self, b) -> None:
self.process.stdin.write(b) self.process.stdin.write(b)
if self.bd_pipe: if self.bd_pipe:
@ -203,9 +211,25 @@ class FFMpegConverter:
self.process.kill() self.process.kill()
self.process.communicate() self.process.communicate()
def run(self) -> None:
while not self.stop_event.is_set():
try:
frame = self.input_queue.get(True, timeout=1)
self.__write(frame)
except queue.Empty:
pass
self.exit()
class BroadcastThread(threading.Thread): class BroadcastThread(threading.Thread):
def __init__(self, camera, converter, websocket_server, stop_event): def __init__(
self,
camera: str,
converter: FFMpegConverter,
websocket_server,
stop_event: mp.Event,
):
super(BroadcastThread, self).__init__() super(BroadcastThread, self).__init__()
self.camera = camera self.camera = camera
self.converter = converter self.converter = converter
@ -678,15 +702,20 @@ def output_frames(
websocket_server.initialize_websockets_manager() websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever) websocket_thread = threading.Thread(target=websocket_server.serve_forever)
inputs: dict[str, queue.Queue] = {}
converters = {} converters = {}
broadcasters = {} broadcasters = {}
for camera, cam_config in config.cameras.items(): for camera, cam_config in config.cameras.items():
inputs[camera] = queue.Queue(maxsize=cam_config.detect.fps)
width = int( width = int(
cam_config.live.height cam_config.live.height
* (cam_config.frame_shape[1] / cam_config.frame_shape[0]) * (cam_config.frame_shape[1] / cam_config.frame_shape[0])
) )
converters[camera] = FFMpegConverter( converters[camera] = FFMpegConverter(
camera,
inputs[camera],
stop_event,
cam_config.frame_shape[1], cam_config.frame_shape[1],
cam_config.frame_shape[0], cam_config.frame_shape[0],
width, width,
@ -698,7 +727,11 @@ def output_frames(
) )
if config.birdseye.enabled: if config.birdseye.enabled:
inputs["birdseye"] = queue.Queue(maxsize=10)
converters["birdseye"] = FFMpegConverter( converters["birdseye"] = FFMpegConverter(
"birdseye",
inputs["birdseye"],
stop_event,
config.birdseye.width, config.birdseye.width,
config.birdseye.height, config.birdseye.height,
config.birdseye.width, config.birdseye.width,
@ -715,6 +748,9 @@ def output_frames(
websocket_thread.start() websocket_thread.start()
for t in converters.values():
t.start()
for t in broadcasters.values(): for t in broadcasters.values():
t.start() t.start()
@ -749,7 +785,11 @@ def output_frames(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager
): ):
# write to the converter for the camera if clients are listening to the specific camera # write to the converter for the camera if clients are listening to the specific camera
converters[camera].write(frame.tobytes()) try:
inputs[camera].put_nowait(frame.tobytes())
except queue.Full:
# drop frames if queue is full
pass
if config.birdseye.enabled and ( if config.birdseye.enabled and (
config.birdseye.restream config.birdseye.restream
@ -770,7 +810,11 @@ def output_frames(
if config.birdseye.restream: if config.birdseye.restream:
birdseye_buffer[:] = frame_bytes birdseye_buffer[:] = frame_bytes
converters["birdseye"].write(frame_bytes) try:
inputs["birdseye"].put_nowait(frame_bytes)
except queue.Full:
# drop frames if queue is full
pass
if camera in previous_frames: if camera in previous_frames:
frame_manager.delete(f"{camera}{previous_frames[camera]}") frame_manager.delete(f"{camera}{previous_frames[camera]}")
@ -790,10 +834,9 @@ def output_frames(
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
frame_manager.delete(frame_id) frame_manager.delete(frame_id)
for c in converters.values():
c.exit()
for b in broadcasters.values(): for b in broadcasters.values():
b.join() b.join()
websocket_server.manager.close_all() websocket_server.manager.close_all()
websocket_server.manager.stop() websocket_server.manager.stop()
websocket_server.manager.join() websocket_server.manager.join()