From 30dfdf47d4bc440ea1f558e4c3965798f64bb904 Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Thu, 6 Jul 2023 21:54:55 +0300 Subject: [PATCH] Add thread-safety to LimitedQueue by implementing a lock for put and get methods (#7053) --- frigate/util/builtin.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index f55ea5e37..2f623567c 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -78,29 +78,33 @@ class LimitedQueue(FFQueue): self.size = multiprocessing.RawValue( ctypes.c_int, 0 ) # Add a counter for the number of items in the queue + self.lock = multiprocessing.Lock() # Add a lock for thread-safety def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): - if self.maxsize > 0 and self.size.value >= self.maxsize: - if block: - start_time = time.time() - while self.size.value >= self.maxsize: - remaining = timeout - (time.time() - start_time) - if remaining <= 0.0: - raise Full - time.sleep(min(remaining, 0.1)) - else: - raise Full - self.size.value += 1 + with self.lock: # Ensure thread-safety + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 return super().put(x, block=block, timeout=timeout) def get(self, block=True, timeout=DEFAULT_TIMEOUT): - if self.size.value <= 0 and not block: - raise Empty - self.size.value -= 1 - return super().get(block=block, timeout=timeout) + item = super().get(block=block, timeout=timeout) + with self.lock: # Ensure thread-safety + if self.size.value <= 0 and not block: + raise Empty + self.size.value -= 1 + return item def qsize(self): - return self.size + return self.size.value def empty(self): return self.qsize() == 0