1# util/queue.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7# mypy: allow-untyped-defs, allow-untyped-calls 
    8 
    9"""An adaptation of Py2.3/2.4's Queue module which supports reentrant 
    10behavior, using RLock instead of Lock for its mutex object.  The 
    11Queue object is used exclusively by the sqlalchemy.pool.QueuePool 
    12class. 
    13 
    14This is to support the connection pool's usage of weakref callbacks to return 
    15connections to the underlying Queue, which can in extremely 
    16rare cases be invoked within the ``get()`` method of the Queue itself, 
    17producing a ``put()`` inside the ``get()`` and therefore a reentrant 
    18condition. 
    19 
    20""" 
    21from __future__ import annotations 
    22 
    23import asyncio 
    24from collections import deque 
    25import threading 
    26from time import time as _time 
    27from typing import Any 
    28from typing import Deque 
    29from typing import Generic 
    30from typing import Optional 
    31from typing import TypeVar 
    32 
    33from .concurrency import await_ 
    34from .langhelpers import memoized_property 
    35 
    36 
    37_T = TypeVar("_T", bound=Any) 
    38__all__ = ["Empty", "Full", "Queue"] 
    39 
    40 
    41class Empty(Exception): 
    42    "Exception raised by Queue.get(block=0)/get_nowait()." 
    43 
    44    pass 
    45 
    46 
    47class Full(Exception): 
    48    "Exception raised by Queue.put(block=0)/put_nowait()." 
    49 
    50    pass 
    51 
    52 
    53class QueueCommon(Generic[_T]): 
    54    maxsize: int 
    55    use_lifo: bool 
    56 
    57    def __init__(self, maxsize: int = 0, use_lifo: bool = False): ... 
    58 
    59    def empty(self) -> bool: 
    60        raise NotImplementedError() 
    61 
    62    def full(self) -> bool: 
    63        raise NotImplementedError() 
    64 
    65    def qsize(self) -> int: 
    66        raise NotImplementedError() 
    67 
    68    def put_nowait(self, item: _T) -> None: 
    69        raise NotImplementedError() 
    70 
    71    def put( 
    72        self, item: _T, block: bool = True, timeout: Optional[float] = None 
    73    ) -> None: 
    74        raise NotImplementedError() 
    75 
    76    def get_nowait(self) -> _T: 
    77        raise NotImplementedError() 
    78 
    79    def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 
    80        raise NotImplementedError() 
    81 
    82 
    83class Queue(QueueCommon[_T]): 
    84    queue: Deque[_T] 
    85 
    86    def __init__(self, maxsize: int = 0, use_lifo: bool = False): 
    87        """Initialize a queue object with a given maximum size. 
    88 
    89        If `maxsize` is <= 0, the queue size is infinite. 
    90 
    91        If `use_lifo` is True, this Queue acts like a Stack (LIFO). 
    92        """ 
    93 
    94        self._init(maxsize) 
    95        # mutex must be held whenever the queue is mutating.  All methods 
    96        # that acquire mutex must release it before returning.  mutex 
    97        # is shared between the two conditions, so acquiring and 
    98        # releasing the conditions also acquires and releases mutex. 
    99        self.mutex = threading.RLock() 
    100        # Notify not_empty whenever an item is added to the queue; a 
    101        # thread waiting to get is notified then. 
    102        self.not_empty = threading.Condition(self.mutex) 
    103        # Notify not_full whenever an item is removed from the queue; 
    104        # a thread waiting to put is notified then. 
    105        self.not_full = threading.Condition(self.mutex) 
    106        # If this queue uses LIFO or FIFO 
    107        self.use_lifo = use_lifo 
    108 
    109    def qsize(self) -> int: 
    110        """Return the approximate size of the queue (not reliable!).""" 
    111 
    112        with self.mutex: 
    113            return self._qsize() 
    114 
    115    def empty(self) -> bool: 
    116        """Return True if the queue is empty, False otherwise (not 
    117        reliable!).""" 
    118 
    119        with self.mutex: 
    120            return self._empty() 
    121 
    122    def full(self) -> bool: 
    123        """Return True if the queue is full, False otherwise (not 
    124        reliable!).""" 
    125 
    126        with self.mutex: 
    127            return self._full() 
    128 
    129    def put( 
    130        self, item: _T, block: bool = True, timeout: Optional[float] = None 
    131    ) -> None: 
    132        """Put an item into the queue. 
    133 
    134        If optional args `block` is True and `timeout` is None (the 
    135        default), block if necessary until a free slot is 
    136        available. If `timeout` is a positive number, it blocks at 
    137        most `timeout` seconds and raises the ``Full`` exception if no 
    138        free slot was available within that time.  Otherwise (`block` 
    139        is false), put an item on the queue if a free slot is 
    140        immediately available, else raise the ``Full`` exception 
    141        (`timeout` is ignored in that case). 
    142        """ 
    143 
    144        with self.not_full: 
    145            if not block: 
    146                if self._full(): 
    147                    raise Full 
    148            elif timeout is None: 
    149                while self._full(): 
    150                    self.not_full.wait() 
    151            else: 
    152                if timeout < 0: 
    153                    raise ValueError("'timeout' must be a positive number") 
    154                endtime = _time() + timeout 
    155                while self._full(): 
    156                    remaining = endtime - _time() 
    157                    if remaining <= 0.0: 
    158                        raise Full 
    159                    self.not_full.wait(remaining) 
    160            self._put(item) 
    161            self.not_empty.notify() 
    162 
    163    def put_nowait(self, item: _T) -> None: 
    164        """Put an item into the queue without blocking. 
    165 
    166        Only enqueue the item if a free slot is immediately available. 
    167        Otherwise raise the ``Full`` exception. 
    168        """ 
    169        return self.put(item, False) 
    170 
    171    def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 
    172        """Remove and return an item from the queue. 
    173 
    174        If optional args `block` is True and `timeout` is None (the 
    175        default), block if necessary until an item is available. If 
    176        `timeout` is a positive number, it blocks at most `timeout` 
    177        seconds and raises the ``Empty`` exception if no item was 
    178        available within that time.  Otherwise (`block` is false), 
    179        return an item if one is immediately available, else raise the 
    180        ``Empty`` exception (`timeout` is ignored in that case). 
    181 
    182        """ 
    183        with self.not_empty: 
    184            if not block: 
    185                if self._empty(): 
    186                    raise Empty 
    187            elif timeout is None: 
    188                while self._empty(): 
    189                    self.not_empty.wait() 
    190            else: 
    191                if timeout < 0: 
    192                    raise ValueError("'timeout' must be a positive number") 
    193                endtime = _time() + timeout 
    194                while self._empty(): 
    195                    remaining = endtime - _time() 
    196                    if remaining <= 0.0: 
    197                        raise Empty 
    198                    self.not_empty.wait(remaining) 
    199            item = self._get() 
    200            self.not_full.notify() 
    201            return item 
    202 
    203    def get_nowait(self) -> _T: 
    204        """Remove and return an item from the queue without blocking. 
    205 
    206        Only get an item if one is immediately available. Otherwise 
    207        raise the ``Empty`` exception. 
    208        """ 
    209 
    210        return self.get(False) 
    211 
    212    def _init(self, maxsize: int) -> None: 
    213        self.maxsize = maxsize 
    214        self.queue = deque() 
    215 
    216    def _qsize(self) -> int: 
    217        return len(self.queue) 
    218 
    219    def _empty(self) -> bool: 
    220        return not self.queue 
    221 
    222    def _full(self) -> bool: 
    223        return self.maxsize > 0 and len(self.queue) == self.maxsize 
    224 
    225    def _put(self, item: _T) -> None: 
    226        self.queue.append(item) 
    227 
    228    def _get(self) -> _T: 
    229        if self.use_lifo: 
    230            # LIFO 
    231            return self.queue.pop() 
    232        else: 
    233            # FIFO 
    234            return self.queue.popleft() 
    235 
    236 
    237class AsyncAdaptedQueue(QueueCommon[_T]): 
    238    def __init__(self, maxsize: int = 0, use_lifo: bool = False): 
    239        self.use_lifo = use_lifo 
    240        self.maxsize = maxsize 
    241 
    242    def empty(self) -> bool: 
    243        return self._queue.empty() 
    244 
    245    def full(self): 
    246        return self._queue.full() 
    247 
    248    def qsize(self): 
    249        return self._queue.qsize() 
    250 
    251    @memoized_property 
    252    def _queue(self) -> asyncio.Queue[_T]: 
    253        # Delay creation of the queue until it is first used, to avoid 
    254        # binding it to a possibly wrong event loop. 
    255        # By delaying the creation of the pool we accommodate the common 
    256        # usage pattern of instantiating the engine at module level, where a 
    257        # different event loop is in present compared to when the application 
    258        # is actually run. 
    259 
    260        queue: asyncio.Queue[_T] 
    261 
    262        if self.use_lifo: 
    263            queue = asyncio.LifoQueue(maxsize=self.maxsize) 
    264        else: 
    265            queue = asyncio.Queue(maxsize=self.maxsize) 
    266        return queue 
    267 
    268    def put_nowait(self, item: _T) -> None: 
    269        try: 
    270            self._queue.put_nowait(item) 
    271        except asyncio.QueueFull as err: 
    272            raise Full() from err 
    273 
    274    def put( 
    275        self, item: _T, block: bool = True, timeout: Optional[float] = None 
    276    ) -> None: 
    277        if not block: 
    278            return self.put_nowait(item) 
    279 
    280        try: 
    281            if timeout is not None: 
    282                await_(asyncio.wait_for(self._queue.put(item), timeout)) 
    283            else: 
    284                await_(self._queue.put(item)) 
    285        except (asyncio.QueueFull, asyncio.TimeoutError) as err: 
    286            raise Full() from err 
    287 
    288    def get_nowait(self) -> _T: 
    289        try: 
    290            return self._queue.get_nowait() 
    291        except asyncio.QueueEmpty as err: 
    292            raise Empty() from err 
    293 
    294    def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 
    295        if not block: 
    296            return self.get_nowait() 
    297 
    298        try: 
    299            if timeout is not None: 
    300                return await_(asyncio.wait_for(self._queue.get(), timeout)) 
    301            else: 
    302                return await_(self._queue.get()) 
    303        except (asyncio.QueueEmpty, asyncio.TimeoutError) as err: 
    304            raise Empty() from err