Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/pubsub_v1/services/publisher/transports/grpc_asyncio.py: 43%
111 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import warnings
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
19from google.api_core import gapic_v1
20from google.api_core import grpc_helpers_async
21from google.auth import credentials as ga_credentials # type: ignore
22from google.auth.transport.grpc import SslCredentials # type: ignore
24import grpc # type: ignore
25from grpc.experimental import aio # type: ignore
27from google.iam.v1 import iam_policy_pb2 # type: ignore
28from google.iam.v1 import policy_pb2 # type: ignore
29from google.protobuf import empty_pb2 # type: ignore
30from google.pubsub_v1.types import pubsub
31from .base import PublisherTransport, DEFAULT_CLIENT_INFO
32from .grpc import PublisherGrpcTransport
35class PublisherGrpcAsyncIOTransport(PublisherTransport):
36 """gRPC AsyncIO backend transport for Publisher.
38 The service that an application uses to manipulate topics,
39 and to send messages to a topic.
41 This class defines the same methods as the primary client, so the
42 primary client can load the underlying transport implementation
43 and call it.
45 It sends protocol buffers over the wire using gRPC (which is built on
46 top of HTTP/2); the ``grpcio`` package must be installed.
47 """
49 _grpc_channel: aio.Channel
50 _stubs: Dict[str, Callable] = {}
52 @classmethod
53 def create_channel(
54 cls,
55 host: str = "pubsub.googleapis.com",
56 credentials: Optional[ga_credentials.Credentials] = None,
57 credentials_file: Optional[str] = None,
58 scopes: Optional[Sequence[str]] = None,
59 quota_project_id: Optional[str] = None,
60 **kwargs,
61 ) -> aio.Channel:
62 """Create and return a gRPC AsyncIO channel object.
63 Args:
64 host (Optional[str]): The host for the channel to use.
65 credentials (Optional[~.Credentials]): The
66 authorization credentials to attach to requests. These
67 credentials identify this application to the service. If
68 none are specified, the client will attempt to ascertain
69 the credentials from the environment.
70 credentials_file (Optional[str]): A file with credentials that can
71 be loaded with :func:`google.auth.load_credentials_from_file`.
72 This argument is ignored if ``channel`` is provided.
73 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
74 service. These are only used when credentials are not specified and
75 are passed to :func:`google.auth.default`.
76 quota_project_id (Optional[str]): An optional project to use for billing
77 and quota.
78 kwargs (Optional[dict]): Keyword arguments, which are passed to the
79 channel creation.
80 Returns:
81 aio.Channel: A gRPC AsyncIO channel object.
82 """
84 return grpc_helpers_async.create_channel(
85 host,
86 credentials=credentials,
87 credentials_file=credentials_file,
88 quota_project_id=quota_project_id,
89 default_scopes=cls.AUTH_SCOPES,
90 scopes=scopes,
91 default_host=cls.DEFAULT_HOST,
92 **kwargs,
93 )
95 def __init__(
96 self,
97 *,
98 host: str = "pubsub.googleapis.com",
99 credentials: Optional[ga_credentials.Credentials] = None,
100 credentials_file: Optional[str] = None,
101 scopes: Optional[Sequence[str]] = None,
102 channel: Optional[aio.Channel] = None,
103 api_mtls_endpoint: Optional[str] = None,
104 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
105 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
106 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
107 quota_project_id: Optional[str] = None,
108 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
109 always_use_jwt_access: Optional[bool] = False,
110 api_audience: Optional[str] = None,
111 ) -> None:
112 """Instantiate the transport.
114 Args:
115 host (Optional[str]):
116 The hostname to connect to.
117 credentials (Optional[google.auth.credentials.Credentials]): The
118 authorization credentials to attach to requests. These
119 credentials identify the application to the service; if none
120 are specified, the client will attempt to ascertain the
121 credentials from the environment.
122 This argument is ignored if ``channel`` is provided.
123 credentials_file (Optional[str]): A file with credentials that can
124 be loaded with :func:`google.auth.load_credentials_from_file`.
125 This argument is ignored if ``channel`` is provided.
126 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
127 service. These are only used when credentials are not specified and
128 are passed to :func:`google.auth.default`.
129 channel (Optional[aio.Channel]): A ``Channel`` instance through
130 which to make calls.
131 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
132 If provided, it overrides the ``host`` argument and tries to create
133 a mutual TLS channel with client SSL credentials from
134 ``client_cert_source`` or application default SSL credentials.
135 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
136 Deprecated. A callback to provide client SSL certificate bytes and
137 private key bytes, both in PEM format. It is ignored if
138 ``api_mtls_endpoint`` is None.
139 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
140 for the grpc channel. It is ignored if ``channel`` is provided.
141 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
142 A callback to provide client certificate bytes and private key bytes,
143 both in PEM format. It is used to configure a mutual TLS channel. It is
144 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
145 quota_project_id (Optional[str]): An optional project to use for billing
146 and quota.
147 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
148 The client info used to send a user-agent string along with
149 API requests. If ``None``, then default info will be used.
150 Generally, you only need to set this if you're developing
151 your own client library.
152 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
153 be used for service account credentials.
155 Raises:
156 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
157 creation failed for any reason.
158 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
159 and ``credentials_file`` are passed.
160 """
161 self._grpc_channel = None
162 self._ssl_channel_credentials = ssl_channel_credentials
163 self._stubs: Dict[str, Callable] = {}
165 if api_mtls_endpoint:
166 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
167 if client_cert_source:
168 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
170 if channel:
171 # Ignore credentials if a channel was passed.
172 credentials = False
173 # If a channel was explicitly provided, set it.
174 self._grpc_channel = channel
175 self._ssl_channel_credentials = None
176 else:
177 if api_mtls_endpoint:
178 host = api_mtls_endpoint
180 # Create SSL credentials with client_cert_source or application
181 # default SSL credentials.
182 if client_cert_source:
183 cert, key = client_cert_source()
184 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
185 certificate_chain=cert, private_key=key
186 )
187 else:
188 self._ssl_channel_credentials = SslCredentials().ssl_credentials
190 else:
191 if client_cert_source_for_mtls and not ssl_channel_credentials:
192 cert, key = client_cert_source_for_mtls()
193 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
194 certificate_chain=cert, private_key=key
195 )
197 # The base transport sets the host, credentials and scopes
198 super().__init__(
199 host=host,
200 credentials=credentials,
201 credentials_file=credentials_file,
202 scopes=scopes,
203 quota_project_id=quota_project_id,
204 client_info=client_info,
205 always_use_jwt_access=always_use_jwt_access,
206 api_audience=api_audience,
207 )
209 if not self._grpc_channel:
210 self._grpc_channel = type(self).create_channel(
211 self._host,
212 # use the credentials which are saved
213 credentials=self._credentials,
214 # Set ``credentials_file`` to ``None`` here as
215 # the credentials that we saved earlier should be used.
216 credentials_file=None,
217 scopes=self._scopes,
218 ssl_credentials=self._ssl_channel_credentials,
219 quota_project_id=quota_project_id,
220 options=[
221 ("grpc.max_send_message_length", -1),
222 ("grpc.max_receive_message_length", -1),
223 ("grpc.max_metadata_size", 4 * 1024 * 1024),
224 ("grpc.keepalive_time_ms", 30000),
225 ],
226 )
228 # Wrap messages. This must be done after self._grpc_channel exists
229 self._prep_wrapped_messages(client_info)
231 @property
232 def grpc_channel(self) -> aio.Channel:
233 """Create the channel designed to connect to this service.
235 This property caches on the instance; repeated calls return
236 the same channel.
237 """
238 # Return the channel from cache.
239 return self._grpc_channel
241 @property
242 def create_topic(self) -> Callable[[pubsub.Topic], Awaitable[pubsub.Topic]]:
243 r"""Return a callable for the create topic method over gRPC.
245 Creates the given topic with the given name. See the [resource
246 name rules]
247 (https://cloud.google.com/pubsub/docs/admin#resource_names).
249 Returns:
250 Callable[[~.Topic],
251 Awaitable[~.Topic]]:
252 A function that, when called, will call the underlying RPC
253 on the server.
254 """
255 # Generate a "stub function" on-the-fly which will actually make
256 # the request.
257 # gRPC handles serialization and deserialization, so we just need
258 # to pass in the functions for each.
259 if "create_topic" not in self._stubs:
260 self._stubs["create_topic"] = self.grpc_channel.unary_unary(
261 "/google.pubsub.v1.Publisher/CreateTopic",
262 request_serializer=pubsub.Topic.serialize,
263 response_deserializer=pubsub.Topic.deserialize,
264 )
265 return self._stubs["create_topic"]
267 @property
268 def update_topic(
269 self,
270 ) -> Callable[[pubsub.UpdateTopicRequest], Awaitable[pubsub.Topic]]:
271 r"""Return a callable for the update topic method over gRPC.
273 Updates an existing topic. Note that certain
274 properties of a topic are not modifiable.
276 Returns:
277 Callable[[~.UpdateTopicRequest],
278 Awaitable[~.Topic]]:
279 A function that, when called, will call the underlying RPC
280 on the server.
281 """
282 # Generate a "stub function" on-the-fly which will actually make
283 # the request.
284 # gRPC handles serialization and deserialization, so we just need
285 # to pass in the functions for each.
286 if "update_topic" not in self._stubs:
287 self._stubs["update_topic"] = self.grpc_channel.unary_unary(
288 "/google.pubsub.v1.Publisher/UpdateTopic",
289 request_serializer=pubsub.UpdateTopicRequest.serialize,
290 response_deserializer=pubsub.Topic.deserialize,
291 )
292 return self._stubs["update_topic"]
294 @property
295 def publish(
296 self,
297 ) -> Callable[[pubsub.PublishRequest], Awaitable[pubsub.PublishResponse]]:
298 r"""Return a callable for the publish method over gRPC.
300 Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
301 the topic does not exist.
303 Returns:
304 Callable[[~.PublishRequest],
305 Awaitable[~.PublishResponse]]:
306 A function that, when called, will call the underlying RPC
307 on the server.
308 """
309 # Generate a "stub function" on-the-fly which will actually make
310 # the request.
311 # gRPC handles serialization and deserialization, so we just need
312 # to pass in the functions for each.
313 if "publish" not in self._stubs:
314 self._stubs["publish"] = self.grpc_channel.unary_unary(
315 "/google.pubsub.v1.Publisher/Publish",
316 request_serializer=pubsub.PublishRequest.serialize,
317 response_deserializer=pubsub.PublishResponse.deserialize,
318 )
319 return self._stubs["publish"]
321 @property
322 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], Awaitable[pubsub.Topic]]:
323 r"""Return a callable for the get topic method over gRPC.
325 Gets the configuration of a topic.
327 Returns:
328 Callable[[~.GetTopicRequest],
329 Awaitable[~.Topic]]:
330 A function that, when called, will call the underlying RPC
331 on the server.
332 """
333 # Generate a "stub function" on-the-fly which will actually make
334 # the request.
335 # gRPC handles serialization and deserialization, so we just need
336 # to pass in the functions for each.
337 if "get_topic" not in self._stubs:
338 self._stubs["get_topic"] = self.grpc_channel.unary_unary(
339 "/google.pubsub.v1.Publisher/GetTopic",
340 request_serializer=pubsub.GetTopicRequest.serialize,
341 response_deserializer=pubsub.Topic.deserialize,
342 )
343 return self._stubs["get_topic"]
345 @property
346 def list_topics(
347 self,
348 ) -> Callable[[pubsub.ListTopicsRequest], Awaitable[pubsub.ListTopicsResponse]]:
349 r"""Return a callable for the list topics method over gRPC.
351 Lists matching topics.
353 Returns:
354 Callable[[~.ListTopicsRequest],
355 Awaitable[~.ListTopicsResponse]]:
356 A function that, when called, will call the underlying RPC
357 on the server.
358 """
359 # Generate a "stub function" on-the-fly which will actually make
360 # the request.
361 # gRPC handles serialization and deserialization, so we just need
362 # to pass in the functions for each.
363 if "list_topics" not in self._stubs:
364 self._stubs["list_topics"] = self.grpc_channel.unary_unary(
365 "/google.pubsub.v1.Publisher/ListTopics",
366 request_serializer=pubsub.ListTopicsRequest.serialize,
367 response_deserializer=pubsub.ListTopicsResponse.deserialize,
368 )
369 return self._stubs["list_topics"]
371 @property
372 def list_topic_subscriptions(
373 self,
374 ) -> Callable[
375 [pubsub.ListTopicSubscriptionsRequest],
376 Awaitable[pubsub.ListTopicSubscriptionsResponse],
377 ]:
378 r"""Return a callable for the list topic subscriptions method over gRPC.
380 Lists the names of the attached subscriptions on this
381 topic.
383 Returns:
384 Callable[[~.ListTopicSubscriptionsRequest],
385 Awaitable[~.ListTopicSubscriptionsResponse]]:
386 A function that, when called, will call the underlying RPC
387 on the server.
388 """
389 # Generate a "stub function" on-the-fly which will actually make
390 # the request.
391 # gRPC handles serialization and deserialization, so we just need
392 # to pass in the functions for each.
393 if "list_topic_subscriptions" not in self._stubs:
394 self._stubs["list_topic_subscriptions"] = self.grpc_channel.unary_unary(
395 "/google.pubsub.v1.Publisher/ListTopicSubscriptions",
396 request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize,
397 response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize,
398 )
399 return self._stubs["list_topic_subscriptions"]
401 @property
402 def list_topic_snapshots(
403 self,
404 ) -> Callable[
405 [pubsub.ListTopicSnapshotsRequest], Awaitable[pubsub.ListTopicSnapshotsResponse]
406 ]:
407 r"""Return a callable for the list topic snapshots method over gRPC.
409 Lists the names of the snapshots on this topic. Snapshots are
410 used in
411 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__
412 operations, which allow you to manage message acknowledgments in
413 bulk. That is, you can set the acknowledgment state of messages
414 in an existing subscription to the state captured by a snapshot.
416 Returns:
417 Callable[[~.ListTopicSnapshotsRequest],
418 Awaitable[~.ListTopicSnapshotsResponse]]:
419 A function that, when called, will call the underlying RPC
420 on the server.
421 """
422 # Generate a "stub function" on-the-fly which will actually make
423 # the request.
424 # gRPC handles serialization and deserialization, so we just need
425 # to pass in the functions for each.
426 if "list_topic_snapshots" not in self._stubs:
427 self._stubs["list_topic_snapshots"] = self.grpc_channel.unary_unary(
428 "/google.pubsub.v1.Publisher/ListTopicSnapshots",
429 request_serializer=pubsub.ListTopicSnapshotsRequest.serialize,
430 response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize,
431 )
432 return self._stubs["list_topic_snapshots"]
434 @property
435 def delete_topic(
436 self,
437 ) -> Callable[[pubsub.DeleteTopicRequest], Awaitable[empty_pb2.Empty]]:
438 r"""Return a callable for the delete topic method over gRPC.
440 Deletes the topic with the given name. Returns ``NOT_FOUND`` if
441 the topic does not exist. After a topic is deleted, a new topic
442 may be created with the same name; this is an entirely new topic
443 with none of the old configuration or subscriptions. Existing
444 subscriptions to this topic are not deleted, but their ``topic``
445 field is set to ``_deleted-topic_``.
447 Returns:
448 Callable[[~.DeleteTopicRequest],
449 Awaitable[~.Empty]]:
450 A function that, when called, will call the underlying RPC
451 on the server.
452 """
453 # Generate a "stub function" on-the-fly which will actually make
454 # the request.
455 # gRPC handles serialization and deserialization, so we just need
456 # to pass in the functions for each.
457 if "delete_topic" not in self._stubs:
458 self._stubs["delete_topic"] = self.grpc_channel.unary_unary(
459 "/google.pubsub.v1.Publisher/DeleteTopic",
460 request_serializer=pubsub.DeleteTopicRequest.serialize,
461 response_deserializer=empty_pb2.Empty.FromString,
462 )
463 return self._stubs["delete_topic"]
465 @property
466 def detach_subscription(
467 self,
468 ) -> Callable[
469 [pubsub.DetachSubscriptionRequest], Awaitable[pubsub.DetachSubscriptionResponse]
470 ]:
471 r"""Return a callable for the detach subscription method over gRPC.
473 Detaches a subscription from this topic. All messages retained
474 in the subscription are dropped. Subsequent ``Pull`` and
475 ``StreamingPull`` requests will return FAILED_PRECONDITION. If
476 the subscription is a push subscription, pushes to the endpoint
477 will stop.
479 Returns:
480 Callable[[~.DetachSubscriptionRequest],
481 Awaitable[~.DetachSubscriptionResponse]]:
482 A function that, when called, will call the underlying RPC
483 on the server.
484 """
485 # Generate a "stub function" on-the-fly which will actually make
486 # the request.
487 # gRPC handles serialization and deserialization, so we just need
488 # to pass in the functions for each.
489 if "detach_subscription" not in self._stubs:
490 self._stubs["detach_subscription"] = self.grpc_channel.unary_unary(
491 "/google.pubsub.v1.Publisher/DetachSubscription",
492 request_serializer=pubsub.DetachSubscriptionRequest.serialize,
493 response_deserializer=pubsub.DetachSubscriptionResponse.deserialize,
494 )
495 return self._stubs["detach_subscription"]
497 @property
498 def set_iam_policy(
499 self,
500 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], Awaitable[policy_pb2.Policy]]:
501 r"""Return a callable for the set iam policy method over gRPC.
502 Sets the IAM access control policy on the specified
503 function. Replaces any existing policy.
504 Returns:
505 Callable[[~.SetIamPolicyRequest],
506 Awaitable[~.Policy]]:
507 A function that, when called, will call the underlying RPC
508 on the server.
509 """
510 # Generate a "stub function" on-the-fly which will actually make
511 # the request.
512 # gRPC handles serialization and deserialization, so we just need
513 # to pass in the functions for each.
514 if "set_iam_policy" not in self._stubs:
515 self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary(
516 "/google.iam.v1.IAMPolicy/SetIamPolicy",
517 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
518 response_deserializer=policy_pb2.Policy.FromString,
519 )
520 return self._stubs["set_iam_policy"]
522 @property
523 def get_iam_policy(
524 self,
525 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], Awaitable[policy_pb2.Policy]]:
526 r"""Return a callable for the get iam policy method over gRPC.
527 Gets the IAM access control policy for a function.
528 Returns an empty policy if the function exists and does
529 not have a policy set.
530 Returns:
531 Callable[[~.GetIamPolicyRequest],
532 Awaitable[~.Policy]]:
533 A function that, when called, will call the underlying RPC
534 on the server.
535 """
536 # Generate a "stub function" on-the-fly which will actually make
537 # the request.
538 # gRPC handles serialization and deserialization, so we just need
539 # to pass in the functions for each.
540 if "get_iam_policy" not in self._stubs:
541 self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary(
542 "/google.iam.v1.IAMPolicy/GetIamPolicy",
543 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
544 response_deserializer=policy_pb2.Policy.FromString,
545 )
546 return self._stubs["get_iam_policy"]
548 @property
549 def test_iam_permissions(
550 self,
551 ) -> Callable[
552 [iam_policy_pb2.TestIamPermissionsRequest],
553 Awaitable[iam_policy_pb2.TestIamPermissionsResponse],
554 ]:
555 r"""Return a callable for the test iam permissions method over gRPC.
556 Tests the specified permissions against the IAM access control
557 policy for a function. If the function does not exist, this will
558 return an empty set of permissions, not a NOT_FOUND error.
559 Returns:
560 Callable[[~.TestIamPermissionsRequest],
561 Awaitable[~.TestIamPermissionsResponse]]:
562 A function that, when called, will call the underlying RPC
563 on the server.
564 """
565 # Generate a "stub function" on-the-fly which will actually make
566 # the request.
567 # gRPC handles serialization and deserialization, so we just need
568 # to pass in the functions for each.
569 if "test_iam_permissions" not in self._stubs:
570 self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary(
571 "/google.iam.v1.IAMPolicy/TestIamPermissions",
572 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
573 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
574 )
575 return self._stubs["test_iam_permissions"]
577 def close(self):
578 return self.grpc_channel.close()
581__all__ = ("PublisherGrpcAsyncIOTransport",)