1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Helpers for wrapping low-level gRPC methods with common functionality. 
    16 
    17This is used by gapic clients to provide common error mapping, retry, timeout, 
    18compression, pagination, and long-running operations to gRPC methods. 
    19""" 
    20 
    21import enum 
    22import functools 
    23 
    24from google.api_core import grpc_helpers 
    25from google.api_core.gapic_v1 import client_info 
    26from google.api_core.timeout import TimeToDeadlineTimeout 
    27 
    28USE_DEFAULT_METADATA = object() 
    29 
    30 
    31class _MethodDefault(enum.Enum): 
    32    # Uses enum so that pytype/mypy knows that this is the only possible value. 
    33    # https://stackoverflow.com/a/60605919/101923 
    34    # 
    35    # Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8. 
    36    # https://docs.python.org/3/library/typing.html#typing.Literal 
    37    _DEFAULT_VALUE = object() 
    38 
    39 
    40DEFAULT = _MethodDefault._DEFAULT_VALUE 
    41"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified, 
    42so the default should be used.""" 
    43 
    44 
    45def _is_not_none_or_false(value): 
    46    return value is not None and value is not False 
    47 
    48 
    49def _apply_decorators(func, decorators): 
    50    """Apply a list of decorators to a given function. 
    51 
    52    ``decorators`` may contain items that are ``None`` or ``False`` which will 
    53    be ignored. 
    54    """ 
    55    filtered_decorators = filter(_is_not_none_or_false, reversed(decorators)) 
    56 
    57    for decorator in filtered_decorators: 
    58        func = decorator(func) 
    59 
    60    return func 
    61 
    62 
    63class _GapicCallable(object): 
    64    """Callable that applies retry, timeout, and metadata logic. 
    65 
    66    Args: 
    67        target (Callable): The low-level RPC method. 
    68        retry (google.api_core.retry.Retry): The default retry for the 
    69            callable. If ``None``, this callable will not retry by default 
    70        timeout (google.api_core.timeout.Timeout): The default timeout for the 
    71            callable (i.e. duration of time within which an RPC must terminate 
    72            after its start, not to be confused with deadline). If ``None``, 
    73            this callable will not specify a timeout argument to the low-level 
    74            RPC method. 
    75        compression (grpc.Compression): The default compression for the callable. 
    76            If ``None``, this callable will not specify a compression argument 
    77            to the low-level RPC method. 
    78        metadata (Sequence[Tuple[str, str]]): Additional metadata that is 
    79            provided to the RPC method on every invocation. This is merged with 
    80            any metadata specified during invocation. If ``None``, no 
    81            additional metadata will be passed to the RPC method. 
    82    """ 
    83 
    84    def __init__( 
    85        self, 
    86        target, 
    87        retry, 
    88        timeout, 
    89        compression, 
    90        metadata=None, 
    91    ): 
    92        self._target = target 
    93        self._retry = retry 
    94        self._timeout = timeout 
    95        self._compression = compression 
    96        self._metadata = metadata 
    97 
    98    def __call__( 
    99        self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs 
    100    ): 
    101        """Invoke the low-level RPC with retry, timeout, compression, and metadata.""" 
    102 
    103        if retry is DEFAULT: 
    104            retry = self._retry 
    105 
    106        if timeout is DEFAULT: 
    107            timeout = self._timeout 
    108 
    109        if compression is DEFAULT: 
    110            compression = self._compression 
    111 
    112        if isinstance(timeout, (int, float)): 
    113            timeout = TimeToDeadlineTimeout(timeout=timeout) 
    114 
    115        # Apply all applicable decorators. 
    116        wrapped_func = _apply_decorators(self._target, [retry, timeout]) 
    117 
    118        # Add the user agent metadata to the call. 
    119        if self._metadata is not None: 
    120            metadata = kwargs.get("metadata", []) 
    121            # Due to the nature of invocation, None should be treated the same 
    122            # as not specified. 
    123            if metadata is None: 
    124                metadata = [] 
    125            metadata = list(metadata) 
    126            metadata.extend(self._metadata) 
    127            kwargs["metadata"] = metadata 
    128        if self._compression is not None: 
    129            kwargs["compression"] = compression 
    130 
    131        return wrapped_func(*args, **kwargs) 
    132 
    133 
    134def wrap_method( 
    135    func, 
    136    default_retry=None, 
    137    default_timeout=None, 
    138    default_compression=None, 
    139    client_info=client_info.DEFAULT_CLIENT_INFO, 
    140    *, 
    141    with_call=False, 
    142): 
    143    """Wrap an RPC method with common behavior. 
    144 
    145    This applies common error wrapping, retry, timeout, and compression behavior to a function. 
    146    The wrapped function will take optional ``retry``, ``timeout``, and ``compression`` 
    147    arguments. 
    148 
    149    For example:: 
    150 
    151        import google.api_core.gapic_v1.method 
    152        from google.api_core import retry 
    153        from google.api_core import timeout 
    154        from grpc import Compression 
    155 
    156        # The original RPC method. 
    157        def get_topic(name, timeout=None): 
    158            request = publisher_v2.GetTopicRequest(name=name) 
    159            return publisher_stub.GetTopic(request, timeout=timeout) 
    160 
    161        default_retry = retry.Retry(deadline=60) 
    162        default_timeout = timeout.Timeout(deadline=60) 
    163        default_compression = Compression.NoCompression 
    164        wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method( 
    165            get_topic, default_retry) 
    166 
    167        # Execute get_topic with default retry and timeout: 
    168        response = wrapped_get_topic() 
    169 
    170        # Execute get_topic without doing any retying but with the default 
    171        # timeout: 
    172        response = wrapped_get_topic(retry=None) 
    173 
    174        # Execute get_topic but only retry on 5xx errors: 
    175        my_retry = retry.Retry(retry.if_exception_type( 
    176            exceptions.InternalServerError)) 
    177        response = wrapped_get_topic(retry=my_retry) 
    178 
    179    The way this works is by late-wrapping the given function with the retry 
    180    and timeout decorators. Essentially, when ``wrapped_get_topic()`` is 
    181    called: 
    182 
    183    * ``get_topic()`` is first wrapped with the ``timeout`` into 
    184      ``get_topic_with_timeout``. 
    185    * ``get_topic_with_timeout`` is wrapped with the ``retry`` into 
    186      ``get_topic_with_timeout_and_retry()``. 
    187    * The final ``get_topic_with_timeout_and_retry`` is called passing through 
    188      the ``args``  and ``kwargs``. 
    189 
    190    The callstack is therefore:: 
    191 
    192        method.__call__() -> 
    193            Retry.__call__() -> 
    194                Timeout.__call__() -> 
    195                    wrap_errors() -> 
    196                        get_topic() 
    197 
    198    Note that if ``timeout`` or ``retry`` is ``None``, then they are not 
    199    applied to the function. For example, 
    200    ``wrapped_get_topic(timeout=None, retry=None)`` is more or less 
    201    equivalent to just calling ``get_topic`` but with error re-mapping. 
    202 
    203    Args: 
    204        func (Callable[Any]): The function to wrap. It should accept an 
    205            optional ``timeout`` argument. If ``metadata`` is not ``None``, it 
    206            should accept a ``metadata`` argument. 
    207        default_retry (Optional[google.api_core.Retry]): The default retry 
    208            strategy. If ``None``, the method will not retry by default. 
    209        default_timeout (Optional[google.api_core.Timeout]): The default 
    210            timeout strategy. Can also be specified as an int or float. If 
    211            ``None``, the method will not have timeout specified by default. 
    212        default_compression (Optional[grpc.Compression]): The default 
    213            grpc.Compression. If ``None``, the method will not have 
    214            compression specified by default. 
    215        client_info 
    216            (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): 
    217                Client information used to create a user-agent string that's 
    218                passed as gRPC metadata to the method. If unspecified, then 
    219                a sane default will be used. If ``None``, then no user agent 
    220                metadata will be provided to the RPC method. 
    221        with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will 
    222            return a tuple of (response, grpc.Call) instead of just the response. 
    223            This is useful for extracting trailing metadata from unary calls. 
    224            Defaults to False. 
    225 
    226    Returns: 
    227        Callable: A new callable that takes optional ``retry``, ``timeout``, 
    228            and ``compression`` 
    229            arguments and applies the common error mapping, retry, timeout, compression, 
    230            and metadata behavior to the low-level RPC method. 
    231    """ 
    232    if with_call: 
    233        try: 
    234            func = func.with_call 
    235        except AttributeError as exc: 
    236            raise ValueError( 
    237                "with_call=True is only supported for unary calls." 
    238            ) from exc 
    239    func = grpc_helpers.wrap_errors(func) 
    240    if client_info is not None: 
    241        user_agent_metadata = [client_info.to_grpc_metadata()] 
    242    else: 
    243        user_agent_metadata = None 
    244 
    245    return functools.wraps(func)( 
    246        _GapicCallable( 
    247            func, 
    248            default_retry, 
    249            default_timeout, 
    250            default_compression, 
    251            metadata=user_agent_metadata, 
    252        ) 
    253    )