Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/grpc/_simple_stubs.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

127 statements  

1# Copyright 2020 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Functions that obviate explicit stubs and explicit channels.""" 

15 

16import collections 

17import datetime 

18import logging 

19import os 

20import threading 

21from typing import ( 

22 Any, 

23 AnyStr, 

24 Callable, 

25 Dict, 

26 Iterator, 

27 Optional, 

28 Sequence, 

29 Tuple, 

30 TypeVar, 

31 Union, 

32) 

33 

34import grpc 

35from grpc.experimental import experimental_api 

36 

37RequestType = TypeVar("RequestType") 

38ResponseType = TypeVar("ResponseType") 

39 

40OptionsType = Sequence[Tuple[str, str]] 

41CacheKey = Tuple[ 

42 str, 

43 OptionsType, 

44 Optional[grpc.ChannelCredentials], 

45 Optional[grpc.Compression], 

46] 

47 

48_LOGGER = logging.getLogger(__name__) 

49 

50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" 

51if _EVICTION_PERIOD_KEY in os.environ: 

52 _EVICTION_PERIOD = datetime.timedelta( 

53 seconds=float(os.environ[_EVICTION_PERIOD_KEY]) 

54 ) 

55 _LOGGER.debug( 

56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD 

57 ) 

58else: 

59 _EVICTION_PERIOD = datetime.timedelta(minutes=10) 

60 

61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" 

62if _MAXIMUM_CHANNELS_KEY in os.environ: 

63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) 

64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) 

65else: 

66 _MAXIMUM_CHANNELS = 2**8 

67 

68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" 

69if _DEFAULT_TIMEOUT_KEY in os.environ: 

70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) 

71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) 

72else: 

73 _DEFAULT_TIMEOUT = 60.0 

74 

75 

76def _create_channel( 

77 target: str, 

78 options: Sequence[Tuple[str, str]], 

79 channel_credentials: Optional[grpc.ChannelCredentials], 

80 compression: Optional[grpc.Compression], 

81) -> grpc.Channel: 

82 debug_msg = ( 

83 f"Creating secure channel with credentials '{channel_credentials}', " 

84 f"options '{options}' and compression '{compression}'" 

85 ) 

86 _LOGGER.debug(debug_msg) 

87 return grpc.secure_channel( 

88 target, 

89 credentials=channel_credentials, 

90 options=options, 

91 compression=compression, 

92 ) 

93 

94 

95class ChannelCache: 

96 # NOTE(rbellevi): Untyped due to reference cycle. 

97 _singleton = None 

98 _lock: threading.RLock = threading.RLock() 

99 _condition: threading.Condition = threading.Condition(lock=_lock) 

100 _eviction_ready: threading.Event = threading.Event() 

101 

102 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] 

103 _eviction_thread: threading.Thread 

104 

105 def __init__(self): 

106 self._mapping = collections.OrderedDict() 

107 self._eviction_thread = threading.Thread( 

108 target=ChannelCache._perform_evictions, daemon=True 

109 ) 

110 self._eviction_thread.start() 

111 

112 @staticmethod 

113 def get(): 

114 with ChannelCache._lock: 

115 if ChannelCache._singleton is None: 

116 ChannelCache._singleton = ChannelCache() 

117 ChannelCache._eviction_ready.wait() 

118 return ChannelCache._singleton 

119 

120 def _evict_locked(self, key: CacheKey): 

121 channel, _ = self._mapping.pop(key) 

122 _LOGGER.debug( 

123 "Evicting channel %s with configuration %s.", channel, key 

124 ) 

125 channel.close() 

126 del channel 

127 

128 @staticmethod 

129 def _perform_evictions(): 

130 while True: 

131 with ChannelCache._lock: 

132 ChannelCache._eviction_ready.set() 

133 if not ChannelCache._singleton._mapping: 

134 ChannelCache._condition.wait() 

135 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: 

136 key = next(iter(ChannelCache._singleton._mapping.keys())) 

137 ChannelCache._singleton._evict_locked(key) 

138 # And immediately reevaluate. 

139 else: 

140 key, (_, eviction_time) = next( 

141 iter(ChannelCache._singleton._mapping.items()) 

142 ) 

143 now = datetime.datetime.now() 

144 if eviction_time <= now: 

145 ChannelCache._singleton._evict_locked(key) 

146 continue 

147 else: 

148 time_to_eviction = (eviction_time - now).total_seconds() 

149 # NOTE: We aim to *eventually* coalesce to a state in 

150 # which no overdue channels are in the cache and the 

151 # length of the cache is longer than _MAXIMUM_CHANNELS. 

152 # We tolerate momentary states in which these two 

153 # criteria are not met. 

154 ChannelCache._condition.wait(timeout=time_to_eviction) 

155 

156 def get_channel( 

157 self, 

158 target: str, 

159 options: Sequence[Tuple[str, str]], 

160 channel_credentials: Optional[grpc.ChannelCredentials], 

161 insecure: bool, 

162 compression: Optional[grpc.Compression], 

163 method: str, 

164 _registered_method: bool, 

165 ) -> Tuple[grpc.Channel, Optional[int]]: 

166 """Get a channel from cache or creates a new channel. 

167 

168 This method also takes care of register method for channel, 

169 which means we'll register a new call handle if we're calling a 

170 non-registered method for an existing channel. 

171 

172 Returns: 

173 A tuple with two items. The first item is the channel, second item is 

174 the call handle if the method is registered, None if it's not registered. 

175 """ 

176 if insecure and channel_credentials: 

177 raise ValueError( 

178 "The insecure option is mutually exclusive with " 

179 + "the channel_credentials option. Please use one " 

180 + "or the other." 

181 ) 

182 if insecure: 

183 channel_credentials = ( 

184 grpc.experimental.insecure_channel_credentials() 

185 ) 

186 elif channel_credentials is None: 

187 _LOGGER.debug("Defaulting to SSL channel credentials.") 

188 channel_credentials = grpc.ssl_channel_credentials() 

189 key = (target, options, channel_credentials, compression) 

190 with self._lock: 

191 channel_data = self._mapping.get(key, None) 

192 call_handle = None 

193 if channel_data is not None: 

194 channel = channel_data[0] 

195 # Register a new call handle if we're calling a registered method for an 

196 # existing channel and this method is not registered. 

197 if _registered_method: 

198 call_handle = channel._get_registered_call_handle(method) 

199 self._mapping.pop(key) 

200 self._mapping[key] = ( 

201 channel, 

202 datetime.datetime.now() + _EVICTION_PERIOD, 

203 ) 

204 return channel, call_handle 

205 else: 

206 channel = _create_channel( 

207 target, options, channel_credentials, compression 

208 ) 

209 if _registered_method: 

210 call_handle = channel._get_registered_call_handle(method) 

211 self._mapping[key] = ( 

212 channel, 

213 datetime.datetime.now() + _EVICTION_PERIOD, 

214 ) 

215 if ( 

216 len(self._mapping) == 1 

217 or len(self._mapping) >= _MAXIMUM_CHANNELS 

218 ): 

219 self._condition.notify() 

220 return channel, call_handle 

221 

222 def _test_only_channel_count(self) -> int: 

223 with self._lock: 

224 return len(self._mapping) 

225 

226 

227@experimental_api 

228# pylint: disable=too-many-locals 

229def unary_unary( 

230 request: RequestType, 

231 target: str, 

232 method: str, 

233 request_serializer: Optional[Callable[[Any], bytes]] = None, 

234 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

235 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

236 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

237 insecure: bool = False, 

238 call_credentials: Optional[grpc.CallCredentials] = None, 

239 compression: Optional[grpc.Compression] = None, 

240 wait_for_ready: Optional[bool] = None, 

241 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

242 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

243 _registered_method: Optional[bool] = False, 

244) -> ResponseType: 

245 """Invokes a unary-unary RPC without an explicitly specified channel. 

246 

247 THIS IS AN EXPERIMENTAL API. 

248 

249 This is backed by a per-process cache of channels. Channels are evicted 

250 from the cache after a fixed period by a background. Channels will also be 

251 evicted if more than a configured maximum accumulate. 

252 

253 The default eviction period is 10 minutes. One may set the environment 

254 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

255 

256 The default maximum number of channels is 256. One may set the 

257 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

258 this. 

259 

260 Args: 

261 request: An iterator that yields request values for the RPC. 

262 target: The server address. 

263 method: The name of the RPC method. 

264 request_serializer: Optional :term:`serializer` for serializing the request 

265 message. Request goes unserialized in case None is passed. 

266 response_deserializer: Optional :term:`deserializer` for deserializing the response 

267 message. Response goes undeserialized in case None is passed. 

268 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

269 runtime) to configure the channel. 

270 channel_credentials: A credential applied to the whole channel, e.g. the 

271 return value of grpc.ssl_channel_credentials() or 

272 grpc.insecure_channel_credentials(). 

273 insecure: If True, specifies channel_credentials as 

274 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

275 exclusive with the `channel_credentials` option. 

276 call_credentials: A call credential applied to each call individually, 

277 e.g. the output of grpc.metadata_call_credentials() or 

278 grpc.access_token_call_credentials(). 

279 compression: An optional value indicating the compression method to be 

280 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

281 wait_for_ready: An optional flag indicating whether the RPC should fail 

282 immediately if the connection is not ready at the time the RPC is 

283 invoked, or if it should wait until the connection to the server 

284 becomes ready. When using this option, the user will likely also want 

285 to set a timeout. Defaults to True. 

286 timeout: An optional duration of time in seconds to allow for the RPC, 

287 after which an exception will be raised. If timeout is unspecified, 

288 defaults to a timeout controlled by the 

289 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

290 unset, defaults to 60 seconds. Supply a value of None to indicate that 

291 no timeout should be enforced. 

292 metadata: Optional metadata to send to the server. 

293 

294 Returns: 

295 The response to the RPC. 

296 """ 

297 channel, method_handle = ChannelCache.get().get_channel( 

298 target, 

299 options, 

300 channel_credentials, 

301 insecure, 

302 compression, 

303 method, 

304 _registered_method, 

305 ) 

306 multicallable = channel.unary_unary( 

307 method, request_serializer, response_deserializer, method_handle 

308 ) 

309 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

310 return multicallable( 

311 request, 

312 metadata=metadata, 

313 wait_for_ready=wait_for_ready, 

314 credentials=call_credentials, 

315 timeout=timeout, 

316 ) 

317 

318 

319@experimental_api 

320# pylint: disable=too-many-locals 

321def unary_stream( 

322 request: RequestType, 

323 target: str, 

324 method: str, 

325 request_serializer: Optional[Callable[[Any], bytes]] = None, 

326 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

327 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

328 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

329 insecure: bool = False, 

330 call_credentials: Optional[grpc.CallCredentials] = None, 

331 compression: Optional[grpc.Compression] = None, 

332 wait_for_ready: Optional[bool] = None, 

333 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

334 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

335 _registered_method: Optional[bool] = False, 

336) -> Iterator[ResponseType]: 

337 """Invokes a unary-stream RPC without an explicitly specified channel. 

338 

339 THIS IS AN EXPERIMENTAL API. 

340 

341 This is backed by a per-process cache of channels. Channels are evicted 

342 from the cache after a fixed period by a background. Channels will also be 

343 evicted if more than a configured maximum accumulate. 

344 

345 The default eviction period is 10 minutes. One may set the environment 

346 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

347 

348 The default maximum number of channels is 256. One may set the 

349 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

350 this. 

351 

352 Args: 

353 request: An iterator that yields request values for the RPC. 

354 target: The server address. 

355 method: The name of the RPC method. 

356 request_serializer: Optional :term:`serializer` for serializing the request 

357 message. Request goes unserialized in case None is passed. 

358 response_deserializer: Optional :term:`deserializer` for deserializing the response 

359 message. Response goes undeserialized in case None is passed. 

360 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

361 runtime) to configure the channel. 

362 channel_credentials: A credential applied to the whole channel, e.g. the 

363 return value of grpc.ssl_channel_credentials(). 

364 insecure: If True, specifies channel_credentials as 

365 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

366 exclusive with the `channel_credentials` option. 

367 call_credentials: A call credential applied to each call individually, 

368 e.g. the output of grpc.metadata_call_credentials() or 

369 grpc.access_token_call_credentials(). 

370 compression: An optional value indicating the compression method to be 

371 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

372 wait_for_ready: An optional flag indicating whether the RPC should fail 

373 immediately if the connection is not ready at the time the RPC is 

374 invoked, or if it should wait until the connection to the server 

375 becomes ready. When using this option, the user will likely also want 

376 to set a timeout. Defaults to True. 

377 timeout: An optional duration of time in seconds to allow for the RPC, 

378 after which an exception will be raised. If timeout is unspecified, 

379 defaults to a timeout controlled by the 

380 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

381 unset, defaults to 60 seconds. Supply a value of None to indicate that 

382 no timeout should be enforced. 

383 metadata: Optional metadata to send to the server. 

384 

385 Returns: 

386 An iterator of responses. 

387 """ 

388 channel, method_handle = ChannelCache.get().get_channel( 

389 target, 

390 options, 

391 channel_credentials, 

392 insecure, 

393 compression, 

394 method, 

395 _registered_method, 

396 ) 

397 multicallable = channel.unary_stream( 

398 method, request_serializer, response_deserializer, method_handle 

399 ) 

400 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

401 return multicallable( 

402 request, 

403 metadata=metadata, 

404 wait_for_ready=wait_for_ready, 

405 credentials=call_credentials, 

406 timeout=timeout, 

407 ) 

408 

409 

410@experimental_api 

411# pylint: disable=too-many-locals 

412def stream_unary( 

413 request_iterator: Iterator[RequestType], 

414 target: str, 

415 method: str, 

416 request_serializer: Optional[Callable[[Any], bytes]] = None, 

417 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

418 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

419 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

420 insecure: bool = False, 

421 call_credentials: Optional[grpc.CallCredentials] = None, 

422 compression: Optional[grpc.Compression] = None, 

423 wait_for_ready: Optional[bool] = None, 

424 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

425 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

426 _registered_method: Optional[bool] = False, 

427) -> ResponseType: 

428 """Invokes a stream-unary RPC without an explicitly specified channel. 

429 

430 THIS IS AN EXPERIMENTAL API. 

431 

432 This is backed by a per-process cache of channels. Channels are evicted 

433 from the cache after a fixed period by a background. Channels will also be 

434 evicted if more than a configured maximum accumulate. 

435 

436 The default eviction period is 10 minutes. One may set the environment 

437 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

438 

439 The default maximum number of channels is 256. One may set the 

440 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

441 this. 

442 

443 Args: 

444 request_iterator: An iterator that yields request values for the RPC. 

445 target: The server address. 

446 method: The name of the RPC method. 

447 request_serializer: Optional :term:`serializer` for serializing the request 

448 message. Request goes unserialized in case None is passed. 

449 response_deserializer: Optional :term:`deserializer` for deserializing the response 

450 message. Response goes undeserialized in case None is passed. 

451 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

452 runtime) to configure the channel. 

453 channel_credentials: A credential applied to the whole channel, e.g. the 

454 return value of grpc.ssl_channel_credentials(). 

455 call_credentials: A call credential applied to each call individually, 

456 e.g. the output of grpc.metadata_call_credentials() or 

457 grpc.access_token_call_credentials(). 

458 insecure: If True, specifies channel_credentials as 

459 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

460 exclusive with the `channel_credentials` option. 

461 compression: An optional value indicating the compression method to be 

462 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

463 wait_for_ready: An optional flag indicating whether the RPC should fail 

464 immediately if the connection is not ready at the time the RPC is 

465 invoked, or if it should wait until the connection to the server 

466 becomes ready. When using this option, the user will likely also want 

467 to set a timeout. Defaults to True. 

468 timeout: An optional duration of time in seconds to allow for the RPC, 

469 after which an exception will be raised. If timeout is unspecified, 

470 defaults to a timeout controlled by the 

471 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

472 unset, defaults to 60 seconds. Supply a value of None to indicate that 

473 no timeout should be enforced. 

474 metadata: Optional metadata to send to the server. 

475 

476 Returns: 

477 The response to the RPC. 

478 """ 

479 channel, method_handle = ChannelCache.get().get_channel( 

480 target, 

481 options, 

482 channel_credentials, 

483 insecure, 

484 compression, 

485 method, 

486 _registered_method, 

487 ) 

488 multicallable = channel.stream_unary( 

489 method, request_serializer, response_deserializer, method_handle 

490 ) 

491 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

492 return multicallable( 

493 request_iterator, 

494 metadata=metadata, 

495 wait_for_ready=wait_for_ready, 

496 credentials=call_credentials, 

497 timeout=timeout, 

498 ) 

499 

500 

501@experimental_api 

502# pylint: disable=too-many-locals 

503def stream_stream( 

504 request_iterator: Iterator[RequestType], 

505 target: str, 

506 method: str, 

507 request_serializer: Optional[Callable[[Any], bytes]] = None, 

508 response_deserializer: Optional[Callable[[bytes], Any]] = None, 

509 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 

510 channel_credentials: Optional[grpc.ChannelCredentials] = None, 

511 insecure: bool = False, 

512 call_credentials: Optional[grpc.CallCredentials] = None, 

513 compression: Optional[grpc.Compression] = None, 

514 wait_for_ready: Optional[bool] = None, 

515 timeout: Optional[float] = _DEFAULT_TIMEOUT, 

516 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 

517 _registered_method: Optional[bool] = False, 

518) -> Iterator[ResponseType]: 

519 """Invokes a stream-stream RPC without an explicitly specified channel. 

520 

521 THIS IS AN EXPERIMENTAL API. 

522 

523 This is backed by a per-process cache of channels. Channels are evicted 

524 from the cache after a fixed period by a background. Channels will also be 

525 evicted if more than a configured maximum accumulate. 

526 

527 The default eviction period is 10 minutes. One may set the environment 

528 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 

529 

530 The default maximum number of channels is 256. One may set the 

531 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 

532 this. 

533 

534 Args: 

535 request_iterator: An iterator that yields request values for the RPC. 

536 target: The server address. 

537 method: The name of the RPC method. 

538 request_serializer: Optional :term:`serializer` for serializing the request 

539 message. Request goes unserialized in case None is passed. 

540 response_deserializer: Optional :term:`deserializer` for deserializing the response 

541 message. Response goes undeserialized in case None is passed. 

542 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 

543 runtime) to configure the channel. 

544 channel_credentials: A credential applied to the whole channel, e.g. the 

545 return value of grpc.ssl_channel_credentials(). 

546 call_credentials: A call credential applied to each call individually, 

547 e.g. the output of grpc.metadata_call_credentials() or 

548 grpc.access_token_call_credentials(). 

549 insecure: If True, specifies channel_credentials as 

550 :term:`grpc.insecure_channel_credentials()`. This option is mutually 

551 exclusive with the `channel_credentials` option. 

552 compression: An optional value indicating the compression method to be 

553 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 

554 wait_for_ready: An optional flag indicating whether the RPC should fail 

555 immediately if the connection is not ready at the time the RPC is 

556 invoked, or if it should wait until the connection to the server 

557 becomes ready. When using this option, the user will likely also want 

558 to set a timeout. Defaults to True. 

559 timeout: An optional duration of time in seconds to allow for the RPC, 

560 after which an exception will be raised. If timeout is unspecified, 

561 defaults to a timeout controlled by the 

562 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 

563 unset, defaults to 60 seconds. Supply a value of None to indicate that 

564 no timeout should be enforced. 

565 metadata: Optional metadata to send to the server. 

566 

567 Returns: 

568 An iterator of responses. 

569 """ 

570 channel, method_handle = ChannelCache.get().get_channel( 

571 target, 

572 options, 

573 channel_credentials, 

574 insecure, 

575 compression, 

576 method, 

577 _registered_method, 

578 ) 

579 multicallable = channel.stream_stream( 

580 method, request_serializer, response_deserializer, method_handle 

581 ) 

582 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 

583 return multicallable( 

584 request_iterator, 

585 metadata=metadata, 

586 wait_for_ready=wait_for_ready, 

587 credentials=call_credentials, 

588 timeout=timeout, 

589 )