Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/aio/_base_channel.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# Copyright 2020 The gRPC Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Abstract base classes for Channel objects and Multicallable objects.""" 

15 

16import abc 

17from typing import Generic, Optional 

18 

19import grpc 

20 

21from . import _base_call 

22from ._typing import DeserializingFunction 

23from ._typing import MetadataType 

24from ._typing import RequestIterableType 

25from ._typing import RequestType 

26from ._typing import ResponseType 

27from ._typing import SerializingFunction 

28 

29 

30class UnaryUnaryMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 

31 """Enables asynchronous invocation of a unary-call RPC.""" 

32 

33 @abc.abstractmethod 

34 def __call__( 

35 self, 

36 request: RequestType, 

37 *, 

38 timeout: Optional[float] = None, 

39 metadata: Optional[MetadataType] = None, 

40 credentials: Optional[grpc.CallCredentials] = None, 

41 wait_for_ready: Optional[bool] = None, 

42 compression: Optional[grpc.Compression] = None, 

43 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]: 

44 """Asynchronously invokes the underlying RPC. 

45 

46 Args: 

47 request: The request value for the RPC. 

48 timeout: An optional duration of time in seconds to allow 

49 for the RPC. 

50 metadata: Optional :term:`metadata` to be transmitted to the 

51 service-side of the RPC. 

52 credentials: An optional CallCredentials for the RPC. Only valid for 

53 secure Channel. 

54 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

55 compression: An element of grpc.compression, e.g. 

56 grpc.compression.Gzip. 

57 

58 Returns: 

59 A UnaryUnaryCall object. 

60 

61 Raises: 

62 RpcError: Indicates that the RPC terminated with non-OK status. The 

63 raised RpcError will also be a Call for the RPC affording the RPC's 

64 metadata, status code, and details. 

65 """ 

66 

67 

68class UnaryStreamMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 

69 """Enables asynchronous invocation of a server-streaming RPC.""" 

70 

71 @abc.abstractmethod 

72 def __call__( 

73 self, 

74 request: RequestType, 

75 *, 

76 timeout: Optional[float] = None, 

77 metadata: Optional[MetadataType] = None, 

78 credentials: Optional[grpc.CallCredentials] = None, 

79 wait_for_ready: Optional[bool] = None, 

80 compression: Optional[grpc.Compression] = None, 

81 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]: 

82 """Asynchronously invokes the underlying RPC. 

83 

84 Args: 

85 request: The request value for the RPC. 

86 timeout: An optional duration of time in seconds to allow 

87 for the RPC. 

88 metadata: Optional :term:`metadata` to be transmitted to the 

89 service-side of the RPC. 

90 credentials: An optional CallCredentials for the RPC. Only valid for 

91 secure Channel. 

92 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

93 compression: An element of grpc.compression, e.g. 

94 grpc.compression.Gzip. 

95 

96 Returns: 

97 A UnaryStreamCall object. 

98 

99 Raises: 

100 RpcError: Indicates that the RPC terminated with non-OK status. The 

101 raised RpcError will also be a Call for the RPC affording the RPC's 

102 metadata, status code, and details. 

103 """ 

104 

105 

106class StreamUnaryMultiCallable(abc.ABC): 

107 """Enables asynchronous invocation of a client-streaming RPC.""" 

108 

109 @abc.abstractmethod 

110 def __call__( 

111 self, 

112 request_iterator: Optional[RequestIterableType] = None, 

113 timeout: Optional[float] = None, 

114 metadata: Optional[MetadataType] = None, 

115 credentials: Optional[grpc.CallCredentials] = None, 

116 wait_for_ready: Optional[bool] = None, 

117 compression: Optional[grpc.Compression] = None, 

118 ) -> _base_call.StreamUnaryCall: 

119 """Asynchronously invokes the underlying RPC. 

120 

121 Args: 

122 request_iterator: An optional async iterable or iterable of request 

123 messages for the RPC. 

124 timeout: An optional duration of time in seconds to allow 

125 for the RPC. 

126 metadata: Optional :term:`metadata` to be transmitted to the 

127 service-side of the RPC. 

128 credentials: An optional CallCredentials for the RPC. Only valid for 

129 secure Channel. 

130 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

131 compression: An element of grpc.compression, e.g. 

132 grpc.compression.Gzip. 

133 

134 Returns: 

135 A StreamUnaryCall object. 

136 

137 Raises: 

138 RpcError: Indicates that the RPC terminated with non-OK status. The 

139 raised RpcError will also be a Call for the RPC affording the RPC's 

140 metadata, status code, and details. 

141 """ 

142 

143 

144class StreamStreamMultiCallable(abc.ABC): 

145 """Enables asynchronous invocation of a bidirectional-streaming RPC.""" 

146 

147 @abc.abstractmethod 

148 def __call__( 

149 self, 

150 request_iterator: Optional[RequestIterableType] = None, 

151 timeout: Optional[float] = None, 

152 metadata: Optional[MetadataType] = None, 

153 credentials: Optional[grpc.CallCredentials] = None, 

154 wait_for_ready: Optional[bool] = None, 

155 compression: Optional[grpc.Compression] = None, 

156 ) -> _base_call.StreamStreamCall: 

157 """Asynchronously invokes the underlying RPC. 

158 

159 Args: 

160 request_iterator: An optional async iterable or iterable of request 

161 messages for the RPC. 

162 timeout: An optional duration of time in seconds to allow 

163 for the RPC. 

164 metadata: Optional :term:`metadata` to be transmitted to the 

165 service-side of the RPC. 

166 credentials: An optional CallCredentials for the RPC. Only valid for 

167 secure Channel. 

168 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 

169 compression: An element of grpc.compression, e.g. 

170 grpc.compression.Gzip. 

171 

172 Returns: 

173 A StreamStreamCall object. 

174 

175 Raises: 

176 RpcError: Indicates that the RPC terminated with non-OK status. The 

177 raised RpcError will also be a Call for the RPC affording the RPC's 

178 metadata, status code, and details. 

179 """ 

180 

181 

182class Channel(abc.ABC): 

183 """Enables asynchronous RPC invocation as a client. 

184 

185 Channel objects implement the Asynchronous Context Manager (aka. async 

186 with) type, although they are not supportted to be entered and exited 

187 multiple times. 

188 """ 

189 

190 @abc.abstractmethod 

191 async def __aenter__(self): 

192 """Starts an asynchronous context manager. 

193 

194 Returns: 

195 Channel the channel that was instantiated. 

196 """ 

197 

198 @abc.abstractmethod 

199 async def __aexit__(self, exc_type, exc_val, exc_tb): 

200 """Finishes the asynchronous context manager by closing the channel. 

201 

202 Still active RPCs will be cancelled. 

203 """ 

204 

205 @abc.abstractmethod 

206 async def close(self, grace: Optional[float] = None): 

207 """Closes this Channel and releases all resources held by it. 

208 

209 This method immediately stops the channel from executing new RPCs in 

210 all cases. 

211 

212 If a grace period is specified, this method wait until all active 

213 RPCs are finshed, once the grace period is reached the ones that haven't 

214 been terminated are cancelled. If a grace period is not specified 

215 (by passing None for grace), all existing RPCs are cancelled immediately. 

216 

217 This method is idempotent. 

218 """ 

219 

220 @abc.abstractmethod 

221 def get_state( 

222 self, try_to_connect: bool = False 

223 ) -> grpc.ChannelConnectivity: 

224 """Checks the connectivity state of a channel. 

225 

226 This is an EXPERIMENTAL API. 

227 

228 If the channel reaches a stable connectivity state, it is guaranteed 

229 that the return value of this function will eventually converge to that 

230 state. 

231 

232 Args: 

233 try_to_connect: a bool indicate whether the Channel should try to 

234 connect to peer or not. 

235 

236 Returns: A ChannelConnectivity object. 

237 """ 

238 

239 @abc.abstractmethod 

240 async def wait_for_state_change( 

241 self, 

242 last_observed_state: grpc.ChannelConnectivity, 

243 ) -> None: 

244 """Waits for a change in connectivity state. 

245 

246 This is an EXPERIMENTAL API. 

247 

248 The function blocks until there is a change in the channel connectivity 

249 state from the "last_observed_state". If the state is already 

250 different, this function will return immediately. 

251 

252 There is an inherent race between the invocation of 

253 "Channel.wait_for_state_change" and "Channel.get_state". The state can 

254 change arbitrary many times during the race, so there is no way to 

255 observe every state transition. 

256 

257 If there is a need to put a timeout for this function, please refer to 

258 "asyncio.wait_for". 

259 

260 Args: 

261 last_observed_state: A grpc.ChannelConnectivity object representing 

262 the last known state. 

263 """ 

264 

265 @abc.abstractmethod 

266 async def channel_ready(self) -> None: 

267 """Creates a coroutine that blocks until the Channel is READY.""" 

268 

269 @abc.abstractmethod 

270 def unary_unary( 

271 self, 

272 method: str, 

273 request_serializer: Optional[SerializingFunction] = None, 

274 response_deserializer: Optional[DeserializingFunction] = None, 

275 ) -> UnaryUnaryMultiCallable: 

276 """Creates a UnaryUnaryMultiCallable for a unary-unary method. 

277 

278 Args: 

279 method: The name of the RPC method. 

280 request_serializer: Optional :term:`serializer` for serializing the request 

281 message. Request goes unserialized in case None is passed. 

282 response_deserializer: Optional :term:`deserializer` for deserializing the 

283 response message. Response goes undeserialized in case None 

284 is passed. 

285 

286 Returns: 

287 A UnaryUnaryMultiCallable value for the named unary-unary method. 

288 """ 

289 

290 @abc.abstractmethod 

291 def unary_stream( 

292 self, 

293 method: str, 

294 request_serializer: Optional[SerializingFunction] = None, 

295 response_deserializer: Optional[DeserializingFunction] = None, 

296 ) -> UnaryStreamMultiCallable: 

297 """Creates a UnaryStreamMultiCallable for a unary-stream method. 

298 

299 Args: 

300 method: The name of the RPC method. 

301 request_serializer: Optional :term:`serializer` for serializing the request 

302 message. Request goes unserialized in case None is passed. 

303 response_deserializer: Optional :term:`deserializer` for deserializing the 

304 response message. Response goes undeserialized in case None 

305 is passed. 

306 

307 Returns: 

308 A UnarySteramMultiCallable value for the named unary-stream method. 

309 """ 

310 

311 @abc.abstractmethod 

312 def stream_unary( 

313 self, 

314 method: str, 

315 request_serializer: Optional[SerializingFunction] = None, 

316 response_deserializer: Optional[DeserializingFunction] = None, 

317 ) -> StreamUnaryMultiCallable: 

318 """Creates a StreamUnaryMultiCallable for a stream-unary method. 

319 

320 Args: 

321 method: The name of the RPC method. 

322 request_serializer: Optional :term:`serializer` for serializing the request 

323 message. Request goes unserialized in case None is passed. 

324 response_deserializer: Optional :term:`deserializer` for deserializing the 

325 response message. Response goes undeserialized in case None 

326 is passed. 

327 

328 Returns: 

329 A StreamUnaryMultiCallable value for the named stream-unary method. 

330 """ 

331 

332 @abc.abstractmethod 

333 def stream_stream( 

334 self, 

335 method: str, 

336 request_serializer: Optional[SerializingFunction] = None, 

337 response_deserializer: Optional[DeserializingFunction] = None, 

338 ) -> StreamStreamMultiCallable: 

339 """Creates a StreamStreamMultiCallable for a stream-stream method. 

340 

341 Args: 

342 method: The name of the RPC method. 

343 request_serializer: Optional :term:`serializer` for serializing the request 

344 message. Request goes unserialized in case None is passed. 

345 response_deserializer: Optional :term:`deserializer` for deserializing the 

346 response message. Response goes undeserialized in case None 

347 is passed. 

348 

349 Returns: 

350 A StreamStreamMultiCallable value for the named stream-stream method. 

351 """