1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import inspect
17import json
18import pickle
19import logging as std_logging
20import warnings
21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
22
23from google.api_core import gapic_v1
24from google.api_core import grpc_helpers_async
25from google.api_core import exceptions as core_exceptions
26from google.api_core import retry_async as retries
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.grpc import SslCredentials # type: ignore
29from google.protobuf.json_format import MessageToJson
30import google.protobuf.message
31
32import grpc # type: ignore
33import proto # type: ignore
34from grpc.experimental import aio # type: ignore
35
36from google.iam.v1 import iam_policy_pb2 # type: ignore
37from google.iam.v1 import policy_pb2 # type: ignore
38from google.protobuf import empty_pb2 # type: ignore
39from google.pubsub_v1.types import pubsub
40from .base import PublisherTransport, DEFAULT_CLIENT_INFO
41from .grpc import PublisherGrpcTransport
42
43try:
44 from google.api_core import client_logging # type: ignore
45
46 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
47except ImportError: # pragma: NO COVER
48 CLIENT_LOGGING_SUPPORTED = False
49
50_LOGGER = std_logging.getLogger(__name__)
51
52
53class _LoggingClientAIOInterceptor(
54 grpc.aio.UnaryUnaryClientInterceptor
55): # pragma: NO COVER
56 async def intercept_unary_unary(self, continuation, client_call_details, request):
57 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
58 std_logging.DEBUG
59 )
60 if logging_enabled: # pragma: NO COVER
61 request_metadata = client_call_details.metadata
62 if isinstance(request, proto.Message):
63 request_payload = type(request).to_json(request)
64 elif isinstance(request, google.protobuf.message.Message):
65 request_payload = MessageToJson(request)
66 else:
67 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
68
69 request_metadata = {
70 key: value.decode("utf-8") if isinstance(value, bytes) else value
71 for key, value in request_metadata
72 }
73 grpc_request = {
74 "payload": request_payload,
75 "requestMethod": "grpc",
76 "metadata": dict(request_metadata),
77 }
78 _LOGGER.debug(
79 f"Sending request for {client_call_details.method}",
80 extra={
81 "serviceName": "google.pubsub.v1.Publisher",
82 "rpcName": str(client_call_details.method),
83 "request": grpc_request,
84 "metadata": grpc_request["metadata"],
85 },
86 )
87 response = await continuation(client_call_details, request)
88 if logging_enabled: # pragma: NO COVER
89 response_metadata = await response.trailing_metadata()
90 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
91 metadata = (
92 dict([(k, str(v)) for k, v in response_metadata])
93 if response_metadata
94 else None
95 )
96 result = await response
97 if isinstance(result, proto.Message):
98 response_payload = type(result).to_json(result)
99 elif isinstance(result, google.protobuf.message.Message):
100 response_payload = MessageToJson(result)
101 else:
102 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
103 grpc_response = {
104 "payload": response_payload,
105 "metadata": metadata,
106 "status": "OK",
107 }
108 _LOGGER.debug(
109 f"Received response to rpc {client_call_details.method}.",
110 extra={
111 "serviceName": "google.pubsub.v1.Publisher",
112 "rpcName": str(client_call_details.method),
113 "response": grpc_response,
114 "metadata": grpc_response["metadata"],
115 },
116 )
117 return response
118
119
120class PublisherGrpcAsyncIOTransport(PublisherTransport):
121 """gRPC AsyncIO backend transport for Publisher.
122
123 The service that an application uses to manipulate topics,
124 and to send messages to a topic.
125
126 This class defines the same methods as the primary client, so the
127 primary client can load the underlying transport implementation
128 and call it.
129
130 It sends protocol buffers over the wire using gRPC (which is built on
131 top of HTTP/2); the ``grpcio`` package must be installed.
132 """
133
134 _grpc_channel: aio.Channel
135 _stubs: Dict[str, Callable] = {}
136
137 @classmethod
138 def create_channel(
139 cls,
140 host: str = "pubsub.googleapis.com",
141 credentials: Optional[ga_credentials.Credentials] = None,
142 credentials_file: Optional[str] = None,
143 scopes: Optional[Sequence[str]] = None,
144 quota_project_id: Optional[str] = None,
145 **kwargs,
146 ) -> aio.Channel:
147 """Create and return a gRPC AsyncIO channel object.
148 Args:
149 host (Optional[str]): The host for the channel to use.
150 credentials (Optional[~.Credentials]): The
151 authorization credentials to attach to requests. These
152 credentials identify this application to the service. If
153 none are specified, the client will attempt to ascertain
154 the credentials from the environment.
155 credentials_file (Optional[str]): A file with credentials that can
156 be loaded with :func:`google.auth.load_credentials_from_file`.
157 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
158 service. These are only used when credentials are not specified and
159 are passed to :func:`google.auth.default`.
160 quota_project_id (Optional[str]): An optional project to use for billing
161 and quota.
162 kwargs (Optional[dict]): Keyword arguments, which are passed to the
163 channel creation.
164 Returns:
165 aio.Channel: A gRPC AsyncIO channel object.
166 """
167
168 return grpc_helpers_async.create_channel(
169 host,
170 credentials=credentials,
171 credentials_file=credentials_file,
172 quota_project_id=quota_project_id,
173 default_scopes=cls.AUTH_SCOPES,
174 scopes=scopes,
175 default_host=cls.DEFAULT_HOST,
176 **kwargs,
177 )
178
179 def __init__(
180 self,
181 *,
182 host: str = "pubsub.googleapis.com",
183 credentials: Optional[ga_credentials.Credentials] = None,
184 credentials_file: Optional[str] = None,
185 scopes: Optional[Sequence[str]] = None,
186 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None,
187 api_mtls_endpoint: Optional[str] = None,
188 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
189 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
190 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
191 quota_project_id: Optional[str] = None,
192 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
193 always_use_jwt_access: Optional[bool] = False,
194 api_audience: Optional[str] = None,
195 ) -> None:
196 """Instantiate the transport.
197
198 Args:
199 host (Optional[str]):
200 The hostname to connect to (default: 'pubsub.googleapis.com').
201 credentials (Optional[google.auth.credentials.Credentials]): The
202 authorization credentials to attach to requests. These
203 credentials identify the application to the service; if none
204 are specified, the client will attempt to ascertain the
205 credentials from the environment.
206 This argument is ignored if a ``channel`` instance is provided.
207 credentials_file (Optional[str]): A file with credentials that can
208 be loaded with :func:`google.auth.load_credentials_from_file`.
209 This argument is ignored if a ``channel`` instance is provided.
210 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
211 service. These are only used when credentials are not specified and
212 are passed to :func:`google.auth.default`.
213 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]):
214 A ``Channel`` instance through which to make calls, or a Callable
215 that constructs and returns one. If set to None, ``self.create_channel``
216 is used to create the channel. If a Callable is given, it will be called
217 with the same arguments as used in ``self.create_channel``.
218 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
219 If provided, it overrides the ``host`` argument and tries to create
220 a mutual TLS channel with client SSL credentials from
221 ``client_cert_source`` or application default SSL credentials.
222 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
223 Deprecated. A callback to provide client SSL certificate bytes and
224 private key bytes, both in PEM format. It is ignored if
225 ``api_mtls_endpoint`` is None.
226 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
227 for the grpc channel. It is ignored if a ``channel`` instance is provided.
228 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
229 A callback to provide client certificate bytes and private key bytes,
230 both in PEM format. It is used to configure a mutual TLS channel. It is
231 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
232 quota_project_id (Optional[str]): An optional project to use for billing
233 and quota.
234 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
235 The client info used to send a user-agent string along with
236 API requests. If ``None``, then default info will be used.
237 Generally, you only need to set this if you're developing
238 your own client library.
239 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
240 be used for service account credentials.
241
242 Raises:
243 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
244 creation failed for any reason.
245 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
246 and ``credentials_file`` are passed.
247 """
248 self._grpc_channel = None
249 self._ssl_channel_credentials = ssl_channel_credentials
250 self._stubs: Dict[str, Callable] = {}
251
252 if api_mtls_endpoint:
253 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
254 if client_cert_source:
255 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
256
257 if isinstance(channel, aio.Channel):
258 # Ignore credentials if a channel was passed.
259 credentials = None
260 self._ignore_credentials = True
261 # If a channel was explicitly provided, set it.
262 self._grpc_channel = channel
263 self._ssl_channel_credentials = None
264 else:
265 if api_mtls_endpoint:
266 host = api_mtls_endpoint
267
268 # Create SSL credentials with client_cert_source or application
269 # default SSL credentials.
270 if client_cert_source:
271 cert, key = client_cert_source()
272 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
273 certificate_chain=cert, private_key=key
274 )
275 else:
276 self._ssl_channel_credentials = SslCredentials().ssl_credentials
277
278 else:
279 if client_cert_source_for_mtls and not ssl_channel_credentials:
280 cert, key = client_cert_source_for_mtls()
281 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
282 certificate_chain=cert, private_key=key
283 )
284
285 # The base transport sets the host, credentials and scopes
286 super().__init__(
287 host=host,
288 credentials=credentials,
289 credentials_file=credentials_file,
290 scopes=scopes,
291 quota_project_id=quota_project_id,
292 client_info=client_info,
293 always_use_jwt_access=always_use_jwt_access,
294 api_audience=api_audience,
295 )
296
297 if not self._grpc_channel:
298 # initialize with the provided callable or the default channel
299 channel_init = channel or type(self).create_channel
300 self._grpc_channel = channel_init(
301 self._host,
302 # use the credentials which are saved
303 credentials=self._credentials,
304 # Set ``credentials_file`` to ``None`` here as
305 # the credentials that we saved earlier should be used.
306 credentials_file=None,
307 scopes=self._scopes,
308 ssl_credentials=self._ssl_channel_credentials,
309 quota_project_id=quota_project_id,
310 options=[
311 ("grpc.max_send_message_length", -1),
312 ("grpc.max_receive_message_length", -1),
313 ("grpc.max_metadata_size", 4 * 1024 * 1024),
314 ("grpc.keepalive_time_ms", 30000),
315 ],
316 )
317
318 self._interceptor = _LoggingClientAIOInterceptor()
319 self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
320 self._logged_channel = self._grpc_channel
321 self._wrap_with_kind = (
322 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
323 )
324 # Wrap messages. This must be done after self._logged_channel exists
325 self._prep_wrapped_messages(client_info)
326
327 @property
328 def grpc_channel(self) -> aio.Channel:
329 """Create the channel designed to connect to this service.
330
331 This property caches on the instance; repeated calls return
332 the same channel.
333 """
334 # Return the channel from cache.
335 return self._grpc_channel
336
337 @property
338 def create_topic(self) -> Callable[[pubsub.Topic], Awaitable[pubsub.Topic]]:
339 r"""Return a callable for the create topic method over gRPC.
340
341 Creates the given topic with the given name. See the [resource
342 name rules]
343 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names).
344
345 Returns:
346 Callable[[~.Topic],
347 Awaitable[~.Topic]]:
348 A function that, when called, will call the underlying RPC
349 on the server.
350 """
351 # Generate a "stub function" on-the-fly which will actually make
352 # the request.
353 # gRPC handles serialization and deserialization, so we just need
354 # to pass in the functions for each.
355 if "create_topic" not in self._stubs:
356 self._stubs["create_topic"] = self._logged_channel.unary_unary(
357 "/google.pubsub.v1.Publisher/CreateTopic",
358 request_serializer=pubsub.Topic.serialize,
359 response_deserializer=pubsub.Topic.deserialize,
360 )
361 return self._stubs["create_topic"]
362
363 @property
364 def update_topic(
365 self,
366 ) -> Callable[[pubsub.UpdateTopicRequest], Awaitable[pubsub.Topic]]:
367 r"""Return a callable for the update topic method over gRPC.
368
369 Updates an existing topic by updating the fields
370 specified in the update mask. Note that certain
371 properties of a topic are not modifiable.
372
373 Returns:
374 Callable[[~.UpdateTopicRequest],
375 Awaitable[~.Topic]]:
376 A function that, when called, will call the underlying RPC
377 on the server.
378 """
379 # Generate a "stub function" on-the-fly which will actually make
380 # the request.
381 # gRPC handles serialization and deserialization, so we just need
382 # to pass in the functions for each.
383 if "update_topic" not in self._stubs:
384 self._stubs["update_topic"] = self._logged_channel.unary_unary(
385 "/google.pubsub.v1.Publisher/UpdateTopic",
386 request_serializer=pubsub.UpdateTopicRequest.serialize,
387 response_deserializer=pubsub.Topic.deserialize,
388 )
389 return self._stubs["update_topic"]
390
391 @property
392 def publish(
393 self,
394 ) -> Callable[[pubsub.PublishRequest], Awaitable[pubsub.PublishResponse]]:
395 r"""Return a callable for the publish method over gRPC.
396
397 Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
398 the topic does not exist.
399
400 Returns:
401 Callable[[~.PublishRequest],
402 Awaitable[~.PublishResponse]]:
403 A function that, when called, will call the underlying RPC
404 on the server.
405 """
406 # Generate a "stub function" on-the-fly which will actually make
407 # the request.
408 # gRPC handles serialization and deserialization, so we just need
409 # to pass in the functions for each.
410 if "publish" not in self._stubs:
411 self._stubs["publish"] = self._logged_channel.unary_unary(
412 "/google.pubsub.v1.Publisher/Publish",
413 request_serializer=pubsub.PublishRequest.serialize,
414 response_deserializer=pubsub.PublishResponse.deserialize,
415 )
416 return self._stubs["publish"]
417
418 @property
419 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], Awaitable[pubsub.Topic]]:
420 r"""Return a callable for the get topic method over gRPC.
421
422 Gets the configuration of a topic.
423
424 Returns:
425 Callable[[~.GetTopicRequest],
426 Awaitable[~.Topic]]:
427 A function that, when called, will call the underlying RPC
428 on the server.
429 """
430 # Generate a "stub function" on-the-fly which will actually make
431 # the request.
432 # gRPC handles serialization and deserialization, so we just need
433 # to pass in the functions for each.
434 if "get_topic" not in self._stubs:
435 self._stubs["get_topic"] = self._logged_channel.unary_unary(
436 "/google.pubsub.v1.Publisher/GetTopic",
437 request_serializer=pubsub.GetTopicRequest.serialize,
438 response_deserializer=pubsub.Topic.deserialize,
439 )
440 return self._stubs["get_topic"]
441
442 @property
443 def list_topics(
444 self,
445 ) -> Callable[[pubsub.ListTopicsRequest], Awaitable[pubsub.ListTopicsResponse]]:
446 r"""Return a callable for the list topics method over gRPC.
447
448 Lists matching topics.
449
450 Returns:
451 Callable[[~.ListTopicsRequest],
452 Awaitable[~.ListTopicsResponse]]:
453 A function that, when called, will call the underlying RPC
454 on the server.
455 """
456 # Generate a "stub function" on-the-fly which will actually make
457 # the request.
458 # gRPC handles serialization and deserialization, so we just need
459 # to pass in the functions for each.
460 if "list_topics" not in self._stubs:
461 self._stubs["list_topics"] = self._logged_channel.unary_unary(
462 "/google.pubsub.v1.Publisher/ListTopics",
463 request_serializer=pubsub.ListTopicsRequest.serialize,
464 response_deserializer=pubsub.ListTopicsResponse.deserialize,
465 )
466 return self._stubs["list_topics"]
467
468 @property
469 def list_topic_subscriptions(
470 self,
471 ) -> Callable[
472 [pubsub.ListTopicSubscriptionsRequest],
473 Awaitable[pubsub.ListTopicSubscriptionsResponse],
474 ]:
475 r"""Return a callable for the list topic subscriptions method over gRPC.
476
477 Lists the names of the attached subscriptions on this
478 topic.
479
480 Returns:
481 Callable[[~.ListTopicSubscriptionsRequest],
482 Awaitable[~.ListTopicSubscriptionsResponse]]:
483 A function that, when called, will call the underlying RPC
484 on the server.
485 """
486 # Generate a "stub function" on-the-fly which will actually make
487 # the request.
488 # gRPC handles serialization and deserialization, so we just need
489 # to pass in the functions for each.
490 if "list_topic_subscriptions" not in self._stubs:
491 self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary(
492 "/google.pubsub.v1.Publisher/ListTopicSubscriptions",
493 request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize,
494 response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize,
495 )
496 return self._stubs["list_topic_subscriptions"]
497
498 @property
499 def list_topic_snapshots(
500 self,
501 ) -> Callable[
502 [pubsub.ListTopicSnapshotsRequest], Awaitable[pubsub.ListTopicSnapshotsResponse]
503 ]:
504 r"""Return a callable for the list topic snapshots method over gRPC.
505
506 Lists the names of the snapshots on this topic. Snapshots are
507 used in
508 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
509 operations, which allow you to manage message acknowledgments in
510 bulk. That is, you can set the acknowledgment state of messages
511 in an existing subscription to the state captured by a snapshot.
512
513 Returns:
514 Callable[[~.ListTopicSnapshotsRequest],
515 Awaitable[~.ListTopicSnapshotsResponse]]:
516 A function that, when called, will call the underlying RPC
517 on the server.
518 """
519 # Generate a "stub function" on-the-fly which will actually make
520 # the request.
521 # gRPC handles serialization and deserialization, so we just need
522 # to pass in the functions for each.
523 if "list_topic_snapshots" not in self._stubs:
524 self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary(
525 "/google.pubsub.v1.Publisher/ListTopicSnapshots",
526 request_serializer=pubsub.ListTopicSnapshotsRequest.serialize,
527 response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize,
528 )
529 return self._stubs["list_topic_snapshots"]
530
531 @property
532 def delete_topic(
533 self,
534 ) -> Callable[[pubsub.DeleteTopicRequest], Awaitable[empty_pb2.Empty]]:
535 r"""Return a callable for the delete topic method over gRPC.
536
537 Deletes the topic with the given name. Returns ``NOT_FOUND`` if
538 the topic does not exist. After a topic is deleted, a new topic
539 may be created with the same name; this is an entirely new topic
540 with none of the old configuration or subscriptions. Existing
541 subscriptions to this topic are not deleted, but their ``topic``
542 field is set to ``_deleted-topic_``.
543
544 Returns:
545 Callable[[~.DeleteTopicRequest],
546 Awaitable[~.Empty]]:
547 A function that, when called, will call the underlying RPC
548 on the server.
549 """
550 # Generate a "stub function" on-the-fly which will actually make
551 # the request.
552 # gRPC handles serialization and deserialization, so we just need
553 # to pass in the functions for each.
554 if "delete_topic" not in self._stubs:
555 self._stubs["delete_topic"] = self._logged_channel.unary_unary(
556 "/google.pubsub.v1.Publisher/DeleteTopic",
557 request_serializer=pubsub.DeleteTopicRequest.serialize,
558 response_deserializer=empty_pb2.Empty.FromString,
559 )
560 return self._stubs["delete_topic"]
561
562 @property
563 def detach_subscription(
564 self,
565 ) -> Callable[
566 [pubsub.DetachSubscriptionRequest], Awaitable[pubsub.DetachSubscriptionResponse]
567 ]:
568 r"""Return a callable for the detach subscription method over gRPC.
569
570 Detaches a subscription from this topic. All messages retained
571 in the subscription are dropped. Subsequent ``Pull`` and
572 ``StreamingPull`` requests will return FAILED_PRECONDITION. If
573 the subscription is a push subscription, pushes to the endpoint
574 will stop.
575
576 Returns:
577 Callable[[~.DetachSubscriptionRequest],
578 Awaitable[~.DetachSubscriptionResponse]]:
579 A function that, when called, will call the underlying RPC
580 on the server.
581 """
582 # Generate a "stub function" on-the-fly which will actually make
583 # the request.
584 # gRPC handles serialization and deserialization, so we just need
585 # to pass in the functions for each.
586 if "detach_subscription" not in self._stubs:
587 self._stubs["detach_subscription"] = self._logged_channel.unary_unary(
588 "/google.pubsub.v1.Publisher/DetachSubscription",
589 request_serializer=pubsub.DetachSubscriptionRequest.serialize,
590 response_deserializer=pubsub.DetachSubscriptionResponse.deserialize,
591 )
592 return self._stubs["detach_subscription"]
593
594 def _prep_wrapped_messages(self, client_info):
595 """Precompute the wrapped methods, overriding the base class method to use async wrappers."""
596 self._wrapped_methods = {
597 self.create_topic: self._wrap_method(
598 self.create_topic,
599 default_retry=retries.AsyncRetry(
600 initial=0.1,
601 maximum=60.0,
602 multiplier=1.3,
603 predicate=retries.if_exception_type(
604 core_exceptions.ServiceUnavailable,
605 ),
606 deadline=60.0,
607 ),
608 default_timeout=60.0,
609 client_info=client_info,
610 ),
611 self.update_topic: self._wrap_method(
612 self.update_topic,
613 default_retry=retries.AsyncRetry(
614 initial=0.1,
615 maximum=60.0,
616 multiplier=1.3,
617 predicate=retries.if_exception_type(
618 core_exceptions.ServiceUnavailable,
619 ),
620 deadline=60.0,
621 ),
622 default_timeout=60.0,
623 client_info=client_info,
624 ),
625 self.publish: self._wrap_method(
626 self.publish,
627 default_retry=retries.AsyncRetry(
628 initial=0.1,
629 maximum=60.0,
630 multiplier=4,
631 predicate=retries.if_exception_type(
632 core_exceptions.Aborted,
633 core_exceptions.Cancelled,
634 core_exceptions.DeadlineExceeded,
635 core_exceptions.InternalServerError,
636 core_exceptions.ResourceExhausted,
637 core_exceptions.ServiceUnavailable,
638 core_exceptions.Unknown,
639 ),
640 deadline=60.0,
641 ),
642 default_timeout=60.0,
643 client_info=client_info,
644 ),
645 self.get_topic: self._wrap_method(
646 self.get_topic,
647 default_retry=retries.AsyncRetry(
648 initial=0.1,
649 maximum=60.0,
650 multiplier=1.3,
651 predicate=retries.if_exception_type(
652 core_exceptions.Aborted,
653 core_exceptions.ServiceUnavailable,
654 core_exceptions.Unknown,
655 ),
656 deadline=60.0,
657 ),
658 default_timeout=60.0,
659 client_info=client_info,
660 ),
661 self.list_topics: self._wrap_method(
662 self.list_topics,
663 default_retry=retries.AsyncRetry(
664 initial=0.1,
665 maximum=60.0,
666 multiplier=1.3,
667 predicate=retries.if_exception_type(
668 core_exceptions.Aborted,
669 core_exceptions.ServiceUnavailable,
670 core_exceptions.Unknown,
671 ),
672 deadline=60.0,
673 ),
674 default_timeout=60.0,
675 client_info=client_info,
676 ),
677 self.list_topic_subscriptions: self._wrap_method(
678 self.list_topic_subscriptions,
679 default_retry=retries.AsyncRetry(
680 initial=0.1,
681 maximum=60.0,
682 multiplier=1.3,
683 predicate=retries.if_exception_type(
684 core_exceptions.Aborted,
685 core_exceptions.ServiceUnavailable,
686 core_exceptions.Unknown,
687 ),
688 deadline=60.0,
689 ),
690 default_timeout=60.0,
691 client_info=client_info,
692 ),
693 self.list_topic_snapshots: self._wrap_method(
694 self.list_topic_snapshots,
695 default_retry=retries.AsyncRetry(
696 initial=0.1,
697 maximum=60.0,
698 multiplier=1.3,
699 predicate=retries.if_exception_type(
700 core_exceptions.Aborted,
701 core_exceptions.ServiceUnavailable,
702 core_exceptions.Unknown,
703 ),
704 deadline=60.0,
705 ),
706 default_timeout=60.0,
707 client_info=client_info,
708 ),
709 self.delete_topic: self._wrap_method(
710 self.delete_topic,
711 default_retry=retries.AsyncRetry(
712 initial=0.1,
713 maximum=60.0,
714 multiplier=1.3,
715 predicate=retries.if_exception_type(
716 core_exceptions.ServiceUnavailable,
717 ),
718 deadline=60.0,
719 ),
720 default_timeout=60.0,
721 client_info=client_info,
722 ),
723 self.detach_subscription: self._wrap_method(
724 self.detach_subscription,
725 default_retry=retries.AsyncRetry(
726 initial=0.1,
727 maximum=60.0,
728 multiplier=1.3,
729 predicate=retries.if_exception_type(
730 core_exceptions.ServiceUnavailable,
731 ),
732 deadline=60.0,
733 ),
734 default_timeout=60.0,
735 client_info=client_info,
736 ),
737 self.get_iam_policy: self._wrap_method(
738 self.get_iam_policy,
739 default_timeout=None,
740 client_info=client_info,
741 ),
742 self.set_iam_policy: self._wrap_method(
743 self.set_iam_policy,
744 default_timeout=None,
745 client_info=client_info,
746 ),
747 self.test_iam_permissions: self._wrap_method(
748 self.test_iam_permissions,
749 default_timeout=None,
750 client_info=client_info,
751 ),
752 }
753
754 def _wrap_method(self, func, *args, **kwargs):
755 if self._wrap_with_kind: # pragma: NO COVER
756 kwargs["kind"] = self.kind
757 return gapic_v1.method_async.wrap_method(func, *args, **kwargs)
758
759 def close(self):
760 return self._logged_channel.close()
761
762 @property
763 def kind(self) -> str:
764 return "grpc_asyncio"
765
766 @property
767 def set_iam_policy(
768 self,
769 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]:
770 r"""Return a callable for the set iam policy method over gRPC.
771 Sets the IAM access control policy on the specified
772 function. Replaces any existing policy.
773 Returns:
774 Callable[[~.SetIamPolicyRequest],
775 ~.Policy]:
776 A function that, when called, will call the underlying RPC
777 on the server.
778 """
779 # Generate a "stub function" on-the-fly which will actually make
780 # the request.
781 # gRPC handles serialization and deserialization, so we just need
782 # to pass in the functions for each.
783 if "set_iam_policy" not in self._stubs:
784 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
785 "/google.iam.v1.IAMPolicy/SetIamPolicy",
786 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
787 response_deserializer=policy_pb2.Policy.FromString,
788 )
789 return self._stubs["set_iam_policy"]
790
791 @property
792 def get_iam_policy(
793 self,
794 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]:
795 r"""Return a callable for the get iam policy method over gRPC.
796 Gets the IAM access control policy for a function.
797 Returns an empty policy if the function exists and does
798 not have a policy set.
799 Returns:
800 Callable[[~.GetIamPolicyRequest],
801 ~.Policy]:
802 A function that, when called, will call the underlying RPC
803 on the server.
804 """
805 # Generate a "stub function" on-the-fly which will actually make
806 # the request.
807 # gRPC handles serialization and deserialization, so we just need
808 # to pass in the functions for each.
809 if "get_iam_policy" not in self._stubs:
810 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
811 "/google.iam.v1.IAMPolicy/GetIamPolicy",
812 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
813 response_deserializer=policy_pb2.Policy.FromString,
814 )
815 return self._stubs["get_iam_policy"]
816
817 @property
818 def test_iam_permissions(
819 self,
820 ) -> Callable[
821 [iam_policy_pb2.TestIamPermissionsRequest],
822 iam_policy_pb2.TestIamPermissionsResponse,
823 ]:
824 r"""Return a callable for the test iam permissions method over gRPC.
825 Tests the specified permissions against the IAM access control
826 policy for a function. If the function does not exist, this will
827 return an empty set of permissions, not a NOT_FOUND error.
828 Returns:
829 Callable[[~.TestIamPermissionsRequest],
830 ~.TestIamPermissionsResponse]:
831 A function that, when called, will call the underlying RPC
832 on the server.
833 """
834 # Generate a "stub function" on-the-fly which will actually make
835 # the request.
836 # gRPC handles serialization and deserialization, so we just need
837 # to pass in the functions for each.
838 if "test_iam_permissions" not in self._stubs:
839 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
840 "/google.iam.v1.IAMPolicy/TestIamPermissions",
841 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
842 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
843 )
844 return self._stubs["test_iam_permissions"]
845
846
847__all__ = ("PublisherGrpcAsyncIOTransport",)