1# Copyright 2017, Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15import logging 
    16import queue 
    17import time 
    18from typing import Any, Callable, List, Sequence, Optional 
    19import uuid 
    20 
    21 
    22__all__ = ("QueueCallbackWorker", "STOP") 
    23 
    24_LOGGER = logging.getLogger(__name__) 
    25 
    26 
    27# Helper thread stop indicator. This could be a sentinel object or None, 
    28# but the sentinel object's ID can change if the process is forked, and 
    29# None has the possibility of a user accidentally killing the helper 
    30# thread. 
    31STOP = uuid.uuid4() 
    32 
    33 
    34def _get_many( 
    35    queue_: queue.Queue, max_items: Optional[int] = None, max_latency: float = 0 
    36) -> List[Any]: 
    37    """Get multiple items from a Queue. 
    38 
    39    Gets at least one (blocking) and at most ``max_items`` items 
    40    (non-blocking) from a given Queue. Does not mark the items as done. 
    41 
    42    Args: 
    43        queue_: The Queue to get items from. 
    44        max_items: 
    45            The maximum number of items to get. If ``None``, then all available items 
    46            in the queue are returned. 
    47        max_latency: 
    48            The maximum number of seconds to wait for more than one item from a queue. 
    49            This number includes the time required to retrieve the first item. 
    50 
    51    Returns: 
    52        A sequence of items retrieved from the queue. 
    53    """ 
    54    start = time.time() 
    55    # Always return at least one item. 
    56    items = [queue_.get()] 
    57    while max_items is None or len(items) < max_items: 
    58        try: 
    59            elapsed = time.time() - start 
    60            timeout = max(0, max_latency - elapsed) 
    61            items.append(queue_.get(timeout=timeout)) 
    62        except queue.Empty: 
    63            break 
    64    return items 
    65 
    66 
    67class QueueCallbackWorker(object): 
    68    """A helper that executes a callback for items sent in a queue. 
    69 
    70    Calls a blocking ``get()`` on the ``queue`` until it encounters 
    71    :attr:`STOP`. 
    72 
    73    Args: 
    74        queue: 
    75            A Queue instance, appropriate for crossing the concurrency boundary 
    76            implemented by ``executor``. Items will be popped off (with a blocking 
    77            ``get()``) until :attr:`STOP` is encountered. 
    78        callback: 
    79            A callback that can process items pulled off of the queue. Multiple items 
    80            will be passed to the callback in batches. 
    81        max_items: 
    82            The maximum amount of items that will be passed to the callback at a time. 
    83        max_latency: 
    84            The maximum amount of time in seconds to wait for additional items before 
    85            executing the callback. 
    86    """ 
    87 
    88    def __init__( 
    89        self, 
    90        queue: queue.Queue, 
    91        callback: Callable[[Sequence[Any]], Any], 
    92        max_items: int = 100, 
    93        max_latency: float = 0, 
    94    ): 
    95        self.queue = queue 
    96        self._callback = callback 
    97        self.max_items = max_items 
    98        self.max_latency = max_latency 
    99 
    100    def __call__(self) -> None: 
    101        continue_ = True 
    102        while continue_: 
    103            items = _get_many( 
    104                self.queue, max_items=self.max_items, max_latency=self.max_latency 
    105            ) 
    106 
    107            # If stop is in the items, process all items up to STOP and then 
    108            # exit. 
    109            try: 
    110                items = items[: items.index(STOP)] 
    111                continue_ = False 
    112            except ValueError: 
    113                pass 
    114 
    115            # Run the callback. If any exceptions occur, log them and 
    116            # continue. 
    117            try: 
    118                self._callback(items) 
    119            except Exception as exc: 
    120                _LOGGER.exception("Error in queue callback worker: %s", exc) 
    121 
    122        _LOGGER.debug("Exiting the QueueCallbackWorker.")