Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/transports/grpc_asyncio.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

172 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import pickle 

19import logging as std_logging 

20import warnings 

21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

22 

23from google.api_core import gapic_v1 

24from google.api_core import grpc_helpers_async 

25from google.api_core import exceptions as core_exceptions 

26from google.api_core import retry_async as retries 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30import google.protobuf.message 

31 

32import grpc # type: ignore 

33import proto # type: ignore 

34from grpc.experimental import aio # type: ignore 

35 

36from google.iam.v1 import iam_policy_pb2 # type: ignore 

37from google.iam.v1 import policy_pb2 # type: ignore 

38from google.protobuf import empty_pb2 # type: ignore 

39from google.pubsub_v1.types import pubsub 

40from .base import SubscriberTransport, DEFAULT_CLIENT_INFO 

41from .grpc import SubscriberGrpcTransport 

42 

43try: 

44 from google.api_core import client_logging # type: ignore 

45 

46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

47except ImportError: # pragma: NO COVER 

48 CLIENT_LOGGING_SUPPORTED = False 

49 

50_LOGGER = std_logging.getLogger(__name__) 

51 

52 

53class _LoggingClientAIOInterceptor( 

54 grpc.aio.UnaryUnaryClientInterceptor 

55): # pragma: NO COVER 

56 async def intercept_unary_unary(self, continuation, client_call_details, request): 

57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

58 std_logging.DEBUG 

59 ) 

60 if logging_enabled: # pragma: NO COVER 

61 request_metadata = client_call_details.metadata 

62 if isinstance(request, proto.Message): 

63 request_payload = type(request).to_json(request) 

64 elif isinstance(request, google.protobuf.message.Message): 

65 request_payload = MessageToJson(request) 

66 else: 

67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

68 

69 request_metadata = { 

70 key: value.decode("utf-8") if isinstance(value, bytes) else value 

71 for key, value in request_metadata 

72 } 

73 grpc_request = { 

74 "payload": request_payload, 

75 "requestMethod": "grpc", 

76 "metadata": dict(request_metadata), 

77 } 

78 _LOGGER.debug( 

79 f"Sending request for {client_call_details.method}", 

80 extra={ 

81 "serviceName": "google.pubsub.v1.Subscriber", 

82 "rpcName": str(client_call_details.method), 

83 "request": grpc_request, 

84 "metadata": grpc_request["metadata"], 

85 }, 

86 ) 

87 response = await continuation(client_call_details, request) 

88 if logging_enabled: # pragma: NO COVER 

89 response_metadata = await response.trailing_metadata() 

90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

91 metadata = ( 

92 dict([(k, str(v)) for k, v in response_metadata]) 

93 if response_metadata 

94 else None 

95 ) 

96 result = await response 

97 if isinstance(result, proto.Message): 

98 response_payload = type(result).to_json(result) 

99 elif isinstance(result, google.protobuf.message.Message): 

100 response_payload = MessageToJson(result) 

101 else: 

102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

103 grpc_response = { 

104 "payload": response_payload, 

105 "metadata": metadata, 

106 "status": "OK", 

107 } 

108 _LOGGER.debug( 

109 f"Received response to rpc {client_call_details.method}.", 

110 extra={ 

111 "serviceName": "google.pubsub.v1.Subscriber", 

112 "rpcName": str(client_call_details.method), 

113 "response": grpc_response, 

114 "metadata": grpc_response["metadata"], 

115 }, 

116 ) 

117 return response 

118 

119 

120class SubscriberGrpcAsyncIOTransport(SubscriberTransport): 

121 """gRPC AsyncIO backend transport for Subscriber. 

122 

123 The service that an application uses to manipulate subscriptions and 

124 to consume messages from a subscription via the ``Pull`` method or 

125 by establishing a bi-directional stream using the ``StreamingPull`` 

126 method. 

127 

128 This class defines the same methods as the primary client, so the 

129 primary client can load the underlying transport implementation 

130 and call it. 

131 

132 It sends protocol buffers over the wire using gRPC (which is built on 

133 top of HTTP/2); the ``grpcio`` package must be installed. 

134 """ 

135 

136 _grpc_channel: aio.Channel 

137 _stubs: Dict[str, Callable] = {} 

138 

139 @classmethod 

140 def create_channel( 

141 cls, 

142 host: str = "pubsub.googleapis.com", 

143 credentials: Optional[ga_credentials.Credentials] = None, 

144 credentials_file: Optional[str] = None, 

145 scopes: Optional[Sequence[str]] = None, 

146 quota_project_id: Optional[str] = None, 

147 **kwargs, 

148 ) -> aio.Channel: 

149 """Create and return a gRPC AsyncIO channel object. 

150 Args: 

151 host (Optional[str]): The host for the channel to use. 

152 credentials (Optional[~.Credentials]): The 

153 authorization credentials to attach to requests. These 

154 credentials identify this application to the service. If 

155 none are specified, the client will attempt to ascertain 

156 the credentials from the environment. 

157 credentials_file (Optional[str]): A file with credentials that can 

158 be loaded with :func:`google.auth.load_credentials_from_file`. 

159 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

160 service. These are only used when credentials are not specified and 

161 are passed to :func:`google.auth.default`. 

162 quota_project_id (Optional[str]): An optional project to use for billing 

163 and quota. 

164 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

165 channel creation. 

166 Returns: 

167 aio.Channel: A gRPC AsyncIO channel object. 

168 """ 

169 

170 return grpc_helpers_async.create_channel( 

171 host, 

172 credentials=credentials, 

173 credentials_file=credentials_file, 

174 quota_project_id=quota_project_id, 

175 default_scopes=cls.AUTH_SCOPES, 

176 scopes=scopes, 

177 default_host=cls.DEFAULT_HOST, 

178 **kwargs, 

179 ) 

180 

181 def __init__( 

182 self, 

183 *, 

184 host: str = "pubsub.googleapis.com", 

185 credentials: Optional[ga_credentials.Credentials] = None, 

186 credentials_file: Optional[str] = None, 

187 scopes: Optional[Sequence[str]] = None, 

188 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

189 api_mtls_endpoint: Optional[str] = None, 

190 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

191 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

192 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

193 quota_project_id: Optional[str] = None, 

194 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

195 always_use_jwt_access: Optional[bool] = False, 

196 api_audience: Optional[str] = None, 

197 ) -> None: 

198 """Instantiate the transport. 

199 

200 Args: 

201 host (Optional[str]): 

202 The hostname to connect to (default: 'pubsub.googleapis.com'). 

203 credentials (Optional[google.auth.credentials.Credentials]): The 

204 authorization credentials to attach to requests. These 

205 credentials identify the application to the service; if none 

206 are specified, the client will attempt to ascertain the 

207 credentials from the environment. 

208 This argument is ignored if a ``channel`` instance is provided. 

209 credentials_file (Optional[str]): A file with credentials that can 

210 be loaded with :func:`google.auth.load_credentials_from_file`. 

211 This argument is ignored if a ``channel`` instance is provided. 

212 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

213 service. These are only used when credentials are not specified and 

214 are passed to :func:`google.auth.default`. 

215 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

216 A ``Channel`` instance through which to make calls, or a Callable 

217 that constructs and returns one. If set to None, ``self.create_channel`` 

218 is used to create the channel. If a Callable is given, it will be called 

219 with the same arguments as used in ``self.create_channel``. 

220 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

221 If provided, it overrides the ``host`` argument and tries to create 

222 a mutual TLS channel with client SSL credentials from 

223 ``client_cert_source`` or application default SSL credentials. 

224 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

225 Deprecated. A callback to provide client SSL certificate bytes and 

226 private key bytes, both in PEM format. It is ignored if 

227 ``api_mtls_endpoint`` is None. 

228 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

229 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

230 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

231 A callback to provide client certificate bytes and private key bytes, 

232 both in PEM format. It is used to configure a mutual TLS channel. It is 

233 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

234 quota_project_id (Optional[str]): An optional project to use for billing 

235 and quota. 

236 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

237 The client info used to send a user-agent string along with 

238 API requests. If ``None``, then default info will be used. 

239 Generally, you only need to set this if you're developing 

240 your own client library. 

241 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

242 be used for service account credentials. 

243 

244 Raises: 

245 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

246 creation failed for any reason. 

247 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

248 and ``credentials_file`` are passed. 

249 """ 

250 self._grpc_channel = None 

251 self._ssl_channel_credentials = ssl_channel_credentials 

252 self._stubs: Dict[str, Callable] = {} 

253 

254 if api_mtls_endpoint: 

255 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

256 if client_cert_source: 

257 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

258 

259 if isinstance(channel, aio.Channel): 

260 # Ignore credentials if a channel was passed. 

261 credentials = None 

262 self._ignore_credentials = True 

263 # If a channel was explicitly provided, set it. 

264 self._grpc_channel = channel 

265 self._ssl_channel_credentials = None 

266 else: 

267 if api_mtls_endpoint: 

268 host = api_mtls_endpoint 

269 

270 # Create SSL credentials with client_cert_source or application 

271 # default SSL credentials. 

272 if client_cert_source: 

273 cert, key = client_cert_source() 

274 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

275 certificate_chain=cert, private_key=key 

276 ) 

277 else: 

278 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

279 

280 else: 

281 if client_cert_source_for_mtls and not ssl_channel_credentials: 

282 cert, key = client_cert_source_for_mtls() 

283 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

284 certificate_chain=cert, private_key=key 

285 ) 

286 

287 # The base transport sets the host, credentials and scopes 

288 super().__init__( 

289 host=host, 

290 credentials=credentials, 

291 credentials_file=credentials_file, 

292 scopes=scopes, 

293 quota_project_id=quota_project_id, 

294 client_info=client_info, 

295 always_use_jwt_access=always_use_jwt_access, 

296 api_audience=api_audience, 

297 ) 

298 

299 if not self._grpc_channel: 

300 # initialize with the provided callable or the default channel 

301 channel_init = channel or type(self).create_channel 

302 self._grpc_channel = channel_init( 

303 self._host, 

304 # use the credentials which are saved 

305 credentials=self._credentials, 

306 # Set ``credentials_file`` to ``None`` here as 

307 # the credentials that we saved earlier should be used. 

308 credentials_file=None, 

309 scopes=self._scopes, 

310 ssl_credentials=self._ssl_channel_credentials, 

311 quota_project_id=quota_project_id, 

312 options=[ 

313 ("grpc.max_send_message_length", -1), 

314 ("grpc.max_receive_message_length", -1), 

315 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

316 ("grpc.keepalive_time_ms", 30000), 

317 ], 

318 ) 

319 

320 self._interceptor = _LoggingClientAIOInterceptor() 

321 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

322 self._logged_channel = self._grpc_channel 

323 self._wrap_with_kind = ( 

324 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

325 ) 

326 # Wrap messages. This must be done after self._logged_channel exists 

327 self._prep_wrapped_messages(client_info) 

328 

329 @property 

330 def grpc_channel(self) -> aio.Channel: 

331 """Create the channel designed to connect to this service. 

332 

333 This property caches on the instance; repeated calls return 

334 the same channel. 

335 """ 

336 # Return the channel from cache. 

337 return self._grpc_channel 

338 

339 @property 

340 def create_subscription( 

341 self, 

342 ) -> Callable[[pubsub.Subscription], Awaitable[pubsub.Subscription]]: 

343 r"""Return a callable for the create subscription method over gRPC. 

344 

345 Creates a subscription to a given topic. See the [resource name 

346 rules] 

347 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

348 If the subscription already exists, returns ``ALREADY_EXISTS``. 

349 If the corresponding topic doesn't exist, returns ``NOT_FOUND``. 

350 

351 If the name is not provided in the request, the server will 

352 assign a random name for this subscription on the same project 

353 as the topic, conforming to the [resource name format] 

354 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

355 The generated name is populated in the returned Subscription 

356 object. Note that for REST API requests, you must specify a name 

357 in the request. 

358 

359 Returns: 

360 Callable[[~.Subscription], 

361 Awaitable[~.Subscription]]: 

362 A function that, when called, will call the underlying RPC 

363 on the server. 

364 """ 

365 # Generate a "stub function" on-the-fly which will actually make 

366 # the request. 

367 # gRPC handles serialization and deserialization, so we just need 

368 # to pass in the functions for each. 

369 if "create_subscription" not in self._stubs: 

370 self._stubs["create_subscription"] = self._logged_channel.unary_unary( 

371 "/google.pubsub.v1.Subscriber/CreateSubscription", 

372 request_serializer=pubsub.Subscription.serialize, 

373 response_deserializer=pubsub.Subscription.deserialize, 

374 ) 

375 return self._stubs["create_subscription"] 

376 

377 @property 

378 def get_subscription( 

379 self, 

380 ) -> Callable[[pubsub.GetSubscriptionRequest], Awaitable[pubsub.Subscription]]: 

381 r"""Return a callable for the get subscription method over gRPC. 

382 

383 Gets the configuration details of a subscription. 

384 

385 Returns: 

386 Callable[[~.GetSubscriptionRequest], 

387 Awaitable[~.Subscription]]: 

388 A function that, when called, will call the underlying RPC 

389 on the server. 

390 """ 

391 # Generate a "stub function" on-the-fly which will actually make 

392 # the request. 

393 # gRPC handles serialization and deserialization, so we just need 

394 # to pass in the functions for each. 

395 if "get_subscription" not in self._stubs: 

396 self._stubs["get_subscription"] = self._logged_channel.unary_unary( 

397 "/google.pubsub.v1.Subscriber/GetSubscription", 

398 request_serializer=pubsub.GetSubscriptionRequest.serialize, 

399 response_deserializer=pubsub.Subscription.deserialize, 

400 ) 

401 return self._stubs["get_subscription"] 

402 

403 @property 

404 def update_subscription( 

405 self, 

406 ) -> Callable[[pubsub.UpdateSubscriptionRequest], Awaitable[pubsub.Subscription]]: 

407 r"""Return a callable for the update subscription method over gRPC. 

408 

409 Updates an existing subscription by updating the 

410 fields specified in the update mask. Note that certain 

411 properties of a subscription, such as its topic, are not 

412 modifiable. 

413 

414 Returns: 

415 Callable[[~.UpdateSubscriptionRequest], 

416 Awaitable[~.Subscription]]: 

417 A function that, when called, will call the underlying RPC 

418 on the server. 

419 """ 

420 # Generate a "stub function" on-the-fly which will actually make 

421 # the request. 

422 # gRPC handles serialization and deserialization, so we just need 

423 # to pass in the functions for each. 

424 if "update_subscription" not in self._stubs: 

425 self._stubs["update_subscription"] = self._logged_channel.unary_unary( 

426 "/google.pubsub.v1.Subscriber/UpdateSubscription", 

427 request_serializer=pubsub.UpdateSubscriptionRequest.serialize, 

428 response_deserializer=pubsub.Subscription.deserialize, 

429 ) 

430 return self._stubs["update_subscription"] 

431 

432 @property 

433 def list_subscriptions( 

434 self, 

435 ) -> Callable[ 

436 [pubsub.ListSubscriptionsRequest], Awaitable[pubsub.ListSubscriptionsResponse] 

437 ]: 

438 r"""Return a callable for the list subscriptions method over gRPC. 

439 

440 Lists matching subscriptions. 

441 

442 Returns: 

443 Callable[[~.ListSubscriptionsRequest], 

444 Awaitable[~.ListSubscriptionsResponse]]: 

445 A function that, when called, will call the underlying RPC 

446 on the server. 

447 """ 

448 # Generate a "stub function" on-the-fly which will actually make 

449 # the request. 

450 # gRPC handles serialization and deserialization, so we just need 

451 # to pass in the functions for each. 

452 if "list_subscriptions" not in self._stubs: 

453 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary( 

454 "/google.pubsub.v1.Subscriber/ListSubscriptions", 

455 request_serializer=pubsub.ListSubscriptionsRequest.serialize, 

456 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize, 

457 ) 

458 return self._stubs["list_subscriptions"] 

459 

460 @property 

461 def delete_subscription( 

462 self, 

463 ) -> Callable[[pubsub.DeleteSubscriptionRequest], Awaitable[empty_pb2.Empty]]: 

464 r"""Return a callable for the delete subscription method over gRPC. 

465 

466 Deletes an existing subscription. All messages retained in the 

467 subscription are immediately dropped. Calls to ``Pull`` after 

468 deletion will return ``NOT_FOUND``. After a subscription is 

469 deleted, a new one may be created with the same name, but the 

470 new one has no association with the old subscription or its 

471 topic unless the same topic is specified. 

472 

473 Returns: 

474 Callable[[~.DeleteSubscriptionRequest], 

475 Awaitable[~.Empty]]: 

476 A function that, when called, will call the underlying RPC 

477 on the server. 

478 """ 

479 # Generate a "stub function" on-the-fly which will actually make 

480 # the request. 

481 # gRPC handles serialization and deserialization, so we just need 

482 # to pass in the functions for each. 

483 if "delete_subscription" not in self._stubs: 

484 self._stubs["delete_subscription"] = self._logged_channel.unary_unary( 

485 "/google.pubsub.v1.Subscriber/DeleteSubscription", 

486 request_serializer=pubsub.DeleteSubscriptionRequest.serialize, 

487 response_deserializer=empty_pb2.Empty.FromString, 

488 ) 

489 return self._stubs["delete_subscription"] 

490 

491 @property 

492 def modify_ack_deadline( 

493 self, 

494 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], Awaitable[empty_pb2.Empty]]: 

495 r"""Return a callable for the modify ack deadline method over gRPC. 

496 

497 Modifies the ack deadline for a specific message. This method is 

498 useful to indicate that more time is needed to process a message 

499 by the subscriber, or to make the message available for 

500 redelivery if the processing was interrupted. Note that this 

501 does not modify the subscription-level ``ackDeadlineSeconds`` 

502 used for subsequent messages. 

503 

504 Returns: 

505 Callable[[~.ModifyAckDeadlineRequest], 

506 Awaitable[~.Empty]]: 

507 A function that, when called, will call the underlying RPC 

508 on the server. 

509 """ 

510 # Generate a "stub function" on-the-fly which will actually make 

511 # the request. 

512 # gRPC handles serialization and deserialization, so we just need 

513 # to pass in the functions for each. 

514 if "modify_ack_deadline" not in self._stubs: 

515 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary( 

516 "/google.pubsub.v1.Subscriber/ModifyAckDeadline", 

517 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize, 

518 response_deserializer=empty_pb2.Empty.FromString, 

519 ) 

520 return self._stubs["modify_ack_deadline"] 

521 

522 @property 

523 def acknowledge( 

524 self, 

525 ) -> Callable[[pubsub.AcknowledgeRequest], Awaitable[empty_pb2.Empty]]: 

526 r"""Return a callable for the acknowledge method over gRPC. 

527 

528 Acknowledges the messages associated with the ``ack_ids`` in the 

529 ``AcknowledgeRequest``. The Pub/Sub system can remove the 

530 relevant messages from the subscription. 

531 

532 Acknowledging a message whose ack deadline has expired may 

533 succeed, but such a message may be redelivered later. 

534 Acknowledging a message more than once will not result in an 

535 error. 

536 

537 Returns: 

538 Callable[[~.AcknowledgeRequest], 

539 Awaitable[~.Empty]]: 

540 A function that, when called, will call the underlying RPC 

541 on the server. 

542 """ 

543 # Generate a "stub function" on-the-fly which will actually make 

544 # the request. 

545 # gRPC handles serialization and deserialization, so we just need 

546 # to pass in the functions for each. 

547 if "acknowledge" not in self._stubs: 

548 self._stubs["acknowledge"] = self._logged_channel.unary_unary( 

549 "/google.pubsub.v1.Subscriber/Acknowledge", 

550 request_serializer=pubsub.AcknowledgeRequest.serialize, 

551 response_deserializer=empty_pb2.Empty.FromString, 

552 ) 

553 return self._stubs["acknowledge"] 

554 

555 @property 

556 def pull(self) -> Callable[[pubsub.PullRequest], Awaitable[pubsub.PullResponse]]: 

557 r"""Return a callable for the pull method over gRPC. 

558 

559 Pulls messages from the server. 

560 

561 Returns: 

562 Callable[[~.PullRequest], 

563 Awaitable[~.PullResponse]]: 

564 A function that, when called, will call the underlying RPC 

565 on the server. 

566 """ 

567 # Generate a "stub function" on-the-fly which will actually make 

568 # the request. 

569 # gRPC handles serialization and deserialization, so we just need 

570 # to pass in the functions for each. 

571 if "pull" not in self._stubs: 

572 self._stubs["pull"] = self._logged_channel.unary_unary( 

573 "/google.pubsub.v1.Subscriber/Pull", 

574 request_serializer=pubsub.PullRequest.serialize, 

575 response_deserializer=pubsub.PullResponse.deserialize, 

576 ) 

577 return self._stubs["pull"] 

578 

579 @property 

580 def streaming_pull( 

581 self, 

582 ) -> Callable[ 

583 [pubsub.StreamingPullRequest], Awaitable[pubsub.StreamingPullResponse] 

584 ]: 

585 r"""Return a callable for the streaming pull method over gRPC. 

586 

587 Establishes a stream with the server, which sends messages down 

588 to the client. The client streams acknowledgments and ack 

589 deadline modifications back to the server. The server will close 

590 the stream and return the status on any error. The server may 

591 close the stream with status ``UNAVAILABLE`` to reassign 

592 server-side resources, in which case, the client should 

593 re-establish the stream. Flow control can be achieved by 

594 configuring the underlying RPC channel. 

595 

596 Returns: 

597 Callable[[~.StreamingPullRequest], 

598 Awaitable[~.StreamingPullResponse]]: 

599 A function that, when called, will call the underlying RPC 

600 on the server. 

601 """ 

602 # Generate a "stub function" on-the-fly which will actually make 

603 # the request. 

604 # gRPC handles serialization and deserialization, so we just need 

605 # to pass in the functions for each. 

606 if "streaming_pull" not in self._stubs: 

607 self._stubs["streaming_pull"] = self._logged_channel.stream_stream( 

608 "/google.pubsub.v1.Subscriber/StreamingPull", 

609 request_serializer=pubsub.StreamingPullRequest.serialize, 

610 response_deserializer=pubsub.StreamingPullResponse.deserialize, 

611 ) 

612 return self._stubs["streaming_pull"] 

613 

614 @property 

615 def modify_push_config( 

616 self, 

617 ) -> Callable[[pubsub.ModifyPushConfigRequest], Awaitable[empty_pb2.Empty]]: 

618 r"""Return a callable for the modify push config method over gRPC. 

619 

620 Modifies the ``PushConfig`` for a specified subscription. 

621 

622 This may be used to change a push subscription to a pull one 

623 (signified by an empty ``PushConfig``) or vice versa, or change 

624 the endpoint URL and other attributes of a push subscription. 

625 Messages will accumulate for delivery continuously through the 

626 call regardless of changes to the ``PushConfig``. 

627 

628 Returns: 

629 Callable[[~.ModifyPushConfigRequest], 

630 Awaitable[~.Empty]]: 

631 A function that, when called, will call the underlying RPC 

632 on the server. 

633 """ 

634 # Generate a "stub function" on-the-fly which will actually make 

635 # the request. 

636 # gRPC handles serialization and deserialization, so we just need 

637 # to pass in the functions for each. 

638 if "modify_push_config" not in self._stubs: 

639 self._stubs["modify_push_config"] = self._logged_channel.unary_unary( 

640 "/google.pubsub.v1.Subscriber/ModifyPushConfig", 

641 request_serializer=pubsub.ModifyPushConfigRequest.serialize, 

642 response_deserializer=empty_pb2.Empty.FromString, 

643 ) 

644 return self._stubs["modify_push_config"] 

645 

646 @property 

647 def get_snapshot( 

648 self, 

649 ) -> Callable[[pubsub.GetSnapshotRequest], Awaitable[pubsub.Snapshot]]: 

650 r"""Return a callable for the get snapshot method over gRPC. 

651 

652 Gets the configuration details of a snapshot. Snapshots are used 

653 in 

654 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

655 operations, which allow you to manage message acknowledgments in 

656 bulk. That is, you can set the acknowledgment state of messages 

657 in an existing subscription to the state captured by a snapshot. 

658 

659 Returns: 

660 Callable[[~.GetSnapshotRequest], 

661 Awaitable[~.Snapshot]]: 

662 A function that, when called, will call the underlying RPC 

663 on the server. 

664 """ 

665 # Generate a "stub function" on-the-fly which will actually make 

666 # the request. 

667 # gRPC handles serialization and deserialization, so we just need 

668 # to pass in the functions for each. 

669 if "get_snapshot" not in self._stubs: 

670 self._stubs["get_snapshot"] = self._logged_channel.unary_unary( 

671 "/google.pubsub.v1.Subscriber/GetSnapshot", 

672 request_serializer=pubsub.GetSnapshotRequest.serialize, 

673 response_deserializer=pubsub.Snapshot.deserialize, 

674 ) 

675 return self._stubs["get_snapshot"] 

676 

677 @property 

678 def list_snapshots( 

679 self, 

680 ) -> Callable[ 

681 [pubsub.ListSnapshotsRequest], Awaitable[pubsub.ListSnapshotsResponse] 

682 ]: 

683 r"""Return a callable for the list snapshots method over gRPC. 

684 

685 Lists the existing snapshots. Snapshots are used in 

686 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

687 operations, which allow you to manage message acknowledgments in 

688 bulk. That is, you can set the acknowledgment state of messages 

689 in an existing subscription to the state captured by a snapshot. 

690 

691 Returns: 

692 Callable[[~.ListSnapshotsRequest], 

693 Awaitable[~.ListSnapshotsResponse]]: 

694 A function that, when called, will call the underlying RPC 

695 on the server. 

696 """ 

697 # Generate a "stub function" on-the-fly which will actually make 

698 # the request. 

699 # gRPC handles serialization and deserialization, so we just need 

700 # to pass in the functions for each. 

701 if "list_snapshots" not in self._stubs: 

702 self._stubs["list_snapshots"] = self._logged_channel.unary_unary( 

703 "/google.pubsub.v1.Subscriber/ListSnapshots", 

704 request_serializer=pubsub.ListSnapshotsRequest.serialize, 

705 response_deserializer=pubsub.ListSnapshotsResponse.deserialize, 

706 ) 

707 return self._stubs["list_snapshots"] 

708 

709 @property 

710 def create_snapshot( 

711 self, 

712 ) -> Callable[[pubsub.CreateSnapshotRequest], Awaitable[pubsub.Snapshot]]: 

713 r"""Return a callable for the create snapshot method over gRPC. 

714 

715 Creates a snapshot from the requested subscription. Snapshots 

716 are used in 

717 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

718 operations, which allow you to manage message acknowledgments in 

719 bulk. That is, you can set the acknowledgment state of messages 

720 in an existing subscription to the state captured by a snapshot. 

721 If the snapshot already exists, returns ``ALREADY_EXISTS``. If 

722 the requested subscription doesn't exist, returns ``NOT_FOUND``. 

723 If the backlog in the subscription is too old -- and the 

724 resulting snapshot would expire in less than 1 hour -- then 

725 ``FAILED_PRECONDITION`` is returned. See also the 

726 ``Snapshot.expire_time`` field. If the name is not provided in 

727 the request, the server will assign a random name for this 

728 snapshot on the same project as the subscription, conforming to 

729 the [resource name format] 

730 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

731 The generated name is populated in the returned Snapshot object. 

732 Note that for REST API requests, you must specify a name in the 

733 request. 

734 

735 Returns: 

736 Callable[[~.CreateSnapshotRequest], 

737 Awaitable[~.Snapshot]]: 

738 A function that, when called, will call the underlying RPC 

739 on the server. 

740 """ 

741 # Generate a "stub function" on-the-fly which will actually make 

742 # the request. 

743 # gRPC handles serialization and deserialization, so we just need 

744 # to pass in the functions for each. 

745 if "create_snapshot" not in self._stubs: 

746 self._stubs["create_snapshot"] = self._logged_channel.unary_unary( 

747 "/google.pubsub.v1.Subscriber/CreateSnapshot", 

748 request_serializer=pubsub.CreateSnapshotRequest.serialize, 

749 response_deserializer=pubsub.Snapshot.deserialize, 

750 ) 

751 return self._stubs["create_snapshot"] 

752 

753 @property 

754 def update_snapshot( 

755 self, 

756 ) -> Callable[[pubsub.UpdateSnapshotRequest], Awaitable[pubsub.Snapshot]]: 

757 r"""Return a callable for the update snapshot method over gRPC. 

758 

759 Updates an existing snapshot by updating the fields specified in 

760 the update mask. Snapshots are used in 

761 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

762 operations, which allow you to manage message acknowledgments in 

763 bulk. That is, you can set the acknowledgment state of messages 

764 in an existing subscription to the state captured by a snapshot. 

765 

766 Returns: 

767 Callable[[~.UpdateSnapshotRequest], 

768 Awaitable[~.Snapshot]]: 

769 A function that, when called, will call the underlying RPC 

770 on the server. 

771 """ 

772 # Generate a "stub function" on-the-fly which will actually make 

773 # the request. 

774 # gRPC handles serialization and deserialization, so we just need 

775 # to pass in the functions for each. 

776 if "update_snapshot" not in self._stubs: 

777 self._stubs["update_snapshot"] = self._logged_channel.unary_unary( 

778 "/google.pubsub.v1.Subscriber/UpdateSnapshot", 

779 request_serializer=pubsub.UpdateSnapshotRequest.serialize, 

780 response_deserializer=pubsub.Snapshot.deserialize, 

781 ) 

782 return self._stubs["update_snapshot"] 

783 

784 @property 

785 def delete_snapshot( 

786 self, 

787 ) -> Callable[[pubsub.DeleteSnapshotRequest], Awaitable[empty_pb2.Empty]]: 

788 r"""Return a callable for the delete snapshot method over gRPC. 

789 

790 Removes an existing snapshot. Snapshots are used in [Seek] 

791 (https://cloud.google.com/pubsub/docs/replay-overview) 

792 operations, which allow you to manage message acknowledgments in 

793 bulk. That is, you can set the acknowledgment state of messages 

794 in an existing subscription to the state captured by a snapshot. 

795 When the snapshot is deleted, all messages retained in the 

796 snapshot are immediately dropped. After a snapshot is deleted, a 

797 new one may be created with the same name, but the new one has 

798 no association with the old snapshot or its subscription, unless 

799 the same subscription is specified. 

800 

801 Returns: 

802 Callable[[~.DeleteSnapshotRequest], 

803 Awaitable[~.Empty]]: 

804 A function that, when called, will call the underlying RPC 

805 on the server. 

806 """ 

807 # Generate a "stub function" on-the-fly which will actually make 

808 # the request. 

809 # gRPC handles serialization and deserialization, so we just need 

810 # to pass in the functions for each. 

811 if "delete_snapshot" not in self._stubs: 

812 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary( 

813 "/google.pubsub.v1.Subscriber/DeleteSnapshot", 

814 request_serializer=pubsub.DeleteSnapshotRequest.serialize, 

815 response_deserializer=empty_pb2.Empty.FromString, 

816 ) 

817 return self._stubs["delete_snapshot"] 

818 

819 @property 

820 def seek(self) -> Callable[[pubsub.SeekRequest], Awaitable[pubsub.SeekResponse]]: 

821 r"""Return a callable for the seek method over gRPC. 

822 

823 Seeks an existing subscription to a point in time or to a given 

824 snapshot, whichever is provided in the request. Snapshots are 

825 used in [Seek] 

826 (https://cloud.google.com/pubsub/docs/replay-overview) 

827 operations, which allow you to manage message acknowledgments in 

828 bulk. That is, you can set the acknowledgment state of messages 

829 in an existing subscription to the state captured by a snapshot. 

830 Note that both the subscription and the snapshot must be on the 

831 same topic. 

832 

833 Returns: 

834 Callable[[~.SeekRequest], 

835 Awaitable[~.SeekResponse]]: 

836 A function that, when called, will call the underlying RPC 

837 on the server. 

838 """ 

839 # Generate a "stub function" on-the-fly which will actually make 

840 # the request. 

841 # gRPC handles serialization and deserialization, so we just need 

842 # to pass in the functions for each. 

843 if "seek" not in self._stubs: 

844 self._stubs["seek"] = self._logged_channel.unary_unary( 

845 "/google.pubsub.v1.Subscriber/Seek", 

846 request_serializer=pubsub.SeekRequest.serialize, 

847 response_deserializer=pubsub.SeekResponse.deserialize, 

848 ) 

849 return self._stubs["seek"] 

850 

851 def _prep_wrapped_messages(self, client_info): 

852 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

853 self._wrapped_methods = { 

854 self.create_subscription: self._wrap_method( 

855 self.create_subscription, 

856 default_retry=retries.AsyncRetry( 

857 initial=0.1, 

858 maximum=60.0, 

859 multiplier=1.3, 

860 predicate=retries.if_exception_type( 

861 core_exceptions.Aborted, 

862 core_exceptions.ServiceUnavailable, 

863 core_exceptions.Unknown, 

864 ), 

865 deadline=60.0, 

866 ), 

867 default_timeout=60.0, 

868 client_info=client_info, 

869 ), 

870 self.get_subscription: self._wrap_method( 

871 self.get_subscription, 

872 default_retry=retries.AsyncRetry( 

873 initial=0.1, 

874 maximum=60.0, 

875 multiplier=1.3, 

876 predicate=retries.if_exception_type( 

877 core_exceptions.Aborted, 

878 core_exceptions.ServiceUnavailable, 

879 core_exceptions.Unknown, 

880 ), 

881 deadline=60.0, 

882 ), 

883 default_timeout=60.0, 

884 client_info=client_info, 

885 ), 

886 self.update_subscription: self._wrap_method( 

887 self.update_subscription, 

888 default_retry=retries.AsyncRetry( 

889 initial=0.1, 

890 maximum=60.0, 

891 multiplier=1.3, 

892 predicate=retries.if_exception_type( 

893 core_exceptions.ServiceUnavailable, 

894 ), 

895 deadline=60.0, 

896 ), 

897 default_timeout=60.0, 

898 client_info=client_info, 

899 ), 

900 self.list_subscriptions: self._wrap_method( 

901 self.list_subscriptions, 

902 default_retry=retries.AsyncRetry( 

903 initial=0.1, 

904 maximum=60.0, 

905 multiplier=1.3, 

906 predicate=retries.if_exception_type( 

907 core_exceptions.Aborted, 

908 core_exceptions.ServiceUnavailable, 

909 core_exceptions.Unknown, 

910 ), 

911 deadline=60.0, 

912 ), 

913 default_timeout=60.0, 

914 client_info=client_info, 

915 ), 

916 self.delete_subscription: self._wrap_method( 

917 self.delete_subscription, 

918 default_retry=retries.AsyncRetry( 

919 initial=0.1, 

920 maximum=60.0, 

921 multiplier=1.3, 

922 predicate=retries.if_exception_type( 

923 core_exceptions.ServiceUnavailable, 

924 ), 

925 deadline=60.0, 

926 ), 

927 default_timeout=60.0, 

928 client_info=client_info, 

929 ), 

930 self.modify_ack_deadline: self._wrap_method( 

931 self.modify_ack_deadline, 

932 default_retry=retries.AsyncRetry( 

933 initial=0.1, 

934 maximum=60.0, 

935 multiplier=1.3, 

936 predicate=retries.if_exception_type( 

937 core_exceptions.ServiceUnavailable, 

938 ), 

939 deadline=60.0, 

940 ), 

941 default_timeout=60.0, 

942 client_info=client_info, 

943 ), 

944 self.acknowledge: self._wrap_method( 

945 self.acknowledge, 

946 default_retry=retries.AsyncRetry( 

947 initial=0.1, 

948 maximum=60.0, 

949 multiplier=1.3, 

950 predicate=retries.if_exception_type( 

951 core_exceptions.ServiceUnavailable, 

952 ), 

953 deadline=60.0, 

954 ), 

955 default_timeout=60.0, 

956 client_info=client_info, 

957 ), 

958 self.pull: self._wrap_method( 

959 self.pull, 

960 default_retry=retries.AsyncRetry( 

961 initial=0.1, 

962 maximum=60.0, 

963 multiplier=1.3, 

964 predicate=retries.if_exception_type( 

965 core_exceptions.Aborted, 

966 core_exceptions.InternalServerError, 

967 core_exceptions.ServiceUnavailable, 

968 core_exceptions.Unknown, 

969 ), 

970 deadline=60.0, 

971 ), 

972 default_timeout=60.0, 

973 client_info=client_info, 

974 ), 

975 self.streaming_pull: self._wrap_method( 

976 self.streaming_pull, 

977 default_retry=retries.AsyncRetry( 

978 initial=0.1, 

979 maximum=60.0, 

980 multiplier=4, 

981 predicate=retries.if_exception_type( 

982 core_exceptions.Aborted, 

983 core_exceptions.DeadlineExceeded, 

984 core_exceptions.InternalServerError, 

985 core_exceptions.ResourceExhausted, 

986 core_exceptions.ServiceUnavailable, 

987 ), 

988 deadline=900.0, 

989 ), 

990 default_timeout=900.0, 

991 client_info=client_info, 

992 ), 

993 self.modify_push_config: self._wrap_method( 

994 self.modify_push_config, 

995 default_retry=retries.AsyncRetry( 

996 initial=0.1, 

997 maximum=60.0, 

998 multiplier=1.3, 

999 predicate=retries.if_exception_type( 

1000 core_exceptions.ServiceUnavailable, 

1001 ), 

1002 deadline=60.0, 

1003 ), 

1004 default_timeout=60.0, 

1005 client_info=client_info, 

1006 ), 

1007 self.get_snapshot: self._wrap_method( 

1008 self.get_snapshot, 

1009 default_retry=retries.AsyncRetry( 

1010 initial=0.1, 

1011 maximum=60.0, 

1012 multiplier=1.3, 

1013 predicate=retries.if_exception_type( 

1014 core_exceptions.Aborted, 

1015 core_exceptions.ServiceUnavailable, 

1016 core_exceptions.Unknown, 

1017 ), 

1018 deadline=60.0, 

1019 ), 

1020 default_timeout=60.0, 

1021 client_info=client_info, 

1022 ), 

1023 self.list_snapshots: self._wrap_method( 

1024 self.list_snapshots, 

1025 default_retry=retries.AsyncRetry( 

1026 initial=0.1, 

1027 maximum=60.0, 

1028 multiplier=1.3, 

1029 predicate=retries.if_exception_type( 

1030 core_exceptions.Aborted, 

1031 core_exceptions.ServiceUnavailable, 

1032 core_exceptions.Unknown, 

1033 ), 

1034 deadline=60.0, 

1035 ), 

1036 default_timeout=60.0, 

1037 client_info=client_info, 

1038 ), 

1039 self.create_snapshot: self._wrap_method( 

1040 self.create_snapshot, 

1041 default_retry=retries.AsyncRetry( 

1042 initial=0.1, 

1043 maximum=60.0, 

1044 multiplier=1.3, 

1045 predicate=retries.if_exception_type( 

1046 core_exceptions.ServiceUnavailable, 

1047 ), 

1048 deadline=60.0, 

1049 ), 

1050 default_timeout=60.0, 

1051 client_info=client_info, 

1052 ), 

1053 self.update_snapshot: self._wrap_method( 

1054 self.update_snapshot, 

1055 default_retry=retries.AsyncRetry( 

1056 initial=0.1, 

1057 maximum=60.0, 

1058 multiplier=1.3, 

1059 predicate=retries.if_exception_type( 

1060 core_exceptions.ServiceUnavailable, 

1061 ), 

1062 deadline=60.0, 

1063 ), 

1064 default_timeout=60.0, 

1065 client_info=client_info, 

1066 ), 

1067 self.delete_snapshot: self._wrap_method( 

1068 self.delete_snapshot, 

1069 default_retry=retries.AsyncRetry( 

1070 initial=0.1, 

1071 maximum=60.0, 

1072 multiplier=1.3, 

1073 predicate=retries.if_exception_type( 

1074 core_exceptions.ServiceUnavailable, 

1075 ), 

1076 deadline=60.0, 

1077 ), 

1078 default_timeout=60.0, 

1079 client_info=client_info, 

1080 ), 

1081 self.seek: self._wrap_method( 

1082 self.seek, 

1083 default_retry=retries.AsyncRetry( 

1084 initial=0.1, 

1085 maximum=60.0, 

1086 multiplier=1.3, 

1087 predicate=retries.if_exception_type( 

1088 core_exceptions.Aborted, 

1089 core_exceptions.ServiceUnavailable, 

1090 core_exceptions.Unknown, 

1091 ), 

1092 deadline=60.0, 

1093 ), 

1094 default_timeout=60.0, 

1095 client_info=client_info, 

1096 ), 

1097 self.get_iam_policy: self._wrap_method( 

1098 self.get_iam_policy, 

1099 default_timeout=None, 

1100 client_info=client_info, 

1101 ), 

1102 self.set_iam_policy: self._wrap_method( 

1103 self.set_iam_policy, 

1104 default_timeout=None, 

1105 client_info=client_info, 

1106 ), 

1107 self.test_iam_permissions: self._wrap_method( 

1108 self.test_iam_permissions, 

1109 default_timeout=None, 

1110 client_info=client_info, 

1111 ), 

1112 } 

1113 

1114 def _wrap_method(self, func, *args, **kwargs): 

1115 if self._wrap_with_kind: # pragma: NO COVER 

1116 kwargs["kind"] = self.kind 

1117 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

1118 

1119 def close(self): 

1120 return self._logged_channel.close() 

1121 

1122 @property 

1123 def kind(self) -> str: 

1124 return "grpc_asyncio" 

1125 

1126 @property 

1127 def set_iam_policy( 

1128 self, 

1129 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

1130 r"""Return a callable for the set iam policy method over gRPC. 

1131 Sets the IAM access control policy on the specified 

1132 function. Replaces any existing policy. 

1133 Returns: 

1134 Callable[[~.SetIamPolicyRequest], 

1135 ~.Policy]: 

1136 A function that, when called, will call the underlying RPC 

1137 on the server. 

1138 """ 

1139 # Generate a "stub function" on-the-fly which will actually make 

1140 # the request. 

1141 # gRPC handles serialization and deserialization, so we just need 

1142 # to pass in the functions for each. 

1143 if "set_iam_policy" not in self._stubs: 

1144 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

1145 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

1146 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

1147 response_deserializer=policy_pb2.Policy.FromString, 

1148 ) 

1149 return self._stubs["set_iam_policy"] 

1150 

1151 @property 

1152 def get_iam_policy( 

1153 self, 

1154 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

1155 r"""Return a callable for the get iam policy method over gRPC. 

1156 Gets the IAM access control policy for a function. 

1157 Returns an empty policy if the function exists and does 

1158 not have a policy set. 

1159 Returns: 

1160 Callable[[~.GetIamPolicyRequest], 

1161 ~.Policy]: 

1162 A function that, when called, will call the underlying RPC 

1163 on the server. 

1164 """ 

1165 # Generate a "stub function" on-the-fly which will actually make 

1166 # the request. 

1167 # gRPC handles serialization and deserialization, so we just need 

1168 # to pass in the functions for each. 

1169 if "get_iam_policy" not in self._stubs: 

1170 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

1171 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

1172 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

1173 response_deserializer=policy_pb2.Policy.FromString, 

1174 ) 

1175 return self._stubs["get_iam_policy"] 

1176 

1177 @property 

1178 def test_iam_permissions( 

1179 self, 

1180 ) -> Callable[ 

1181 [iam_policy_pb2.TestIamPermissionsRequest], 

1182 iam_policy_pb2.TestIamPermissionsResponse, 

1183 ]: 

1184 r"""Return a callable for the test iam permissions method over gRPC. 

1185 Tests the specified permissions against the IAM access control 

1186 policy for a function. If the function does not exist, this will 

1187 return an empty set of permissions, not a NOT_FOUND error. 

1188 Returns: 

1189 Callable[[~.TestIamPermissionsRequest], 

1190 ~.TestIamPermissionsResponse]: 

1191 A function that, when called, will call the underlying RPC 

1192 on the server. 

1193 """ 

1194 # Generate a "stub function" on-the-fly which will actually make 

1195 # the request. 

1196 # gRPC handles serialization and deserialization, so we just need 

1197 # to pass in the functions for each. 

1198 if "test_iam_permissions" not in self._stubs: 

1199 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

1200 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

1201 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

1202 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

1203 ) 

1204 return self._stubs["test_iam_permissions"] 

1205 

1206 

1207__all__ = ("SubscriberGrpcAsyncIOTransport",)