1# Copyright 2025, Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# You may obtain a copy of the License at
5# https://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13"""Base class for bi-directional streaming RPC helpers."""
14
15
16class BidiRpcBase:
17 """A base class for consuming a bi-directional streaming RPC.
18
19 This maps gRPC's built-in interface which uses a request iterator and a
20 response iterator into a socket-like :func:`send` and :func:`recv`. This
21 is a more useful pattern for long-running or asymmetric streams (streams
22 where there is not a direct correlation between the requests and
23 responses).
24
25 This does *not* retry the stream on errors.
26
27 Args:
28 start_rpc (Union[grpc.StreamStreamMultiCallable,
29 grpc.aio.StreamStreamMultiCallable]): The gRPC method used
30 to start the RPC.
31 initial_request (Union[protobuf.Message,
32 Callable[[], protobuf.Message]]): The initial request to
33 yield. This is useful if an initial request is needed to start the
34 stream.
35 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
36 the request.
37 """
38
39 def __init__(self, start_rpc, initial_request=None, metadata=None):
40 self._start_rpc = start_rpc
41 self._initial_request = initial_request
42 self._rpc_metadata = metadata
43 self._request_queue = self._create_queue()
44 self._request_generator = None
45 self._callbacks = []
46 self.call = None
47
48 def _create_queue(self):
49 """Create a queue for requests."""
50 raise NotImplementedError("`_create_queue` is not implemented.")
51
52 def add_done_callback(self, callback):
53 """Adds a callback that will be called when the RPC terminates.
54
55 This occurs when the RPC errors or is successfully terminated.
56
57 Args:
58 callback (Union[Callable[[grpc.Future], None], Callable[[Any], None]]):
59 The callback to execute after gRPC call completed (success or
60 failure).
61
62 For sync streaming gRPC: Callable[[grpc.Future], None]
63
64 For async streaming gRPC: Callable[[Any], None]
65 """
66 self._callbacks.append(callback)
67
68 def _on_call_done(self, future):
69 # This occurs when the RPC errors or is successfully terminated.
70 # Note that grpc's "future" here can also be a grpc.RpcError.
71 # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
72 # that `grpc.RpcError` is also `grpc.Call`.
73 # for asynchronous gRPC call it would be `grpc.aio.AioRpcError`
74
75 # Note: sync callbacks can be limiting for async code, because you can't
76 # await anything in a sync callback.
77 for callback in self._callbacks:
78 callback(future)
79
80 @property
81 def is_active(self):
82 """True if the gRPC call is not done yet."""
83 raise NotImplementedError("`is_active` is not implemented.")
84
85 @property
86 def pending_requests(self):
87 """Estimate of the number of queued requests."""
88 return self._request_queue.qsize()