1# util/queue.py 
    2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: http://www.opensource.org/licenses/mit-license.php 
    7 
    8"""An adaptation of Py2.3/2.4's Queue module which supports reentrant 
    9behavior, using RLock instead of Lock for its mutex object.  The 
    10Queue object is used exclusively by the sqlalchemy.pool.QueuePool 
    11class. 
    12 
    13This is to support the connection pool's usage of weakref callbacks to return 
    14connections to the underlying Queue, which can in extremely 
    15rare cases be invoked within the ``get()`` method of the Queue itself, 
    16producing a ``put()`` inside the ``get()`` and therefore a reentrant 
    17condition. 
    18 
    19""" 
    20 
    21from collections import deque 
    22from time import time as _time 
    23 
    24from .compat import threading 
    25 
    26 
    27__all__ = ["Empty", "Full", "Queue"] 
    28 
    29 
    30class Empty(Exception): 
    31    "Exception raised by Queue.get(block=0)/get_nowait()." 
    32 
    33    pass 
    34 
    35 
    36class Full(Exception): 
    37    "Exception raised by Queue.put(block=0)/put_nowait()." 
    38 
    39    pass 
    40 
    41 
    42class Queue: 
    43    def __init__(self, maxsize=0, use_lifo=False): 
    44        """Initialize a queue object with a given maximum size. 
    45 
    46        If `maxsize` is <= 0, the queue size is infinite. 
    47 
    48        If `use_lifo` is True, this Queue acts like a Stack (LIFO). 
    49        """ 
    50 
    51        self._init(maxsize) 
    52        # mutex must be held whenever the queue is mutating.  All methods 
    53        # that acquire mutex must release it before returning.  mutex 
    54        # is shared between the two conditions, so acquiring and 
    55        # releasing the conditions also acquires and releases mutex. 
    56        self.mutex = threading.RLock() 
    57        # Notify not_empty whenever an item is added to the queue; a 
    58        # thread waiting to get is notified then. 
    59        self.not_empty = threading.Condition(self.mutex) 
    60        # Notify not_full whenever an item is removed from the queue; 
    61        # a thread waiting to put is notified then. 
    62        self.not_full = threading.Condition(self.mutex) 
    63        # If this queue uses LIFO or FIFO 
    64        self.use_lifo = use_lifo 
    65 
    66    def qsize(self): 
    67        """Return the approximate size of the queue (not reliable!).""" 
    68 
    69        self.mutex.acquire() 
    70        n = self._qsize() 
    71        self.mutex.release() 
    72        return n 
    73 
    74    def empty(self): 
    75        """Return True if the queue is empty, False otherwise (not 
    76        reliable!).""" 
    77 
    78        self.mutex.acquire() 
    79        n = self._empty() 
    80        self.mutex.release() 
    81        return n 
    82 
    83    def full(self): 
    84        """Return True if the queue is full, False otherwise (not 
    85        reliable!).""" 
    86 
    87        self.mutex.acquire() 
    88        n = self._full() 
    89        self.mutex.release() 
    90        return n 
    91 
    92    def put(self, item, block=True, timeout=None): 
    93        """Put an item into the queue. 
    94 
    95        If optional args `block` is True and `timeout` is None (the 
    96        default), block if necessary until a free slot is 
    97        available. If `timeout` is a positive number, it blocks at 
    98        most `timeout` seconds and raises the ``Full`` exception if no 
    99        free slot was available within that time.  Otherwise (`block` 
    100        is false), put an item on the queue if a free slot is 
    101        immediately available, else raise the ``Full`` exception 
    102        (`timeout` is ignored in that case). 
    103        """ 
    104 
    105        self.not_full.acquire() 
    106        try: 
    107            if not block: 
    108                if self._full(): 
    109                    raise Full 
    110            elif timeout is None: 
    111                while self._full(): 
    112                    self.not_full.wait() 
    113            else: 
    114                if timeout < 0: 
    115                    raise ValueError("'timeout' must be a positive number") 
    116                endtime = _time() + timeout 
    117                while self._full(): 
    118                    remaining = endtime - _time() 
    119                    if remaining <= 0.0: 
    120                        raise Full 
    121                    self.not_full.wait(remaining) 
    122            self._put(item) 
    123            self.not_empty.notify() 
    124        finally: 
    125            self.not_full.release() 
    126 
    127    def put_nowait(self, item): 
    128        """Put an item into the queue without blocking. 
    129 
    130        Only enqueue the item if a free slot is immediately available. 
    131        Otherwise raise the ``Full`` exception. 
    132        """ 
    133        return self.put(item, False) 
    134 
    135    def get(self, block=True, timeout=None): 
    136        """Remove and return an item from the queue. 
    137 
    138        If optional args `block` is True and `timeout` is None (the 
    139        default), block if necessary until an item is available. If 
    140        `timeout` is a positive number, it blocks at most `timeout` 
    141        seconds and raises the ``Empty`` exception if no item was 
    142        available within that time.  Otherwise (`block` is false), 
    143        return an item if one is immediately available, else raise the 
    144        ``Empty`` exception (`timeout` is ignored in that case). 
    145        """ 
    146        self.not_empty.acquire() 
    147        try: 
    148            if not block: 
    149                if self._empty(): 
    150                    raise Empty 
    151            elif timeout is None: 
    152                while self._empty(): 
    153                    self.not_empty.wait() 
    154            else: 
    155                if timeout < 0: 
    156                    raise ValueError("'timeout' must be a positive number") 
    157                endtime = _time() + timeout 
    158                while self._empty(): 
    159                    remaining = endtime - _time() 
    160                    if remaining <= 0.0: 
    161                        raise Empty 
    162                    self.not_empty.wait(remaining) 
    163            item = self._get() 
    164            self.not_full.notify() 
    165            return item 
    166        finally: 
    167            self.not_empty.release() 
    168 
    169    def get_nowait(self): 
    170        """Remove and return an item from the queue without blocking. 
    171 
    172        Only get an item if one is immediately available. Otherwise 
    173        raise the ``Empty`` exception. 
    174        """ 
    175 
    176        return self.get(False) 
    177 
    178    # Override these methods to implement other queue organizations 
    179    # (e.g. stack or priority queue). 
    180    # These will only be called with appropriate locks held 
    181 
    182    # Initialize the queue representation 
    183    def _init(self, maxsize): 
    184        self.maxsize = maxsize 
    185        self.queue = deque() 
    186 
    187    def _qsize(self): 
    188        return len(self.queue) 
    189 
    190    # Check whether the queue is empty 
    191    def _empty(self): 
    192        return not self.queue 
    193 
    194    # Check whether the queue is full 
    195    def _full(self): 
    196        return self.maxsize > 0 and len(self.queue) == self.maxsize 
    197 
    198    # Put a new item in the queue 
    199    def _put(self, item): 
    200        self.queue.append(item) 
    201 
    202    # Get an item from the queue 
    203    def _get(self): 
    204        if self.use_lifo: 
    205            # LIFO 
    206            return self.queue.pop() 
    207        else: 
    208            # FIFO 
    209            return self.queue.popleft()