Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_simple_stubs.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

126 statements  

1# Copyright 2020 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Functions that obviate explicit stubs and explicit channels.""" 

15 

16import collections 

17import datetime 

18import logging 

19import os 

20import threading 

21from typing import ( 

22 Any, 

23 AnyStr, 

24 Callable, 

25 Dict, 

26 Iterator, 

27 Optional, 

28 Sequence, 

29 Tuple, 

30 TypeVar, 

31 Union, 

32) 

33 

34import grpc 

35from grpc.experimental import experimental_api 

36 

37RequestType = TypeVar("RequestType") 

38ResponseType = TypeVar("ResponseType") 

39 

40OptionsType = Sequence[Tuple[str, str]] 

41CacheKey = Tuple[ 

42 str, 

43 OptionsType, 

44 Optional[grpc.ChannelCredentials], 

45 Optional[grpc.Compression], 

46] 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" 

51if _EVICTION_PERIOD_KEY in os.environ: 

52 _EVICTION_PERIOD = datetime.timedelta( 

53 seconds=float(os.environ[_EVICTION_PERIOD_KEY]) 

54 ) 

55 _LOGGER.debug( 

56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD 

57 ) 

58else: 

59 _EVICTION_PERIOD = datetime.timedelta(minutes=10) 

60 

61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" 

62if _MAXIMUM_CHANNELS_KEY in os.environ: 

63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) 

64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) 

65else: 

66 _MAXIMUM_CHANNELS = 2**8 

67 

68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" 

69if _DEFAULT_TIMEOUT_KEY in os.environ: 

70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) 

71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) 

72else: 

73 _DEFAULT_TIMEOUT = 60.0 

74 

75 

76def _create_channel( 

77 target: str, 

78 options: Sequence[Tuple[str, str]], 

79 channel_credentials: Optional[grpc.ChannelCredentials], 

80 compression: Optional[grpc.Compression], 

81) -> grpc.Channel: 

82 _LOGGER.debug( 

83 f"Creating secure channel with credentials '{channel_credentials}', " 

84 + f"options '{options}' and compression '{compression}'" 

85 ) 

86 return grpc.secure_channel( 

87 target, 

88 credentials=channel_credentials, 

89 options=options, 

90 compression=compression, 

91 ) 

92 

93 

94class ChannelCache: 

95 # NOTE(rbellevi): Untyped due to reference cycle. 

96 _singleton = None 

97 _lock: threading.RLock = threading.RLock() 

98 _condition: threading.Condition = threading.Condition(lock=_lock) 

99 _eviction_ready: threading.Event = threading.Event() 

100 

101 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] 

102 _eviction_thread: threading.Thread 

103 

104 def __init__(self): 

105 self._mapping = collections.OrderedDict() 

106 self._eviction_thread = threading.Thread( 

107 target=ChannelCache._perform_evictions, daemon=True 

108 ) 

109 self._eviction_thread.start() 

110 

111 @staticmethod 

112 def get(): 

113 with ChannelCache._lock: 

114 if ChannelCache._singleton is None: 

115 ChannelCache._singleton = ChannelCache() 

116 ChannelCache._eviction_ready.wait() 

117 return ChannelCache._singleton 

118 

119 def _evict_locked(self, key: CacheKey): 

120 channel, _ = self._mapping.pop(key) 

121 _LOGGER.debug( 

122 "Evicting channel %s with configuration %s.", channel, key 

123 ) 

124 channel.close() 

125 del channel 

126 

127 @staticmethod 

128 def _perform_evictions(): 

129 while True: 

130 with ChannelCache._lock: 

131 ChannelCache._eviction_ready.set() 

132 if not ChannelCache._singleton._mapping: 

133 ChannelCache._condition.wait() 

134 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: 

135 key = next(iter(ChannelCache._singleton._mapping.keys())) 

136 ChannelCache._singleton._evict_locked(key) 

137 # And immediately reevaluate. 

138 else: 

139 key, (_, eviction_time) = next( 

140 iter(ChannelCache._singleton._mapping.items()) 

141 ) 

142 now = datetime.datetime.now() 

143 if eviction_time <= now: 

144 ChannelCache._singleton._evict_locked(key) 

145 continue 

146 else: 

147 time_to_eviction = (eviction_time - now).total_seconds() 

148 # NOTE: We aim to *eventually* coalesce to a state in 

149 # which no overdue channels are in the cache and the 

150 # length of the cache is longer than _MAXIMUM_CHANNELS. 

151 # We tolerate momentary states in which these two 

152 # criteria are not met. 

153 ChannelCache._condition.wait(timeout=time_to_eviction) 

154 

155 def get_channel( 

156 self, 

157 target: str, 

158 options: Sequence[Tuple[str, str]], 

159 channel_credentials: Optional[grpc.ChannelCredentials], 

160 insecure: bool, 

161 compression: Optional[grpc.Compression], 

162 method: str, 

163 _registered_method: bool, 

164 ) -> Tuple[grpc.Channel, Optional[int]]: 

165 """Get a channel from cache or creates a new channel. 

166 

167 This method also takes care of register method for channel, 

168 which means we'll register a new call handle if we're calling a 

169 non-registered method for an existing channel. 

170 

171 Returns: 

172 A tuple with two items. The first item is the channel, second item is 

173 the call handle if the method is registered, None if it's not registered. 

174 """ 

175 if insecure and channel_credentials: 

176 raise ValueError( 

177 "The insecure option is mutually exclusive with " 

178 + "the channel_credentials option. Please use one " 

179 + "or the other." 

180 ) 

181 if insecure: 

182 channel_credentials = ( 

183 grpc.experimental.insecure_channel_credentials() 

184 ) 

185 elif channel_credentials is None: 

186 _LOGGER.debug("Defaulting to SSL channel credentials.") 

187 channel_credentials = grpc.ssl_channel_credentials() 

188 key = (target, options, channel_credentials, compression) 

189 with self._lock: 

190 channel_data = self._mapping.get(key, None) 

191 call_handle = None 

192 if channel_data is not None: 

193 channel = channel_data[0] 

194 # Register a new call handle if we're calling a registered method for an 

195 # existing channel and this method is not registered. 

196 if _registered_method: 

197 call_handle = channel._get_registered_call_handle(method) 

198 self._mapping.pop(key) 

199 self._mapping[key] = ( 

200 channel, 

201 datetime.datetime.now() + _EVICTION_PERIOD, 

202 ) 

203 return channel, call_handle 

204 else: 

205 channel = _create_channel( 

206 target, options, channel_credentials, compression 

207 ) 

208 if _registered_method: 

209 call_handle = channel._get_registered_call_handle(method) 

210 self._mapping[key] = ( 

211 channel, 

212 datetime.datetime.now() + _EVICTION_PERIOD, 

213 ) 

214 if ( 

215 len(self._mapping) == 1 

216 or len(self._mapping) >= _MAXIMUM_CHANNELS 

217 ): 

218 self._condition.notify() 

219 return channel, call_handle 

220 

221 def _test_only_channel_count(self) -> int: 

222 with self._lock: 

223 return len(self._mapping) 

224 

225 

226@experimental_api 

227# pylint: disable=too-many-locals 

228def unary_unary( 

229 request: RequestType, 

230 target: str, 

231 method: str, 

232 request_serializer: Optional[Callable[[Any], bytes]] = None, 

233 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

234 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

235 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

236 insecure: bool = False, 

237 call_credentials: Optional[grpc.CallCredentials] = None, 

238 compression: Optional[grpc.Compression] = None, 

239 wait_for_ready: Optional[bool] = None, 

240 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

241 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

242 _registered_method: Optional[bool] = False, 

243) -> ResponseType: 

244 """Invokes a unary-unary RPC without an explicitly specified channel. 

245 

246 THIS IS AN EXPERIMENTAL API. 

247 

248 This is backed by a per-process cache of channels. Channels are evicted 

249 from the cache after a fixed period by a background. Channels will also be 

250 evicted if more than a configured maximum accumulate. 

251 

252 The default eviction period is 10 minutes. One may set the environment 

253 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

254 

255 The default maximum number of channels is 256. One may set the 

256 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

257 this. 

258 

259 Args: 

260 request: An iterator that yields request values for the RPC. 

261 target: The server address. 

262 method: The name of the RPC method. 

263 request_serializer: Optional :term:`serializer` for serializing the request 

264 message. Request goes unserialized in case None is passed. 

265 response_deserializer: Optional :term:`deserializer` for deserializing the response 

266 message. Response goes undeserialized in case None is passed. 

267 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

268 runtime) to configure the channel. 

269 channel_credentials: A credential applied to the whole channel, e.g. the 

270 return value of grpc.ssl_channel_credentials() or 

271 grpc.insecure_channel_credentials(). 

272 insecure: If True, specifies channel_credentials as 

273 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

274 exclusive with the `channel_credentials` option. 

275 call_credentials: A call credential applied to each call individually, 

276 e.g. the output of grpc.metadata_call_credentials() or 

277 grpc.access_token_call_credentials(). 

278 compression: An optional value indicating the compression method to be 

279 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

280 wait_for_ready: An optional flag indicating whether the RPC should fail 

281 immediately if the connection is not ready at the time the RPC is 

282 invoked, or if it should wait until the connection to the server 

283 becomes ready. When using this option, the user will likely also want 

284 to set a timeout. Defaults to True. 

285 timeout: An optional duration of time in seconds to allow for the RPC, 

286 after which an exception will be raised. If timeout is unspecified, 

287 defaults to a timeout controlled by the 

288 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

289 unset, defaults to 60 seconds. Supply a value of None to indicate that 

290 no timeout should be enforced. 

291 metadata: Optional metadata to send to the server. 

292 

293 Returns: 

294 The response to the RPC. 

295 """ 

296 channel, method_handle = ChannelCache.get().get_channel( 

297 target, 

298 options, 

299 channel_credentials, 

300 insecure, 

301 compression, 

302 method, 

303 _registered_method, 

304 ) 

305 multicallable = channel.unary_unary( 

306 method, request_serializer, response_deserializer, method_handle 

307 ) 

308 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

309 return multicallable( 

310 request, 

311 metadata=metadata, 

312 wait_for_ready=wait_for_ready, 

313 credentials=call_credentials, 

314 timeout=timeout, 

315 ) 

316 

317 

318@experimental_api 

319# pylint: disable=too-many-locals 

320def unary_stream( 

321 request: RequestType, 

322 target: str, 

323 method: str, 

324 request_serializer: Optional[Callable[[Any], bytes]] = None, 

325 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

326 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

327 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

328 insecure: bool = False, 

329 call_credentials: Optional[grpc.CallCredentials] = None, 

330 compression: Optional[grpc.Compression] = None, 

331 wait_for_ready: Optional[bool] = None, 

332 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

333 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

334 _registered_method: Optional[bool] = False, 

335) -> Iterator[ResponseType]: 

336 """Invokes a unary-stream RPC without an explicitly specified channel. 

337 

338 THIS IS AN EXPERIMENTAL API. 

339 

340 This is backed by a per-process cache of channels. Channels are evicted 

341 from the cache after a fixed period by a background. Channels will also be 

342 evicted if more than a configured maximum accumulate. 

343 

344 The default eviction period is 10 minutes. One may set the environment 

345 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

346 

347 The default maximum number of channels is 256. One may set the 

348 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

349 this. 

350 

351 Args: 

352 request: An iterator that yields request values for the RPC. 

353 target: The server address. 

354 method: The name of the RPC method. 

355 request_serializer: Optional :term:`serializer` for serializing the request 

356 message. Request goes unserialized in case None is passed. 

357 response_deserializer: Optional :term:`deserializer` for deserializing the response 

358 message. Response goes undeserialized in case None is passed. 

359 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

360 runtime) to configure the channel. 

361 channel_credentials: A credential applied to the whole channel, e.g. the 

362 return value of grpc.ssl_channel_credentials(). 

363 insecure: If True, specifies channel_credentials as 

364 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

365 exclusive with the `channel_credentials` option. 

366 call_credentials: A call credential applied to each call individually, 

367 e.g. the output of grpc.metadata_call_credentials() or 

368 grpc.access_token_call_credentials(). 

369 compression: An optional value indicating the compression method to be 

370 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

371 wait_for_ready: An optional flag indicating whether the RPC should fail 

372 immediately if the connection is not ready at the time the RPC is 

373 invoked, or if it should wait until the connection to the server 

374 becomes ready. When using this option, the user will likely also want 

375 to set a timeout. Defaults to True. 

376 timeout: An optional duration of time in seconds to allow for the RPC, 

377 after which an exception will be raised. If timeout is unspecified, 

378 defaults to a timeout controlled by the 

379 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

380 unset, defaults to 60 seconds. Supply a value of None to indicate that 

381 no timeout should be enforced. 

382 metadata: Optional metadata to send to the server. 

383 

384 Returns: 

385 An iterator of responses. 

386 """ 

387 channel, method_handle = ChannelCache.get().get_channel( 

388 target, 

389 options, 

390 channel_credentials, 

391 insecure, 

392 compression, 

393 method, 

394 _registered_method, 

395 ) 

396 multicallable = channel.unary_stream( 

397 method, request_serializer, response_deserializer, method_handle 

398 ) 

399 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

400 return multicallable( 

401 request, 

402 metadata=metadata, 

403 wait_for_ready=wait_for_ready, 

404 credentials=call_credentials, 

405 timeout=timeout, 

406 ) 

407 

408 

409@experimental_api 

410# pylint: disable=too-many-locals 

411def stream_unary( 

412 request_iterator: Iterator[RequestType], 

413 target: str, 

414 method: str, 

415 request_serializer: Optional[Callable[[Any], bytes]] = None, 

416 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

417 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

418 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

419 insecure: bool = False, 

420 call_credentials: Optional[grpc.CallCredentials] = None, 

421 compression: Optional[grpc.Compression] = None, 

422 wait_for_ready: Optional[bool] = None, 

423 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

424 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

425 _registered_method: Optional[bool] = False, 

426) -> ResponseType: 

427 """Invokes a stream-unary RPC without an explicitly specified channel. 

428 

429 THIS IS AN EXPERIMENTAL API. 

430 

431 This is backed by a per-process cache of channels. Channels are evicted 

432 from the cache after a fixed period by a background. Channels will also be 

433 evicted if more than a configured maximum accumulate. 

434 

435 The default eviction period is 10 minutes. One may set the environment 

436 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

437 

438 The default maximum number of channels is 256. One may set the 

439 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

440 this. 

441 

442 Args: 

443 request_iterator: An iterator that yields request values for the RPC. 

444 target: The server address. 

445 method: The name of the RPC method. 

446 request_serializer: Optional :term:`serializer` for serializing the request 

447 message. Request goes unserialized in case None is passed. 

448 response_deserializer: Optional :term:`deserializer` for deserializing the response 

449 message. Response goes undeserialized in case None is passed. 

450 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

451 runtime) to configure the channel. 

452 channel_credentials: A credential applied to the whole channel, e.g. the 

453 return value of grpc.ssl_channel_credentials(). 

454 call_credentials: A call credential applied to each call individually, 

455 e.g. the output of grpc.metadata_call_credentials() or 

456 grpc.access_token_call_credentials(). 

457 insecure: If True, specifies channel_credentials as 

458 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

459 exclusive with the `channel_credentials` option. 

460 compression: An optional value indicating the compression method to be 

461 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

462 wait_for_ready: An optional flag indicating whether the RPC should fail 

463 immediately if the connection is not ready at the time the RPC is 

464 invoked, or if it should wait until the connection to the server 

465 becomes ready. When using this option, the user will likely also want 

466 to set a timeout. Defaults to True. 

467 timeout: An optional duration of time in seconds to allow for the RPC, 

468 after which an exception will be raised. If timeout is unspecified, 

469 defaults to a timeout controlled by the 

470 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

471 unset, defaults to 60 seconds. Supply a value of None to indicate that 

472 no timeout should be enforced. 

473 metadata: Optional metadata to send to the server. 

474 

475 Returns: 

476 The response to the RPC. 

477 """ 

478 channel, method_handle = ChannelCache.get().get_channel( 

479 target, 

480 options, 

481 channel_credentials, 

482 insecure, 

483 compression, 

484 method, 

485 _registered_method, 

486 ) 

487 multicallable = channel.stream_unary( 

488 method, request_serializer, response_deserializer, method_handle 

489 ) 

490 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

491 return multicallable( 

492 request_iterator, 

493 metadata=metadata, 

494 wait_for_ready=wait_for_ready, 

495 credentials=call_credentials, 

496 timeout=timeout, 

497 ) 

498 

499 

500@experimental_api 

501# pylint: disable=too-many-locals 

502def stream_stream( 

503 request_iterator: Iterator[RequestType], 

504 target: str, 

505 method: str, 

506 request_serializer: Optional[Callable[[Any], bytes]] = None, 

507 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

508 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

509 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

510 insecure: bool = False, 

511 call_credentials: Optional[grpc.CallCredentials] = None, 

512 compression: Optional[grpc.Compression] = None, 

513 wait_for_ready: Optional[bool] = None, 

514 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

515 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

516 _registered_method: Optional[bool] = False, 

517) -> Iterator[ResponseType]: 

518 """Invokes a stream-stream RPC without an explicitly specified channel. 

519 

520 THIS IS AN EXPERIMENTAL API. 

521 

522 This is backed by a per-process cache of channels. Channels are evicted 

523 from the cache after a fixed period by a background. Channels will also be 

524 evicted if more than a configured maximum accumulate. 

525 

526 The default eviction period is 10 minutes. One may set the environment 

527 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

528 

529 The default maximum number of channels is 256. One may set the 

530 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

531 this. 

532 

533 Args: 

534 request_iterator: An iterator that yields request values for the RPC. 

535 target: The server address. 

536 method: The name of the RPC method. 

537 request_serializer: Optional :term:`serializer` for serializing the request 

538 message. Request goes unserialized in case None is passed. 

539 response_deserializer: Optional :term:`deserializer` for deserializing the response 

540 message. Response goes undeserialized in case None is passed. 

541 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

542 runtime) to configure the channel. 

543 channel_credentials: A credential applied to the whole channel, e.g. the 

544 return value of grpc.ssl_channel_credentials(). 

545 call_credentials: A call credential applied to each call individually, 

546 e.g. the output of grpc.metadata_call_credentials() or 

547 grpc.access_token_call_credentials(). 

548 insecure: If True, specifies channel_credentials as 

549 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

550 exclusive with the `channel_credentials` option. 

551 compression: An optional value indicating the compression method to be 

552 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

553 wait_for_ready: An optional flag indicating whether the RPC should fail 

554 immediately if the connection is not ready at the time the RPC is 

555 invoked, or if it should wait until the connection to the server 

556 becomes ready. When using this option, the user will likely also want 

557 to set a timeout. Defaults to True. 

558 timeout: An optional duration of time in seconds to allow for the RPC, 

559 after which an exception will be raised. If timeout is unspecified, 

560 defaults to a timeout controlled by the 

561 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

562 unset, defaults to 60 seconds. Supply a value of None to indicate that 

563 no timeout should be enforced. 

564 metadata: Optional metadata to send to the server. 

565 

566 Returns: 

567 An iterator of responses. 

568 """ 

569 channel, method_handle = ChannelCache.get().get_channel( 

570 target, 

571 options, 

572 channel_credentials, 

573 insecure, 

574 compression, 

575 method, 

576 _registered_method, 

577 ) 

578 multicallable = channel.stream_stream( 

579 method, request_serializer, response_deserializer, method_handle 

580 ) 

581 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

582 return multicallable( 

583 request_iterator, 

584 metadata=metadata, 

585 wait_for_ready=wait_for_ready, 

586 credentials=call_credentials, 

587 timeout=timeout, 

588 )