Don't try to close or join mp manager queues (#18866)

Multiprocessing Manager queues don't have a close() or join_thread() method, and the Manager will clean it up appropriately after we empty it. This prevents an infinite loop when an AttributeError exception fires for Manager AutoProxy queue objects.
This commit is contained in:
Josh Hawkins 2025-06-24 16:19:09 -05:00 committed by Nicolas Mowen
parent 671bd0e7d1
commit 326fac2e9d

View File

@ -5,7 +5,7 @@ import copy
import datetime import datetime
import logging import logging
import math import math
import multiprocessing as mp import multiprocessing.queues
import queue import queue
import re import re
import shlex import shlex
@ -338,16 +338,23 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
file.unlink(missing_ok=missing_ok) file.unlink(missing_ok=missing_ok)
def empty_and_close_queue(q: mp.Queue): def empty_and_close_queue(q):
while True: while True:
try: try:
try: q.get(block=True, timeout=0.5)
q.get(block=True, timeout=0.5) except (queue.Empty, EOFError):
except (queue.Empty, EOFError): break
q.close() except Exception as e:
q.join_thread() logger.debug(f"Error while emptying queue: {e}")
return break
except AttributeError:
# close the queue if it is a multiprocessing queue
# manager proxy queues do not have close or join_thread method
if isinstance(q, multiprocessing.queues.Queue):
try:
q.close()
q.join_thread()
except Exception:
pass pass