Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/grpc_asyncio.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

137 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import pickle 

19import logging as std_logging 

20import warnings 

21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

22 

23from google.api_core import gapic_v1 

24from google.api_core import grpc_helpers_async 

25from google.api_core import exceptions as core_exceptions 

26from google.api_core import retry_async as retries 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30import google.protobuf.message 

31 

32import grpc # type: ignore 

33import proto # type: ignore 

34from grpc.experimental import aio # type: ignore 

35 

36from google.iam.v1 import iam_policy_pb2 # type: ignore 

37from google.iam.v1 import policy_pb2 # type: ignore 

38from google.protobuf import empty_pb2 # type: ignore 

39from google.pubsub_v1.types import pubsub 

40from .base import PublisherTransport, DEFAULT_CLIENT_INFO 

41from .grpc import PublisherGrpcTransport 

42 

43try: 

44 from google.api_core import client_logging # type: ignore 

45 

46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

47except ImportError: # pragma: NO COVER 

48 CLIENT_LOGGING_SUPPORTED = False 

49 

50_LOGGER = std_logging.getLogger(__name__) 

51 

52 

53class _LoggingClientAIOInterceptor( 

54 grpc.aio.UnaryUnaryClientInterceptor 

55): # pragma: NO COVER 

56 async def intercept_unary_unary(self, continuation, client_call_details, request): 

57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

58 std_logging.DEBUG 

59 ) 

60 if logging_enabled: # pragma: NO COVER 

61 request_metadata = client_call_details.metadata 

62 if isinstance(request, proto.Message): 

63 request_payload = type(request).to_json(request) 

64 elif isinstance(request, google.protobuf.message.Message): 

65 request_payload = MessageToJson(request) 

66 else: 

67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

68 

69 request_metadata = { 

70 key: value.decode("utf-8") if isinstance(value, bytes) else value 

71 for key, value in request_metadata 

72 } 

73 grpc_request = { 

74 "payload": request_payload, 

75 "requestMethod": "grpc", 

76 "metadata": dict(request_metadata), 

77 } 

78 _LOGGER.debug( 

79 f"Sending request for {client_call_details.method}", 

80 extra={ 

81 "serviceName": "google.pubsub.v1.Publisher", 

82 "rpcName": str(client_call_details.method), 

83 "request": grpc_request, 

84 "metadata": grpc_request["metadata"], 

85 }, 

86 ) 

87 response = await continuation(client_call_details, request) 

88 if logging_enabled: # pragma: NO COVER 

89 response_metadata = await response.trailing_metadata() 

90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

91 metadata = ( 

92 dict([(k, str(v)) for k, v in response_metadata]) 

93 if response_metadata 

94 else None 

95 ) 

96 result = await response 

97 if isinstance(result, proto.Message): 

98 response_payload = type(result).to_json(result) 

99 elif isinstance(result, google.protobuf.message.Message): 

100 response_payload = MessageToJson(result) 

101 else: 

102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

103 grpc_response = { 

104 "payload": response_payload, 

105 "metadata": metadata, 

106 "status": "OK", 

107 } 

108 _LOGGER.debug( 

109 f"Received response to rpc {client_call_details.method}.", 

110 extra={ 

111 "serviceName": "google.pubsub.v1.Publisher", 

112 "rpcName": str(client_call_details.method), 

113 "response": grpc_response, 

114 "metadata": grpc_response["metadata"], 

115 }, 

116 ) 

117 return response 

118 

119 

120class PublisherGrpcAsyncIOTransport(PublisherTransport): 

121 """gRPC AsyncIO backend transport for Publisher. 

122 

123 The service that an application uses to manipulate topics, 

124 and to send messages to a topic. 

125 

126 This class defines the same methods as the primary client, so the 

127 primary client can load the underlying transport implementation 

128 and call it. 

129 

130 It sends protocol buffers over the wire using gRPC (which is built on 

131 top of HTTP/2); the ``grpcio`` package must be installed. 

132 """ 

133 

134 _grpc_channel: aio.Channel 

135 _stubs: Dict[str, Callable] = {} 

136 

137 @classmethod 

138 def create_channel( 

139 cls, 

140 host: str = "pubsub.googleapis.com", 

141 credentials: Optional[ga_credentials.Credentials] = None, 

142 credentials_file: Optional[str] = None, 

143 scopes: Optional[Sequence[str]] = None, 

144 quota_project_id: Optional[str] = None, 

145 **kwargs, 

146 ) -> aio.Channel: 

147 """Create and return a gRPC AsyncIO channel object. 

148 Args: 

149 host (Optional[str]): The host for the channel to use. 

150 credentials (Optional[~.Credentials]): The 

151 authorization credentials to attach to requests. These 

152 credentials identify this application to the service. If 

153 none are specified, the client will attempt to ascertain 

154 the credentials from the environment. 

155 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

156 be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be 

157 removed in the next major version of this library. 

158 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

159 service. These are only used when credentials are not specified and 

160 are passed to :func:`google.auth.default`. 

161 quota_project_id (Optional[str]): An optional project to use for billing 

162 and quota. 

163 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

164 channel creation. 

165 Returns: 

166 aio.Channel: A gRPC AsyncIO channel object. 

167 """ 

168 

169 return grpc_helpers_async.create_channel( 

170 host, 

171 credentials=credentials, 

172 credentials_file=credentials_file, 

173 quota_project_id=quota_project_id, 

174 default_scopes=cls.AUTH_SCOPES, 

175 scopes=scopes, 

176 default_host=cls.DEFAULT_HOST, 

177 **kwargs, 

178 ) 

179 

180 def __init__( 

181 self, 

182 *, 

183 host: str = "pubsub.googleapis.com", 

184 credentials: Optional[ga_credentials.Credentials] = None, 

185 credentials_file: Optional[str] = None, 

186 scopes: Optional[Sequence[str]] = None, 

187 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

188 api_mtls_endpoint: Optional[str] = None, 

189 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

190 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

191 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

192 quota_project_id: Optional[str] = None, 

193 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

194 always_use_jwt_access: Optional[bool] = False, 

195 api_audience: Optional[str] = None, 

196 ) -> None: 

197 """Instantiate the transport. 

198 

199 Args: 

200 host (Optional[str]): 

201 The hostname to connect to (default: 'pubsub.googleapis.com'). 

202 credentials (Optional[google.auth.credentials.Credentials]): The 

203 authorization credentials to attach to requests. These 

204 credentials identify the application to the service; if none 

205 are specified, the client will attempt to ascertain the 

206 credentials from the environment. 

207 This argument is ignored if a ``channel`` instance is provided. 

208 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

209 be loaded with :func:`google.auth.load_credentials_from_file`. 

210 This argument is ignored if a ``channel`` instance is provided. 

211 This argument will be removed in the next major version of this library. 

212 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

213 service. These are only used when credentials are not specified and 

214 are passed to :func:`google.auth.default`. 

215 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

216 A ``Channel`` instance through which to make calls, or a Callable 

217 that constructs and returns one. If set to None, ``self.create_channel`` 

218 is used to create the channel. If a Callable is given, it will be called 

219 with the same arguments as used in ``self.create_channel``. 

220 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

221 If provided, it overrides the ``host`` argument and tries to create 

222 a mutual TLS channel with client SSL credentials from 

223 ``client_cert_source`` or application default SSL credentials. 

224 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

225 Deprecated. A callback to provide client SSL certificate bytes and 

226 private key bytes, both in PEM format. It is ignored if 

227 ``api_mtls_endpoint`` is None. 

228 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

229 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

230 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

231 A callback to provide client certificate bytes and private key bytes, 

232 both in PEM format. It is used to configure a mutual TLS channel. It is 

233 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

234 quota_project_id (Optional[str]): An optional project to use for billing 

235 and quota. 

236 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

237 The client info used to send a user-agent string along with 

238 API requests. If ``None``, then default info will be used. 

239 Generally, you only need to set this if you're developing 

240 your own client library. 

241 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

242 be used for service account credentials. 

243 

244 Raises: 

245 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

246 creation failed for any reason. 

247 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

248 and ``credentials_file`` are passed. 

249 """ 

250 self._grpc_channel = None 

251 self._ssl_channel_credentials = ssl_channel_credentials 

252 self._stubs: Dict[str, Callable] = {} 

253 

254 if api_mtls_endpoint: 

255 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

256 if client_cert_source: 

257 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

258 

259 if isinstance(channel, aio.Channel): 

260 # Ignore credentials if a channel was passed. 

261 credentials = None 

262 self._ignore_credentials = True 

263 # If a channel was explicitly provided, set it. 

264 self._grpc_channel = channel 

265 self._ssl_channel_credentials = None 

266 else: 

267 if api_mtls_endpoint: 

268 host = api_mtls_endpoint 

269 

270 # Create SSL credentials with client_cert_source or application 

271 # default SSL credentials. 

272 if client_cert_source: 

273 cert, key = client_cert_source() 

274 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

275 certificate_chain=cert, private_key=key 

276 ) 

277 else: 

278 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

279 

280 else: 

281 if client_cert_source_for_mtls and not ssl_channel_credentials: 

282 cert, key = client_cert_source_for_mtls() 

283 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

284 certificate_chain=cert, private_key=key 

285 ) 

286 

287 # The base transport sets the host, credentials and scopes 

288 super().__init__( 

289 host=host, 

290 credentials=credentials, 

291 credentials_file=credentials_file, 

292 scopes=scopes, 

293 quota_project_id=quota_project_id, 

294 client_info=client_info, 

295 always_use_jwt_access=always_use_jwt_access, 

296 api_audience=api_audience, 

297 ) 

298 

299 if not self._grpc_channel: 

300 # initialize with the provided callable or the default channel 

301 channel_init = channel or type(self).create_channel 

302 self._grpc_channel = channel_init( 

303 self._host, 

304 # use the credentials which are saved 

305 credentials=self._credentials, 

306 # Set ``credentials_file`` to ``None`` here as 

307 # the credentials that we saved earlier should be used. 

308 credentials_file=None, 

309 scopes=self._scopes, 

310 ssl_credentials=self._ssl_channel_credentials, 

311 quota_project_id=quota_project_id, 

312 options=[ 

313 ("grpc.max_send_message_length", -1), 

314 ("grpc.max_receive_message_length", -1), 

315 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

316 ("grpc.keepalive_time_ms", 30000), 

317 ], 

318 ) 

319 

320 self._interceptor = _LoggingClientAIOInterceptor() 

321 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

322 self._logged_channel = self._grpc_channel 

323 self._wrap_with_kind = ( 

324 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

325 ) 

326 # Wrap messages. This must be done after self._logged_channel exists 

327 self._prep_wrapped_messages(client_info) 

328 

329 @property 

330 def grpc_channel(self) -> aio.Channel: 

331 """Create the channel designed to connect to this service. 

332 

333 This property caches on the instance; repeated calls return 

334 the same channel. 

335 """ 

336 # Return the channel from cache. 

337 return self._grpc_channel 

338 

339 @property 

340 def create_topic(self) -> Callable[[pubsub.Topic], Awaitable[pubsub.Topic]]: 

341 r"""Return a callable for the create topic method over gRPC. 

342 

343 Creates the given topic with the given name. See the [resource 

344 name rules] 

345 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

346 

347 Returns: 

348 Callable[[~.Topic], 

349 Awaitable[~.Topic]]: 

350 A function that, when called, will call the underlying RPC 

351 on the server. 

352 """ 

353 # Generate a "stub function" on-the-fly which will actually make 

354 # the request. 

355 # gRPC handles serialization and deserialization, so we just need 

356 # to pass in the functions for each. 

357 if "create_topic" not in self._stubs: 

358 self._stubs["create_topic"] = self._logged_channel.unary_unary( 

359 "/google.pubsub.v1.Publisher/CreateTopic", 

360 request_serializer=pubsub.Topic.serialize, 

361 response_deserializer=pubsub.Topic.deserialize, 

362 ) 

363 return self._stubs["create_topic"] 

364 

365 @property 

366 def update_topic( 

367 self, 

368 ) -> Callable[[pubsub.UpdateTopicRequest], Awaitable[pubsub.Topic]]: 

369 r"""Return a callable for the update topic method over gRPC. 

370 

371 Updates an existing topic by updating the fields 

372 specified in the update mask. Note that certain 

373 properties of a topic are not modifiable. 

374 

375 Returns: 

376 Callable[[~.UpdateTopicRequest], 

377 Awaitable[~.Topic]]: 

378 A function that, when called, will call the underlying RPC 

379 on the server. 

380 """ 

381 # Generate a "stub function" on-the-fly which will actually make 

382 # the request. 

383 # gRPC handles serialization and deserialization, so we just need 

384 # to pass in the functions for each. 

385 if "update_topic" not in self._stubs: 

386 self._stubs["update_topic"] = self._logged_channel.unary_unary( 

387 "/google.pubsub.v1.Publisher/UpdateTopic", 

388 request_serializer=pubsub.UpdateTopicRequest.serialize, 

389 response_deserializer=pubsub.Topic.deserialize, 

390 ) 

391 return self._stubs["update_topic"] 

392 

393 @property 

394 def publish( 

395 self, 

396 ) -> Callable[[pubsub.PublishRequest], Awaitable[pubsub.PublishResponse]]: 

397 r"""Return a callable for the publish method over gRPC. 

398 

399 Adds one or more messages to the topic. Returns ``NOT_FOUND`` if 

400 the topic does not exist. 

401 

402 Returns: 

403 Callable[[~.PublishRequest], 

404 Awaitable[~.PublishResponse]]: 

405 A function that, when called, will call the underlying RPC 

406 on the server. 

407 """ 

408 # Generate a "stub function" on-the-fly which will actually make 

409 # the request. 

410 # gRPC handles serialization and deserialization, so we just need 

411 # to pass in the functions for each. 

412 if "publish" not in self._stubs: 

413 self._stubs["publish"] = self._logged_channel.unary_unary( 

414 "/google.pubsub.v1.Publisher/Publish", 

415 request_serializer=pubsub.PublishRequest.serialize, 

416 response_deserializer=pubsub.PublishResponse.deserialize, 

417 ) 

418 return self._stubs["publish"] 

419 

420 @property 

421 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], Awaitable[pubsub.Topic]]: 

422 r"""Return a callable for the get topic method over gRPC. 

423 

424 Gets the configuration of a topic. 

425 

426 Returns: 

427 Callable[[~.GetTopicRequest], 

428 Awaitable[~.Topic]]: 

429 A function that, when called, will call the underlying RPC 

430 on the server. 

431 """ 

432 # Generate a "stub function" on-the-fly which will actually make 

433 # the request. 

434 # gRPC handles serialization and deserialization, so we just need 

435 # to pass in the functions for each. 

436 if "get_topic" not in self._stubs: 

437 self._stubs["get_topic"] = self._logged_channel.unary_unary( 

438 "/google.pubsub.v1.Publisher/GetTopic", 

439 request_serializer=pubsub.GetTopicRequest.serialize, 

440 response_deserializer=pubsub.Topic.deserialize, 

441 ) 

442 return self._stubs["get_topic"] 

443 

444 @property 

445 def list_topics( 

446 self, 

447 ) -> Callable[[pubsub.ListTopicsRequest], Awaitable[pubsub.ListTopicsResponse]]: 

448 r"""Return a callable for the list topics method over gRPC. 

449 

450 Lists matching topics. 

451 

452 Returns: 

453 Callable[[~.ListTopicsRequest], 

454 Awaitable[~.ListTopicsResponse]]: 

455 A function that, when called, will call the underlying RPC 

456 on the server. 

457 """ 

458 # Generate a "stub function" on-the-fly which will actually make 

459 # the request. 

460 # gRPC handles serialization and deserialization, so we just need 

461 # to pass in the functions for each. 

462 if "list_topics" not in self._stubs: 

463 self._stubs["list_topics"] = self._logged_channel.unary_unary( 

464 "/google.pubsub.v1.Publisher/ListTopics", 

465 request_serializer=pubsub.ListTopicsRequest.serialize, 

466 response_deserializer=pubsub.ListTopicsResponse.deserialize, 

467 ) 

468 return self._stubs["list_topics"] 

469 

470 @property 

471 def list_topic_subscriptions( 

472 self, 

473 ) -> Callable[ 

474 [pubsub.ListTopicSubscriptionsRequest], 

475 Awaitable[pubsub.ListTopicSubscriptionsResponse], 

476 ]: 

477 r"""Return a callable for the list topic subscriptions method over gRPC. 

478 

479 Lists the names of the attached subscriptions on this 

480 topic. 

481 

482 Returns: 

483 Callable[[~.ListTopicSubscriptionsRequest], 

484 Awaitable[~.ListTopicSubscriptionsResponse]]: 

485 A function that, when called, will call the underlying RPC 

486 on the server. 

487 """ 

488 # Generate a "stub function" on-the-fly which will actually make 

489 # the request. 

490 # gRPC handles serialization and deserialization, so we just need 

491 # to pass in the functions for each. 

492 if "list_topic_subscriptions" not in self._stubs: 

493 self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary( 

494 "/google.pubsub.v1.Publisher/ListTopicSubscriptions", 

495 request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize, 

496 response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize, 

497 ) 

498 return self._stubs["list_topic_subscriptions"] 

499 

500 @property 

501 def list_topic_snapshots( 

502 self, 

503 ) -> Callable[ 

504 [pubsub.ListTopicSnapshotsRequest], Awaitable[pubsub.ListTopicSnapshotsResponse] 

505 ]: 

506 r"""Return a callable for the list topic snapshots method over gRPC. 

507 

508 Lists the names of the snapshots on this topic. Snapshots are 

509 used in 

510 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

511 operations, which allow you to manage message acknowledgments in 

512 bulk. That is, you can set the acknowledgment state of messages 

513 in an existing subscription to the state captured by a snapshot. 

514 

515 Returns: 

516 Callable[[~.ListTopicSnapshotsRequest], 

517 Awaitable[~.ListTopicSnapshotsResponse]]: 

518 A function that, when called, will call the underlying RPC 

519 on the server. 

520 """ 

521 # Generate a "stub function" on-the-fly which will actually make 

522 # the request. 

523 # gRPC handles serialization and deserialization, so we just need 

524 # to pass in the functions for each. 

525 if "list_topic_snapshots" not in self._stubs: 

526 self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary( 

527 "/google.pubsub.v1.Publisher/ListTopicSnapshots", 

528 request_serializer=pubsub.ListTopicSnapshotsRequest.serialize, 

529 response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize, 

530 ) 

531 return self._stubs["list_topic_snapshots"] 

532 

533 @property 

534 def delete_topic( 

535 self, 

536 ) -> Callable[[pubsub.DeleteTopicRequest], Awaitable[empty_pb2.Empty]]: 

537 r"""Return a callable for the delete topic method over gRPC. 

538 

539 Deletes the topic with the given name. Returns ``NOT_FOUND`` if 

540 the topic does not exist. After a topic is deleted, a new topic 

541 may be created with the same name; this is an entirely new topic 

542 with none of the old configuration or subscriptions. Existing 

543 subscriptions to this topic are not deleted, but their ``topic`` 

544 field is set to ``_deleted-topic_``. 

545 

546 Returns: 

547 Callable[[~.DeleteTopicRequest], 

548 Awaitable[~.Empty]]: 

549 A function that, when called, will call the underlying RPC 

550 on the server. 

551 """ 

552 # Generate a "stub function" on-the-fly which will actually make 

553 # the request. 

554 # gRPC handles serialization and deserialization, so we just need 

555 # to pass in the functions for each. 

556 if "delete_topic" not in self._stubs: 

557 self._stubs["delete_topic"] = self._logged_channel.unary_unary( 

558 "/google.pubsub.v1.Publisher/DeleteTopic", 

559 request_serializer=pubsub.DeleteTopicRequest.serialize, 

560 response_deserializer=empty_pb2.Empty.FromString, 

561 ) 

562 return self._stubs["delete_topic"] 

563 

564 @property 

565 def detach_subscription( 

566 self, 

567 ) -> Callable[ 

568 [pubsub.DetachSubscriptionRequest], Awaitable[pubsub.DetachSubscriptionResponse] 

569 ]: 

570 r"""Return a callable for the detach subscription method over gRPC. 

571 

572 Detaches a subscription from this topic. All messages retained 

573 in the subscription are dropped. Subsequent ``Pull`` and 

574 ``StreamingPull`` requests will return FAILED_PRECONDITION. If 

575 the subscription is a push subscription, pushes to the endpoint 

576 will stop. 

577 

578 Returns: 

579 Callable[[~.DetachSubscriptionRequest], 

580 Awaitable[~.DetachSubscriptionResponse]]: 

581 A function that, when called, will call the underlying RPC 

582 on the server. 

583 """ 

584 # Generate a "stub function" on-the-fly which will actually make 

585 # the request. 

586 # gRPC handles serialization and deserialization, so we just need 

587 # to pass in the functions for each. 

588 if "detach_subscription" not in self._stubs: 

589 self._stubs["detach_subscription"] = self._logged_channel.unary_unary( 

590 "/google.pubsub.v1.Publisher/DetachSubscription", 

591 request_serializer=pubsub.DetachSubscriptionRequest.serialize, 

592 response_deserializer=pubsub.DetachSubscriptionResponse.deserialize, 

593 ) 

594 return self._stubs["detach_subscription"] 

595 

596 def _prep_wrapped_messages(self, client_info): 

597 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

598 self._wrapped_methods = { 

599 self.create_topic: self._wrap_method( 

600 self.create_topic, 

601 default_retry=retries.AsyncRetry( 

602 initial=0.1, 

603 maximum=60.0, 

604 multiplier=1.3, 

605 predicate=retries.if_exception_type( 

606 core_exceptions.ServiceUnavailable, 

607 ), 

608 deadline=60.0, 

609 ), 

610 default_timeout=60.0, 

611 client_info=client_info, 

612 ), 

613 self.update_topic: self._wrap_method( 

614 self.update_topic, 

615 default_retry=retries.AsyncRetry( 

616 initial=0.1, 

617 maximum=60.0, 

618 multiplier=1.3, 

619 predicate=retries.if_exception_type( 

620 core_exceptions.ServiceUnavailable, 

621 ), 

622 deadline=60.0, 

623 ), 

624 default_timeout=60.0, 

625 client_info=client_info, 

626 ), 

627 self.publish: self._wrap_method( 

628 self.publish, 

629 default_retry=retries.AsyncRetry( 

630 initial=0.1, 

631 maximum=60.0, 

632 multiplier=4, 

633 predicate=retries.if_exception_type( 

634 core_exceptions.Aborted, 

635 core_exceptions.Cancelled, 

636 core_exceptions.DeadlineExceeded, 

637 core_exceptions.InternalServerError, 

638 core_exceptions.ResourceExhausted, 

639 core_exceptions.ServiceUnavailable, 

640 core_exceptions.Unknown, 

641 ), 

642 deadline=60.0, 

643 ), 

644 default_timeout=60.0, 

645 client_info=client_info, 

646 ), 

647 self.get_topic: self._wrap_method( 

648 self.get_topic, 

649 default_retry=retries.AsyncRetry( 

650 initial=0.1, 

651 maximum=60.0, 

652 multiplier=1.3, 

653 predicate=retries.if_exception_type( 

654 core_exceptions.Aborted, 

655 core_exceptions.ServiceUnavailable, 

656 core_exceptions.Unknown, 

657 ), 

658 deadline=60.0, 

659 ), 

660 default_timeout=60.0, 

661 client_info=client_info, 

662 ), 

663 self.list_topics: self._wrap_method( 

664 self.list_topics, 

665 default_retry=retries.AsyncRetry( 

666 initial=0.1, 

667 maximum=60.0, 

668 multiplier=1.3, 

669 predicate=retries.if_exception_type( 

670 core_exceptions.Aborted, 

671 core_exceptions.ServiceUnavailable, 

672 core_exceptions.Unknown, 

673 ), 

674 deadline=60.0, 

675 ), 

676 default_timeout=60.0, 

677 client_info=client_info, 

678 ), 

679 self.list_topic_subscriptions: self._wrap_method( 

680 self.list_topic_subscriptions, 

681 default_retry=retries.AsyncRetry( 

682 initial=0.1, 

683 maximum=60.0, 

684 multiplier=1.3, 

685 predicate=retries.if_exception_type( 

686 core_exceptions.Aborted, 

687 core_exceptions.ServiceUnavailable, 

688 core_exceptions.Unknown, 

689 ), 

690 deadline=60.0, 

691 ), 

692 default_timeout=60.0, 

693 client_info=client_info, 

694 ), 

695 self.list_topic_snapshots: self._wrap_method( 

696 self.list_topic_snapshots, 

697 default_retry=retries.AsyncRetry( 

698 initial=0.1, 

699 maximum=60.0, 

700 multiplier=1.3, 

701 predicate=retries.if_exception_type( 

702 core_exceptions.Aborted, 

703 core_exceptions.ServiceUnavailable, 

704 core_exceptions.Unknown, 

705 ), 

706 deadline=60.0, 

707 ), 

708 default_timeout=60.0, 

709 client_info=client_info, 

710 ), 

711 self.delete_topic: self._wrap_method( 

712 self.delete_topic, 

713 default_retry=retries.AsyncRetry( 

714 initial=0.1, 

715 maximum=60.0, 

716 multiplier=1.3, 

717 predicate=retries.if_exception_type( 

718 core_exceptions.ServiceUnavailable, 

719 ), 

720 deadline=60.0, 

721 ), 

722 default_timeout=60.0, 

723 client_info=client_info, 

724 ), 

725 self.detach_subscription: self._wrap_method( 

726 self.detach_subscription, 

727 default_retry=retries.AsyncRetry( 

728 initial=0.1, 

729 maximum=60.0, 

730 multiplier=1.3, 

731 predicate=retries.if_exception_type( 

732 core_exceptions.ServiceUnavailable, 

733 ), 

734 deadline=60.0, 

735 ), 

736 default_timeout=60.0, 

737 client_info=client_info, 

738 ), 

739 self.get_iam_policy: self._wrap_method( 

740 self.get_iam_policy, 

741 default_timeout=None, 

742 client_info=client_info, 

743 ), 

744 self.set_iam_policy: self._wrap_method( 

745 self.set_iam_policy, 

746 default_timeout=None, 

747 client_info=client_info, 

748 ), 

749 self.test_iam_permissions: self._wrap_method( 

750 self.test_iam_permissions, 

751 default_timeout=None, 

752 client_info=client_info, 

753 ), 

754 } 

755 

756 def _wrap_method(self, func, *args, **kwargs): 

757 if self._wrap_with_kind: # pragma: NO COVER 

758 kwargs["kind"] = self.kind 

759 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

760 

761 def close(self): 

762 return self._logged_channel.close() 

763 

764 @property 

765 def kind(self) -> str: 

766 return "grpc_asyncio" 

767 

768 @property 

769 def set_iam_policy( 

770 self, 

771 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

772 r"""Return a callable for the set iam policy method over gRPC. 

773 Sets the IAM access control policy on the specified 

774 function. Replaces any existing policy. 

775 Returns: 

776 Callable[[~.SetIamPolicyRequest], 

777 ~.Policy]: 

778 A function that, when called, will call the underlying RPC 

779 on the server. 

780 """ 

781 # Generate a "stub function" on-the-fly which will actually make 

782 # the request. 

783 # gRPC handles serialization and deserialization, so we just need 

784 # to pass in the functions for each. 

785 if "set_iam_policy" not in self._stubs: 

786 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

787 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

788 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

789 response_deserializer=policy_pb2.Policy.FromString, 

790 ) 

791 return self._stubs["set_iam_policy"] 

792 

793 @property 

794 def get_iam_policy( 

795 self, 

796 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

797 r"""Return a callable for the get iam policy method over gRPC. 

798 Gets the IAM access control policy for a function. 

799 Returns an empty policy if the function exists and does 

800 not have a policy set. 

801 Returns: 

802 Callable[[~.GetIamPolicyRequest], 

803 ~.Policy]: 

804 A function that, when called, will call the underlying RPC 

805 on the server. 

806 """ 

807 # Generate a "stub function" on-the-fly which will actually make 

808 # the request. 

809 # gRPC handles serialization and deserialization, so we just need 

810 # to pass in the functions for each. 

811 if "get_iam_policy" not in self._stubs: 

812 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

813 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

814 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

815 response_deserializer=policy_pb2.Policy.FromString, 

816 ) 

817 return self._stubs["get_iam_policy"] 

818 

819 @property 

820 def test_iam_permissions( 

821 self, 

822 ) -> Callable[ 

823 [iam_policy_pb2.TestIamPermissionsRequest], 

824 iam_policy_pb2.TestIamPermissionsResponse, 

825 ]: 

826 r"""Return a callable for the test iam permissions method over gRPC. 

827 Tests the specified permissions against the IAM access control 

828 policy for a function. If the function does not exist, this will 

829 return an empty set of permissions, not a NOT_FOUND error. 

830 Returns: 

831 Callable[[~.TestIamPermissionsRequest], 

832 ~.TestIamPermissionsResponse]: 

833 A function that, when called, will call the underlying RPC 

834 on the server. 

835 """ 

836 # Generate a "stub function" on-the-fly which will actually make 

837 # the request. 

838 # gRPC handles serialization and deserialization, so we just need 

839 # to pass in the functions for each. 

840 if "test_iam_permissions" not in self._stubs: 

841 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

842 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

843 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

844 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

845 ) 

846 return self._stubs["test_iam_permissions"] 

847 

848 

849__all__ = ("PublisherGrpcAsyncIOTransport",)