1# Copyright 2025, Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# You may obtain a copy of the License at 
    5#     https://www.apache.org/licenses/LICENSE-2.0 
    6# 
    7# Unless required by applicable law or agreed to in writing, software 
    8# distributed under the License is distributed on an "AS IS" BASIS, 
    9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    10# See the License for the specific language governing permissions and 
    11# limitations under the License. 
    12 
    13"""Base class for bi-directional streaming RPC helpers.""" 
    14 
    15 
    16class BidiRpcBase: 
    17    """A base class for consuming a bi-directional streaming RPC. 
    18 
    19    This maps gRPC's built-in interface which uses a request iterator and a 
    20    response iterator into a socket-like :func:`send` and :func:`recv`. This 
    21    is a more useful pattern for long-running or asymmetric streams (streams 
    22    where there is not a direct correlation between the requests and 
    23    responses). 
    24 
    25    This does *not* retry the stream on errors. 
    26 
    27    Args: 
    28        start_rpc (Union[grpc.StreamStreamMultiCallable, 
    29                    grpc.aio.StreamStreamMultiCallable]): The gRPC method used 
    30                    to start the RPC. 
    31        initial_request (Union[protobuf.Message, 
    32                Callable[[], protobuf.Message]]): The initial request to 
    33            yield. This is useful if an initial request is needed to start the 
    34            stream. 
    35        metadata (Sequence[Tuple(str, str)]): RPC metadata to include in 
    36            the request. 
    37    """ 
    38 
    39    def __init__(self, start_rpc, initial_request=None, metadata=None): 
    40        self._start_rpc = start_rpc 
    41        self._initial_request = initial_request 
    42        self._rpc_metadata = metadata 
    43        self._request_queue = self._create_queue() 
    44        self._request_generator = None 
    45        self._callbacks = [] 
    46        self.call = None 
    47 
    48    def _create_queue(self): 
    49        """Create a queue for requests.""" 
    50        raise NotImplementedError("`_create_queue` is not implemented.") 
    51 
    52    def add_done_callback(self, callback): 
    53        """Adds a callback that will be called when the RPC terminates. 
    54 
    55        This occurs when the RPC errors or is successfully terminated. 
    56 
    57        Args: 
    58            callback (Union[Callable[[grpc.Future], None], Callable[[Any], None]]): 
    59                The callback to execute after gRPC call completed (success or 
    60                failure). 
    61 
    62                For sync streaming gRPC: Callable[[grpc.Future], None] 
    63 
    64                For async streaming gRPC: Callable[[Any], None] 
    65        """ 
    66        self._callbacks.append(callback) 
    67 
    68    def _on_call_done(self, future): 
    69        # This occurs when the RPC errors or is successfully terminated. 
    70        # Note that grpc's "future" here can also be a grpc.RpcError. 
    71        # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 
    72        # that `grpc.RpcError` is also `grpc.Call`. 
    73        # for asynchronous gRPC call it would be `grpc.aio.AioRpcError` 
    74 
    75        # Note: sync callbacks can be limiting for async code, because you can't 
    76        # await anything in a sync callback. 
    77        for callback in self._callbacks: 
    78            callback(future) 
    79 
    80    @property 
    81    def is_active(self): 
    82        """True if the gRPC call is not done yet.""" 
    83        raise NotImplementedError("`is_active` is not implemented.") 
    84 
    85    @property 
    86    def pending_requests(self): 
    87        """Estimate of the number of queued requests.""" 
    88        return self._request_queue.qsize()