1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import inspect
17import json
18import pickle
19import logging as std_logging
20import warnings
21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
22
23from google.api_core import gapic_v1
24from google.api_core import grpc_helpers_async
25from google.api_core import exceptions as core_exceptions
26from google.api_core import retry_async as retries
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.grpc import SslCredentials # type: ignore
29from google.protobuf.json_format import MessageToJson
30import google.protobuf.message
31
32import grpc # type: ignore
33import proto # type: ignore
34from grpc.experimental import aio # type: ignore
35
36from google.iam.v1 import iam_policy_pb2 # type: ignore
37from google.iam.v1 import policy_pb2 # type: ignore
38from google.protobuf import empty_pb2 # type: ignore
39from google.pubsub_v1.types import pubsub
40from .base import PublisherTransport, DEFAULT_CLIENT_INFO
41from .grpc import PublisherGrpcTransport
42
43try:
44 from google.api_core import client_logging # type: ignore
45
46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
47except ImportError: # pragma: NO COVER
48 CLIENT_LOGGING_SUPPORTED = False
49
50_LOGGER = std_logging.getLogger(__name__)
51
52
53class _LoggingClientAIOInterceptor(
54 grpc.aio.UnaryUnaryClientInterceptor
55): # pragma: NO COVER
56 async def intercept_unary_unary(self, continuation, client_call_details, request):
57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
58 std_logging.DEBUG
59 )
60 if logging_enabled: # pragma: NO COVER
61 request_metadata = client_call_details.metadata
62 if isinstance(request, proto.Message):
63 request_payload = type(request).to_json(request)
64 elif isinstance(request, google.protobuf.message.Message):
65 request_payload = MessageToJson(request)
66 else:
67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
68
69 request_metadata = {
70 key: value.decode("utf-8") if isinstance(value, bytes) else value
71 for key, value in request_metadata
72 }
73 grpc_request = {
74 "payload": request_payload,
75 "requestMethod": "grpc",
76 "metadata": dict(request_metadata),
77 }
78 _LOGGER.debug(
79 f"Sending request for {client_call_details.method}",
80 extra={
81 "serviceName": "google.pubsub.v1.Publisher",
82 "rpcName": str(client_call_details.method),
83 "request": grpc_request,
84 "metadata": grpc_request["metadata"],
85 },
86 )
87 response = await continuation(client_call_details, request)
88 if logging_enabled: # pragma: NO COVER
89 response_metadata = await response.trailing_metadata()
90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
91 metadata = (
92 dict([(k, str(v)) for k, v in response_metadata])
93 if response_metadata
94 else None
95 )
96 result = await response
97 if isinstance(result, proto.Message):
98 response_payload = type(result).to_json(result)
99 elif isinstance(result, google.protobuf.message.Message):
100 response_payload = MessageToJson(result)
101 else:
102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
103 grpc_response = {
104 "payload": response_payload,
105 "metadata": metadata,
106 "status": "OK",
107 }
108 _LOGGER.debug(
109 f"Received response to rpc {client_call_details.method}.",
110 extra={
111 "serviceName": "google.pubsub.v1.Publisher",
112 "rpcName": str(client_call_details.method),
113 "response": grpc_response,
114 "metadata": grpc_response["metadata"],
115 },
116 )
117 return response
118
119
120class PublisherGrpcAsyncIOTransport(PublisherTransport):
121 """gRPC AsyncIO backend transport for Publisher.
122
123 The service that an application uses to manipulate topics,
124 and to send messages to a topic.
125
126 This class defines the same methods as the primary client, so the
127 primary client can load the underlying transport implementation
128 and call it.
129
130 It sends protocol buffers over the wire using gRPC (which is built on
131 top of HTTP/2); the ``grpcio`` package must be installed.
132 """
133
134 _grpc_channel: aio.Channel
135 _stubs: Dict[str, Callable] = {}
136
137 @classmethod
138 def create_channel(
139 cls,
140 host: str = "pubsub.googleapis.com",
141 credentials: Optional[ga_credentials.Credentials] = None,
142 credentials_file: Optional[str] = None,
143 scopes: Optional[Sequence[str]] = None,
144 quota_project_id: Optional[str] = None,
145 **kwargs,
146 ) -> aio.Channel:
147 """Create and return a gRPC AsyncIO channel object.
148 Args:
149 host (Optional[str]): The host for the channel to use.
150 credentials (Optional[~.Credentials]): The
151 authorization credentials to attach to requests. These
152 credentials identify this application to the service. If
153 none are specified, the client will attempt to ascertain
154 the credentials from the environment.
155 credentials_file (Optional[str]): Deprecated. A file with credentials that can
156 be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be
157 removed in the next major version of this library.
158 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
159 service. These are only used when credentials are not specified and
160 are passed to :func:`google.auth.default`.
161 quota_project_id (Optional[str]): An optional project to use for billing
162 and quota.
163 kwargs (Optional[dict]): Keyword arguments, which are passed to the
164 channel creation.
165 Returns:
166 aio.Channel: A gRPC AsyncIO channel object.
167 """
168
169 return grpc_helpers_async.create_channel(
170 host,
171 credentials=credentials,
172 credentials_file=credentials_file,
173 quota_project_id=quota_project_id,
174 default_scopes=cls.AUTH_SCOPES,
175 scopes=scopes,
176 default_host=cls.DEFAULT_HOST,
177 **kwargs,
178 )
179
180 def __init__(
181 self,
182 *,
183 host: str = "pubsub.googleapis.com",
184 credentials: Optional[ga_credentials.Credentials] = None,
185 credentials_file: Optional[str] = None,
186 scopes: Optional[Sequence[str]] = None,
187 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None,
188 api_mtls_endpoint: Optional[str] = None,
189 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
190 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
191 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
192 quota_project_id: Optional[str] = None,
193 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
194 always_use_jwt_access: Optional[bool] = False,
195 api_audience: Optional[str] = None,
196 ) -> None:
197 """Instantiate the transport.
198
199 Args:
200 host (Optional[str]):
201 The hostname to connect to (default: 'pubsub.googleapis.com').
202 credentials (Optional[google.auth.credentials.Credentials]): The
203 authorization credentials to attach to requests. These
204 credentials identify the application to the service; if none
205 are specified, the client will attempt to ascertain the
206 credentials from the environment.
207 This argument is ignored if a ``channel`` instance is provided.
208 credentials_file (Optional[str]): Deprecated. A file with credentials that can
209 be loaded with :func:`google.auth.load_credentials_from_file`.
210 This argument is ignored if a ``channel`` instance is provided.
211 This argument will be removed in the next major version of this library.
212 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
213 service. These are only used when credentials are not specified and
214 are passed to :func:`google.auth.default`.
215 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]):
216 A ``Channel`` instance through which to make calls, or a Callable
217 that constructs and returns one. If set to None, ``self.create_channel``
218 is used to create the channel. If a Callable is given, it will be called
219 with the same arguments as used in ``self.create_channel``.
220 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
221 If provided, it overrides the ``host`` argument and tries to create
222 a mutual TLS channel with client SSL credentials from
223 ``client_cert_source`` or application default SSL credentials.
224 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
225 Deprecated. A callback to provide client SSL certificate bytes and
226 private key bytes, both in PEM format. It is ignored if
227 ``api_mtls_endpoint`` is None.
228 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
229 for the grpc channel. It is ignored if a ``channel`` instance is provided.
230 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
231 A callback to provide client certificate bytes and private key bytes,
232 both in PEM format. It is used to configure a mutual TLS channel. It is
233 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
234 quota_project_id (Optional[str]): An optional project to use for billing
235 and quota.
236 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
237 The client info used to send a user-agent string along with
238 API requests. If ``None``, then default info will be used.
239 Generally, you only need to set this if you're developing
240 your own client library.
241 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
242 be used for service account credentials.
243
244 Raises:
245 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
246 creation failed for any reason.
247 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
248 and ``credentials_file`` are passed.
249 """
250 self._grpc_channel = None
251 self._ssl_channel_credentials = ssl_channel_credentials
252 self._stubs: Dict[str, Callable] = {}
253
254 if api_mtls_endpoint:
255 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
256 if client_cert_source:
257 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
258
259 if isinstance(channel, aio.Channel):
260 # Ignore credentials if a channel was passed.
261 credentials = None
262 self._ignore_credentials = True
263 # If a channel was explicitly provided, set it.
264 self._grpc_channel = channel
265 self._ssl_channel_credentials = None
266 else:
267 if api_mtls_endpoint:
268 host = api_mtls_endpoint
269
270 # Create SSL credentials with client_cert_source or application
271 # default SSL credentials.
272 if client_cert_source:
273 cert, key = client_cert_source()
274 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
275 certificate_chain=cert, private_key=key
276 )
277 else:
278 self._ssl_channel_credentials = SslCredentials().ssl_credentials
279
280 else:
281 if client_cert_source_for_mtls and not ssl_channel_credentials:
282 cert, key = client_cert_source_for_mtls()
283 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
284 certificate_chain=cert, private_key=key
285 )
286
287 # The base transport sets the host, credentials and scopes
288 super().__init__(
289 host=host,
290 credentials=credentials,
291 credentials_file=credentials_file,
292 scopes=scopes,
293 quota_project_id=quota_project_id,
294 client_info=client_info,
295 always_use_jwt_access=always_use_jwt_access,
296 api_audience=api_audience,
297 )
298
299 if not self._grpc_channel:
300 # initialize with the provided callable or the default channel
301 channel_init = channel or type(self).create_channel
302 self._grpc_channel = channel_init(
303 self._host,
304 # use the credentials which are saved
305 credentials=self._credentials,
306 # Set ``credentials_file`` to ``None`` here as
307 # the credentials that we saved earlier should be used.
308 credentials_file=None,
309 scopes=self._scopes,
310 ssl_credentials=self._ssl_channel_credentials,
311 quota_project_id=quota_project_id,
312 options=[
313 ("grpc.max_send_message_length", -1),
314 ("grpc.max_receive_message_length", -1),
315 ("grpc.max_metadata_size", 4 * 1024 * 1024),
316 ("grpc.keepalive_time_ms", 30000),
317 ],
318 )
319
320 self._interceptor = _LoggingClientAIOInterceptor()
321 self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
322 self._logged_channel = self._grpc_channel
323 self._wrap_with_kind = (
324 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
325 )
326 # Wrap messages. This must be done after self._logged_channel exists
327 self._prep_wrapped_messages(client_info)
328
329 @property
330 def grpc_channel(self) -> aio.Channel:
331 """Create the channel designed to connect to this service.
332
333 This property caches on the instance; repeated calls return
334 the same channel.
335 """
336 # Return the channel from cache.
337 return self._grpc_channel
338
339 @property
340 def create_topic(self) -> Callable[[pubsub.Topic], Awaitable[pubsub.Topic]]:
341 r"""Return a callable for the create topic method over gRPC.
342
343 Creates the given topic with the given name. See the [resource
344 name rules]
345 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
346
347 Returns:
348 Callable[[~.Topic],
349 Awaitable[~.Topic]]:
350 A function that, when called, will call the underlying RPC
351 on the server.
352 """
353 # Generate a "stub function" on-the-fly which will actually make
354 # the request.
355 # gRPC handles serialization and deserialization, so we just need
356 # to pass in the functions for each.
357 if "create_topic" not in self._stubs:
358 self._stubs["create_topic"] = self._logged_channel.unary_unary(
359 "/google.pubsub.v1.Publisher/CreateTopic",
360 request_serializer=pubsub.Topic.serialize,
361 response_deserializer=pubsub.Topic.deserialize,
362 )
363 return self._stubs["create_topic"]
364
365 @property
366 def update_topic(
367 self,
368 ) -> Callable[[pubsub.UpdateTopicRequest], Awaitable[pubsub.Topic]]:
369 r"""Return a callable for the update topic method over gRPC.
370
371 Updates an existing topic by updating the fields
372 specified in the update mask. Note that certain
373 properties of a topic are not modifiable.
374
375 Returns:
376 Callable[[~.UpdateTopicRequest],
377 Awaitable[~.Topic]]:
378 A function that, when called, will call the underlying RPC
379 on the server.
380 """
381 # Generate a "stub function" on-the-fly which will actually make
382 # the request.
383 # gRPC handles serialization and deserialization, so we just need
384 # to pass in the functions for each.
385 if "update_topic" not in self._stubs:
386 self._stubs["update_topic"] = self._logged_channel.unary_unary(
387 "/google.pubsub.v1.Publisher/UpdateTopic",
388 request_serializer=pubsub.UpdateTopicRequest.serialize,
389 response_deserializer=pubsub.Topic.deserialize,
390 )
391 return self._stubs["update_topic"]
392
393 @property
394 def publish(
395 self,
396 ) -> Callable[[pubsub.PublishRequest], Awaitable[pubsub.PublishResponse]]:
397 r"""Return a callable for the publish method over gRPC.
398
399 Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
400 the topic does not exist.
401
402 Returns:
403 Callable[[~.PublishRequest],
404 Awaitable[~.PublishResponse]]:
405 A function that, when called, will call the underlying RPC
406 on the server.
407 """
408 # Generate a "stub function" on-the-fly which will actually make
409 # the request.
410 # gRPC handles serialization and deserialization, so we just need
411 # to pass in the functions for each.
412 if "publish" not in self._stubs:
413 self._stubs["publish"] = self._logged_channel.unary_unary(
414 "/google.pubsub.v1.Publisher/Publish",
415 request_serializer=pubsub.PublishRequest.serialize,
416 response_deserializer=pubsub.PublishResponse.deserialize,
417 )
418 return self._stubs["publish"]
419
420 @property
421 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], Awaitable[pubsub.Topic]]:
422 r"""Return a callable for the get topic method over gRPC.
423
424 Gets the configuration of a topic.
425
426 Returns:
427 Callable[[~.GetTopicRequest],
428 Awaitable[~.Topic]]:
429 A function that, when called, will call the underlying RPC
430 on the server.
431 """
432 # Generate a "stub function" on-the-fly which will actually make
433 # the request.
434 # gRPC handles serialization and deserialization, so we just need
435 # to pass in the functions for each.
436 if "get_topic" not in self._stubs:
437 self._stubs["get_topic"] = self._logged_channel.unary_unary(
438 "/google.pubsub.v1.Publisher/GetTopic",
439 request_serializer=pubsub.GetTopicRequest.serialize,
440 response_deserializer=pubsub.Topic.deserialize,
441 )
442 return self._stubs["get_topic"]
443
444 @property
445 def list_topics(
446 self,
447 ) -> Callable[[pubsub.ListTopicsRequest], Awaitable[pubsub.ListTopicsResponse]]:
448 r"""Return a callable for the list topics method over gRPC.
449
450 Lists matching topics.
451
452 Returns:
453 Callable[[~.ListTopicsRequest],
454 Awaitable[~.ListTopicsResponse]]:
455 A function that, when called, will call the underlying RPC
456 on the server.
457 """
458 # Generate a "stub function" on-the-fly which will actually make
459 # the request.
460 # gRPC handles serialization and deserialization, so we just need
461 # to pass in the functions for each.
462 if "list_topics" not in self._stubs:
463 self._stubs["list_topics"] = self._logged_channel.unary_unary(
464 "/google.pubsub.v1.Publisher/ListTopics",
465 request_serializer=pubsub.ListTopicsRequest.serialize,
466 response_deserializer=pubsub.ListTopicsResponse.deserialize,
467 )
468 return self._stubs["list_topics"]
469
470 @property
471 def list_topic_subscriptions(
472 self,
473 ) -> Callable[
474 [pubsub.ListTopicSubscriptionsRequest],
475 Awaitable[pubsub.ListTopicSubscriptionsResponse],
476 ]:
477 r"""Return a callable for the list topic subscriptions method over gRPC.
478
479 Lists the names of the attached subscriptions on this
480 topic.
481
482 Returns:
483 Callable[[~.ListTopicSubscriptionsRequest],
484 Awaitable[~.ListTopicSubscriptionsResponse]]:
485 A function that, when called, will call the underlying RPC
486 on the server.
487 """
488 # Generate a "stub function" on-the-fly which will actually make
489 # the request.
490 # gRPC handles serialization and deserialization, so we just need
491 # to pass in the functions for each.
492 if "list_topic_subscriptions" not in self._stubs:
493 self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary(
494 "/google.pubsub.v1.Publisher/ListTopicSubscriptions",
495 request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize,
496 response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize,
497 )
498 return self._stubs["list_topic_subscriptions"]
499
500 @property
501 def list_topic_snapshots(
502 self,
503 ) -> Callable[
504 [pubsub.ListTopicSnapshotsRequest], Awaitable[pubsub.ListTopicSnapshotsResponse]
505 ]:
506 r"""Return a callable for the list topic snapshots method over gRPC.
507
508 Lists the names of the snapshots on this topic. Snapshots are
509 used in
510 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
511 operations, which allow you to manage message acknowledgments in
512 bulk. That is, you can set the acknowledgment state of messages
513 in an existing subscription to the state captured by a snapshot.
514
515 Returns:
516 Callable[[~.ListTopicSnapshotsRequest],
517 Awaitable[~.ListTopicSnapshotsResponse]]:
518 A function that, when called, will call the underlying RPC
519 on the server.
520 """
521 # Generate a "stub function" on-the-fly which will actually make
522 # the request.
523 # gRPC handles serialization and deserialization, so we just need
524 # to pass in the functions for each.
525 if "list_topic_snapshots" not in self._stubs:
526 self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary(
527 "/google.pubsub.v1.Publisher/ListTopicSnapshots",
528 request_serializer=pubsub.ListTopicSnapshotsRequest.serialize,
529 response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize,
530 )
531 return self._stubs["list_topic_snapshots"]
532
533 @property
534 def delete_topic(
535 self,
536 ) -> Callable[[pubsub.DeleteTopicRequest], Awaitable[empty_pb2.Empty]]:
537 r"""Return a callable for the delete topic method over gRPC.
538
539 Deletes the topic with the given name. Returns ``NOT_FOUND`` if
540 the topic does not exist. After a topic is deleted, a new topic
541 may be created with the same name; this is an entirely new topic
542 with none of the old configuration or subscriptions. Existing
543 subscriptions to this topic are not deleted, but their ``topic``
544 field is set to ``_deleted-topic_``.
545
546 Returns:
547 Callable[[~.DeleteTopicRequest],
548 Awaitable[~.Empty]]:
549 A function that, when called, will call the underlying RPC
550 on the server.
551 """
552 # Generate a "stub function" on-the-fly which will actually make
553 # the request.
554 # gRPC handles serialization and deserialization, so we just need
555 # to pass in the functions for each.
556 if "delete_topic" not in self._stubs:
557 self._stubs["delete_topic"] = self._logged_channel.unary_unary(
558 "/google.pubsub.v1.Publisher/DeleteTopic",
559 request_serializer=pubsub.DeleteTopicRequest.serialize,
560 response_deserializer=empty_pb2.Empty.FromString,
561 )
562 return self._stubs["delete_topic"]
563
564 @property
565 def detach_subscription(
566 self,
567 ) -> Callable[
568 [pubsub.DetachSubscriptionRequest], Awaitable[pubsub.DetachSubscriptionResponse]
569 ]:
570 r"""Return a callable for the detach subscription method over gRPC.
571
572 Detaches a subscription from this topic. All messages retained
573 in the subscription are dropped. Subsequent ``Pull`` and
574 ``StreamingPull`` requests will return FAILED_PRECONDITION. If
575 the subscription is a push subscription, pushes to the endpoint
576 will stop.
577
578 Returns:
579 Callable[[~.DetachSubscriptionRequest],
580 Awaitable[~.DetachSubscriptionResponse]]:
581 A function that, when called, will call the underlying RPC
582 on the server.
583 """
584 # Generate a "stub function" on-the-fly which will actually make
585 # the request.
586 # gRPC handles serialization and deserialization, so we just need
587 # to pass in the functions for each.
588 if "detach_subscription" not in self._stubs:
589 self._stubs["detach_subscription"] = self._logged_channel.unary_unary(
590 "/google.pubsub.v1.Publisher/DetachSubscription",
591 request_serializer=pubsub.DetachSubscriptionRequest.serialize,
592 response_deserializer=pubsub.DetachSubscriptionResponse.deserialize,
593 )
594 return self._stubs["detach_subscription"]
595
596 def _prep_wrapped_messages(self, client_info):
597 """Precompute the wrapped methods, overriding the base class method to use async wrappers."""
598 self._wrapped_methods = {
599 self.create_topic: self._wrap_method(
600 self.create_topic,
601 default_retry=retries.AsyncRetry(
602 initial=0.1,
603 maximum=60.0,
604 multiplier=1.3,
605 predicate=retries.if_exception_type(
606 core_exceptions.ServiceUnavailable,
607 ),
608 deadline=60.0,
609 ),
610 default_timeout=60.0,
611 client_info=client_info,
612 ),
613 self.update_topic: self._wrap_method(
614 self.update_topic,
615 default_retry=retries.AsyncRetry(
616 initial=0.1,
617 maximum=60.0,
618 multiplier=1.3,
619 predicate=retries.if_exception_type(
620 core_exceptions.ServiceUnavailable,
621 ),
622 deadline=60.0,
623 ),
624 default_timeout=60.0,
625 client_info=client_info,
626 ),
627 self.publish: self._wrap_method(
628 self.publish,
629 default_retry=retries.AsyncRetry(
630 initial=0.1,
631 maximum=60.0,
632 multiplier=4,
633 predicate=retries.if_exception_type(
634 core_exceptions.Aborted,
635 core_exceptions.Cancelled,
636 core_exceptions.DeadlineExceeded,
637 core_exceptions.InternalServerError,
638 core_exceptions.ResourceExhausted,
639 core_exceptions.ServiceUnavailable,
640 core_exceptions.Unknown,
641 ),
642 deadline=60.0,
643 ),
644 default_timeout=60.0,
645 client_info=client_info,
646 ),
647 self.get_topic: self._wrap_method(
648 self.get_topic,
649 default_retry=retries.AsyncRetry(
650 initial=0.1,
651 maximum=60.0,
652 multiplier=1.3,
653 predicate=retries.if_exception_type(
654 core_exceptions.Aborted,
655 core_exceptions.ServiceUnavailable,
656 core_exceptions.Unknown,
657 ),
658 deadline=60.0,
659 ),
660 default_timeout=60.0,
661 client_info=client_info,
662 ),
663 self.list_topics: self._wrap_method(
664 self.list_topics,
665 default_retry=retries.AsyncRetry(
666 initial=0.1,
667 maximum=60.0,
668 multiplier=1.3,
669 predicate=retries.if_exception_type(
670 core_exceptions.Aborted,
671 core_exceptions.ServiceUnavailable,
672 core_exceptions.Unknown,
673 ),
674 deadline=60.0,
675 ),
676 default_timeout=60.0,
677 client_info=client_info,
678 ),
679 self.list_topic_subscriptions: self._wrap_method(
680 self.list_topic_subscriptions,
681 default_retry=retries.AsyncRetry(
682 initial=0.1,
683 maximum=60.0,
684 multiplier=1.3,
685 predicate=retries.if_exception_type(
686 core_exceptions.Aborted,
687 core_exceptions.ServiceUnavailable,
688 core_exceptions.Unknown,
689 ),
690 deadline=60.0,
691 ),
692 default_timeout=60.0,
693 client_info=client_info,
694 ),
695 self.list_topic_snapshots: self._wrap_method(
696 self.list_topic_snapshots,
697 default_retry=retries.AsyncRetry(
698 initial=0.1,
699 maximum=60.0,
700 multiplier=1.3,
701 predicate=retries.if_exception_type(
702 core_exceptions.Aborted,
703 core_exceptions.ServiceUnavailable,
704 core_exceptions.Unknown,
705 ),
706 deadline=60.0,
707 ),
708 default_timeout=60.0,
709 client_info=client_info,
710 ),
711 self.delete_topic: self._wrap_method(
712 self.delete_topic,
713 default_retry=retries.AsyncRetry(
714 initial=0.1,
715 maximum=60.0,
716 multiplier=1.3,
717 predicate=retries.if_exception_type(
718 core_exceptions.ServiceUnavailable,
719 ),
720 deadline=60.0,
721 ),
722 default_timeout=60.0,
723 client_info=client_info,
724 ),
725 self.detach_subscription: self._wrap_method(
726 self.detach_subscription,
727 default_retry=retries.AsyncRetry(
728 initial=0.1,
729 maximum=60.0,
730 multiplier=1.3,
731 predicate=retries.if_exception_type(
732 core_exceptions.ServiceUnavailable,
733 ),
734 deadline=60.0,
735 ),
736 default_timeout=60.0,
737 client_info=client_info,
738 ),
739 self.get_iam_policy: self._wrap_method(
740 self.get_iam_policy,
741 default_timeout=None,
742 client_info=client_info,
743 ),
744 self.set_iam_policy: self._wrap_method(
745 self.set_iam_policy,
746 default_timeout=None,
747 client_info=client_info,
748 ),
749 self.test_iam_permissions: self._wrap_method(
750 self.test_iam_permissions,
751 default_timeout=None,
752 client_info=client_info,
753 ),
754 }
755
756 def _wrap_method(self, func, *args, **kwargs):
757 if self._wrap_with_kind: # pragma: NO COVER
758 kwargs["kind"] = self.kind
759 return gapic_v1.method_async.wrap_method(func, *args, **kwargs)
760
761 def close(self):
762 return self._logged_channel.close()
763
764 @property
765 def kind(self) -> str:
766 return "grpc_asyncio"
767
768 @property
769 def set_iam_policy(
770 self,
771 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]:
772 r"""Return a callable for the set iam policy method over gRPC.
773 Sets the IAM access control policy on the specified
774 function. Replaces any existing policy.
775 Returns:
776 Callable[[~.SetIamPolicyRequest],
777 ~.Policy]:
778 A function that, when called, will call the underlying RPC
779 on the server.
780 """
781 # Generate a "stub function" on-the-fly which will actually make
782 # the request.
783 # gRPC handles serialization and deserialization, so we just need
784 # to pass in the functions for each.
785 if "set_iam_policy" not in self._stubs:
786 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
787 "/google.iam.v1.IAMPolicy/SetIamPolicy",
788 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
789 response_deserializer=policy_pb2.Policy.FromString,
790 )
791 return self._stubs["set_iam_policy"]
792
793 @property
794 def get_iam_policy(
795 self,
796 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]:
797 r"""Return a callable for the get iam policy method over gRPC.
798 Gets the IAM access control policy for a function.
799 Returns an empty policy if the function exists and does
800 not have a policy set.
801 Returns:
802 Callable[[~.GetIamPolicyRequest],
803 ~.Policy]:
804 A function that, when called, will call the underlying RPC
805 on the server.
806 """
807 # Generate a "stub function" on-the-fly which will actually make
808 # the request.
809 # gRPC handles serialization and deserialization, so we just need
810 # to pass in the functions for each.
811 if "get_iam_policy" not in self._stubs:
812 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
813 "/google.iam.v1.IAMPolicy/GetIamPolicy",
814 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
815 response_deserializer=policy_pb2.Policy.FromString,
816 )
817 return self._stubs["get_iam_policy"]
818
819 @property
820 def test_iam_permissions(
821 self,
822 ) -> Callable[
823 [iam_policy_pb2.TestIamPermissionsRequest],
824 iam_policy_pb2.TestIamPermissionsResponse,
825 ]:
826 r"""Return a callable for the test iam permissions method over gRPC.
827 Tests the specified permissions against the IAM access control
828 policy for a function. If the function does not exist, this will
829 return an empty set of permissions, not a NOT_FOUND error.
830 Returns:
831 Callable[[~.TestIamPermissionsRequest],
832 ~.TestIamPermissionsResponse]:
833 A function that, when called, will call the underlying RPC
834 on the server.
835 """
836 # Generate a "stub function" on-the-fly which will actually make
837 # the request.
838 # gRPC handles serialization and deserialization, so we just need
839 # to pass in the functions for each.
840 if "test_iam_permissions" not in self._stubs:
841 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
842 "/google.iam.v1.IAMPolicy/TestIamPermissions",
843 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
844 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
845 )
846 return self._stubs["test_iam_permissions"]
847
848
849__all__ = ("PublisherGrpcAsyncIOTransport",)