Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/aio/_base_channel.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

45 statements  

1# Copyright 2020 The gRPC Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Abstract base classes for Channel objects and Multicallable objects.""" 

15 

16import abc 

17from typing import Generic, Optional 

18 

19import grpc 

20 

21from . import _base_call 

22from ._typing import DeserializingFunction 

23from ._typing import MetadataType 

24from ._typing import RequestIterableType 

25from ._typing import RequestType 

26from ._typing import ResponseType 

27from ._typing import SerializingFunction 

28 

29 

30class UnaryUnaryMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 

31 """Enables asynchronous invocation of a unary-call RPC.""" 

32 

33 @abc.abstractmethod 

34 def __call__( 

35 self, 

36 request: RequestType, 

37 *, 

38 timeout: Optional[float] = None, 

39 metadata: Optional[MetadataType] = None, 

40 credentials: Optional[grpc.CallCredentials] = None, 

41 wait_for_ready: Optional[bool] = None, 

42 compression: Optional[grpc.Compression] = None, 

43 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]: 

44 """Asynchronously invokes the underlying RPC. 

45 

46 Args: 

47 request: The request value for the RPC. 

48 timeout: An optional duration of time in seconds to allow 

49 for the RPC. 

50 metadata: Optional :term:`metadata` to be transmitted to the 

51 service-side of the RPC. 

52 credentials: An optional CallCredentials for the RPC. Only valid for 

53 secure Channel. 

54 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

55 compression: An element of grpc.compression, e.g. 

56 grpc.compression.Gzip. 

57 

58 Returns: 

59 A UnaryUnaryCall object. 

60 

61 Raises: 

62 RpcError: Indicates that the RPC terminated with non-OK status. The 

63 raised RpcError will also be a Call for the RPC affording the RPC's 

64 metadata, status code, and details. 

65 """ 

66 

67 

68class UnaryStreamMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 

69 """Enables asynchronous invocation of a server-streaming RPC.""" 

70 

71 @abc.abstractmethod 

72 def __call__( 

73 self, 

74 request: RequestType, 

75 *, 

76 timeout: Optional[float] = None, 

77 metadata: Optional[MetadataType] = None, 

78 credentials: Optional[grpc.CallCredentials] = None, 

79 wait_for_ready: Optional[bool] = None, 

80 compression: Optional[grpc.Compression] = None, 

81 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]: 

82 """Asynchronously invokes the underlying RPC. 

83 

84 Args: 

85 request: The request value for the RPC. 

86 timeout: An optional duration of time in seconds to allow 

87 for the RPC. 

88 metadata: Optional :term:`metadata` to be transmitted to the 

89 service-side of the RPC. 

90 credentials: An optional CallCredentials for the RPC. Only valid for 

91 secure Channel. 

92 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

93 compression: An element of grpc.compression, e.g. 

94 grpc.compression.Gzip. 

95 

96 Returns: 

97 A UnaryStreamCall object. 

98 

99 Raises: 

100 RpcError: Indicates that the RPC terminated with non-OK status. The 

101 raised RpcError will also be a Call for the RPC affording the RPC's 

102 metadata, status code, and details. 

103 """ 

104 

105 

106class StreamUnaryMultiCallable(abc.ABC): 

107 """Enables asynchronous invocation of a client-streaming RPC.""" 

108 

109 @abc.abstractmethod 

110 def __call__( 

111 self, 

112 request_iterator: Optional[RequestIterableType] = None, 

113 timeout: Optional[float] = None, 

114 metadata: Optional[MetadataType] = None, 

115 credentials: Optional[grpc.CallCredentials] = None, 

116 wait_for_ready: Optional[bool] = None, 

117 compression: Optional[grpc.Compression] = None, 

118 ) -> _base_call.StreamUnaryCall: 

119 """Asynchronously invokes the underlying RPC. 

120 

121 Args: 

122 request_iterator: An optional async iterable or iterable of request 

123 messages for the RPC. 

124 timeout: An optional duration of time in seconds to allow 

125 for the RPC. 

126 metadata: Optional :term:`metadata` to be transmitted to the 

127 service-side of the RPC. 

128 credentials: An optional CallCredentials for the RPC. Only valid for 

129 secure Channel. 

130 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

131 compression: An element of grpc.compression, e.g. 

132 grpc.compression.Gzip. 

133 

134 Returns: 

135 A StreamUnaryCall object. 

136 

137 Raises: 

138 RpcError: Indicates that the RPC terminated with non-OK status. The 

139 raised RpcError will also be a Call for the RPC affording the RPC's 

140 metadata, status code, and details. 

141 """ 

142 

143 

144class StreamStreamMultiCallable(abc.ABC): 

145 """Enables asynchronous invocation of a bidirectional-streaming RPC.""" 

146 

147 @abc.abstractmethod 

148 def __call__( 

149 self, 

150 request_iterator: Optional[RequestIterableType] = None, 

151 timeout: Optional[float] = None, 

152 metadata: Optional[MetadataType] = None, 

153 credentials: Optional[grpc.CallCredentials] = None, 

154 wait_for_ready: Optional[bool] = None, 

155 compression: Optional[grpc.Compression] = None, 

156 ) -> _base_call.StreamStreamCall: 

157 """Asynchronously invokes the underlying RPC. 

158 

159 Args: 

160 request_iterator: An optional async iterable or iterable of request 

161 messages for the RPC. 

162 timeout: An optional duration of time in seconds to allow 

163 for the RPC. 

164 metadata: Optional :term:`metadata` to be transmitted to the 

165 service-side of the RPC. 

166 credentials: An optional CallCredentials for the RPC. Only valid for 

167 secure Channel. 

168 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

169 compression: An element of grpc.compression, e.g. 

170 grpc.compression.Gzip. 

171 

172 Returns: 

173 A StreamStreamCall object. 

174 

175 Raises: 

176 RpcError: Indicates that the RPC terminated with non-OK status. The 

177 raised RpcError will also be a Call for the RPC affording the RPC's 

178 metadata, status code, and details. 

179 """ 

180 

181 

182class Channel(abc.ABC): 

183 """Enables asynchronous RPC invocation as a client. 

184 

185 Channel objects implement the Asynchronous Context Manager (aka. async 

186 with) type, although they are not supported to be entered and exited 

187 multiple times. 

188 """ 

189 

190 @abc.abstractmethod 

191 async def __aenter__(self): 

192 """Starts an asynchronous context manager. 

193 

194 Returns: 

195 Channel the channel that was instantiated. 

196 """ 

197 

198 @abc.abstractmethod 

199 async def __aexit__(self, exc_type, exc_val, exc_tb): 

200 """Finishes the asynchronous context manager by closing the channel. 

201 

202 Still active RPCs will be cancelled. 

203 """ 

204 

205 @abc.abstractmethod 

206 async def close(self, grace: Optional[float] = None): 

207 """Closes this Channel and releases all resources held by it. 

208 

209 This method immediately stops the channel from executing new RPCs in 

210 all cases. 

211 

212 If a grace period is specified, this method waits until all active 

213 RPCs are finished or until the grace period is reached. RPCs that haven't 

214 been terminated within the grace period are aborted. 

215 If a grace period is not specified (by passing None for grace), 

216 all existing RPCs are cancelled immediately. 

217 

218 This method is idempotent. 

219 """ 

220 

221 @abc.abstractmethod 

222 def get_state( 

223 self, try_to_connect: bool = False 

224 ) -> grpc.ChannelConnectivity: 

225 """Checks the connectivity state of a channel. 

226 

227 This is an EXPERIMENTAL API. 

228 

229 If the channel reaches a stable connectivity state, it is guaranteed 

230 that the return value of this function will eventually converge to that 

231 state. 

232 

233 Args: 

234 try_to_connect: a bool indicate whether the Channel should try to 

235 connect to peer or not. 

236 

237 Returns: A ChannelConnectivity object. 

238 """ 

239 

240 @abc.abstractmethod 

241 async def wait_for_state_change( 

242 self, 

243 last_observed_state: grpc.ChannelConnectivity, 

244 ) -> None: 

245 """Waits for a change in connectivity state. 

246 

247 This is an EXPERIMENTAL API. 

248 

249 The function blocks until there is a change in the channel connectivity 

250 state from the "last_observed_state". If the state is already 

251 different, this function will return immediately. 

252 

253 There is an inherent race between the invocation of 

254 "Channel.wait_for_state_change" and "Channel.get_state". The state can 

255 change arbitrary many times during the race, so there is no way to 

256 observe every state transition. 

257 

258 If there is a need to put a timeout for this function, please refer to 

259 "asyncio.wait_for". 

260 

261 Args: 

262 last_observed_state: A grpc.ChannelConnectivity object representing 

263 the last known state. 

264 """ 

265 

266 @abc.abstractmethod 

267 async def channel_ready(self) -> None: 

268 """Creates a coroutine that blocks until the Channel is READY.""" 

269 

270 @abc.abstractmethod 

271 def unary_unary( 

272 self, 

273 method: str, 

274 request_serializer: Optional[SerializingFunction] = None, 

275 response_deserializer: Optional[DeserializingFunction] = None, 

276 _registered_method: Optional[bool] = False, 

277 ) -> UnaryUnaryMultiCallable: 

278 """Creates a UnaryUnaryMultiCallable for a unary-unary method. 

279 

280 Args: 

281 method: The name of the RPC method. 

282 request_serializer: Optional :term:`serializer` for serializing the request 

283 message. Request goes unserialized in case None is passed. 

284 response_deserializer: Optional :term:`deserializer` for deserializing the 

285 response message. Response goes undeserialized in case None 

286 is passed. 

287 _registered_method: Implementation Private. Optional: A bool representing 

288 whether the method is registered. 

289 

290 Returns: 

291 A UnaryUnaryMultiCallable value for the named unary-unary method. 

292 """ 

293 

294 @abc.abstractmethod 

295 def unary_stream( 

296 self, 

297 method: str, 

298 request_serializer: Optional[SerializingFunction] = None, 

299 response_deserializer: Optional[DeserializingFunction] = None, 

300 _registered_method: Optional[bool] = False, 

301 ) -> UnaryStreamMultiCallable: 

302 """Creates a UnaryStreamMultiCallable for a unary-stream method. 

303 

304 Args: 

305 method: The name of the RPC method. 

306 request_serializer: Optional :term:`serializer` for serializing the request 

307 message. Request goes unserialized in case None is passed. 

308 response_deserializer: Optional :term:`deserializer` for deserializing the 

309 response message. Response goes undeserialized in case None 

310 is passed. 

311 _registered_method: Implementation Private. Optional: A bool representing 

312 whether the method is registered. 

313 

314 Returns: 

315 A UnaryStreamMultiCallable value for the named unary-stream method. 

316 """ 

317 

318 @abc.abstractmethod 

319 def stream_unary( 

320 self, 

321 method: str, 

322 request_serializer: Optional[SerializingFunction] = None, 

323 response_deserializer: Optional[DeserializingFunction] = None, 

324 _registered_method: Optional[bool] = False, 

325 ) -> StreamUnaryMultiCallable: 

326 """Creates a StreamUnaryMultiCallable for a stream-unary method. 

327 

328 Args: 

329 method: The name of the RPC method. 

330 request_serializer: Optional :term:`serializer` for serializing the request 

331 message. Request goes unserialized in case None is passed. 

332 response_deserializer: Optional :term:`deserializer` for deserializing the 

333 response message. Response goes undeserialized in case None 

334 is passed. 

335 _registered_method: Implementation Private. Optional: A bool representing 

336 whether the method is registered. 

337 

338 Returns: 

339 A StreamUnaryMultiCallable value for the named stream-unary method. 

340 """ 

341 

342 @abc.abstractmethod 

343 def stream_stream( 

344 self, 

345 method: str, 

346 request_serializer: Optional[SerializingFunction] = None, 

347 response_deserializer: Optional[DeserializingFunction] = None, 

348 _registered_method: Optional[bool] = False, 

349 ) -> StreamStreamMultiCallable: 

350 """Creates a StreamStreamMultiCallable for a stream-stream method. 

351 

352 Args: 

353 method: The name of the RPC method. 

354 request_serializer: Optional :term:`serializer` for serializing the request 

355 message. Request goes unserialized in case None is passed. 

356 response_deserializer: Optional :term:`deserializer` for deserializing the 

357 response message. Response goes undeserialized in case None 

358 is passed. 

359 _registered_method: Implementation Private. Optional: A bool representing 

360 whether the method is registered. 

361 

362 Returns: 

363 A StreamStreamMultiCallable value for the named stream-stream method. 

364 """