1# Copyright 2020 The gRPC Authors 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14"""Abstract base classes for Channel objects and Multicallable objects.""" 
    15 
    16import abc 
    17from typing import Generic, Optional 
    18 
    19import grpc 
    20 
    21from . import _base_call 
    22from ._typing import DeserializingFunction 
    23from ._typing import MetadataType 
    24from ._typing import RequestIterableType 
    25from ._typing import RequestType 
    26from ._typing import ResponseType 
    27from ._typing import SerializingFunction 
    28 
    29 
    30class UnaryUnaryMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 
    31    """Enables asynchronous invocation of a unary-call RPC.""" 
    32 
    33    @abc.abstractmethod 
    34    def __call__( 
    35        self, 
    36        request: RequestType, 
    37        *, 
    38        timeout: Optional[float] = None, 
    39        metadata: Optional[MetadataType] = None, 
    40        credentials: Optional[grpc.CallCredentials] = None, 
    41        wait_for_ready: Optional[bool] = None, 
    42        compression: Optional[grpc.Compression] = None, 
    43    ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]: 
    44        """Asynchronously invokes the underlying RPC. 
    45 
    46        Args: 
    47          request: The request value for the RPC. 
    48          timeout: An optional duration of time in seconds to allow 
    49            for the RPC. 
    50          metadata: Optional :term:`metadata` to be transmitted to the 
    51            service-side of the RPC. 
    52          credentials: An optional CallCredentials for the RPC. Only valid for 
    53            secure Channel. 
    54          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 
    55          compression: An element of grpc.compression, e.g. 
    56            grpc.compression.Gzip. 
    57 
    58        Returns: 
    59          A UnaryUnaryCall object. 
    60 
    61        Raises: 
    62          RpcError: Indicates that the RPC terminated with non-OK status. The 
    63            raised RpcError will also be a Call for the RPC affording the RPC's 
    64            metadata, status code, and details. 
    65        """ 
    66 
    67 
    68class UnaryStreamMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 
    69    """Enables asynchronous invocation of a server-streaming RPC.""" 
    70 
    71    @abc.abstractmethod 
    72    def __call__( 
    73        self, 
    74        request: RequestType, 
    75        *, 
    76        timeout: Optional[float] = None, 
    77        metadata: Optional[MetadataType] = None, 
    78        credentials: Optional[grpc.CallCredentials] = None, 
    79        wait_for_ready: Optional[bool] = None, 
    80        compression: Optional[grpc.Compression] = None, 
    81    ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]: 
    82        """Asynchronously invokes the underlying RPC. 
    83 
    84        Args: 
    85          request: The request value for the RPC. 
    86          timeout: An optional duration of time in seconds to allow 
    87            for the RPC. 
    88          metadata: Optional :term:`metadata` to be transmitted to the 
    89            service-side of the RPC. 
    90          credentials: An optional CallCredentials for the RPC. Only valid for 
    91            secure Channel. 
    92          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 
    93          compression: An element of grpc.compression, e.g. 
    94            grpc.compression.Gzip. 
    95 
    96        Returns: 
    97          A UnaryStreamCall object. 
    98 
    99        Raises: 
    100          RpcError: Indicates that the RPC terminated with non-OK status. The 
    101            raised RpcError will also be a Call for the RPC affording the RPC's 
    102            metadata, status code, and details. 
    103        """ 
    104 
    105 
    106class StreamUnaryMultiCallable(abc.ABC): 
    107    """Enables asynchronous invocation of a client-streaming RPC.""" 
    108 
    109    @abc.abstractmethod 
    110    def __call__( 
    111        self, 
    112        request_iterator: Optional[RequestIterableType] = None, 
    113        timeout: Optional[float] = None, 
    114        metadata: Optional[MetadataType] = None, 
    115        credentials: Optional[grpc.CallCredentials] = None, 
    116        wait_for_ready: Optional[bool] = None, 
    117        compression: Optional[grpc.Compression] = None, 
    118    ) -> _base_call.StreamUnaryCall: 
    119        """Asynchronously invokes the underlying RPC. 
    120 
    121        Args: 
    122          request_iterator: An optional async iterable or iterable of request 
    123            messages for the RPC. 
    124          timeout: An optional duration of time in seconds to allow 
    125            for the RPC. 
    126          metadata: Optional :term:`metadata` to be transmitted to the 
    127            service-side of the RPC. 
    128          credentials: An optional CallCredentials for the RPC. Only valid for 
    129            secure Channel. 
    130          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 
    131          compression: An element of grpc.compression, e.g. 
    132            grpc.compression.Gzip. 
    133 
    134        Returns: 
    135          A StreamUnaryCall object. 
    136 
    137        Raises: 
    138          RpcError: Indicates that the RPC terminated with non-OK status. The 
    139            raised RpcError will also be a Call for the RPC affording the RPC's 
    140            metadata, status code, and details. 
    141        """ 
    142 
    143 
    144class StreamStreamMultiCallable(abc.ABC): 
    145    """Enables asynchronous invocation of a bidirectional-streaming RPC.""" 
    146 
    147    @abc.abstractmethod 
    148    def __call__( 
    149        self, 
    150        request_iterator: Optional[RequestIterableType] = None, 
    151        timeout: Optional[float] = None, 
    152        metadata: Optional[MetadataType] = None, 
    153        credentials: Optional[grpc.CallCredentials] = None, 
    154        wait_for_ready: Optional[bool] = None, 
    155        compression: Optional[grpc.Compression] = None, 
    156    ) -> _base_call.StreamStreamCall: 
    157        """Asynchronously invokes the underlying RPC. 
    158 
    159        Args: 
    160          request_iterator: An optional async iterable or iterable of request 
    161            messages for the RPC. 
    162          timeout: An optional duration of time in seconds to allow 
    163            for the RPC. 
    164          metadata: Optional :term:`metadata` to be transmitted to the 
    165            service-side of the RPC. 
    166          credentials: An optional CallCredentials for the RPC. Only valid for 
    167            secure Channel. 
    168          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 
    169          compression: An element of grpc.compression, e.g. 
    170            grpc.compression.Gzip. 
    171 
    172        Returns: 
    173          A StreamStreamCall object. 
    174 
    175        Raises: 
    176          RpcError: Indicates that the RPC terminated with non-OK status. The 
    177            raised RpcError will also be a Call for the RPC affording the RPC's 
    178            metadata, status code, and details. 
    179        """ 
    180 
    181 
    182class Channel(abc.ABC): 
    183    """Enables asynchronous RPC invocation as a client. 
    184 
    185    Channel objects implement the Asynchronous Context Manager (aka. async 
    186    with) type, although they are not supported to be entered and exited 
    187    multiple times. 
    188    """ 
    189 
    190    @abc.abstractmethod 
    191    async def __aenter__(self): 
    192        """Starts an asynchronous context manager. 
    193 
    194        Returns: 
    195          Channel the channel that was instantiated. 
    196        """ 
    197 
    198    @abc.abstractmethod 
    199    async def __aexit__(self, exc_type, exc_val, exc_tb): 
    200        """Finishes the asynchronous context manager by closing the channel. 
    201 
    202        Still active RPCs will be cancelled. 
    203        """ 
    204 
    205    @abc.abstractmethod 
    206    async def close(self, grace: Optional[float] = None): 
    207        """Closes this Channel and releases all resources held by it. 
    208 
    209        This method immediately stops the channel from executing new RPCs in 
    210        all cases. 
    211 
    212        If a grace period is specified, this method waits until all active 
    213        RPCs are finished or until the grace period is reached. RPCs that haven't 
    214        been terminated within the grace period are aborted. 
    215        If a grace period is not specified (by passing None for grace), 
    216        all existing RPCs are cancelled immediately. 
    217 
    218        This method is idempotent. 
    219        """ 
    220 
    221    @abc.abstractmethod 
    222    def get_state( 
    223        self, try_to_connect: bool = False 
    224    ) -> grpc.ChannelConnectivity: 
    225        """Checks the connectivity state of a channel. 
    226 
    227        This is an EXPERIMENTAL API. 
    228 
    229        If the channel reaches a stable connectivity state, it is guaranteed 
    230        that the return value of this function will eventually converge to that 
    231        state. 
    232 
    233        Args: 
    234          try_to_connect: a bool indicate whether the Channel should try to 
    235            connect to peer or not. 
    236 
    237        Returns: A ChannelConnectivity object. 
    238        """ 
    239 
    240    @abc.abstractmethod 
    241    async def wait_for_state_change( 
    242        self, 
    243        last_observed_state: grpc.ChannelConnectivity, 
    244    ) -> None: 
    245        """Waits for a change in connectivity state. 
    246 
    247        This is an EXPERIMENTAL API. 
    248 
    249        The function blocks until there is a change in the channel connectivity 
    250        state from the "last_observed_state". If the state is already 
    251        different, this function will return immediately. 
    252 
    253        There is an inherent race between the invocation of 
    254        "Channel.wait_for_state_change" and "Channel.get_state". The state can 
    255        change arbitrary many times during the race, so there is no way to 
    256        observe every state transition. 
    257 
    258        If there is a need to put a timeout for this function, please refer to 
    259        "asyncio.wait_for". 
    260 
    261        Args: 
    262          last_observed_state: A grpc.ChannelConnectivity object representing 
    263            the last known state. 
    264        """ 
    265 
    266    @abc.abstractmethod 
    267    async def channel_ready(self) -> None: 
    268        """Creates a coroutine that blocks until the Channel is READY.""" 
    269 
    270    @abc.abstractmethod 
    271    def unary_unary( 
    272        self, 
    273        method: str, 
    274        request_serializer: Optional[SerializingFunction] = None, 
    275        response_deserializer: Optional[DeserializingFunction] = None, 
    276        _registered_method: Optional[bool] = False, 
    277    ) -> UnaryUnaryMultiCallable: 
    278        """Creates a UnaryUnaryMultiCallable for a unary-unary method. 
    279 
    280        Args: 
    281          method: The name of the RPC method. 
    282          request_serializer: Optional :term:`serializer` for serializing the request 
    283            message. Request goes unserialized in case None is passed. 
    284          response_deserializer: Optional :term:`deserializer` for deserializing the 
    285            response message. Response goes undeserialized in case None 
    286            is passed. 
    287          _registered_method: Implementation Private. Optional: A bool representing 
    288            whether the method is registered. 
    289 
    290        Returns: 
    291          A UnaryUnaryMultiCallable value for the named unary-unary method. 
    292        """ 
    293 
    294    @abc.abstractmethod 
    295    def unary_stream( 
    296        self, 
    297        method: str, 
    298        request_serializer: Optional[SerializingFunction] = None, 
    299        response_deserializer: Optional[DeserializingFunction] = None, 
    300        _registered_method: Optional[bool] = False, 
    301    ) -> UnaryStreamMultiCallable: 
    302        """Creates a UnaryStreamMultiCallable for a unary-stream method. 
    303 
    304        Args: 
    305          method: The name of the RPC method. 
    306          request_serializer: Optional :term:`serializer` for serializing the request 
    307            message. Request goes unserialized in case None is passed. 
    308          response_deserializer: Optional :term:`deserializer` for deserializing the 
    309            response message. Response goes undeserialized in case None 
    310            is passed. 
    311          _registered_method: Implementation Private. Optional: A bool representing 
    312            whether the method is registered. 
    313 
    314        Returns: 
    315          A UnaryStreamMultiCallable value for the named unary-stream method. 
    316        """ 
    317 
    318    @abc.abstractmethod 
    319    def stream_unary( 
    320        self, 
    321        method: str, 
    322        request_serializer: Optional[SerializingFunction] = None, 
    323        response_deserializer: Optional[DeserializingFunction] = None, 
    324        _registered_method: Optional[bool] = False, 
    325    ) -> StreamUnaryMultiCallable: 
    326        """Creates a StreamUnaryMultiCallable for a stream-unary method. 
    327 
    328        Args: 
    329          method: The name of the RPC method. 
    330          request_serializer: Optional :term:`serializer` for serializing the request 
    331            message. Request goes unserialized in case None is passed. 
    332          response_deserializer: Optional :term:`deserializer` for deserializing the 
    333            response message. Response goes undeserialized in case None 
    334            is passed. 
    335          _registered_method: Implementation Private. Optional: A bool representing 
    336            whether the method is registered. 
    337 
    338        Returns: 
    339          A StreamUnaryMultiCallable value for the named stream-unary method. 
    340        """ 
    341 
    342    @abc.abstractmethod 
    343    def stream_stream( 
    344        self, 
    345        method: str, 
    346        request_serializer: Optional[SerializingFunction] = None, 
    347        response_deserializer: Optional[DeserializingFunction] = None, 
    348        _registered_method: Optional[bool] = False, 
    349    ) -> StreamStreamMultiCallable: 
    350        """Creates a StreamStreamMultiCallable for a stream-stream method. 
    351 
    352        Args: 
    353          method: The name of the RPC method. 
    354          request_serializer: Optional :term:`serializer` for serializing the request 
    355            message. Request goes unserialized in case None is passed. 
    356          response_deserializer: Optional :term:`deserializer` for deserializing the 
    357            response message. Response goes undeserialized in case None 
    358            is passed. 
    359          _registered_method: Implementation Private. Optional: A bool representing 
    360            whether the method is registered. 
    361 
    362        Returns: 
    363          A StreamStreamMultiCallable value for the named stream-stream method. 
    364        """