Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/transports/grpc_asyncio.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

172 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import pickle 

19import logging as std_logging 

20import warnings 

21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

22 

23from google.api_core import gapic_v1 

24from google.api_core import grpc_helpers_async 

25from google.api_core import exceptions as core_exceptions 

26from google.api_core import retry_async as retries 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30import google.protobuf.message 

31 

32import grpc # type: ignore 

33import proto # type: ignore 

34from grpc.experimental import aio # type: ignore 

35 

36from google.iam.v1 import iam_policy_pb2 # type: ignore 

37from google.iam.v1 import policy_pb2 # type: ignore 

38from google.protobuf import empty_pb2 # type: ignore 

39from google.pubsub_v1.types import pubsub 

40from .base import SubscriberTransport, DEFAULT_CLIENT_INFO 

41from .grpc import SubscriberGrpcTransport 

42 

43try: 

44 from google.api_core import client_logging # type: ignore 

45 

46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

47except ImportError: # pragma: NO COVER 

48 CLIENT_LOGGING_SUPPORTED = False 

49 

50_LOGGER = std_logging.getLogger(__name__) 

51 

52 

53class _LoggingClientAIOInterceptor( 

54 grpc.aio.UnaryUnaryClientInterceptor 

55): # pragma: NO COVER 

56 async def intercept_unary_unary(self, continuation, client_call_details, request): 

57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

58 std_logging.DEBUG 

59 ) 

60 if logging_enabled: # pragma: NO COVER 

61 request_metadata = client_call_details.metadata 

62 if isinstance(request, proto.Message): 

63 request_payload = type(request).to_json(request) 

64 elif isinstance(request, google.protobuf.message.Message): 

65 request_payload = MessageToJson(request) 

66 else: 

67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

68 

69 request_metadata = { 

70 key: value.decode("utf-8") if isinstance(value, bytes) else value 

71 for key, value in request_metadata 

72 } 

73 grpc_request = { 

74 "payload": request_payload, 

75 "requestMethod": "grpc", 

76 "metadata": dict(request_metadata), 

77 } 

78 _LOGGER.debug( 

79 f"Sending request for {client_call_details.method}", 

80 extra={ 

81 "serviceName": "google.pubsub.v1.Subscriber", 

82 "rpcName": str(client_call_details.method), 

83 "request": grpc_request, 

84 "metadata": grpc_request["metadata"], 

85 }, 

86 ) 

87 response = await continuation(client_call_details, request) 

88 if logging_enabled: # pragma: NO COVER 

89 response_metadata = await response.trailing_metadata() 

90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

91 metadata = ( 

92 dict([(k, str(v)) for k, v in response_metadata]) 

93 if response_metadata 

94 else None 

95 ) 

96 result = await response 

97 if isinstance(result, proto.Message): 

98 response_payload = type(result).to_json(result) 

99 elif isinstance(result, google.protobuf.message.Message): 

100 response_payload = MessageToJson(result) 

101 else: 

102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

103 grpc_response = { 

104 "payload": response_payload, 

105 "metadata": metadata, 

106 "status": "OK", 

107 } 

108 _LOGGER.debug( 

109 f"Received response to rpc {client_call_details.method}.", 

110 extra={ 

111 "serviceName": "google.pubsub.v1.Subscriber", 

112 "rpcName": str(client_call_details.method), 

113 "response": grpc_response, 

114 "metadata": grpc_response["metadata"], 

115 }, 

116 ) 

117 return response 

118 

119 

120class SubscriberGrpcAsyncIOTransport(SubscriberTransport): 

121 """gRPC AsyncIO backend transport for Subscriber. 

122 

123 The service that an application uses to manipulate subscriptions and 

124 to consume messages from a subscription via the ``Pull`` method or 

125 by establishing a bi-directional stream using the ``StreamingPull`` 

126 method. 

127 

128 This class defines the same methods as the primary client, so the 

129 primary client can load the underlying transport implementation 

130 and call it. 

131 

132 It sends protocol buffers over the wire using gRPC (which is built on 

133 top of HTTP/2); the ``grpcio`` package must be installed. 

134 """ 

135 

136 _grpc_channel: aio.Channel 

137 _stubs: Dict[str, Callable] = {} 

138 

139 @classmethod 

140 def create_channel( 

141 cls, 

142 host: str = "pubsub.googleapis.com", 

143 credentials: Optional[ga_credentials.Credentials] = None, 

144 credentials_file: Optional[str] = None, 

145 scopes: Optional[Sequence[str]] = None, 

146 quota_project_id: Optional[str] = None, 

147 **kwargs, 

148 ) -> aio.Channel: 

149 """Create and return a gRPC AsyncIO channel object. 

150 Args: 

151 host (Optional[str]): The host for the channel to use. 

152 credentials (Optional[~.Credentials]): The 

153 authorization credentials to attach to requests. These 

154 credentials identify this application to the service. If 

155 none are specified, the client will attempt to ascertain 

156 the credentials from the environment. 

157 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

158 be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be 

159 removed in the next major version of this library. 

160 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

161 service. These are only used when credentials are not specified and 

162 are passed to :func:`google.auth.default`. 

163 quota_project_id (Optional[str]): An optional project to use for billing 

164 and quota. 

165 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

166 channel creation. 

167 Returns: 

168 aio.Channel: A gRPC AsyncIO channel object. 

169 """ 

170 

171 return grpc_helpers_async.create_channel( 

172 host, 

173 credentials=credentials, 

174 credentials_file=credentials_file, 

175 quota_project_id=quota_project_id, 

176 default_scopes=cls.AUTH_SCOPES, 

177 scopes=scopes, 

178 default_host=cls.DEFAULT_HOST, 

179 **kwargs, 

180 ) 

181 

182 def __init__( 

183 self, 

184 *, 

185 host: str = "pubsub.googleapis.com", 

186 credentials: Optional[ga_credentials.Credentials] = None, 

187 credentials_file: Optional[str] = None, 

188 scopes: Optional[Sequence[str]] = None, 

189 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

190 api_mtls_endpoint: Optional[str] = None, 

191 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

192 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

193 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

194 quota_project_id: Optional[str] = None, 

195 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

196 always_use_jwt_access: Optional[bool] = False, 

197 api_audience: Optional[str] = None, 

198 ) -> None: 

199 """Instantiate the transport. 

200 

201 Args: 

202 host (Optional[str]): 

203 The hostname to connect to (default: 'pubsub.googleapis.com'). 

204 credentials (Optional[google.auth.credentials.Credentials]): The 

205 authorization credentials to attach to requests. These 

206 credentials identify the application to the service; if none 

207 are specified, the client will attempt to ascertain the 

208 credentials from the environment. 

209 This argument is ignored if a ``channel`` instance is provided. 

210 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

211 be loaded with :func:`google.auth.load_credentials_from_file`. 

212 This argument is ignored if a ``channel`` instance is provided. 

213 This argument will be removed in the next major version of this library. 

214 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

215 service. These are only used when credentials are not specified and 

216 are passed to :func:`google.auth.default`. 

217 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

218 A ``Channel`` instance through which to make calls, or a Callable 

219 that constructs and returns one. If set to None, ``self.create_channel`` 

220 is used to create the channel. If a Callable is given, it will be called 

221 with the same arguments as used in ``self.create_channel``. 

222 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

223 If provided, it overrides the ``host`` argument and tries to create 

224 a mutual TLS channel with client SSL credentials from 

225 ``client_cert_source`` or application default SSL credentials. 

226 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

227 Deprecated. A callback to provide client SSL certificate bytes and 

228 private key bytes, both in PEM format. It is ignored if 

229 ``api_mtls_endpoint`` is None. 

230 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

231 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

232 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

233 A callback to provide client certificate bytes and private key bytes, 

234 both in PEM format. It is used to configure a mutual TLS channel. It is 

235 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

236 quota_project_id (Optional[str]): An optional project to use for billing 

237 and quota. 

238 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

239 The client info used to send a user-agent string along with 

240 API requests. If ``None``, then default info will be used. 

241 Generally, you only need to set this if you're developing 

242 your own client library. 

243 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

244 be used for service account credentials. 

245 

246 Raises: 

247 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

248 creation failed for any reason. 

249 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

250 and ``credentials_file`` are passed. 

251 """ 

252 self._grpc_channel = None 

253 self._ssl_channel_credentials = ssl_channel_credentials 

254 self._stubs: Dict[str, Callable] = {} 

255 

256 if api_mtls_endpoint: 

257 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

258 if client_cert_source: 

259 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

260 

261 if isinstance(channel, aio.Channel): 

262 # Ignore credentials if a channel was passed. 

263 credentials = None 

264 self._ignore_credentials = True 

265 # If a channel was explicitly provided, set it. 

266 self._grpc_channel = channel 

267 self._ssl_channel_credentials = None 

268 else: 

269 if api_mtls_endpoint: 

270 host = api_mtls_endpoint 

271 

272 # Create SSL credentials with client_cert_source or application 

273 # default SSL credentials. 

274 if client_cert_source: 

275 cert, key = client_cert_source() 

276 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

277 certificate_chain=cert, private_key=key 

278 ) 

279 else: 

280 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

281 

282 else: 

283 if client_cert_source_for_mtls and not ssl_channel_credentials: 

284 cert, key = client_cert_source_for_mtls() 

285 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

286 certificate_chain=cert, private_key=key 

287 ) 

288 

289 # The base transport sets the host, credentials and scopes 

290 super().__init__( 

291 host=host, 

292 credentials=credentials, 

293 credentials_file=credentials_file, 

294 scopes=scopes, 

295 quota_project_id=quota_project_id, 

296 client_info=client_info, 

297 always_use_jwt_access=always_use_jwt_access, 

298 api_audience=api_audience, 

299 ) 

300 

301 if not self._grpc_channel: 

302 # initialize with the provided callable or the default channel 

303 channel_init = channel or type(self).create_channel 

304 self._grpc_channel = channel_init( 

305 self._host, 

306 # use the credentials which are saved 

307 credentials=self._credentials, 

308 # Set ``credentials_file`` to ``None`` here as 

309 # the credentials that we saved earlier should be used. 

310 credentials_file=None, 

311 scopes=self._scopes, 

312 ssl_credentials=self._ssl_channel_credentials, 

313 quota_project_id=quota_project_id, 

314 options=[ 

315 ("grpc.max_send_message_length", -1), 

316 ("grpc.max_receive_message_length", -1), 

317 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

318 ("grpc.keepalive_time_ms", 30000), 

319 ], 

320 ) 

321 

322 self._interceptor = _LoggingClientAIOInterceptor() 

323 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

324 self._logged_channel = self._grpc_channel 

325 self._wrap_with_kind = ( 

326 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

327 ) 

328 # Wrap messages. This must be done after self._logged_channel exists 

329 self._prep_wrapped_messages(client_info) 

330 

331 @property 

332 def grpc_channel(self) -> aio.Channel: 

333 """Create the channel designed to connect to this service. 

334 

335 This property caches on the instance; repeated calls return 

336 the same channel. 

337 """ 

338 # Return the channel from cache. 

339 return self._grpc_channel 

340 

341 @property 

342 def create_subscription( 

343 self, 

344 ) -> Callable[[pubsub.Subscription], Awaitable[pubsub.Subscription]]: 

345 r"""Return a callable for the create subscription method over gRPC. 

346 

347 Creates a subscription to a given topic. See the [resource name 

348 rules] 

349 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

350 If the subscription already exists, returns ``ALREADY_EXISTS``. 

351 If the corresponding topic doesn't exist, returns ``NOT_FOUND``. 

352 

353 If the name is not provided in the request, the server will 

354 assign a random name for this subscription on the same project 

355 as the topic, conforming to the [resource name format] 

356 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

357 The generated name is populated in the returned Subscription 

358 object. Note that for REST API requests, you must specify a name 

359 in the request. 

360 

361 Returns: 

362 Callable[[~.Subscription], 

363 Awaitable[~.Subscription]]: 

364 A function that, when called, will call the underlying RPC 

365 on the server. 

366 """ 

367 # Generate a "stub function" on-the-fly which will actually make 

368 # the request. 

369 # gRPC handles serialization and deserialization, so we just need 

370 # to pass in the functions for each. 

371 if "create_subscription" not in self._stubs: 

372 self._stubs["create_subscription"] = self._logged_channel.unary_unary( 

373 "/google.pubsub.v1.Subscriber/CreateSubscription", 

374 request_serializer=pubsub.Subscription.serialize, 

375 response_deserializer=pubsub.Subscription.deserialize, 

376 ) 

377 return self._stubs["create_subscription"] 

378 

379 @property 

380 def get_subscription( 

381 self, 

382 ) -> Callable[[pubsub.GetSubscriptionRequest], Awaitable[pubsub.Subscription]]: 

383 r"""Return a callable for the get subscription method over gRPC. 

384 

385 Gets the configuration details of a subscription. 

386 

387 Returns: 

388 Callable[[~.GetSubscriptionRequest], 

389 Awaitable[~.Subscription]]: 

390 A function that, when called, will call the underlying RPC 

391 on the server. 

392 """ 

393 # Generate a "stub function" on-the-fly which will actually make 

394 # the request. 

395 # gRPC handles serialization and deserialization, so we just need 

396 # to pass in the functions for each. 

397 if "get_subscription" not in self._stubs: 

398 self._stubs["get_subscription"] = self._logged_channel.unary_unary( 

399 "/google.pubsub.v1.Subscriber/GetSubscription", 

400 request_serializer=pubsub.GetSubscriptionRequest.serialize, 

401 response_deserializer=pubsub.Subscription.deserialize, 

402 ) 

403 return self._stubs["get_subscription"] 

404 

405 @property 

406 def update_subscription( 

407 self, 

408 ) -> Callable[[pubsub.UpdateSubscriptionRequest], Awaitable[pubsub.Subscription]]: 

409 r"""Return a callable for the update subscription method over gRPC. 

410 

411 Updates an existing subscription by updating the 

412 fields specified in the update mask. Note that certain 

413 properties of a subscription, such as its topic, are not 

414 modifiable. 

415 

416 Returns: 

417 Callable[[~.UpdateSubscriptionRequest], 

418 Awaitable[~.Subscription]]: 

419 A function that, when called, will call the underlying RPC 

420 on the server. 

421 """ 

422 # Generate a "stub function" on-the-fly which will actually make 

423 # the request. 

424 # gRPC handles serialization and deserialization, so we just need 

425 # to pass in the functions for each. 

426 if "update_subscription" not in self._stubs: 

427 self._stubs["update_subscription"] = self._logged_channel.unary_unary( 

428 "/google.pubsub.v1.Subscriber/UpdateSubscription", 

429 request_serializer=pubsub.UpdateSubscriptionRequest.serialize, 

430 response_deserializer=pubsub.Subscription.deserialize, 

431 ) 

432 return self._stubs["update_subscription"] 

433 

434 @property 

435 def list_subscriptions( 

436 self, 

437 ) -> Callable[ 

438 [pubsub.ListSubscriptionsRequest], Awaitable[pubsub.ListSubscriptionsResponse] 

439 ]: 

440 r"""Return a callable for the list subscriptions method over gRPC. 

441 

442 Lists matching subscriptions. 

443 

444 Returns: 

445 Callable[[~.ListSubscriptionsRequest], 

446 Awaitable[~.ListSubscriptionsResponse]]: 

447 A function that, when called, will call the underlying RPC 

448 on the server. 

449 """ 

450 # Generate a "stub function" on-the-fly which will actually make 

451 # the request. 

452 # gRPC handles serialization and deserialization, so we just need 

453 # to pass in the functions for each. 

454 if "list_subscriptions" not in self._stubs: 

455 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary( 

456 "/google.pubsub.v1.Subscriber/ListSubscriptions", 

457 request_serializer=pubsub.ListSubscriptionsRequest.serialize, 

458 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize, 

459 ) 

460 return self._stubs["list_subscriptions"] 

461 

462 @property 

463 def delete_subscription( 

464 self, 

465 ) -> Callable[[pubsub.DeleteSubscriptionRequest], Awaitable[empty_pb2.Empty]]: 

466 r"""Return a callable for the delete subscription method over gRPC. 

467 

468 Deletes an existing subscription. All messages retained in the 

469 subscription are immediately dropped. Calls to ``Pull`` after 

470 deletion will return ``NOT_FOUND``. After a subscription is 

471 deleted, a new one may be created with the same name, but the 

472 new one has no association with the old subscription or its 

473 topic unless the same topic is specified. 

474 

475 Returns: 

476 Callable[[~.DeleteSubscriptionRequest], 

477 Awaitable[~.Empty]]: 

478 A function that, when called, will call the underlying RPC 

479 on the server. 

480 """ 

481 # Generate a "stub function" on-the-fly which will actually make 

482 # the request. 

483 # gRPC handles serialization and deserialization, so we just need 

484 # to pass in the functions for each. 

485 if "delete_subscription" not in self._stubs: 

486 self._stubs["delete_subscription"] = self._logged_channel.unary_unary( 

487 "/google.pubsub.v1.Subscriber/DeleteSubscription", 

488 request_serializer=pubsub.DeleteSubscriptionRequest.serialize, 

489 response_deserializer=empty_pb2.Empty.FromString, 

490 ) 

491 return self._stubs["delete_subscription"] 

492 

493 @property 

494 def modify_ack_deadline( 

495 self, 

496 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], Awaitable[empty_pb2.Empty]]: 

497 r"""Return a callable for the modify ack deadline method over gRPC. 

498 

499 Modifies the ack deadline for a specific message. This method is 

500 useful to indicate that more time is needed to process a message 

501 by the subscriber, or to make the message available for 

502 redelivery if the processing was interrupted. Note that this 

503 does not modify the subscription-level ``ackDeadlineSeconds`` 

504 used for subsequent messages. 

505 

506 Returns: 

507 Callable[[~.ModifyAckDeadlineRequest], 

508 Awaitable[~.Empty]]: 

509 A function that, when called, will call the underlying RPC 

510 on the server. 

511 """ 

512 # Generate a "stub function" on-the-fly which will actually make 

513 # the request. 

514 # gRPC handles serialization and deserialization, so we just need 

515 # to pass in the functions for each. 

516 if "modify_ack_deadline" not in self._stubs: 

517 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary( 

518 "/google.pubsub.v1.Subscriber/ModifyAckDeadline", 

519 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize, 

520 response_deserializer=empty_pb2.Empty.FromString, 

521 ) 

522 return self._stubs["modify_ack_deadline"] 

523 

524 @property 

525 def acknowledge( 

526 self, 

527 ) -> Callable[[pubsub.AcknowledgeRequest], Awaitable[empty_pb2.Empty]]: 

528 r"""Return a callable for the acknowledge method over gRPC. 

529 

530 Acknowledges the messages associated with the ``ack_ids`` in the 

531 ``AcknowledgeRequest``. The Pub/Sub system can remove the 

532 relevant messages from the subscription. 

533 

534 Acknowledging a message whose ack deadline has expired may 

535 succeed, but such a message may be redelivered later. 

536 Acknowledging a message more than once will not result in an 

537 error. 

538 

539 Returns: 

540 Callable[[~.AcknowledgeRequest], 

541 Awaitable[~.Empty]]: 

542 A function that, when called, will call the underlying RPC 

543 on the server. 

544 """ 

545 # Generate a "stub function" on-the-fly which will actually make 

546 # the request. 

547 # gRPC handles serialization and deserialization, so we just need 

548 # to pass in the functions for each. 

549 if "acknowledge" not in self._stubs: 

550 self._stubs["acknowledge"] = self._logged_channel.unary_unary( 

551 "/google.pubsub.v1.Subscriber/Acknowledge", 

552 request_serializer=pubsub.AcknowledgeRequest.serialize, 

553 response_deserializer=empty_pb2.Empty.FromString, 

554 ) 

555 return self._stubs["acknowledge"] 

556 

557 @property 

558 def pull(self) -> Callable[[pubsub.PullRequest], Awaitable[pubsub.PullResponse]]: 

559 r"""Return a callable for the pull method over gRPC. 

560 

561 Pulls messages from the server. 

562 

563 Returns: 

564 Callable[[~.PullRequest], 

565 Awaitable[~.PullResponse]]: 

566 A function that, when called, will call the underlying RPC 

567 on the server. 

568 """ 

569 # Generate a "stub function" on-the-fly which will actually make 

570 # the request. 

571 # gRPC handles serialization and deserialization, so we just need 

572 # to pass in the functions for each. 

573 if "pull" not in self._stubs: 

574 self._stubs["pull"] = self._logged_channel.unary_unary( 

575 "/google.pubsub.v1.Subscriber/Pull", 

576 request_serializer=pubsub.PullRequest.serialize, 

577 response_deserializer=pubsub.PullResponse.deserialize, 

578 ) 

579 return self._stubs["pull"] 

580 

581 @property 

582 def streaming_pull( 

583 self, 

584 ) -> Callable[ 

585 [pubsub.StreamingPullRequest], Awaitable[pubsub.StreamingPullResponse] 

586 ]: 

587 r"""Return a callable for the streaming pull method over gRPC. 

588 

589 Establishes a stream with the server, which sends messages down 

590 to the client. The client streams acknowledgments and ack 

591 deadline modifications back to the server. The server will close 

592 the stream and return the status on any error. The server may 

593 close the stream with status ``UNAVAILABLE`` to reassign 

594 server-side resources, in which case, the client should 

595 re-establish the stream. Flow control can be achieved by 

596 configuring the underlying RPC channel. 

597 

598 Returns: 

599 Callable[[~.StreamingPullRequest], 

600 Awaitable[~.StreamingPullResponse]]: 

601 A function that, when called, will call the underlying RPC 

602 on the server. 

603 """ 

604 # Generate a "stub function" on-the-fly which will actually make 

605 # the request. 

606 # gRPC handles serialization and deserialization, so we just need 

607 # to pass in the functions for each. 

608 if "streaming_pull" not in self._stubs: 

609 self._stubs["streaming_pull"] = self._logged_channel.stream_stream( 

610 "/google.pubsub.v1.Subscriber/StreamingPull", 

611 request_serializer=pubsub.StreamingPullRequest.serialize, 

612 response_deserializer=pubsub.StreamingPullResponse.deserialize, 

613 ) 

614 return self._stubs["streaming_pull"] 

615 

616 @property 

617 def modify_push_config( 

618 self, 

619 ) -> Callable[[pubsub.ModifyPushConfigRequest], Awaitable[empty_pb2.Empty]]: 

620 r"""Return a callable for the modify push config method over gRPC. 

621 

622 Modifies the ``PushConfig`` for a specified subscription. 

623 

624 This may be used to change a push subscription to a pull one 

625 (signified by an empty ``PushConfig``) or vice versa, or change 

626 the endpoint URL and other attributes of a push subscription. 

627 Messages will accumulate for delivery continuously through the 

628 call regardless of changes to the ``PushConfig``. 

629 

630 Returns: 

631 Callable[[~.ModifyPushConfigRequest], 

632 Awaitable[~.Empty]]: 

633 A function that, when called, will call the underlying RPC 

634 on the server. 

635 """ 

636 # Generate a "stub function" on-the-fly which will actually make 

637 # the request. 

638 # gRPC handles serialization and deserialization, so we just need 

639 # to pass in the functions for each. 

640 if "modify_push_config" not in self._stubs: 

641 self._stubs["modify_push_config"] = self._logged_channel.unary_unary( 

642 "/google.pubsub.v1.Subscriber/ModifyPushConfig", 

643 request_serializer=pubsub.ModifyPushConfigRequest.serialize, 

644 response_deserializer=empty_pb2.Empty.FromString, 

645 ) 

646 return self._stubs["modify_push_config"] 

647 

648 @property 

649 def get_snapshot( 

650 self, 

651 ) -> Callable[[pubsub.GetSnapshotRequest], Awaitable[pubsub.Snapshot]]: 

652 r"""Return a callable for the get snapshot method over gRPC. 

653 

654 Gets the configuration details of a snapshot. Snapshots are used 

655 in 

656 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

657 operations, which allow you to manage message acknowledgments in 

658 bulk. That is, you can set the acknowledgment state of messages 

659 in an existing subscription to the state captured by a snapshot. 

660 

661 Returns: 

662 Callable[[~.GetSnapshotRequest], 

663 Awaitable[~.Snapshot]]: 

664 A function that, when called, will call the underlying RPC 

665 on the server. 

666 """ 

667 # Generate a "stub function" on-the-fly which will actually make 

668 # the request. 

669 # gRPC handles serialization and deserialization, so we just need 

670 # to pass in the functions for each. 

671 if "get_snapshot" not in self._stubs: 

672 self._stubs["get_snapshot"] = self._logged_channel.unary_unary( 

673 "/google.pubsub.v1.Subscriber/GetSnapshot", 

674 request_serializer=pubsub.GetSnapshotRequest.serialize, 

675 response_deserializer=pubsub.Snapshot.deserialize, 

676 ) 

677 return self._stubs["get_snapshot"] 

678 

679 @property 

680 def list_snapshots( 

681 self, 

682 ) -> Callable[ 

683 [pubsub.ListSnapshotsRequest], Awaitable[pubsub.ListSnapshotsResponse] 

684 ]: 

685 r"""Return a callable for the list snapshots method over gRPC. 

686 

687 Lists the existing snapshots. Snapshots are used in 

688 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

689 operations, which allow you to manage message acknowledgments in 

690 bulk. That is, you can set the acknowledgment state of messages 

691 in an existing subscription to the state captured by a snapshot. 

692 

693 Returns: 

694 Callable[[~.ListSnapshotsRequest], 

695 Awaitable[~.ListSnapshotsResponse]]: 

696 A function that, when called, will call the underlying RPC 

697 on the server. 

698 """ 

699 # Generate a "stub function" on-the-fly which will actually make 

700 # the request. 

701 # gRPC handles serialization and deserialization, so we just need 

702 # to pass in the functions for each. 

703 if "list_snapshots" not in self._stubs: 

704 self._stubs["list_snapshots"] = self._logged_channel.unary_unary( 

705 "/google.pubsub.v1.Subscriber/ListSnapshots", 

706 request_serializer=pubsub.ListSnapshotsRequest.serialize, 

707 response_deserializer=pubsub.ListSnapshotsResponse.deserialize, 

708 ) 

709 return self._stubs["list_snapshots"] 

710 

711 @property 

712 def create_snapshot( 

713 self, 

714 ) -> Callable[[pubsub.CreateSnapshotRequest], Awaitable[pubsub.Snapshot]]: 

715 r"""Return a callable for the create snapshot method over gRPC. 

716 

717 Creates a snapshot from the requested subscription. Snapshots 

718 are used in 

719 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

720 operations, which allow you to manage message acknowledgments in 

721 bulk. That is, you can set the acknowledgment state of messages 

722 in an existing subscription to the state captured by a snapshot. 

723 If the snapshot already exists, returns ``ALREADY_EXISTS``. If 

724 the requested subscription doesn't exist, returns ``NOT_FOUND``. 

725 If the backlog in the subscription is too old -- and the 

726 resulting snapshot would expire in less than 1 hour -- then 

727 ``FAILED_PRECONDITION`` is returned. See also the 

728 ``Snapshot.expire_time`` field. If the name is not provided in 

729 the request, the server will assign a random name for this 

730 snapshot on the same project as the subscription, conforming to 

731 the [resource name format] 

732 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

733 The generated name is populated in the returned Snapshot object. 

734 Note that for REST API requests, you must specify a name in the 

735 request. 

736 

737 Returns: 

738 Callable[[~.CreateSnapshotRequest], 

739 Awaitable[~.Snapshot]]: 

740 A function that, when called, will call the underlying RPC 

741 on the server. 

742 """ 

743 # Generate a "stub function" on-the-fly which will actually make 

744 # the request. 

745 # gRPC handles serialization and deserialization, so we just need 

746 # to pass in the functions for each. 

747 if "create_snapshot" not in self._stubs: 

748 self._stubs["create_snapshot"] = self._logged_channel.unary_unary( 

749 "/google.pubsub.v1.Subscriber/CreateSnapshot", 

750 request_serializer=pubsub.CreateSnapshotRequest.serialize, 

751 response_deserializer=pubsub.Snapshot.deserialize, 

752 ) 

753 return self._stubs["create_snapshot"] 

754 

755 @property 

756 def update_snapshot( 

757 self, 

758 ) -> Callable[[pubsub.UpdateSnapshotRequest], Awaitable[pubsub.Snapshot]]: 

759 r"""Return a callable for the update snapshot method over gRPC. 

760 

761 Updates an existing snapshot by updating the fields specified in 

762 the update mask. Snapshots are used in 

763 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

764 operations, which allow you to manage message acknowledgments in 

765 bulk. That is, you can set the acknowledgment state of messages 

766 in an existing subscription to the state captured by a snapshot. 

767 

768 Returns: 

769 Callable[[~.UpdateSnapshotRequest], 

770 Awaitable[~.Snapshot]]: 

771 A function that, when called, will call the underlying RPC 

772 on the server. 

773 """ 

774 # Generate a "stub function" on-the-fly which will actually make 

775 # the request. 

776 # gRPC handles serialization and deserialization, so we just need 

777 # to pass in the functions for each. 

778 if "update_snapshot" not in self._stubs: 

779 self._stubs["update_snapshot"] = self._logged_channel.unary_unary( 

780 "/google.pubsub.v1.Subscriber/UpdateSnapshot", 

781 request_serializer=pubsub.UpdateSnapshotRequest.serialize, 

782 response_deserializer=pubsub.Snapshot.deserialize, 

783 ) 

784 return self._stubs["update_snapshot"] 

785 

786 @property 

787 def delete_snapshot( 

788 self, 

789 ) -> Callable[[pubsub.DeleteSnapshotRequest], Awaitable[empty_pb2.Empty]]: 

790 r"""Return a callable for the delete snapshot method over gRPC. 

791 

792 Removes an existing snapshot. Snapshots are used in [Seek] 

793 (https://cloud.google.com/pubsub/docs/replay-overview) 

794 operations, which allow you to manage message acknowledgments in 

795 bulk. That is, you can set the acknowledgment state of messages 

796 in an existing subscription to the state captured by a snapshot. 

797 When the snapshot is deleted, all messages retained in the 

798 snapshot are immediately dropped. After a snapshot is deleted, a 

799 new one may be created with the same name, but the new one has 

800 no association with the old snapshot or its subscription, unless 

801 the same subscription is specified. 

802 

803 Returns: 

804 Callable[[~.DeleteSnapshotRequest], 

805 Awaitable[~.Empty]]: 

806 A function that, when called, will call the underlying RPC 

807 on the server. 

808 """ 

809 # Generate a "stub function" on-the-fly which will actually make 

810 # the request. 

811 # gRPC handles serialization and deserialization, so we just need 

812 # to pass in the functions for each. 

813 if "delete_snapshot" not in self._stubs: 

814 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary( 

815 "/google.pubsub.v1.Subscriber/DeleteSnapshot", 

816 request_serializer=pubsub.DeleteSnapshotRequest.serialize, 

817 response_deserializer=empty_pb2.Empty.FromString, 

818 ) 

819 return self._stubs["delete_snapshot"] 

820 

821 @property 

822 def seek(self) -> Callable[[pubsub.SeekRequest], Awaitable[pubsub.SeekResponse]]: 

823 r"""Return a callable for the seek method over gRPC. 

824 

825 Seeks an existing subscription to a point in time or to a given 

826 snapshot, whichever is provided in the request. Snapshots are 

827 used in [Seek] 

828 (https://cloud.google.com/pubsub/docs/replay-overview) 

829 operations, which allow you to manage message acknowledgments in 

830 bulk. That is, you can set the acknowledgment state of messages 

831 in an existing subscription to the state captured by a snapshot. 

832 Note that both the subscription and the snapshot must be on the 

833 same topic. 

834 

835 Returns: 

836 Callable[[~.SeekRequest], 

837 Awaitable[~.SeekResponse]]: 

838 A function that, when called, will call the underlying RPC 

839 on the server. 

840 """ 

841 # Generate a "stub function" on-the-fly which will actually make 

842 # the request. 

843 # gRPC handles serialization and deserialization, so we just need 

844 # to pass in the functions for each. 

845 if "seek" not in self._stubs: 

846 self._stubs["seek"] = self._logged_channel.unary_unary( 

847 "/google.pubsub.v1.Subscriber/Seek", 

848 request_serializer=pubsub.SeekRequest.serialize, 

849 response_deserializer=pubsub.SeekResponse.deserialize, 

850 ) 

851 return self._stubs["seek"] 

852 

853 def _prep_wrapped_messages(self, client_info): 

854 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

855 self._wrapped_methods = { 

856 self.create_subscription: self._wrap_method( 

857 self.create_subscription, 

858 default_retry=retries.AsyncRetry( 

859 initial=0.1, 

860 maximum=60.0, 

861 multiplier=1.3, 

862 predicate=retries.if_exception_type( 

863 core_exceptions.Aborted, 

864 core_exceptions.ServiceUnavailable, 

865 core_exceptions.Unknown, 

866 ), 

867 deadline=60.0, 

868 ), 

869 default_timeout=60.0, 

870 client_info=client_info, 

871 ), 

872 self.get_subscription: self._wrap_method( 

873 self.get_subscription, 

874 default_retry=retries.AsyncRetry( 

875 initial=0.1, 

876 maximum=60.0, 

877 multiplier=1.3, 

878 predicate=retries.if_exception_type( 

879 core_exceptions.Aborted, 

880 core_exceptions.ServiceUnavailable, 

881 core_exceptions.Unknown, 

882 ), 

883 deadline=60.0, 

884 ), 

885 default_timeout=60.0, 

886 client_info=client_info, 

887 ), 

888 self.update_subscription: self._wrap_method( 

889 self.update_subscription, 

890 default_retry=retries.AsyncRetry( 

891 initial=0.1, 

892 maximum=60.0, 

893 multiplier=1.3, 

894 predicate=retries.if_exception_type( 

895 core_exceptions.ServiceUnavailable, 

896 ), 

897 deadline=60.0, 

898 ), 

899 default_timeout=60.0, 

900 client_info=client_info, 

901 ), 

902 self.list_subscriptions: self._wrap_method( 

903 self.list_subscriptions, 

904 default_retry=retries.AsyncRetry( 

905 initial=0.1, 

906 maximum=60.0, 

907 multiplier=1.3, 

908 predicate=retries.if_exception_type( 

909 core_exceptions.Aborted, 

910 core_exceptions.ServiceUnavailable, 

911 core_exceptions.Unknown, 

912 ), 

913 deadline=60.0, 

914 ), 

915 default_timeout=60.0, 

916 client_info=client_info, 

917 ), 

918 self.delete_subscription: self._wrap_method( 

919 self.delete_subscription, 

920 default_retry=retries.AsyncRetry( 

921 initial=0.1, 

922 maximum=60.0, 

923 multiplier=1.3, 

924 predicate=retries.if_exception_type( 

925 core_exceptions.ServiceUnavailable, 

926 ), 

927 deadline=60.0, 

928 ), 

929 default_timeout=60.0, 

930 client_info=client_info, 

931 ), 

932 self.modify_ack_deadline: self._wrap_method( 

933 self.modify_ack_deadline, 

934 default_retry=retries.AsyncRetry( 

935 initial=0.1, 

936 maximum=60.0, 

937 multiplier=1.3, 

938 predicate=retries.if_exception_type( 

939 core_exceptions.ServiceUnavailable, 

940 ), 

941 deadline=60.0, 

942 ), 

943 default_timeout=60.0, 

944 client_info=client_info, 

945 ), 

946 self.acknowledge: self._wrap_method( 

947 self.acknowledge, 

948 default_retry=retries.AsyncRetry( 

949 initial=0.1, 

950 maximum=60.0, 

951 multiplier=1.3, 

952 predicate=retries.if_exception_type( 

953 core_exceptions.ServiceUnavailable, 

954 ), 

955 deadline=60.0, 

956 ), 

957 default_timeout=60.0, 

958 client_info=client_info, 

959 ), 

960 self.pull: self._wrap_method( 

961 self.pull, 

962 default_retry=retries.AsyncRetry( 

963 initial=0.1, 

964 maximum=60.0, 

965 multiplier=1.3, 

966 predicate=retries.if_exception_type( 

967 core_exceptions.Aborted, 

968 core_exceptions.InternalServerError, 

969 core_exceptions.ServiceUnavailable, 

970 core_exceptions.Unknown, 

971 ), 

972 deadline=60.0, 

973 ), 

974 default_timeout=60.0, 

975 client_info=client_info, 

976 ), 

977 self.streaming_pull: self._wrap_method( 

978 self.streaming_pull, 

979 default_retry=retries.AsyncRetry( 

980 initial=0.1, 

981 maximum=60.0, 

982 multiplier=4, 

983 predicate=retries.if_exception_type( 

984 core_exceptions.Aborted, 

985 core_exceptions.DeadlineExceeded, 

986 core_exceptions.InternalServerError, 

987 core_exceptions.ResourceExhausted, 

988 core_exceptions.ServiceUnavailable, 

989 ), 

990 deadline=900.0, 

991 ), 

992 default_timeout=900.0, 

993 client_info=client_info, 

994 ), 

995 self.modify_push_config: self._wrap_method( 

996 self.modify_push_config, 

997 default_retry=retries.AsyncRetry( 

998 initial=0.1, 

999 maximum=60.0, 

1000 multiplier=1.3, 

1001 predicate=retries.if_exception_type( 

1002 core_exceptions.ServiceUnavailable, 

1003 ), 

1004 deadline=60.0, 

1005 ), 

1006 default_timeout=60.0, 

1007 client_info=client_info, 

1008 ), 

1009 self.get_snapshot: self._wrap_method( 

1010 self.get_snapshot, 

1011 default_retry=retries.AsyncRetry( 

1012 initial=0.1, 

1013 maximum=60.0, 

1014 multiplier=1.3, 

1015 predicate=retries.if_exception_type( 

1016 core_exceptions.Aborted, 

1017 core_exceptions.ServiceUnavailable, 

1018 core_exceptions.Unknown, 

1019 ), 

1020 deadline=60.0, 

1021 ), 

1022 default_timeout=60.0, 

1023 client_info=client_info, 

1024 ), 

1025 self.list_snapshots: self._wrap_method( 

1026 self.list_snapshots, 

1027 default_retry=retries.AsyncRetry( 

1028 initial=0.1, 

1029 maximum=60.0, 

1030 multiplier=1.3, 

1031 predicate=retries.if_exception_type( 

1032 core_exceptions.Aborted, 

1033 core_exceptions.ServiceUnavailable, 

1034 core_exceptions.Unknown, 

1035 ), 

1036 deadline=60.0, 

1037 ), 

1038 default_timeout=60.0, 

1039 client_info=client_info, 

1040 ), 

1041 self.create_snapshot: self._wrap_method( 

1042 self.create_snapshot, 

1043 default_retry=retries.AsyncRetry( 

1044 initial=0.1, 

1045 maximum=60.0, 

1046 multiplier=1.3, 

1047 predicate=retries.if_exception_type( 

1048 core_exceptions.ServiceUnavailable, 

1049 ), 

1050 deadline=60.0, 

1051 ), 

1052 default_timeout=60.0, 

1053 client_info=client_info, 

1054 ), 

1055 self.update_snapshot: self._wrap_method( 

1056 self.update_snapshot, 

1057 default_retry=retries.AsyncRetry( 

1058 initial=0.1, 

1059 maximum=60.0, 

1060 multiplier=1.3, 

1061 predicate=retries.if_exception_type( 

1062 core_exceptions.ServiceUnavailable, 

1063 ), 

1064 deadline=60.0, 

1065 ), 

1066 default_timeout=60.0, 

1067 client_info=client_info, 

1068 ), 

1069 self.delete_snapshot: self._wrap_method( 

1070 self.delete_snapshot, 

1071 default_retry=retries.AsyncRetry( 

1072 initial=0.1, 

1073 maximum=60.0, 

1074 multiplier=1.3, 

1075 predicate=retries.if_exception_type( 

1076 core_exceptions.ServiceUnavailable, 

1077 ), 

1078 deadline=60.0, 

1079 ), 

1080 default_timeout=60.0, 

1081 client_info=client_info, 

1082 ), 

1083 self.seek: self._wrap_method( 

1084 self.seek, 

1085 default_retry=retries.AsyncRetry( 

1086 initial=0.1, 

1087 maximum=60.0, 

1088 multiplier=1.3, 

1089 predicate=retries.if_exception_type( 

1090 core_exceptions.Aborted, 

1091 core_exceptions.ServiceUnavailable, 

1092 core_exceptions.Unknown, 

1093 ), 

1094 deadline=60.0, 

1095 ), 

1096 default_timeout=60.0, 

1097 client_info=client_info, 

1098 ), 

1099 self.get_iam_policy: self._wrap_method( 

1100 self.get_iam_policy, 

1101 default_timeout=None, 

1102 client_info=client_info, 

1103 ), 

1104 self.set_iam_policy: self._wrap_method( 

1105 self.set_iam_policy, 

1106 default_timeout=None, 

1107 client_info=client_info, 

1108 ), 

1109 self.test_iam_permissions: self._wrap_method( 

1110 self.test_iam_permissions, 

1111 default_timeout=None, 

1112 client_info=client_info, 

1113 ), 

1114 } 

1115 

1116 def _wrap_method(self, func, *args, **kwargs): 

1117 if self._wrap_with_kind: # pragma: NO COVER 

1118 kwargs["kind"] = self.kind 

1119 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

1120 

1121 def close(self): 

1122 return self._logged_channel.close() 

1123 

1124 @property 

1125 def kind(self) -> str: 

1126 return "grpc_asyncio" 

1127 

1128 @property 

1129 def set_iam_policy( 

1130 self, 

1131 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

1132 r"""Return a callable for the set iam policy method over gRPC. 

1133 Sets the IAM access control policy on the specified 

1134 function. Replaces any existing policy. 

1135 Returns: 

1136 Callable[[~.SetIamPolicyRequest], 

1137 ~.Policy]: 

1138 A function that, when called, will call the underlying RPC 

1139 on the server. 

1140 """ 

1141 # Generate a "stub function" on-the-fly which will actually make 

1142 # the request. 

1143 # gRPC handles serialization and deserialization, so we just need 

1144 # to pass in the functions for each. 

1145 if "set_iam_policy" not in self._stubs: 

1146 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

1147 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

1148 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

1149 response_deserializer=policy_pb2.Policy.FromString, 

1150 ) 

1151 return self._stubs["set_iam_policy"] 

1152 

1153 @property 

1154 def get_iam_policy( 

1155 self, 

1156 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

1157 r"""Return a callable for the get iam policy method over gRPC. 

1158 Gets the IAM access control policy for a function. 

1159 Returns an empty policy if the function exists and does 

1160 not have a policy set. 

1161 Returns: 

1162 Callable[[~.GetIamPolicyRequest], 

1163 ~.Policy]: 

1164 A function that, when called, will call the underlying RPC 

1165 on the server. 

1166 """ 

1167 # Generate a "stub function" on-the-fly which will actually make 

1168 # the request. 

1169 # gRPC handles serialization and deserialization, so we just need 

1170 # to pass in the functions for each. 

1171 if "get_iam_policy" not in self._stubs: 

1172 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

1173 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

1174 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

1175 response_deserializer=policy_pb2.Policy.FromString, 

1176 ) 

1177 return self._stubs["get_iam_policy"] 

1178 

1179 @property 

1180 def test_iam_permissions( 

1181 self, 

1182 ) -> Callable[ 

1183 [iam_policy_pb2.TestIamPermissionsRequest], 

1184 iam_policy_pb2.TestIamPermissionsResponse, 

1185 ]: 

1186 r"""Return a callable for the test iam permissions method over gRPC. 

1187 Tests the specified permissions against the IAM access control 

1188 policy for a function. If the function does not exist, this will 

1189 return an empty set of permissions, not a NOT_FOUND error. 

1190 Returns: 

1191 Callable[[~.TestIamPermissionsRequest], 

1192 ~.TestIamPermissionsResponse]: 

1193 A function that, when called, will call the underlying RPC 

1194 on the server. 

1195 """ 

1196 # Generate a "stub function" on-the-fly which will actually make 

1197 # the request. 

1198 # gRPC handles serialization and deserialization, so we just need 

1199 # to pass in the functions for each. 

1200 if "test_iam_permissions" not in self._stubs: 

1201 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

1202 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

1203 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

1204 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

1205 ) 

1206 return self._stubs["test_iam_permissions"] 

1207 

1208 

1209__all__ = ("SubscriberGrpcAsyncIOTransport",)