Add thread-safety to LimitedQueue by implementing a lock for put and get methods (#7053)

This commit is contained in:
Sergey Krashevich 2023-07-06 21:54:55 +03:00 committed by GitHub
parent f48dd8c1ab
commit 30dfdf47d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -78,29 +78,33 @@ class LimitedQueue(FFQueue):
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock for thread-safety
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
with self.lock: # Ensure thread-safety
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return super().get(block=block, timeout=timeout)
item = super().get(block=block, timeout=timeout)
with self.lock: # Ensure thread-safety
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return item
def qsize(self):
return self.size
return self.size.value
def empty(self):
return self.qsize() == 0