Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/bidi_base.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

24 statements  

1# Copyright 2025, Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# You may obtain a copy of the License at 

5# https://www.apache.org/licenses/LICENSE-2.0 

6# 

7# Unless required by applicable law or agreed to in writing, software 

8# distributed under the License is distributed on an "AS IS" BASIS, 

9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

10# See the License for the specific language governing permissions and 

11# limitations under the License. 

12 

13"""Base class for bi-directional streaming RPC helpers.""" 

14 

15 

16class BidiRpcBase: 

17 """A base class for consuming a bi-directional streaming RPC. 

18 

19 This maps gRPC's built-in interface which uses a request iterator and a 

20 response iterator into a socket-like :func:`send` and :func:`recv`. This 

21 is a more useful pattern for long-running or asymmetric streams (streams 

22 where there is not a direct correlation between the requests and 

23 responses). 

24 

25 This does *not* retry the stream on errors. 

26 

27 Args: 

28 start_rpc (Union[grpc.StreamStreamMultiCallable, 

29 grpc.aio.StreamStreamMultiCallable]): The gRPC method used 

30 to start the RPC. 

31 initial_request (Union[protobuf.Message, 

32 Callable[[], protobuf.Message]]): The initial request to 

33 yield. This is useful if an initial request is needed to start the 

34 stream. 

35 metadata (Sequence[Tuple(str, str)]): RPC metadata to include in 

36 the request. 

37 """ 

38 

39 def __init__(self, start_rpc, initial_request=None, metadata=None): 

40 self._start_rpc = start_rpc 

41 self._initial_request = initial_request 

42 self._rpc_metadata = metadata 

43 self._request_queue = self._create_queue() 

44 self._request_generator = None 

45 self._callbacks = [] 

46 self.call = None 

47 

48 def _create_queue(self): 

49 """Create a queue for requests.""" 

50 raise NotImplementedError("`_create_queue` is not implemented.") 

51 

52 def add_done_callback(self, callback): 

53 """Adds a callback that will be called when the RPC terminates. 

54 

55 This occurs when the RPC errors or is successfully terminated. 

56 

57 Args: 

58 callback (Union[Callable[[grpc.Future], None], Callable[[Any], None]]): 

59 The callback to execute after gRPC call completed (success or 

60 failure). 

61 

62 For sync streaming gRPC: Callable[[grpc.Future], None] 

63 

64 For async streaming gRPC: Callable[[Any], None] 

65 """ 

66 self._callbacks.append(callback) 

67 

68 def _on_call_done(self, future): 

69 # This occurs when the RPC errors or is successfully terminated. 

70 # Note that grpc's "future" here can also be a grpc.RpcError. 

71 # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 

72 # that `grpc.RpcError` is also `grpc.Call`. 

73 # for asynchronous gRPC call it would be `grpc.aio.AioRpcError` 

74 

75 # Note: sync callbacks can be limiting for async code, because you can't 

76 # await anything in a sync callback. 

77 for callback in self._callbacks: 

78 callback(future) 

79 

80 @property 

81 def is_active(self): 

82 """True if the gRPC call is not done yet.""" 

83 raise NotImplementedError("`is_active` is not implemented.") 

84 

85 @property 

86 def pending_requests(self): 

87 """Estimate of the number of queued requests.""" 

88 return self._request_queue.qsize()