Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_simple_stubs.py: 39%

119 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2020 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Functions that obviate explicit stubs and explicit channels.""" 

15 

16import collections 

17import datetime 

18import logging 

19import os 

20import threading 

21from typing import ( 

22 Any, 

23 AnyStr, 

24 Callable, 

25 Dict, 

26 Iterator, 

27 Optional, 

28 Sequence, 

29 Tuple, 

30 TypeVar, 

31 Union, 

32) 

33 

34import grpc 

35from grpc.experimental import experimental_api 

36 

37RequestType = TypeVar("RequestType") 

38ResponseType = TypeVar("ResponseType") 

39 

40OptionsType = Sequence[Tuple[str, str]] 

41CacheKey = Tuple[ 

42 str, 

43 OptionsType, 

44 Optional[grpc.ChannelCredentials], 

45 Optional[grpc.Compression], 

46] 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" 

51if _EVICTION_PERIOD_KEY in os.environ: 

52 _EVICTION_PERIOD = datetime.timedelta( 

53 seconds=float(os.environ[_EVICTION_PERIOD_KEY]) 

54 ) 

55 _LOGGER.debug( 

56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD 

57 ) 

58else: 

59 _EVICTION_PERIOD = datetime.timedelta(minutes=10) 

60 

61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" 

62if _MAXIMUM_CHANNELS_KEY in os.environ: 

63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) 

64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) 

65else: 

66 _MAXIMUM_CHANNELS = 2**8 

67 

68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" 

69if _DEFAULT_TIMEOUT_KEY in os.environ: 

70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) 

71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) 

72else: 

73 _DEFAULT_TIMEOUT = 60.0 

74 

75 

76def _create_channel( 

77 target: str, 

78 options: Sequence[Tuple[str, str]], 

79 channel_credentials: Optional[grpc.ChannelCredentials], 

80 compression: Optional[grpc.Compression], 

81) -> grpc.Channel: 

82 _LOGGER.debug( 

83 f"Creating secure channel with credentials '{channel_credentials}', " 

84 + f"options '{options}' and compression '{compression}'" 

85 ) 

86 return grpc.secure_channel( 

87 target, 

88 credentials=channel_credentials, 

89 options=options, 

90 compression=compression, 

91 ) 

92 

93 

94class ChannelCache: 

95 # NOTE(rbellevi): Untyped due to reference cycle. 

96 _singleton = None 

97 _lock: threading.RLock = threading.RLock() 

98 _condition: threading.Condition = threading.Condition(lock=_lock) 

99 _eviction_ready: threading.Event = threading.Event() 

100 

101 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] 

102 _eviction_thread: threading.Thread 

103 

104 def __init__(self): 

105 self._mapping = collections.OrderedDict() 

106 self._eviction_thread = threading.Thread( 

107 target=ChannelCache._perform_evictions, daemon=True 

108 ) 

109 self._eviction_thread.start() 

110 

111 @staticmethod 

112 def get(): 

113 with ChannelCache._lock: 

114 if ChannelCache._singleton is None: 

115 ChannelCache._singleton = ChannelCache() 

116 ChannelCache._eviction_ready.wait() 

117 return ChannelCache._singleton 

118 

119 def _evict_locked(self, key: CacheKey): 

120 channel, _ = self._mapping.pop(key) 

121 _LOGGER.debug( 

122 "Evicting channel %s with configuration %s.", channel, key 

123 ) 

124 channel.close() 

125 del channel 

126 

127 @staticmethod 

128 def _perform_evictions(): 

129 while True: 

130 with ChannelCache._lock: 

131 ChannelCache._eviction_ready.set() 

132 if not ChannelCache._singleton._mapping: 

133 ChannelCache._condition.wait() 

134 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: 

135 key = next(iter(ChannelCache._singleton._mapping.keys())) 

136 ChannelCache._singleton._evict_locked(key) 

137 # And immediately reevaluate. 

138 else: 

139 key, (_, eviction_time) = next( 

140 iter(ChannelCache._singleton._mapping.items()) 

141 ) 

142 now = datetime.datetime.now() 

143 if eviction_time <= now: 

144 ChannelCache._singleton._evict_locked(key) 

145 continue 

146 else: 

147 time_to_eviction = (eviction_time - now).total_seconds() 

148 # NOTE: We aim to *eventually* coalesce to a state in 

149 # which no overdue channels are in the cache and the 

150 # length of the cache is longer than _MAXIMUM_CHANNELS. 

151 # We tolerate momentary states in which these two 

152 # criteria are not met. 

153 ChannelCache._condition.wait(timeout=time_to_eviction) 

154 

155 def get_channel( 

156 self, 

157 target: str, 

158 options: Sequence[Tuple[str, str]], 

159 channel_credentials: Optional[grpc.ChannelCredentials], 

160 insecure: bool, 

161 compression: Optional[grpc.Compression], 

162 ) -> grpc.Channel: 

163 if insecure and channel_credentials: 

164 raise ValueError( 

165 "The insecure option is mutually exclusive with " 

166 + "the channel_credentials option. Please use one " 

167 + "or the other." 

168 ) 

169 if insecure: 

170 channel_credentials = ( 

171 grpc.experimental.insecure_channel_credentials() 

172 ) 

173 elif channel_credentials is None: 

174 _LOGGER.debug("Defaulting to SSL channel credentials.") 

175 channel_credentials = grpc.ssl_channel_credentials() 

176 key = (target, options, channel_credentials, compression) 

177 with self._lock: 

178 channel_data = self._mapping.get(key, None) 

179 if channel_data is not None: 

180 channel = channel_data[0] 

181 self._mapping.pop(key) 

182 self._mapping[key] = ( 

183 channel, 

184 datetime.datetime.now() + _EVICTION_PERIOD, 

185 ) 

186 return channel 

187 else: 

188 channel = _create_channel( 

189 target, options, channel_credentials, compression 

190 ) 

191 self._mapping[key] = ( 

192 channel, 

193 datetime.datetime.now() + _EVICTION_PERIOD, 

194 ) 

195 if ( 

196 len(self._mapping) == 1 

197 or len(self._mapping) >= _MAXIMUM_CHANNELS 

198 ): 

199 self._condition.notify() 

200 return channel 

201 

202 def _test_only_channel_count(self) -> int: 

203 with self._lock: 

204 return len(self._mapping) 

205 

206 

207@experimental_api 

208def unary_unary( 

209 request: RequestType, 

210 target: str, 

211 method: str, 

212 request_serializer: Optional[Callable[[Any], bytes]] = None, 

213 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

214 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

215 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

216 insecure: bool = False, 

217 call_credentials: Optional[grpc.CallCredentials] = None, 

218 compression: Optional[grpc.Compression] = None, 

219 wait_for_ready: Optional[bool] = None, 

220 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

221 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

222) -> ResponseType: 

223 """Invokes a unary-unary RPC without an explicitly specified channel. 

224 

225 THIS IS AN EXPERIMENTAL API. 

226 

227 This is backed by a per-process cache of channels. Channels are evicted 

228 from the cache after a fixed period by a background. Channels will also be 

229 evicted if more than a configured maximum accumulate. 

230 

231 The default eviction period is 10 minutes. One may set the environment 

232 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

233 

234 The default maximum number of channels is 256. One may set the 

235 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

236 this. 

237 

238 Args: 

239 request: An iterator that yields request values for the RPC. 

240 target: The server address. 

241 method: The name of the RPC method. 

242 request_serializer: Optional :term:`serializer` for serializing the request 

243 message. Request goes unserialized in case None is passed. 

244 response_deserializer: Optional :term:`deserializer` for deserializing the response 

245 message. Response goes undeserialized in case None is passed. 

246 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

247 runtime) to configure the channel. 

248 channel_credentials: A credential applied to the whole channel, e.g. the 

249 return value of grpc.ssl_channel_credentials() or 

250 grpc.insecure_channel_credentials(). 

251 insecure: If True, specifies channel_credentials as 

252 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

253 exclusive with the `channel_credentials` option. 

254 call_credentials: A call credential applied to each call individually, 

255 e.g. the output of grpc.metadata_call_credentials() or 

256 grpc.access_token_call_credentials(). 

257 compression: An optional value indicating the compression method to be 

258 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

259 wait_for_ready: An optional flag indicating whether the RPC should fail 

260 immediately if the connection is not ready at the time the RPC is 

261 invoked, or if it should wait until the connection to the server 

262 becomes ready. When using this option, the user will likely also want 

263 to set a timeout. Defaults to True. 

264 timeout: An optional duration of time in seconds to allow for the RPC, 

265 after which an exception will be raised. If timeout is unspecified, 

266 defaults to a timeout controlled by the 

267 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

268 unset, defaults to 60 seconds. Supply a value of None to indicate that 

269 no timeout should be enforced. 

270 metadata: Optional metadata to send to the server. 

271 

272 Returns: 

273 The response to the RPC. 

274 """ 

275 channel = ChannelCache.get().get_channel( 

276 target, options, channel_credentials, insecure, compression 

277 ) 

278 multicallable = channel.unary_unary( 

279 method, request_serializer, response_deserializer 

280 ) 

281 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

282 return multicallable( 

283 request, 

284 metadata=metadata, 

285 wait_for_ready=wait_for_ready, 

286 credentials=call_credentials, 

287 timeout=timeout, 

288 ) 

289 

290 

291@experimental_api 

292def unary_stream( 

293 request: RequestType, 

294 target: str, 

295 method: str, 

296 request_serializer: Optional[Callable[[Any], bytes]] = None, 

297 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

298 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

299 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

300 insecure: bool = False, 

301 call_credentials: Optional[grpc.CallCredentials] = None, 

302 compression: Optional[grpc.Compression] = None, 

303 wait_for_ready: Optional[bool] = None, 

304 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

305 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

306) -> Iterator[ResponseType]: 

307 """Invokes a unary-stream RPC without an explicitly specified channel. 

308 

309 THIS IS AN EXPERIMENTAL API. 

310 

311 This is backed by a per-process cache of channels. Channels are evicted 

312 from the cache after a fixed period by a background. Channels will also be 

313 evicted if more than a configured maximum accumulate. 

314 

315 The default eviction period is 10 minutes. One may set the environment 

316 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

317 

318 The default maximum number of channels is 256. One may set the 

319 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

320 this. 

321 

322 Args: 

323 request: An iterator that yields request values for the RPC. 

324 target: The server address. 

325 method: The name of the RPC method. 

326 request_serializer: Optional :term:`serializer` for serializing the request 

327 message. Request goes unserialized in case None is passed. 

328 response_deserializer: Optional :term:`deserializer` for deserializing the response 

329 message. Response goes undeserialized in case None is passed. 

330 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

331 runtime) to configure the channel. 

332 channel_credentials: A credential applied to the whole channel, e.g. the 

333 return value of grpc.ssl_channel_credentials(). 

334 insecure: If True, specifies channel_credentials as 

335 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

336 exclusive with the `channel_credentials` option. 

337 call_credentials: A call credential applied to each call individually, 

338 e.g. the output of grpc.metadata_call_credentials() or 

339 grpc.access_token_call_credentials(). 

340 compression: An optional value indicating the compression method to be 

341 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

342 wait_for_ready: An optional flag indicating whether the RPC should fail 

343 immediately if the connection is not ready at the time the RPC is 

344 invoked, or if it should wait until the connection to the server 

345 becomes ready. When using this option, the user will likely also want 

346 to set a timeout. Defaults to True. 

347 timeout: An optional duration of time in seconds to allow for the RPC, 

348 after which an exception will be raised. If timeout is unspecified, 

349 defaults to a timeout controlled by the 

350 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

351 unset, defaults to 60 seconds. Supply a value of None to indicate that 

352 no timeout should be enforced. 

353 metadata: Optional metadata to send to the server. 

354 

355 Returns: 

356 An iterator of responses. 

357 """ 

358 channel = ChannelCache.get().get_channel( 

359 target, options, channel_credentials, insecure, compression 

360 ) 

361 multicallable = channel.unary_stream( 

362 method, request_serializer, response_deserializer 

363 ) 

364 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

365 return multicallable( 

366 request, 

367 metadata=metadata, 

368 wait_for_ready=wait_for_ready, 

369 credentials=call_credentials, 

370 timeout=timeout, 

371 ) 

372 

373 

374@experimental_api 

375def stream_unary( 

376 request_iterator: Iterator[RequestType], 

377 target: str, 

378 method: str, 

379 request_serializer: Optional[Callable[[Any], bytes]] = None, 

380 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

381 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

382 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

383 insecure: bool = False, 

384 call_credentials: Optional[grpc.CallCredentials] = None, 

385 compression: Optional[grpc.Compression] = None, 

386 wait_for_ready: Optional[bool] = None, 

387 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

388 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

389) -> ResponseType: 

390 """Invokes a stream-unary RPC without an explicitly specified channel. 

391 

392 THIS IS AN EXPERIMENTAL API. 

393 

394 This is backed by a per-process cache of channels. Channels are evicted 

395 from the cache after a fixed period by a background. Channels will also be 

396 evicted if more than a configured maximum accumulate. 

397 

398 The default eviction period is 10 minutes. One may set the environment 

399 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

400 

401 The default maximum number of channels is 256. One may set the 

402 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

403 this. 

404 

405 Args: 

406 request_iterator: An iterator that yields request values for the RPC. 

407 target: The server address. 

408 method: The name of the RPC method. 

409 request_serializer: Optional :term:`serializer` for serializing the request 

410 message. Request goes unserialized in case None is passed. 

411 response_deserializer: Optional :term:`deserializer` for deserializing the response 

412 message. Response goes undeserialized in case None is passed. 

413 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

414 runtime) to configure the channel. 

415 channel_credentials: A credential applied to the whole channel, e.g. the 

416 return value of grpc.ssl_channel_credentials(). 

417 call_credentials: A call credential applied to each call individually, 

418 e.g. the output of grpc.metadata_call_credentials() or 

419 grpc.access_token_call_credentials(). 

420 insecure: If True, specifies channel_credentials as 

421 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

422 exclusive with the `channel_credentials` option. 

423 compression: An optional value indicating the compression method to be 

424 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

425 wait_for_ready: An optional flag indicating whether the RPC should fail 

426 immediately if the connection is not ready at the time the RPC is 

427 invoked, or if it should wait until the connection to the server 

428 becomes ready. When using this option, the user will likely also want 

429 to set a timeout. Defaults to True. 

430 timeout: An optional duration of time in seconds to allow for the RPC, 

431 after which an exception will be raised. If timeout is unspecified, 

432 defaults to a timeout controlled by the 

433 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

434 unset, defaults to 60 seconds. Supply a value of None to indicate that 

435 no timeout should be enforced. 

436 metadata: Optional metadata to send to the server. 

437 

438 Returns: 

439 The response to the RPC. 

440 """ 

441 channel = ChannelCache.get().get_channel( 

442 target, options, channel_credentials, insecure, compression 

443 ) 

444 multicallable = channel.stream_unary( 

445 method, request_serializer, response_deserializer 

446 ) 

447 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

448 return multicallable( 

449 request_iterator, 

450 metadata=metadata, 

451 wait_for_ready=wait_for_ready, 

452 credentials=call_credentials, 

453 timeout=timeout, 

454 ) 

455 

456 

457@experimental_api 

458def stream_stream( 

459 request_iterator: Iterator[RequestType], 

460 target: str, 

461 method: str, 

462 request_serializer: Optional[Callable[[Any], bytes]] = None, 

463 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

464 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

465 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

466 insecure: bool = False, 

467 call_credentials: Optional[grpc.CallCredentials] = None, 

468 compression: Optional[grpc.Compression] = None, 

469 wait_for_ready: Optional[bool] = None, 

470 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

471 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

472) -> Iterator[ResponseType]: 

473 """Invokes a stream-stream RPC without an explicitly specified channel. 

474 

475 THIS IS AN EXPERIMENTAL API. 

476 

477 This is backed by a per-process cache of channels. Channels are evicted 

478 from the cache after a fixed period by a background. Channels will also be 

479 evicted if more than a configured maximum accumulate. 

480 

481 The default eviction period is 10 minutes. One may set the environment 

482 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

483 

484 The default maximum number of channels is 256. One may set the 

485 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

486 this. 

487 

488 Args: 

489 request_iterator: An iterator that yields request values for the RPC. 

490 target: The server address. 

491 method: The name of the RPC method. 

492 request_serializer: Optional :term:`serializer` for serializing the request 

493 message. Request goes unserialized in case None is passed. 

494 response_deserializer: Optional :term:`deserializer` for deserializing the response 

495 message. Response goes undeserialized in case None is passed. 

496 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

497 runtime) to configure the channel. 

498 channel_credentials: A credential applied to the whole channel, e.g. the 

499 return value of grpc.ssl_channel_credentials(). 

500 call_credentials: A call credential applied to each call individually, 

501 e.g. the output of grpc.metadata_call_credentials() or 

502 grpc.access_token_call_credentials(). 

503 insecure: If True, specifies channel_credentials as 

504 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

505 exclusive with the `channel_credentials` option. 

506 compression: An optional value indicating the compression method to be 

507 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

508 wait_for_ready: An optional flag indicating whether the RPC should fail 

509 immediately if the connection is not ready at the time the RPC is 

510 invoked, or if it should wait until the connection to the server 

511 becomes ready. When using this option, the user will likely also want 

512 to set a timeout. Defaults to True. 

513 timeout: An optional duration of time in seconds to allow for the RPC, 

514 after which an exception will be raised. If timeout is unspecified, 

515 defaults to a timeout controlled by the 

516 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

517 unset, defaults to 60 seconds. Supply a value of None to indicate that 

518 no timeout should be enforced. 

519 metadata: Optional metadata to send to the server. 

520 

521 Returns: 

522 An iterator of responses. 

523 """ 

524 channel = ChannelCache.get().get_channel( 

525 target, options, channel_credentials, insecure, compression 

526 ) 

527 multicallable = channel.stream_stream( 

528 method, request_serializer, response_deserializer 

529 ) 

530 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

531 return multicallable( 

532 request_iterator, 

533 metadata=metadata, 

534 wait_for_ready=wait_for_ready, 

535 credentials=call_credentials, 

536 timeout=timeout, 

537 )