Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/grpc_asyncio.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

137 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import pickle 

19import logging as std_logging 

20import warnings 

21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

22 

23from google.api_core import gapic_v1 

24from google.api_core import grpc_helpers_async 

25from google.api_core import exceptions as core_exceptions 

26from google.api_core import retry_async as retries 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30import google.protobuf.message 

31 

32import grpc # type: ignore 

33import proto # type: ignore 

34from grpc.experimental import aio # type: ignore 

35 

36from google.iam.v1 import iam_policy_pb2 # type: ignore 

37from google.iam.v1 import policy_pb2 # type: ignore 

38from google.protobuf import empty_pb2 # type: ignore 

39from google.pubsub_v1.types import pubsub 

40from .base import PublisherTransport, DEFAULT_CLIENT_INFO 

41from .grpc import PublisherGrpcTransport 

42 

43try: 

44 from google.api_core import client_logging # type: ignore 

45 

46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

47except ImportError: # pragma: NO COVER 

48 CLIENT_LOGGING_SUPPORTED = False 

49 

50_LOGGER = std_logging.getLogger(__name__) 

51 

52 

53class _LoggingClientAIOInterceptor( 

54 grpc.aio.UnaryUnaryClientInterceptor 

55): # pragma: NO COVER 

56 async def intercept_unary_unary(self, continuation, client_call_details, request): 

57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

58 std_logging.DEBUG 

59 ) 

60 if logging_enabled: # pragma: NO COVER 

61 request_metadata = client_call_details.metadata 

62 if isinstance(request, proto.Message): 

63 request_payload = type(request).to_json(request) 

64 elif isinstance(request, google.protobuf.message.Message): 

65 request_payload = MessageToJson(request) 

66 else: 

67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

68 

69 request_metadata = { 

70 key: value.decode("utf-8") if isinstance(value, bytes) else value 

71 for key, value in request_metadata 

72 } 

73 grpc_request = { 

74 "payload": request_payload, 

75 "requestMethod": "grpc", 

76 "metadata": dict(request_metadata), 

77 } 

78 _LOGGER.debug( 

79 f"Sending request for {client_call_details.method}", 

80 extra={ 

81 "serviceName": "google.pubsub.v1.Publisher", 

82 "rpcName": str(client_call_details.method), 

83 "request": grpc_request, 

84 "metadata": grpc_request["metadata"], 

85 }, 

86 ) 

87 response = await continuation(client_call_details, request) 

88 if logging_enabled: # pragma: NO COVER 

89 response_metadata = await response.trailing_metadata() 

90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

91 metadata = ( 

92 dict([(k, str(v)) for k, v in response_metadata]) 

93 if response_metadata 

94 else None 

95 ) 

96 result = await response 

97 if isinstance(result, proto.Message): 

98 response_payload = type(result).to_json(result) 

99 elif isinstance(result, google.protobuf.message.Message): 

100 response_payload = MessageToJson(result) 

101 else: 

102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

103 grpc_response = { 

104 "payload": response_payload, 

105 "metadata": metadata, 

106 "status": "OK", 

107 } 

108 _LOGGER.debug( 

109 f"Received response to rpc {client_call_details.method}.", 

110 extra={ 

111 "serviceName": "google.pubsub.v1.Publisher", 

112 "rpcName": str(client_call_details.method), 

113 "response": grpc_response, 

114 "metadata": grpc_response["metadata"], 

115 }, 

116 ) 

117 return response 

118 

119 

120class PublisherGrpcAsyncIOTransport(PublisherTransport): 

121 """gRPC AsyncIO backend transport for Publisher. 

122 

123 The service that an application uses to manipulate topics, 

124 and to send messages to a topic. 

125 

126 This class defines the same methods as the primary client, so the 

127 primary client can load the underlying transport implementation 

128 and call it. 

129 

130 It sends protocol buffers over the wire using gRPC (which is built on 

131 top of HTTP/2); the ``grpcio`` package must be installed. 

132 """ 

133 

134 _grpc_channel: aio.Channel 

135 _stubs: Dict[str, Callable] = {} 

136 

137 @classmethod 

138 def create_channel( 

139 cls, 

140 host: str = "pubsub.googleapis.com", 

141 credentials: Optional[ga_credentials.Credentials] = None, 

142 credentials_file: Optional[str] = None, 

143 scopes: Optional[Sequence[str]] = None, 

144 quota_project_id: Optional[str] = None, 

145 **kwargs, 

146 ) -> aio.Channel: 

147 """Create and return a gRPC AsyncIO channel object. 

148 Args: 

149 host (Optional[str]): The host for the channel to use. 

150 credentials (Optional[~.Credentials]): The 

151 authorization credentials to attach to requests. These 

152 credentials identify this application to the service. If 

153 none are specified, the client will attempt to ascertain 

154 the credentials from the environment. 

155 credentials_file (Optional[str]): A file with credentials that can 

156 be loaded with :func:`google.auth.load_credentials_from_file`. 

157 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

158 service. These are only used when credentials are not specified and 

159 are passed to :func:`google.auth.default`. 

160 quota_project_id (Optional[str]): An optional project to use for billing 

161 and quota. 

162 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

163 channel creation. 

164 Returns: 

165 aio.Channel: A gRPC AsyncIO channel object. 

166 """ 

167 

168 return grpc_helpers_async.create_channel( 

169 host, 

170 credentials=credentials, 

171 credentials_file=credentials_file, 

172 quota_project_id=quota_project_id, 

173 default_scopes=cls.AUTH_SCOPES, 

174 scopes=scopes, 

175 default_host=cls.DEFAULT_HOST, 

176 **kwargs, 

177 ) 

178 

179 def __init__( 

180 self, 

181 *, 

182 host: str = "pubsub.googleapis.com", 

183 credentials: Optional[ga_credentials.Credentials] = None, 

184 credentials_file: Optional[str] = None, 

185 scopes: Optional[Sequence[str]] = None, 

186 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

187 api_mtls_endpoint: Optional[str] = None, 

188 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

189 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

190 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

191 quota_project_id: Optional[str] = None, 

192 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

193 always_use_jwt_access: Optional[bool] = False, 

194 api_audience: Optional[str] = None, 

195 ) -> None: 

196 """Instantiate the transport. 

197 

198 Args: 

199 host (Optional[str]): 

200 The hostname to connect to (default: 'pubsub.googleapis.com'). 

201 credentials (Optional[google.auth.credentials.Credentials]): The 

202 authorization credentials to attach to requests. These 

203 credentials identify the application to the service; if none 

204 are specified, the client will attempt to ascertain the 

205 credentials from the environment. 

206 This argument is ignored if a ``channel`` instance is provided. 

207 credentials_file (Optional[str]): A file with credentials that can 

208 be loaded with :func:`google.auth.load_credentials_from_file`. 

209 This argument is ignored if a ``channel`` instance is provided. 

210 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

211 service. These are only used when credentials are not specified and 

212 are passed to :func:`google.auth.default`. 

213 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

214 A ``Channel`` instance through which to make calls, or a Callable 

215 that constructs and returns one. If set to None, ``self.create_channel`` 

216 is used to create the channel. If a Callable is given, it will be called 

217 with the same arguments as used in ``self.create_channel``. 

218 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

219 If provided, it overrides the ``host`` argument and tries to create 

220 a mutual TLS channel with client SSL credentials from 

221 ``client_cert_source`` or application default SSL credentials. 

222 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

223 Deprecated. A callback to provide client SSL certificate bytes and 

224 private key bytes, both in PEM format. It is ignored if 

225 ``api_mtls_endpoint`` is None. 

226 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

227 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

228 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

229 A callback to provide client certificate bytes and private key bytes, 

230 both in PEM format. It is used to configure a mutual TLS channel. It is 

231 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

232 quota_project_id (Optional[str]): An optional project to use for billing 

233 and quota. 

234 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

235 The client info used to send a user-agent string along with 

236 API requests. If ``None``, then default info will be used. 

237 Generally, you only need to set this if you're developing 

238 your own client library. 

239 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

240 be used for service account credentials. 

241 

242 Raises: 

243 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

244 creation failed for any reason. 

245 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

246 and ``credentials_file`` are passed. 

247 """ 

248 self._grpc_channel = None 

249 self._ssl_channel_credentials = ssl_channel_credentials 

250 self._stubs: Dict[str, Callable] = {} 

251 

252 if api_mtls_endpoint: 

253 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

254 if client_cert_source: 

255 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

256 

257 if isinstance(channel, aio.Channel): 

258 # Ignore credentials if a channel was passed. 

259 credentials = None 

260 self._ignore_credentials = True 

261 # If a channel was explicitly provided, set it. 

262 self._grpc_channel = channel 

263 self._ssl_channel_credentials = None 

264 else: 

265 if api_mtls_endpoint: 

266 host = api_mtls_endpoint 

267 

268 # Create SSL credentials with client_cert_source or application 

269 # default SSL credentials. 

270 if client_cert_source: 

271 cert, key = client_cert_source() 

272 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

273 certificate_chain=cert, private_key=key 

274 ) 

275 else: 

276 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

277 

278 else: 

279 if client_cert_source_for_mtls and not ssl_channel_credentials: 

280 cert, key = client_cert_source_for_mtls() 

281 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

282 certificate_chain=cert, private_key=key 

283 ) 

284 

285 # The base transport sets the host, credentials and scopes 

286 super().__init__( 

287 host=host, 

288 credentials=credentials, 

289 credentials_file=credentials_file, 

290 scopes=scopes, 

291 quota_project_id=quota_project_id, 

292 client_info=client_info, 

293 always_use_jwt_access=always_use_jwt_access, 

294 api_audience=api_audience, 

295 ) 

296 

297 if not self._grpc_channel: 

298 # initialize with the provided callable or the default channel 

299 channel_init = channel or type(self).create_channel 

300 self._grpc_channel = channel_init( 

301 self._host, 

302 # use the credentials which are saved 

303 credentials=self._credentials, 

304 # Set ``credentials_file`` to ``None`` here as 

305 # the credentials that we saved earlier should be used. 

306 credentials_file=None, 

307 scopes=self._scopes, 

308 ssl_credentials=self._ssl_channel_credentials, 

309 quota_project_id=quota_project_id, 

310 options=[ 

311 ("grpc.max_send_message_length", -1), 

312 ("grpc.max_receive_message_length", -1), 

313 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

314 ("grpc.keepalive_time_ms", 30000), 

315 ], 

316 ) 

317 

318 self._interceptor = _LoggingClientAIOInterceptor() 

319 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

320 self._logged_channel = self._grpc_channel 

321 self._wrap_with_kind = ( 

322 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

323 ) 

324 # Wrap messages. This must be done after self._logged_channel exists 

325 self._prep_wrapped_messages(client_info) 

326 

327 @property 

328 def grpc_channel(self) -> aio.Channel: 

329 """Create the channel designed to connect to this service. 

330 

331 This property caches on the instance; repeated calls return 

332 the same channel. 

333 """ 

334 # Return the channel from cache. 

335 return self._grpc_channel 

336 

337 @property 

338 def create_topic(self) -> Callable[[pubsub.Topic], Awaitable[pubsub.Topic]]: 

339 r"""Return a callable for the create topic method over gRPC. 

340 

341 Creates the given topic with the given name. See the [resource 

342 name rules] 

343 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

344 

345 Returns: 

346 Callable[[~.Topic], 

347 Awaitable[~.Topic]]: 

348 A function that, when called, will call the underlying RPC 

349 on the server. 

350 """ 

351 # Generate a "stub function" on-the-fly which will actually make 

352 # the request. 

353 # gRPC handles serialization and deserialization, so we just need 

354 # to pass in the functions for each. 

355 if "create_topic" not in self._stubs: 

356 self._stubs["create_topic"] = self._logged_channel.unary_unary( 

357 "/google.pubsub.v1.Publisher/CreateTopic", 

358 request_serializer=pubsub.Topic.serialize, 

359 response_deserializer=pubsub.Topic.deserialize, 

360 ) 

361 return self._stubs["create_topic"] 

362 

363 @property 

364 def update_topic( 

365 self, 

366 ) -> Callable[[pubsub.UpdateTopicRequest], Awaitable[pubsub.Topic]]: 

367 r"""Return a callable for the update topic method over gRPC. 

368 

369 Updates an existing topic by updating the fields 

370 specified in the update mask. Note that certain 

371 properties of a topic are not modifiable. 

372 

373 Returns: 

374 Callable[[~.UpdateTopicRequest], 

375 Awaitable[~.Topic]]: 

376 A function that, when called, will call the underlying RPC 

377 on the server. 

378 """ 

379 # Generate a "stub function" on-the-fly which will actually make 

380 # the request. 

381 # gRPC handles serialization and deserialization, so we just need 

382 # to pass in the functions for each. 

383 if "update_topic" not in self._stubs: 

384 self._stubs["update_topic"] = self._logged_channel.unary_unary( 

385 "/google.pubsub.v1.Publisher/UpdateTopic", 

386 request_serializer=pubsub.UpdateTopicRequest.serialize, 

387 response_deserializer=pubsub.Topic.deserialize, 

388 ) 

389 return self._stubs["update_topic"] 

390 

391 @property 

392 def publish( 

393 self, 

394 ) -> Callable[[pubsub.PublishRequest], Awaitable[pubsub.PublishResponse]]: 

395 r"""Return a callable for the publish method over gRPC. 

396 

397 Adds one or more messages to the topic. Returns ``NOT_FOUND`` if 

398 the topic does not exist. 

399 

400 Returns: 

401 Callable[[~.PublishRequest], 

402 Awaitable[~.PublishResponse]]: 

403 A function that, when called, will call the underlying RPC 

404 on the server. 

405 """ 

406 # Generate a "stub function" on-the-fly which will actually make 

407 # the request. 

408 # gRPC handles serialization and deserialization, so we just need 

409 # to pass in the functions for each. 

410 if "publish" not in self._stubs: 

411 self._stubs["publish"] = self._logged_channel.unary_unary( 

412 "/google.pubsub.v1.Publisher/Publish", 

413 request_serializer=pubsub.PublishRequest.serialize, 

414 response_deserializer=pubsub.PublishResponse.deserialize, 

415 ) 

416 return self._stubs["publish"] 

417 

418 @property 

419 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], Awaitable[pubsub.Topic]]: 

420 r"""Return a callable for the get topic method over gRPC. 

421 

422 Gets the configuration of a topic. 

423 

424 Returns: 

425 Callable[[~.GetTopicRequest], 

426 Awaitable[~.Topic]]: 

427 A function that, when called, will call the underlying RPC 

428 on the server. 

429 """ 

430 # Generate a "stub function" on-the-fly which will actually make 

431 # the request. 

432 # gRPC handles serialization and deserialization, so we just need 

433 # to pass in the functions for each. 

434 if "get_topic" not in self._stubs: 

435 self._stubs["get_topic"] = self._logged_channel.unary_unary( 

436 "/google.pubsub.v1.Publisher/GetTopic", 

437 request_serializer=pubsub.GetTopicRequest.serialize, 

438 response_deserializer=pubsub.Topic.deserialize, 

439 ) 

440 return self._stubs["get_topic"] 

441 

442 @property 

443 def list_topics( 

444 self, 

445 ) -> Callable[[pubsub.ListTopicsRequest], Awaitable[pubsub.ListTopicsResponse]]: 

446 r"""Return a callable for the list topics method over gRPC. 

447 

448 Lists matching topics. 

449 

450 Returns: 

451 Callable[[~.ListTopicsRequest], 

452 Awaitable[~.ListTopicsResponse]]: 

453 A function that, when called, will call the underlying RPC 

454 on the server. 

455 """ 

456 # Generate a "stub function" on-the-fly which will actually make 

457 # the request. 

458 # gRPC handles serialization and deserialization, so we just need 

459 # to pass in the functions for each. 

460 if "list_topics" not in self._stubs: 

461 self._stubs["list_topics"] = self._logged_channel.unary_unary( 

462 "/google.pubsub.v1.Publisher/ListTopics", 

463 request_serializer=pubsub.ListTopicsRequest.serialize, 

464 response_deserializer=pubsub.ListTopicsResponse.deserialize, 

465 ) 

466 return self._stubs["list_topics"] 

467 

468 @property 

469 def list_topic_subscriptions( 

470 self, 

471 ) -> Callable[ 

472 [pubsub.ListTopicSubscriptionsRequest], 

473 Awaitable[pubsub.ListTopicSubscriptionsResponse], 

474 ]: 

475 r"""Return a callable for the list topic subscriptions method over gRPC. 

476 

477 Lists the names of the attached subscriptions on this 

478 topic. 

479 

480 Returns: 

481 Callable[[~.ListTopicSubscriptionsRequest], 

482 Awaitable[~.ListTopicSubscriptionsResponse]]: 

483 A function that, when called, will call the underlying RPC 

484 on the server. 

485 """ 

486 # Generate a "stub function" on-the-fly which will actually make 

487 # the request. 

488 # gRPC handles serialization and deserialization, so we just need 

489 # to pass in the functions for each. 

490 if "list_topic_subscriptions" not in self._stubs: 

491 self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary( 

492 "/google.pubsub.v1.Publisher/ListTopicSubscriptions", 

493 request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize, 

494 response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize, 

495 ) 

496 return self._stubs["list_topic_subscriptions"] 

497 

498 @property 

499 def list_topic_snapshots( 

500 self, 

501 ) -> Callable[ 

502 [pubsub.ListTopicSnapshotsRequest], Awaitable[pubsub.ListTopicSnapshotsResponse] 

503 ]: 

504 r"""Return a callable for the list topic snapshots method over gRPC. 

505 

506 Lists the names of the snapshots on this topic. Snapshots are 

507 used in 

508 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

509 operations, which allow you to manage message acknowledgments in 

510 bulk. That is, you can set the acknowledgment state of messages 

511 in an existing subscription to the state captured by a snapshot. 

512 

513 Returns: 

514 Callable[[~.ListTopicSnapshotsRequest], 

515 Awaitable[~.ListTopicSnapshotsResponse]]: 

516 A function that, when called, will call the underlying RPC 

517 on the server. 

518 """ 

519 # Generate a "stub function" on-the-fly which will actually make 

520 # the request. 

521 # gRPC handles serialization and deserialization, so we just need 

522 # to pass in the functions for each. 

523 if "list_topic_snapshots" not in self._stubs: 

524 self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary( 

525 "/google.pubsub.v1.Publisher/ListTopicSnapshots", 

526 request_serializer=pubsub.ListTopicSnapshotsRequest.serialize, 

527 response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize, 

528 ) 

529 return self._stubs["list_topic_snapshots"] 

530 

531 @property 

532 def delete_topic( 

533 self, 

534 ) -> Callable[[pubsub.DeleteTopicRequest], Awaitable[empty_pb2.Empty]]: 

535 r"""Return a callable for the delete topic method over gRPC. 

536 

537 Deletes the topic with the given name. Returns ``NOT_FOUND`` if 

538 the topic does not exist. After a topic is deleted, a new topic 

539 may be created with the same name; this is an entirely new topic 

540 with none of the old configuration or subscriptions. Existing 

541 subscriptions to this topic are not deleted, but their ``topic`` 

542 field is set to ``_deleted-topic_``. 

543 

544 Returns: 

545 Callable[[~.DeleteTopicRequest], 

546 Awaitable[~.Empty]]: 

547 A function that, when called, will call the underlying RPC 

548 on the server. 

549 """ 

550 # Generate a "stub function" on-the-fly which will actually make 

551 # the request. 

552 # gRPC handles serialization and deserialization, so we just need 

553 # to pass in the functions for each. 

554 if "delete_topic" not in self._stubs: 

555 self._stubs["delete_topic"] = self._logged_channel.unary_unary( 

556 "/google.pubsub.v1.Publisher/DeleteTopic", 

557 request_serializer=pubsub.DeleteTopicRequest.serialize, 

558 response_deserializer=empty_pb2.Empty.FromString, 

559 ) 

560 return self._stubs["delete_topic"] 

561 

562 @property 

563 def detach_subscription( 

564 self, 

565 ) -> Callable[ 

566 [pubsub.DetachSubscriptionRequest], Awaitable[pubsub.DetachSubscriptionResponse] 

567 ]: 

568 r"""Return a callable for the detach subscription method over gRPC. 

569 

570 Detaches a subscription from this topic. All messages retained 

571 in the subscription are dropped. Subsequent ``Pull`` and 

572 ``StreamingPull`` requests will return FAILED_PRECONDITION. If 

573 the subscription is a push subscription, pushes to the endpoint 

574 will stop. 

575 

576 Returns: 

577 Callable[[~.DetachSubscriptionRequest], 

578 Awaitable[~.DetachSubscriptionResponse]]: 

579 A function that, when called, will call the underlying RPC 

580 on the server. 

581 """ 

582 # Generate a "stub function" on-the-fly which will actually make 

583 # the request. 

584 # gRPC handles serialization and deserialization, so we just need 

585 # to pass in the functions for each. 

586 if "detach_subscription" not in self._stubs: 

587 self._stubs["detach_subscription"] = self._logged_channel.unary_unary( 

588 "/google.pubsub.v1.Publisher/DetachSubscription", 

589 request_serializer=pubsub.DetachSubscriptionRequest.serialize, 

590 response_deserializer=pubsub.DetachSubscriptionResponse.deserialize, 

591 ) 

592 return self._stubs["detach_subscription"] 

593 

594 def _prep_wrapped_messages(self, client_info): 

595 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

596 self._wrapped_methods = { 

597 self.create_topic: self._wrap_method( 

598 self.create_topic, 

599 default_retry=retries.AsyncRetry( 

600 initial=0.1, 

601 maximum=60.0, 

602 multiplier=1.3, 

603 predicate=retries.if_exception_type( 

604 core_exceptions.ServiceUnavailable, 

605 ), 

606 deadline=60.0, 

607 ), 

608 default_timeout=60.0, 

609 client_info=client_info, 

610 ), 

611 self.update_topic: self._wrap_method( 

612 self.update_topic, 

613 default_retry=retries.AsyncRetry( 

614 initial=0.1, 

615 maximum=60.0, 

616 multiplier=1.3, 

617 predicate=retries.if_exception_type( 

618 core_exceptions.ServiceUnavailable, 

619 ), 

620 deadline=60.0, 

621 ), 

622 default_timeout=60.0, 

623 client_info=client_info, 

624 ), 

625 self.publish: self._wrap_method( 

626 self.publish, 

627 default_retry=retries.AsyncRetry( 

628 initial=0.1, 

629 maximum=60.0, 

630 multiplier=4, 

631 predicate=retries.if_exception_type( 

632 core_exceptions.Aborted, 

633 core_exceptions.Cancelled, 

634 core_exceptions.DeadlineExceeded, 

635 core_exceptions.InternalServerError, 

636 core_exceptions.ResourceExhausted, 

637 core_exceptions.ServiceUnavailable, 

638 core_exceptions.Unknown, 

639 ), 

640 deadline=60.0, 

641 ), 

642 default_timeout=60.0, 

643 client_info=client_info, 

644 ), 

645 self.get_topic: self._wrap_method( 

646 self.get_topic, 

647 default_retry=retries.AsyncRetry( 

648 initial=0.1, 

649 maximum=60.0, 

650 multiplier=1.3, 

651 predicate=retries.if_exception_type( 

652 core_exceptions.Aborted, 

653 core_exceptions.ServiceUnavailable, 

654 core_exceptions.Unknown, 

655 ), 

656 deadline=60.0, 

657 ), 

658 default_timeout=60.0, 

659 client_info=client_info, 

660 ), 

661 self.list_topics: self._wrap_method( 

662 self.list_topics, 

663 default_retry=retries.AsyncRetry( 

664 initial=0.1, 

665 maximum=60.0, 

666 multiplier=1.3, 

667 predicate=retries.if_exception_type( 

668 core_exceptions.Aborted, 

669 core_exceptions.ServiceUnavailable, 

670 core_exceptions.Unknown, 

671 ), 

672 deadline=60.0, 

673 ), 

674 default_timeout=60.0, 

675 client_info=client_info, 

676 ), 

677 self.list_topic_subscriptions: self._wrap_method( 

678 self.list_topic_subscriptions, 

679 default_retry=retries.AsyncRetry( 

680 initial=0.1, 

681 maximum=60.0, 

682 multiplier=1.3, 

683 predicate=retries.if_exception_type( 

684 core_exceptions.Aborted, 

685 core_exceptions.ServiceUnavailable, 

686 core_exceptions.Unknown, 

687 ), 

688 deadline=60.0, 

689 ), 

690 default_timeout=60.0, 

691 client_info=client_info, 

692 ), 

693 self.list_topic_snapshots: self._wrap_method( 

694 self.list_topic_snapshots, 

695 default_retry=retries.AsyncRetry( 

696 initial=0.1, 

697 maximum=60.0, 

698 multiplier=1.3, 

699 predicate=retries.if_exception_type( 

700 core_exceptions.Aborted, 

701 core_exceptions.ServiceUnavailable, 

702 core_exceptions.Unknown, 

703 ), 

704 deadline=60.0, 

705 ), 

706 default_timeout=60.0, 

707 client_info=client_info, 

708 ), 

709 self.delete_topic: self._wrap_method( 

710 self.delete_topic, 

711 default_retry=retries.AsyncRetry( 

712 initial=0.1, 

713 maximum=60.0, 

714 multiplier=1.3, 

715 predicate=retries.if_exception_type( 

716 core_exceptions.ServiceUnavailable, 

717 ), 

718 deadline=60.0, 

719 ), 

720 default_timeout=60.0, 

721 client_info=client_info, 

722 ), 

723 self.detach_subscription: self._wrap_method( 

724 self.detach_subscription, 

725 default_retry=retries.AsyncRetry( 

726 initial=0.1, 

727 maximum=60.0, 

728 multiplier=1.3, 

729 predicate=retries.if_exception_type( 

730 core_exceptions.ServiceUnavailable, 

731 ), 

732 deadline=60.0, 

733 ), 

734 default_timeout=60.0, 

735 client_info=client_info, 

736 ), 

737 self.get_iam_policy: self._wrap_method( 

738 self.get_iam_policy, 

739 default_timeout=None, 

740 client_info=client_info, 

741 ), 

742 self.set_iam_policy: self._wrap_method( 

743 self.set_iam_policy, 

744 default_timeout=None, 

745 client_info=client_info, 

746 ), 

747 self.test_iam_permissions: self._wrap_method( 

748 self.test_iam_permissions, 

749 default_timeout=None, 

750 client_info=client_info, 

751 ), 

752 } 

753 

754 def _wrap_method(self, func, *args, **kwargs): 

755 if self._wrap_with_kind: # pragma: NO COVER 

756 kwargs["kind"] = self.kind 

757 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

758 

759 def close(self): 

760 return self._logged_channel.close() 

761 

762 @property 

763 def kind(self) -> str: 

764 return "grpc_asyncio" 

765 

766 @property 

767 def set_iam_policy( 

768 self, 

769 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

770 r"""Return a callable for the set iam policy method over gRPC. 

771 Sets the IAM access control policy on the specified 

772 function. Replaces any existing policy. 

773 Returns: 

774 Callable[[~.SetIamPolicyRequest], 

775 ~.Policy]: 

776 A function that, when called, will call the underlying RPC 

777 on the server. 

778 """ 

779 # Generate a "stub function" on-the-fly which will actually make 

780 # the request. 

781 # gRPC handles serialization and deserialization, so we just need 

782 # to pass in the functions for each. 

783 if "set_iam_policy" not in self._stubs: 

784 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

785 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

786 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

787 response_deserializer=policy_pb2.Policy.FromString, 

788 ) 

789 return self._stubs["set_iam_policy"] 

790 

791 @property 

792 def get_iam_policy( 

793 self, 

794 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

795 r"""Return a callable for the get iam policy method over gRPC. 

796 Gets the IAM access control policy for a function. 

797 Returns an empty policy if the function exists and does 

798 not have a policy set. 

799 Returns: 

800 Callable[[~.GetIamPolicyRequest], 

801 ~.Policy]: 

802 A function that, when called, will call the underlying RPC 

803 on the server. 

804 """ 

805 # Generate a "stub function" on-the-fly which will actually make 

806 # the request. 

807 # gRPC handles serialization and deserialization, so we just need 

808 # to pass in the functions for each. 

809 if "get_iam_policy" not in self._stubs: 

810 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

811 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

812 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

813 response_deserializer=policy_pb2.Policy.FromString, 

814 ) 

815 return self._stubs["get_iam_policy"] 

816 

817 @property 

818 def test_iam_permissions( 

819 self, 

820 ) -> Callable[ 

821 [iam_policy_pb2.TestIamPermissionsRequest], 

822 iam_policy_pb2.TestIamPermissionsResponse, 

823 ]: 

824 r"""Return a callable for the test iam permissions method over gRPC. 

825 Tests the specified permissions against the IAM access control 

826 policy for a function. If the function does not exist, this will 

827 return an empty set of permissions, not a NOT_FOUND error. 

828 Returns: 

829 Callable[[~.TestIamPermissionsRequest], 

830 ~.TestIamPermissionsResponse]: 

831 A function that, when called, will call the underlying RPC 

832 on the server. 

833 """ 

834 # Generate a "stub function" on-the-fly which will actually make 

835 # the request. 

836 # gRPC handles serialization and deserialization, so we just need 

837 # to pass in the functions for each. 

838 if "test_iam_permissions" not in self._stubs: 

839 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

840 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

841 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

842 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

843 ) 

844 return self._stubs["test_iam_permissions"] 

845 

846 

847__all__ = ("PublisherGrpcAsyncIOTransport",)