Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_simple_stubs.py: 39%

119 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:30 +0000

1# Copyright 2020 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Functions that obviate explicit stubs and explicit channels.""" 

15 

16import collections 

17import datetime 

18import logging 

19import os 

20import threading 

21from typing import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence, 

22 Tuple, TypeVar, Union) 

23 

24import grpc 

25from grpc.experimental import experimental_api 

26 

27RequestType = TypeVar('RequestType') 

28ResponseType = TypeVar('ResponseType') 

29 

30OptionsType = Sequence[Tuple[str, str]] 

31CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials], 

32 Optional[grpc.Compression]] 

33 

34_LOGGER = logging.getLogger(__name__) 

35 

36_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" 

37if _EVICTION_PERIOD_KEY in os.environ: 

38 _EVICTION_PERIOD = datetime.timedelta( 

39 seconds=float(os.environ[_EVICTION_PERIOD_KEY])) 

40 _LOGGER.debug("Setting managed channel eviction period to %s", 

41 _EVICTION_PERIOD) 

42else: 

43 _EVICTION_PERIOD = datetime.timedelta(minutes=10) 

44 

45_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" 

46if _MAXIMUM_CHANNELS_KEY in os.environ: 

47 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) 

48 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) 

49else: 

50 _MAXIMUM_CHANNELS = 2**8 

51 

52_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" 

53if _DEFAULT_TIMEOUT_KEY in os.environ: 

54 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) 

55 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) 

56else: 

57 _DEFAULT_TIMEOUT = 60.0 

58 

59 

60def _create_channel(target: str, options: Sequence[Tuple[str, str]], 

61 channel_credentials: Optional[grpc.ChannelCredentials], 

62 compression: Optional[grpc.Compression]) -> grpc.Channel: 

63 _LOGGER.debug( 

64 f"Creating secure channel with credentials '{channel_credentials}', " + 

65 f"options '{options}' and compression '{compression}'") 

66 return grpc.secure_channel(target, 

67 credentials=channel_credentials, 

68 options=options, 

69 compression=compression) 

70 

71 

72class ChannelCache: 

73 # NOTE(rbellevi): Untyped due to reference cycle. 

74 _singleton = None 

75 _lock: threading.RLock = threading.RLock() 

76 _condition: threading.Condition = threading.Condition(lock=_lock) 

77 _eviction_ready: threading.Event = threading.Event() 

78 

79 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] 

80 _eviction_thread: threading.Thread 

81 

82 def __init__(self): 

83 self._mapping = collections.OrderedDict() 

84 self._eviction_thread = threading.Thread( 

85 target=ChannelCache._perform_evictions, daemon=True) 

86 self._eviction_thread.start() 

87 

88 @staticmethod 

89 def get(): 

90 with ChannelCache._lock: 

91 if ChannelCache._singleton is None: 

92 ChannelCache._singleton = ChannelCache() 

93 ChannelCache._eviction_ready.wait() 

94 return ChannelCache._singleton 

95 

96 def _evict_locked(self, key: CacheKey): 

97 channel, _ = self._mapping.pop(key) 

98 _LOGGER.debug("Evicting channel %s with configuration %s.", channel, 

99 key) 

100 channel.close() 

101 del channel 

102 

103 @staticmethod 

104 def _perform_evictions(): 

105 while True: 

106 with ChannelCache._lock: 

107 ChannelCache._eviction_ready.set() 

108 if not ChannelCache._singleton._mapping: 

109 ChannelCache._condition.wait() 

110 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: 

111 key = next(iter(ChannelCache._singleton._mapping.keys())) 

112 ChannelCache._singleton._evict_locked(key) 

113 # And immediately reevaluate. 

114 else: 

115 key, (_, eviction_time) = next( 

116 iter(ChannelCache._singleton._mapping.items())) 

117 now = datetime.datetime.now() 

118 if eviction_time <= now: 

119 ChannelCache._singleton._evict_locked(key) 

120 continue 

121 else: 

122 time_to_eviction = (eviction_time - now).total_seconds() 

123 # NOTE: We aim to *eventually* coalesce to a state in 

124 # which no overdue channels are in the cache and the 

125 # length of the cache is longer than _MAXIMUM_CHANNELS. 

126 # We tolerate momentary states in which these two 

127 # criteria are not met. 

128 ChannelCache._condition.wait(timeout=time_to_eviction) 

129 

130 def get_channel(self, target: str, options: Sequence[Tuple[str, str]], 

131 channel_credentials: Optional[grpc.ChannelCredentials], 

132 insecure: bool, 

133 compression: Optional[grpc.Compression]) -> grpc.Channel: 

134 if insecure and channel_credentials: 

135 raise ValueError("The insecure option is mutually exclusive with " + 

136 "the channel_credentials option. Please use one " + 

137 "or the other.") 

138 if insecure: 

139 channel_credentials = grpc.experimental.insecure_channel_credentials( 

140 ) 

141 elif channel_credentials is None: 

142 _LOGGER.debug("Defaulting to SSL channel credentials.") 

143 channel_credentials = grpc.ssl_channel_credentials() 

144 key = (target, options, channel_credentials, compression) 

145 with self._lock: 

146 channel_data = self._mapping.get(key, None) 

147 if channel_data is not None: 

148 channel = channel_data[0] 

149 self._mapping.pop(key) 

150 self._mapping[key] = (channel, datetime.datetime.now() + 

151 _EVICTION_PERIOD) 

152 return channel 

153 else: 

154 channel = _create_channel(target, options, channel_credentials, 

155 compression) 

156 self._mapping[key] = (channel, datetime.datetime.now() + 

157 _EVICTION_PERIOD) 

158 if len(self._mapping) == 1 or len( 

159 self._mapping) >= _MAXIMUM_CHANNELS: 

160 self._condition.notify() 

161 return channel 

162 

163 def _test_only_channel_count(self) -> int: 

164 with self._lock: 

165 return len(self._mapping) 

166 

167 

168@experimental_api 

169def unary_unary( 

170 request: RequestType, 

171 target: str, 

172 method: str, 

173 request_serializer: Optional[Callable[[Any], bytes]] = None, 

174 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

175 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

176 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

177 insecure: bool = False, 

178 call_credentials: Optional[grpc.CallCredentials] = None, 

179 compression: Optional[grpc.Compression] = None, 

180 wait_for_ready: Optional[bool] = None, 

181 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

182 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None 

183) -> ResponseType: 

184 """Invokes a unary-unary RPC without an explicitly specified channel. 

185 

186 THIS IS AN EXPERIMENTAL API. 

187 

188 This is backed by a per-process cache of channels. Channels are evicted 

189 from the cache after a fixed period by a background. Channels will also be 

190 evicted if more than a configured maximum accumulate. 

191 

192 The default eviction period is 10 minutes. One may set the environment 

193 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

194 

195 The default maximum number of channels is 256. One may set the 

196 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

197 this. 

198 

199 Args: 

200 request: An iterator that yields request values for the RPC. 

201 target: The server address. 

202 method: The name of the RPC method. 

203 request_serializer: Optional :term:`serializer` for serializing the request 

204 message. Request goes unserialized in case None is passed. 

205 response_deserializer: Optional :term:`deserializer` for deserializing the response 

206 message. Response goes undeserialized in case None is passed. 

207 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

208 runtime) to configure the channel. 

209 channel_credentials: A credential applied to the whole channel, e.g. the 

210 return value of grpc.ssl_channel_credentials() or 

211 grpc.insecure_channel_credentials(). 

212 insecure: If True, specifies channel_credentials as 

213 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

214 exclusive with the `channel_credentials` option. 

215 call_credentials: A call credential applied to each call individually, 

216 e.g. the output of grpc.metadata_call_credentials() or 

217 grpc.access_token_call_credentials(). 

218 compression: An optional value indicating the compression method to be 

219 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

220 wait_for_ready: An optional flag indicating whether the RPC should fail 

221 immediately if the connection is not ready at the time the RPC is 

222 invoked, or if it should wait until the connection to the server 

223 becomes ready. When using this option, the user will likely also want 

224 to set a timeout. Defaults to True. 

225 timeout: An optional duration of time in seconds to allow for the RPC, 

226 after which an exception will be raised. If timeout is unspecified, 

227 defaults to a timeout controlled by the 

228 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

229 unset, defaults to 60 seconds. Supply a value of None to indicate that 

230 no timeout should be enforced. 

231 metadata: Optional metadata to send to the server. 

232 

233 Returns: 

234 The response to the RPC. 

235 """ 

236 channel = ChannelCache.get().get_channel(target, options, 

237 channel_credentials, insecure, 

238 compression) 

239 multicallable = channel.unary_unary(method, request_serializer, 

240 response_deserializer) 

241 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

242 return multicallable(request, 

243 metadata=metadata, 

244 wait_for_ready=wait_for_ready, 

245 credentials=call_credentials, 

246 timeout=timeout) 

247 

248 

249@experimental_api 

250def unary_stream( 

251 request: RequestType, 

252 target: str, 

253 method: str, 

254 request_serializer: Optional[Callable[[Any], bytes]] = None, 

255 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

256 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

257 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

258 insecure: bool = False, 

259 call_credentials: Optional[grpc.CallCredentials] = None, 

260 compression: Optional[grpc.Compression] = None, 

261 wait_for_ready: Optional[bool] = None, 

262 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

263 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None 

264) -> Iterator[ResponseType]: 

265 """Invokes a unary-stream RPC without an explicitly specified channel. 

266 

267 THIS IS AN EXPERIMENTAL API. 

268 

269 This is backed by a per-process cache of channels. Channels are evicted 

270 from the cache after a fixed period by a background. Channels will also be 

271 evicted if more than a configured maximum accumulate. 

272 

273 The default eviction period is 10 minutes. One may set the environment 

274 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

275 

276 The default maximum number of channels is 256. One may set the 

277 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

278 this. 

279 

280 Args: 

281 request: An iterator that yields request values for the RPC. 

282 target: The server address. 

283 method: The name of the RPC method. 

284 request_serializer: Optional :term:`serializer` for serializing the request 

285 message. Request goes unserialized in case None is passed. 

286 response_deserializer: Optional :term:`deserializer` for deserializing the response 

287 message. Response goes undeserialized in case None is passed. 

288 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

289 runtime) to configure the channel. 

290 channel_credentials: A credential applied to the whole channel, e.g. the 

291 return value of grpc.ssl_channel_credentials(). 

292 insecure: If True, specifies channel_credentials as 

293 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

294 exclusive with the `channel_credentials` option. 

295 call_credentials: A call credential applied to each call individually, 

296 e.g. the output of grpc.metadata_call_credentials() or 

297 grpc.access_token_call_credentials(). 

298 compression: An optional value indicating the compression method to be 

299 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

300 wait_for_ready: An optional flag indicating whether the RPC should fail 

301 immediately if the connection is not ready at the time the RPC is 

302 invoked, or if it should wait until the connection to the server 

303 becomes ready. When using this option, the user will likely also want 

304 to set a timeout. Defaults to True. 

305 timeout: An optional duration of time in seconds to allow for the RPC, 

306 after which an exception will be raised. If timeout is unspecified, 

307 defaults to a timeout controlled by the 

308 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

309 unset, defaults to 60 seconds. Supply a value of None to indicate that 

310 no timeout should be enforced. 

311 metadata: Optional metadata to send to the server. 

312 

313 Returns: 

314 An iterator of responses. 

315 """ 

316 channel = ChannelCache.get().get_channel(target, options, 

317 channel_credentials, insecure, 

318 compression) 

319 multicallable = channel.unary_stream(method, request_serializer, 

320 response_deserializer) 

321 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

322 return multicallable(request, 

323 metadata=metadata, 

324 wait_for_ready=wait_for_ready, 

325 credentials=call_credentials, 

326 timeout=timeout) 

327 

328 

329@experimental_api 

330def stream_unary( 

331 request_iterator: Iterator[RequestType], 

332 target: str, 

333 method: str, 

334 request_serializer: Optional[Callable[[Any], bytes]] = None, 

335 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

336 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

337 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

338 insecure: bool = False, 

339 call_credentials: Optional[grpc.CallCredentials] = None, 

340 compression: Optional[grpc.Compression] = None, 

341 wait_for_ready: Optional[bool] = None, 

342 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

343 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None 

344) -> ResponseType: 

345 """Invokes a stream-unary RPC without an explicitly specified channel. 

346 

347 THIS IS AN EXPERIMENTAL API. 

348 

349 This is backed by a per-process cache of channels. Channels are evicted 

350 from the cache after a fixed period by a background. Channels will also be 

351 evicted if more than a configured maximum accumulate. 

352 

353 The default eviction period is 10 minutes. One may set the environment 

354 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

355 

356 The default maximum number of channels is 256. One may set the 

357 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

358 this. 

359 

360 Args: 

361 request_iterator: An iterator that yields request values for the RPC. 

362 target: The server address. 

363 method: The name of the RPC method. 

364 request_serializer: Optional :term:`serializer` for serializing the request 

365 message. Request goes unserialized in case None is passed. 

366 response_deserializer: Optional :term:`deserializer` for deserializing the response 

367 message. Response goes undeserialized in case None is passed. 

368 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

369 runtime) to configure the channel. 

370 channel_credentials: A credential applied to the whole channel, e.g. the 

371 return value of grpc.ssl_channel_credentials(). 

372 call_credentials: A call credential applied to each call individually, 

373 e.g. the output of grpc.metadata_call_credentials() or 

374 grpc.access_token_call_credentials(). 

375 insecure: If True, specifies channel_credentials as 

376 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

377 exclusive with the `channel_credentials` option. 

378 compression: An optional value indicating the compression method to be 

379 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

380 wait_for_ready: An optional flag indicating whether the RPC should fail 

381 immediately if the connection is not ready at the time the RPC is 

382 invoked, or if it should wait until the connection to the server 

383 becomes ready. When using this option, the user will likely also want 

384 to set a timeout. Defaults to True. 

385 timeout: An optional duration of time in seconds to allow for the RPC, 

386 after which an exception will be raised. If timeout is unspecified, 

387 defaults to a timeout controlled by the 

388 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

389 unset, defaults to 60 seconds. Supply a value of None to indicate that 

390 no timeout should be enforced. 

391 metadata: Optional metadata to send to the server. 

392 

393 Returns: 

394 The response to the RPC. 

395 """ 

396 channel = ChannelCache.get().get_channel(target, options, 

397 channel_credentials, insecure, 

398 compression) 

399 multicallable = channel.stream_unary(method, request_serializer, 

400 response_deserializer) 

401 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

402 return multicallable(request_iterator, 

403 metadata=metadata, 

404 wait_for_ready=wait_for_ready, 

405 credentials=call_credentials, 

406 timeout=timeout) 

407 

408 

409@experimental_api 

410def stream_stream( 

411 request_iterator: Iterator[RequestType], 

412 target: str, 

413 method: str, 

414 request_serializer: Optional[Callable[[Any], bytes]] = None, 

415 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

416 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

417 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

418 insecure: bool = False, 

419 call_credentials: Optional[grpc.CallCredentials] = None, 

420 compression: Optional[grpc.Compression] = None, 

421 wait_for_ready: Optional[bool] = None, 

422 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

423 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None 

424) -> Iterator[ResponseType]: 

425 """Invokes a stream-stream RPC without an explicitly specified channel. 

426 

427 THIS IS AN EXPERIMENTAL API. 

428 

429 This is backed by a per-process cache of channels. Channels are evicted 

430 from the cache after a fixed period by a background. Channels will also be 

431 evicted if more than a configured maximum accumulate. 

432 

433 The default eviction period is 10 minutes. One may set the environment 

434 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

435 

436 The default maximum number of channels is 256. One may set the 

437 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

438 this. 

439 

440 Args: 

441 request_iterator: An iterator that yields request values for the RPC. 

442 target: The server address. 

443 method: The name of the RPC method. 

444 request_serializer: Optional :term:`serializer` for serializing the request 

445 message. Request goes unserialized in case None is passed. 

446 response_deserializer: Optional :term:`deserializer` for deserializing the response 

447 message. Response goes undeserialized in case None is passed. 

448 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

449 runtime) to configure the channel. 

450 channel_credentials: A credential applied to the whole channel, e.g. the 

451 return value of grpc.ssl_channel_credentials(). 

452 call_credentials: A call credential applied to each call individually, 

453 e.g. the output of grpc.metadata_call_credentials() or 

454 grpc.access_token_call_credentials(). 

455 insecure: If True, specifies channel_credentials as 

456 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

457 exclusive with the `channel_credentials` option. 

458 compression: An optional value indicating the compression method to be 

459 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

460 wait_for_ready: An optional flag indicating whether the RPC should fail 

461 immediately if the connection is not ready at the time the RPC is 

462 invoked, or if it should wait until the connection to the server 

463 becomes ready. When using this option, the user will likely also want 

464 to set a timeout. Defaults to True. 

465 timeout: An optional duration of time in seconds to allow for the RPC, 

466 after which an exception will be raised. If timeout is unspecified, 

467 defaults to a timeout controlled by the 

468 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

469 unset, defaults to 60 seconds. Supply a value of None to indicate that 

470 no timeout should be enforced. 

471 metadata: Optional metadata to send to the server. 

472 

473 Returns: 

474 An iterator of responses. 

475 """ 

476 channel = ChannelCache.get().get_channel(target, options, 

477 channel_credentials, insecure, 

478 compression) 

479 multicallable = channel.stream_stream(method, request_serializer, 

480 response_deserializer) 

481 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

482 return multicallable(request_iterator, 

483 metadata=metadata, 

484 wait_for_ready=wait_for_ready, 

485 credentials=call_credentials, 

486 timeout=timeout)