Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_simple_stubs.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

127 statements  

1# Copyright 2020 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Functions that obviate explicit stubs and explicit channels.""" 

15 

16import collections 

17import datetime 

18import logging 

19import os 

20import threading 

21from typing import ( 

22 Any, 

23 AnyStr, 

24 Callable, 

25 Dict, 

26 Iterator, 

27 Optional, 

28 Sequence, 

29 Tuple, 

30 TypeVar, 

31 Union, 

32) 

33 

34import grpc 

35from grpc.experimental import experimental_api 

36 

37RequestType = TypeVar("RequestType") 

38ResponseType = TypeVar("ResponseType") 

39 

40OptionsType = Sequence[Tuple[str, str]] 

41CacheKey = Tuple[ 

42 str, 

43 OptionsType, 

44 Optional[grpc.ChannelCredentials], 

45 Optional[grpc.Compression], 

46] 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" 

51if _EVICTION_PERIOD_KEY in os.environ: 

52 _EVICTION_PERIOD = datetime.timedelta( 

53 seconds=float(os.environ[_EVICTION_PERIOD_KEY]) 

54 ) 

55 _LOGGER.debug( 

56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD 

57 ) 

58else: 

59 _EVICTION_PERIOD = datetime.timedelta(minutes=10) 

60 

61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" 

62if _MAXIMUM_CHANNELS_KEY in os.environ: 

63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) 

64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) 

65else: 

66 _MAXIMUM_CHANNELS = 2**8 

67 

68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" 

69if _DEFAULT_TIMEOUT_KEY in os.environ: 

70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) 

71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) 

72else: 

73 _DEFAULT_TIMEOUT = 60.0 

74 

75 

76def _create_channel( 

77 target: str, 

78 options: Sequence[Tuple[str, str]], 

79 channel_credentials: Optional[grpc.ChannelCredentials], 

80 compression: Optional[grpc.Compression], 

81) -> grpc.Channel: 

82 debug_msg = ( 

83 f"Creating secure channel with credentials '{channel_credentials}', " 

84 f"options '{options}' and compression '{compression}'" 

85 ) 

86 _LOGGER.debug(debug_msg) 

87 return grpc.secure_channel( 

88 target, 

89 credentials=channel_credentials, 

90 options=options, 

91 compression=compression, 

92 ) 

93 

94 

95class ChannelCache: 

96 # NOTE(rbellevi): Untyped due to reference cycle. 

97 _singleton = None 

98 _lock: threading.RLock = threading.RLock() 

99 _condition: threading.Condition = threading.Condition(lock=_lock) 

100 _eviction_ready: threading.Event = threading.Event() 

101 

102 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] 

103 _eviction_thread: threading.Thread 

104 

105 def __init__(self): 

106 self._mapping = collections.OrderedDict() 

107 self._eviction_thread = threading.Thread( 

108 target=ChannelCache._perform_evictions, daemon=True 

109 ) 

110 self._eviction_thread.start() 

111 

112 @staticmethod 

113 def get(): 

114 with ChannelCache._lock: 

115 if ChannelCache._singleton is None: 

116 ChannelCache._singleton = ChannelCache() 

117 ChannelCache._eviction_ready.wait() 

118 return ChannelCache._singleton 

119 

120 def _evict_locked(self, key: CacheKey): 

121 channel, _ = self._mapping.pop(key) 

122 _LOGGER.debug( 

123 "Evicting channel %s with configuration %s.", channel, key 

124 ) 

125 channel.close() 

126 del channel 

127 

128 @staticmethod 

129 def _perform_evictions(): 

130 while True: 

131 with ChannelCache._lock: 

132 ChannelCache._eviction_ready.set() 

133 if not ChannelCache._singleton._mapping: 

134 ChannelCache._condition.wait() 

135 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: 

136 key = next(iter(ChannelCache._singleton._mapping.keys())) 

137 ChannelCache._singleton._evict_locked(key) 

138 # And immediately reevaluate. 

139 else: 

140 key, (_, eviction_time) = next( 

141 iter(ChannelCache._singleton._mapping.items()) 

142 ) 

143 now = datetime.datetime.now() 

144 if eviction_time <= now: 

145 ChannelCache._singleton._evict_locked(key) 

146 continue 

147 time_to_eviction = (eviction_time - now).total_seconds() 

148 # NOTE: We aim to *eventually* coalesce to a state in 

149 # which no overdue channels are in the cache and the 

150 # length of the cache is longer than _MAXIMUM_CHANNELS. 

151 # We tolerate momentary states in which these two 

152 # criteria are not met. 

153 ChannelCache._condition.wait(timeout=time_to_eviction) 

154 

155 def get_channel( 

156 self, 

157 target: str, 

158 options: Sequence[Tuple[str, str]], 

159 channel_credentials: Optional[grpc.ChannelCredentials], 

160 insecure: bool, 

161 compression: Optional[grpc.Compression], 

162 method: str, 

163 _registered_method: bool, 

164 ) -> Tuple[grpc.Channel, Optional[int]]: 

165 """Get a channel from cache or creates a new channel. 

166 

167 This method also takes care of register method for channel, 

168 which means we'll register a new call handle if we're calling a 

169 non-registered method for an existing channel. 

170 

171 Returns: 

172 A tuple with two items. The first item is the channel, second item is 

173 the call handle if the method is registered, None if it's not registered. 

174 """ 

175 if insecure and channel_credentials: 

176 raise ValueError( 

177 "The insecure option is mutually exclusive with " 

178 + "the channel_credentials option. Please use one " 

179 + "or the other." 

180 ) 

181 if insecure: 

182 channel_credentials = ( 

183 grpc.experimental.insecure_channel_credentials() 

184 ) 

185 elif channel_credentials is None: 

186 _LOGGER.debug("Defaulting to SSL channel credentials.") 

187 channel_credentials = grpc.ssl_channel_credentials() 

188 key = (target, options, channel_credentials, compression) 

189 with self._lock: 

190 channel_data = self._mapping.get(key, None) 

191 call_handle = None 

192 if channel_data is not None: 

193 channel = channel_data[0] 

194 # Register a new call handle if we're calling a registered method for an 

195 # existing channel and this method is not registered. 

196 if _registered_method: 

197 call_handle = channel._get_registered_call_handle(method) 

198 self._mapping.pop(key) 

199 self._mapping[key] = ( 

200 channel, 

201 datetime.datetime.now() + _EVICTION_PERIOD, 

202 ) 

203 return channel, call_handle 

204 channel = _create_channel( 

205 target, options, channel_credentials, compression 

206 ) 

207 if _registered_method: 

208 call_handle = channel._get_registered_call_handle(method) 

209 self._mapping[key] = ( 

210 channel, 

211 datetime.datetime.now() + _EVICTION_PERIOD, 

212 ) 

213 if ( 

214 len(self._mapping) == 1 

215 or len(self._mapping) >= _MAXIMUM_CHANNELS 

216 ): 

217 self._condition.notify() 

218 return channel, call_handle 

219 

220 def _test_only_channel_count(self) -> int: 

221 with self._lock: 

222 return len(self._mapping) 

223 

224 

225@experimental_api 

226# pylint: disable=too-many-locals 

227def unary_unary( 

228 request: RequestType, 

229 target: str, 

230 method: str, 

231 request_serializer: Optional[Callable[[Any], bytes]] = None, 

232 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

233 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

234 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

235 insecure: bool = False, 

236 call_credentials: Optional[grpc.CallCredentials] = None, 

237 compression: Optional[grpc.Compression] = None, 

238 wait_for_ready: Optional[bool] = None, 

239 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

240 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

241 _registered_method: Optional[bool] = False, 

242) -> ResponseType: 

243 """Invokes a unary-unary RPC without an explicitly specified channel. 

244 

245 THIS IS AN EXPERIMENTAL API. 

246 

247 This is backed by a per-process cache of channels. Channels are evicted 

248 from the cache after a fixed period by a background. Channels will also be 

249 evicted if more than a configured maximum accumulate. 

250 

251 The default eviction period is 10 minutes. One may set the environment 

252 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

253 

254 The default maximum number of channels is 256. One may set the 

255 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

256 this. 

257 

258 Args: 

259 request: An iterator that yields request values for the RPC. 

260 target: The server address. 

261 method: The name of the RPC method. 

262 request_serializer: Optional :term:`serializer` for serializing the request 

263 message. Request goes unserialized in case None is passed. 

264 response_deserializer: Optional :term:`deserializer` for deserializing the response 

265 message. Response goes undeserialized in case None is passed. 

266 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

267 runtime) to configure the channel. 

268 channel_credentials: A credential applied to the whole channel, e.g. the 

269 return value of grpc.ssl_channel_credentials() or 

270 grpc.insecure_channel_credentials(). 

271 insecure: If True, specifies channel_credentials as 

272 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

273 exclusive with the `channel_credentials` option. 

274 call_credentials: A call credential applied to each call individually, 

275 e.g. the output of grpc.metadata_call_credentials() or 

276 grpc.access_token_call_credentials(). 

277 compression: An optional value indicating the compression method to be 

278 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

279 wait_for_ready: An optional flag indicating whether the RPC should fail 

280 immediately if the connection is not ready at the time the RPC is 

281 invoked, or if it should wait until the connection to the server 

282 becomes ready. When using this option, the user will likely also want 

283 to set a timeout. Defaults to True. 

284 timeout: An optional duration of time in seconds to allow for the RPC, 

285 after which an exception will be raised. If timeout is unspecified, 

286 defaults to a timeout controlled by the 

287 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

288 unset, defaults to 60 seconds. Supply a value of None to indicate that 

289 no timeout should be enforced. 

290 metadata: Optional metadata to send to the server. 

291 

292 Returns: 

293 The response to the RPC. 

294 """ 

295 channel, method_handle = ChannelCache.get().get_channel( 

296 target, 

297 options, 

298 channel_credentials, 

299 insecure, 

300 compression, 

301 method, 

302 _registered_method, 

303 ) 

304 multicallable = channel.unary_unary( 

305 method, request_serializer, response_deserializer, method_handle 

306 ) 

307 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

308 return multicallable( 

309 request, 

310 metadata=metadata, 

311 wait_for_ready=wait_for_ready, 

312 credentials=call_credentials, 

313 timeout=timeout, 

314 ) 

315 

316 

317@experimental_api 

318# pylint: disable=too-many-locals 

319def unary_stream( 

320 request: RequestType, 

321 target: str, 

322 method: str, 

323 request_serializer: Optional[Callable[[Any], bytes]] = None, 

324 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

325 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

326 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

327 insecure: bool = False, 

328 call_credentials: Optional[grpc.CallCredentials] = None, 

329 compression: Optional[grpc.Compression] = None, 

330 wait_for_ready: Optional[bool] = None, 

331 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

332 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

333 _registered_method: Optional[bool] = False, 

334) -> Iterator[ResponseType]: 

335 """Invokes a unary-stream RPC without an explicitly specified channel. 

336 

337 THIS IS AN EXPERIMENTAL API. 

338 

339 This is backed by a per-process cache of channels. Channels are evicted 

340 from the cache after a fixed period by a background. Channels will also be 

341 evicted if more than a configured maximum accumulate. 

342 

343 The default eviction period is 10 minutes. One may set the environment 

344 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

345 

346 The default maximum number of channels is 256. One may set the 

347 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

348 this. 

349 

350 Args: 

351 request: An iterator that yields request values for the RPC. 

352 target: The server address. 

353 method: The name of the RPC method. 

354 request_serializer: Optional :term:`serializer` for serializing the request 

355 message. Request goes unserialized in case None is passed. 

356 response_deserializer: Optional :term:`deserializer` for deserializing the response 

357 message. Response goes undeserialized in case None is passed. 

358 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

359 runtime) to configure the channel. 

360 channel_credentials: A credential applied to the whole channel, e.g. the 

361 return value of grpc.ssl_channel_credentials(). 

362 insecure: If True, specifies channel_credentials as 

363 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

364 exclusive with the `channel_credentials` option. 

365 call_credentials: A call credential applied to each call individually, 

366 e.g. the output of grpc.metadata_call_credentials() or 

367 grpc.access_token_call_credentials(). 

368 compression: An optional value indicating the compression method to be 

369 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

370 wait_for_ready: An optional flag indicating whether the RPC should fail 

371 immediately if the connection is not ready at the time the RPC is 

372 invoked, or if it should wait until the connection to the server 

373 becomes ready. When using this option, the user will likely also want 

374 to set a timeout. Defaults to True. 

375 timeout: An optional duration of time in seconds to allow for the RPC, 

376 after which an exception will be raised. If timeout is unspecified, 

377 defaults to a timeout controlled by the 

378 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

379 unset, defaults to 60 seconds. Supply a value of None to indicate that 

380 no timeout should be enforced. 

381 metadata: Optional metadata to send to the server. 

382 

383 Returns: 

384 An iterator of responses. 

385 """ 

386 channel, method_handle = ChannelCache.get().get_channel( 

387 target, 

388 options, 

389 channel_credentials, 

390 insecure, 

391 compression, 

392 method, 

393 _registered_method, 

394 ) 

395 multicallable = channel.unary_stream( 

396 method, request_serializer, response_deserializer, method_handle 

397 ) 

398 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

399 return multicallable( 

400 request, 

401 metadata=metadata, 

402 wait_for_ready=wait_for_ready, 

403 credentials=call_credentials, 

404 timeout=timeout, 

405 ) 

406 

407 

408@experimental_api 

409# pylint: disable=too-many-locals 

410def stream_unary( 

411 request_iterator: Iterator[RequestType], 

412 target: str, 

413 method: str, 

414 request_serializer: Optional[Callable[[Any], bytes]] = None, 

415 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

416 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

417 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

418 insecure: bool = False, 

419 call_credentials: Optional[grpc.CallCredentials] = None, 

420 compression: Optional[grpc.Compression] = None, 

421 wait_for_ready: Optional[bool] = None, 

422 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

423 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

424 _registered_method: Optional[bool] = False, 

425) -> ResponseType: 

426 """Invokes a stream-unary RPC without an explicitly specified channel. 

427 

428 THIS IS AN EXPERIMENTAL API. 

429 

430 This is backed by a per-process cache of channels. Channels are evicted 

431 from the cache after a fixed period by a background. Channels will also be 

432 evicted if more than a configured maximum accumulate. 

433 

434 The default eviction period is 10 minutes. One may set the environment 

435 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

436 

437 The default maximum number of channels is 256. One may set the 

438 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

439 this. 

440 

441 Args: 

442 request_iterator: An iterator that yields request values for the RPC. 

443 target: The server address. 

444 method: The name of the RPC method. 

445 request_serializer: Optional :term:`serializer` for serializing the request 

446 message. Request goes unserialized in case None is passed. 

447 response_deserializer: Optional :term:`deserializer` for deserializing the response 

448 message. Response goes undeserialized in case None is passed. 

449 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

450 runtime) to configure the channel. 

451 channel_credentials: A credential applied to the whole channel, e.g. the 

452 return value of grpc.ssl_channel_credentials(). 

453 call_credentials: A call credential applied to each call individually, 

454 e.g. the output of grpc.metadata_call_credentials() or 

455 grpc.access_token_call_credentials(). 

456 insecure: If True, specifies channel_credentials as 

457 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

458 exclusive with the `channel_credentials` option. 

459 compression: An optional value indicating the compression method to be 

460 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

461 wait_for_ready: An optional flag indicating whether the RPC should fail 

462 immediately if the connection is not ready at the time the RPC is 

463 invoked, or if it should wait until the connection to the server 

464 becomes ready. When using this option, the user will likely also want 

465 to set a timeout. Defaults to True. 

466 timeout: An optional duration of time in seconds to allow for the RPC, 

467 after which an exception will be raised. If timeout is unspecified, 

468 defaults to a timeout controlled by the 

469 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

470 unset, defaults to 60 seconds. Supply a value of None to indicate that 

471 no timeout should be enforced. 

472 metadata: Optional metadata to send to the server. 

473 

474 Returns: 

475 The response to the RPC. 

476 """ 

477 channel, method_handle = ChannelCache.get().get_channel( 

478 target, 

479 options, 

480 channel_credentials, 

481 insecure, 

482 compression, 

483 method, 

484 _registered_method, 

485 ) 

486 multicallable = channel.stream_unary( 

487 method, request_serializer, response_deserializer, method_handle 

488 ) 

489 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

490 return multicallable( 

491 request_iterator, 

492 metadata=metadata, 

493 wait_for_ready=wait_for_ready, 

494 credentials=call_credentials, 

495 timeout=timeout, 

496 ) 

497 

498 

499@experimental_api 

500# pylint: disable=too-many-locals 

501def stream_stream( 

502 request_iterator: Iterator[RequestType], 

503 target: str, 

504 method: str, 

505 request_serializer: Optional[Callable[[Any], bytes]] = None, 

506 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

507 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

508 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

509 insecure: bool = False, 

510 call_credentials: Optional[grpc.CallCredentials] = None, 

511 compression: Optional[grpc.Compression] = None, 

512 wait_for_ready: Optional[bool] = None, 

513 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

514 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

515 _registered_method: Optional[bool] = False, 

516) -> Iterator[ResponseType]: 

517 """Invokes a stream-stream RPC without an explicitly specified channel. 

518 

519 THIS IS AN EXPERIMENTAL API. 

520 

521 This is backed by a per-process cache of channels. Channels are evicted 

522 from the cache after a fixed period by a background. Channels will also be 

523 evicted if more than a configured maximum accumulate. 

524 

525 The default eviction period is 10 minutes. One may set the environment 

526 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

527 

528 The default maximum number of channels is 256. One may set the 

529 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

530 this. 

531 

532 Args: 

533 request_iterator: An iterator that yields request values for the RPC. 

534 target: The server address. 

535 method: The name of the RPC method. 

536 request_serializer: Optional :term:`serializer` for serializing the request 

537 message. Request goes unserialized in case None is passed. 

538 response_deserializer: Optional :term:`deserializer` for deserializing the response 

539 message. Response goes undeserialized in case None is passed. 

540 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

541 runtime) to configure the channel. 

542 channel_credentials: A credential applied to the whole channel, e.g. the 

543 return value of grpc.ssl_channel_credentials(). 

544 call_credentials: A call credential applied to each call individually, 

545 e.g. the output of grpc.metadata_call_credentials() or 

546 grpc.access_token_call_credentials(). 

547 insecure: If True, specifies channel_credentials as 

548 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

549 exclusive with the `channel_credentials` option. 

550 compression: An optional value indicating the compression method to be 

551 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

552 wait_for_ready: An optional flag indicating whether the RPC should fail 

553 immediately if the connection is not ready at the time the RPC is 

554 invoked, or if it should wait until the connection to the server 

555 becomes ready. When using this option, the user will likely also want 

556 to set a timeout. Defaults to True. 

557 timeout: An optional duration of time in seconds to allow for the RPC, 

558 after which an exception will be raised. If timeout is unspecified, 

559 defaults to a timeout controlled by the 

560 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

561 unset, defaults to 60 seconds. Supply a value of None to indicate that 

562 no timeout should be enforced. 

563 metadata: Optional metadata to send to the server. 

564 

565 Returns: 

566 An iterator of responses. 

567 """ 

568 channel, method_handle = ChannelCache.get().get_channel( 

569 target, 

570 options, 

571 channel_credentials, 

572 insecure, 

573 compression, 

574 method, 

575 _registered_method, 

576 ) 

577 multicallable = channel.stream_stream( 

578 method, request_serializer, response_deserializer, method_handle 

579 ) 

580 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

581 return multicallable( 

582 request_iterator, 

583 metadata=metadata, 

584 wait_for_ready=wait_for_ready, 

585 credentials=call_credentials, 

586 timeout=timeout, 

587 )