1# -*- coding: utf-8 -*- 
    2# Copyright 2025 Google LLC 
    3# 
    4# Licensed under the Apache License, Version 2.0 (the "License"); 
    5# you may not use this file except in compliance with the License. 
    6# You may obtain a copy of the License at 
    7# 
    8#     http://www.apache.org/licenses/LICENSE-2.0 
    9# 
    10# Unless required by applicable law or agreed to in writing, software 
    11# distributed under the License is distributed on an "AS IS" BASIS, 
    12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    13# See the License for the specific language governing permissions and 
    14# limitations under the License. 
    15# 
    16import json 
    17import logging as std_logging 
    18import pickle 
    19import warnings 
    20from typing import Callable, Dict, Optional, Sequence, Tuple, Union 
    21 
    22from google.api_core import grpc_helpers 
    23from google.api_core import gapic_v1 
    24import google.auth  # type: ignore 
    25from google.auth import credentials as ga_credentials  # type: ignore 
    26from google.auth.transport.grpc import SslCredentials  # type: ignore 
    27from google.protobuf.json_format import MessageToJson 
    28import google.protobuf.message 
    29 
    30import grpc  # type: ignore 
    31import proto  # type: ignore 
    32 
    33from google.iam.v1 import iam_policy_pb2  # type: ignore 
    34from google.iam.v1 import policy_pb2  # type: ignore 
    35from google.protobuf import empty_pb2  # type: ignore 
    36from google.pubsub_v1.types import pubsub 
    37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO 
    38 
    39try: 
    40    from google.api_core import client_logging  # type: ignore 
    41 
    42    CLIENT_LOGGING_SUPPORTED = True  # pragma: NO COVER 
    43except ImportError:  # pragma: NO COVER 
    44    CLIENT_LOGGING_SUPPORTED = False 
    45 
    46_LOGGER = std_logging.getLogger(__name__) 
    47 
    48 
    49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor):  # pragma: NO COVER 
    50    def intercept_unary_unary(self, continuation, client_call_details, request): 
    51        logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 
    52            std_logging.DEBUG 
    53        ) 
    54        if logging_enabled:  # pragma: NO COVER 
    55            request_metadata = client_call_details.metadata 
    56            if isinstance(request, proto.Message): 
    57                request_payload = type(request).to_json(request) 
    58            elif isinstance(request, google.protobuf.message.Message): 
    59                request_payload = MessageToJson(request) 
    60            else: 
    61                request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 
    62 
    63            request_metadata = { 
    64                key: value.decode("utf-8") if isinstance(value, bytes) else value 
    65                for key, value in request_metadata 
    66            } 
    67            grpc_request = { 
    68                "payload": request_payload, 
    69                "requestMethod": "grpc", 
    70                "metadata": dict(request_metadata), 
    71            } 
    72            _LOGGER.debug( 
    73                f"Sending request for {client_call_details.method}", 
    74                extra={ 
    75                    "serviceName": "google.pubsub.v1.Subscriber", 
    76                    "rpcName": str(client_call_details.method), 
    77                    "request": grpc_request, 
    78                    "metadata": grpc_request["metadata"], 
    79                }, 
    80            ) 
    81        response = continuation(client_call_details, request) 
    82        if logging_enabled:  # pragma: NO COVER 
    83            response_metadata = response.trailing_metadata() 
    84            # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 
    85            metadata = ( 
    86                dict([(k, str(v)) for k, v in response_metadata]) 
    87                if response_metadata 
    88                else None 
    89            ) 
    90            result = response.result() 
    91            if isinstance(result, proto.Message): 
    92                response_payload = type(result).to_json(result) 
    93            elif isinstance(result, google.protobuf.message.Message): 
    94                response_payload = MessageToJson(result) 
    95            else: 
    96                response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 
    97            grpc_response = { 
    98                "payload": response_payload, 
    99                "metadata": metadata, 
    100                "status": "OK", 
    101            } 
    102            _LOGGER.debug( 
    103                f"Received response for {client_call_details.method}.", 
    104                extra={ 
    105                    "serviceName": "google.pubsub.v1.Subscriber", 
    106                    "rpcName": client_call_details.method, 
    107                    "response": grpc_response, 
    108                    "metadata": grpc_response["metadata"], 
    109                }, 
    110            ) 
    111        return response 
    112 
    113 
    114class SubscriberGrpcTransport(SubscriberTransport): 
    115    """gRPC backend transport for Subscriber. 
    116 
    117    The service that an application uses to manipulate subscriptions and 
    118    to consume messages from a subscription via the ``Pull`` method or 
    119    by establishing a bi-directional stream using the ``StreamingPull`` 
    120    method. 
    121 
    122    This class defines the same methods as the primary client, so the 
    123    primary client can load the underlying transport implementation 
    124    and call it. 
    125 
    126    It sends protocol buffers over the wire using gRPC (which is built on 
    127    top of HTTP/2); the ``grpcio`` package must be installed. 
    128    """ 
    129 
    130    _stubs: Dict[str, Callable] 
    131 
    132    def __init__( 
    133        self, 
    134        *, 
    135        host: str = "pubsub.googleapis.com", 
    136        credentials: Optional[ga_credentials.Credentials] = None, 
    137        credentials_file: Optional[str] = None, 
    138        scopes: Optional[Sequence[str]] = None, 
    139        channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 
    140        api_mtls_endpoint: Optional[str] = None, 
    141        client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 
    142        ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 
    143        client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 
    144        quota_project_id: Optional[str] = None, 
    145        client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 
    146        always_use_jwt_access: Optional[bool] = False, 
    147        api_audience: Optional[str] = None, 
    148    ) -> None: 
    149        """Instantiate the transport. 
    150 
    151        Args: 
    152            host (Optional[str]): 
    153                 The hostname to connect to (default: 'pubsub.googleapis.com'). 
    154            credentials (Optional[google.auth.credentials.Credentials]): The 
    155                authorization credentials to attach to requests. These 
    156                credentials identify the application to the service; if none 
    157                are specified, the client will attempt to ascertain the 
    158                credentials from the environment. 
    159                This argument is ignored if a ``channel`` instance is provided. 
    160            credentials_file (Optional[str]): A file with credentials that can 
    161                be loaded with :func:`google.auth.load_credentials_from_file`. 
    162                This argument is ignored if a ``channel`` instance is provided. 
    163            scopes (Optional(Sequence[str])): A list of scopes. This argument is 
    164                ignored if a ``channel`` instance is provided. 
    165            channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 
    166                A ``Channel`` instance through which to make calls, or a Callable 
    167                that constructs and returns one. If set to None, ``self.create_channel`` 
    168                is used to create the channel. If a Callable is given, it will be called 
    169                with the same arguments as used in ``self.create_channel``. 
    170            api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 
    171                If provided, it overrides the ``host`` argument and tries to create 
    172                a mutual TLS channel with client SSL credentials from 
    173                ``client_cert_source`` or application default SSL credentials. 
    174            client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 
    175                Deprecated. A callback to provide client SSL certificate bytes and 
    176                private key bytes, both in PEM format. It is ignored if 
    177                ``api_mtls_endpoint`` is None. 
    178            ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 
    179                for the grpc channel. It is ignored if a ``channel`` instance is provided. 
    180            client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 
    181                A callback to provide client certificate bytes and private key bytes, 
    182                both in PEM format. It is used to configure a mutual TLS channel. It is 
    183                ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 
    184            quota_project_id (Optional[str]): An optional project to use for billing 
    185                and quota. 
    186            client_info (google.api_core.gapic_v1.client_info.ClientInfo): 
    187                The client info used to send a user-agent string along with 
    188                API requests. If ``None``, then default info will be used. 
    189                Generally, you only need to set this if you're developing 
    190                your own client library. 
    191            always_use_jwt_access (Optional[bool]): Whether self signed JWT should 
    192                be used for service account credentials. 
    193 
    194        Raises: 
    195          google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 
    196              creation failed for any reason. 
    197          google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 
    198              and ``credentials_file`` are passed. 
    199        """ 
    200        self._grpc_channel = None 
    201        self._ssl_channel_credentials = ssl_channel_credentials 
    202        self._stubs: Dict[str, Callable] = {} 
    203 
    204        if api_mtls_endpoint: 
    205            warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 
    206        if client_cert_source: 
    207            warnings.warn("client_cert_source is deprecated", DeprecationWarning) 
    208 
    209        if isinstance(channel, grpc.Channel): 
    210            # Ignore credentials if a channel was passed. 
    211            credentials = None 
    212            self._ignore_credentials = True 
    213            # If a channel was explicitly provided, set it. 
    214            self._grpc_channel = channel 
    215            self._ssl_channel_credentials = None 
    216 
    217        else: 
    218            if api_mtls_endpoint: 
    219                host = api_mtls_endpoint 
    220 
    221                # Create SSL credentials with client_cert_source or application 
    222                # default SSL credentials. 
    223                if client_cert_source: 
    224                    cert, key = client_cert_source() 
    225                    self._ssl_channel_credentials = grpc.ssl_channel_credentials( 
    226                        certificate_chain=cert, private_key=key 
    227                    ) 
    228                else: 
    229                    self._ssl_channel_credentials = SslCredentials().ssl_credentials 
    230 
    231            else: 
    232                if client_cert_source_for_mtls and not ssl_channel_credentials: 
    233                    cert, key = client_cert_source_for_mtls() 
    234                    self._ssl_channel_credentials = grpc.ssl_channel_credentials( 
    235                        certificate_chain=cert, private_key=key 
    236                    ) 
    237 
    238        # The base transport sets the host, credentials and scopes 
    239        super().__init__( 
    240            host=host, 
    241            credentials=credentials, 
    242            credentials_file=credentials_file, 
    243            scopes=scopes, 
    244            quota_project_id=quota_project_id, 
    245            client_info=client_info, 
    246            always_use_jwt_access=always_use_jwt_access, 
    247            api_audience=api_audience, 
    248        ) 
    249 
    250        if not self._grpc_channel: 
    251            # initialize with the provided callable or the default channel 
    252            channel_init = channel or type(self).create_channel 
    253            self._grpc_channel = channel_init( 
    254                self._host, 
    255                # use the credentials which are saved 
    256                credentials=self._credentials, 
    257                # Set ``credentials_file`` to ``None`` here as 
    258                # the credentials that we saved earlier should be used. 
    259                credentials_file=None, 
    260                scopes=self._scopes, 
    261                ssl_credentials=self._ssl_channel_credentials, 
    262                quota_project_id=quota_project_id, 
    263                options=[ 
    264                    ("grpc.max_send_message_length", -1), 
    265                    ("grpc.max_receive_message_length", -1), 
    266                    ("grpc.max_metadata_size", 4 * 1024 * 1024), 
    267                    ("grpc.keepalive_time_ms", 30000), 
    268                ], 
    269            ) 
    270 
    271        self._interceptor = _LoggingClientInterceptor() 
    272        self._logged_channel = grpc.intercept_channel( 
    273            self._grpc_channel, self._interceptor 
    274        ) 
    275 
    276        # Wrap messages. This must be done after self._logged_channel exists 
    277        self._prep_wrapped_messages(client_info) 
    278 
    279    @classmethod 
    280    def create_channel( 
    281        cls, 
    282        host: str = "pubsub.googleapis.com", 
    283        credentials: Optional[ga_credentials.Credentials] = None, 
    284        credentials_file: Optional[str] = None, 
    285        scopes: Optional[Sequence[str]] = None, 
    286        quota_project_id: Optional[str] = None, 
    287        **kwargs, 
    288    ) -> grpc.Channel: 
    289        """Create and return a gRPC channel object. 
    290        Args: 
    291            host (Optional[str]): The host for the channel to use. 
    292            credentials (Optional[~.Credentials]): The 
    293                authorization credentials to attach to requests. These 
    294                credentials identify this application to the service. If 
    295                none are specified, the client will attempt to ascertain 
    296                the credentials from the environment. 
    297            credentials_file (Optional[str]): A file with credentials that can 
    298                be loaded with :func:`google.auth.load_credentials_from_file`. 
    299                This argument is mutually exclusive with credentials. 
    300            scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 
    301                service. These are only used when credentials are not specified and 
    302                are passed to :func:`google.auth.default`. 
    303            quota_project_id (Optional[str]): An optional project to use for billing 
    304                and quota. 
    305            kwargs (Optional[dict]): Keyword arguments, which are passed to the 
    306                channel creation. 
    307        Returns: 
    308            grpc.Channel: A gRPC channel object. 
    309 
    310        Raises: 
    311            google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 
    312              and ``credentials_file`` are passed. 
    313        """ 
    314 
    315        return grpc_helpers.create_channel( 
    316            host, 
    317            credentials=credentials, 
    318            credentials_file=credentials_file, 
    319            quota_project_id=quota_project_id, 
    320            default_scopes=cls.AUTH_SCOPES, 
    321            scopes=scopes, 
    322            default_host=cls.DEFAULT_HOST, 
    323            **kwargs, 
    324        ) 
    325 
    326    @property 
    327    def grpc_channel(self) -> grpc.Channel: 
    328        """Return the channel designed to connect to this service.""" 
    329        return self._grpc_channel 
    330 
    331    @property 
    332    def create_subscription( 
    333        self, 
    334    ) -> Callable[[pubsub.Subscription], pubsub.Subscription]: 
    335        r"""Return a callable for the create subscription method over gRPC. 
    336 
    337        Creates a subscription to a given topic. See the [resource name 
    338        rules] 
    339        (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 
    340        If the subscription already exists, returns ``ALREADY_EXISTS``. 
    341        If the corresponding topic doesn't exist, returns ``NOT_FOUND``. 
    342 
    343        If the name is not provided in the request, the server will 
    344        assign a random name for this subscription on the same project 
    345        as the topic, conforming to the [resource name format] 
    346        (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 
    347        The generated name is populated in the returned Subscription 
    348        object. Note that for REST API requests, you must specify a name 
    349        in the request. 
    350 
    351        Returns: 
    352            Callable[[~.Subscription], 
    353                    ~.Subscription]: 
    354                A function that, when called, will call the underlying RPC 
    355                on the server. 
    356        """ 
    357        # Generate a "stub function" on-the-fly which will actually make 
    358        # the request. 
    359        # gRPC handles serialization and deserialization, so we just need 
    360        # to pass in the functions for each. 
    361        if "create_subscription" not in self._stubs: 
    362            self._stubs["create_subscription"] = self._logged_channel.unary_unary( 
    363                "/google.pubsub.v1.Subscriber/CreateSubscription", 
    364                request_serializer=pubsub.Subscription.serialize, 
    365                response_deserializer=pubsub.Subscription.deserialize, 
    366            ) 
    367        return self._stubs["create_subscription"] 
    368 
    369    @property 
    370    def get_subscription( 
    371        self, 
    372    ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]: 
    373        r"""Return a callable for the get subscription method over gRPC. 
    374 
    375        Gets the configuration details of a subscription. 
    376 
    377        Returns: 
    378            Callable[[~.GetSubscriptionRequest], 
    379                    ~.Subscription]: 
    380                A function that, when called, will call the underlying RPC 
    381                on the server. 
    382        """ 
    383        # Generate a "stub function" on-the-fly which will actually make 
    384        # the request. 
    385        # gRPC handles serialization and deserialization, so we just need 
    386        # to pass in the functions for each. 
    387        if "get_subscription" not in self._stubs: 
    388            self._stubs["get_subscription"] = self._logged_channel.unary_unary( 
    389                "/google.pubsub.v1.Subscriber/GetSubscription", 
    390                request_serializer=pubsub.GetSubscriptionRequest.serialize, 
    391                response_deserializer=pubsub.Subscription.deserialize, 
    392            ) 
    393        return self._stubs["get_subscription"] 
    394 
    395    @property 
    396    def update_subscription( 
    397        self, 
    398    ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]: 
    399        r"""Return a callable for the update subscription method over gRPC. 
    400 
    401        Updates an existing subscription by updating the 
    402        fields specified in the update mask. Note that certain 
    403        properties of a subscription, such as its topic, are not 
    404        modifiable. 
    405 
    406        Returns: 
    407            Callable[[~.UpdateSubscriptionRequest], 
    408                    ~.Subscription]: 
    409                A function that, when called, will call the underlying RPC 
    410                on the server. 
    411        """ 
    412        # Generate a "stub function" on-the-fly which will actually make 
    413        # the request. 
    414        # gRPC handles serialization and deserialization, so we just need 
    415        # to pass in the functions for each. 
    416        if "update_subscription" not in self._stubs: 
    417            self._stubs["update_subscription"] = self._logged_channel.unary_unary( 
    418                "/google.pubsub.v1.Subscriber/UpdateSubscription", 
    419                request_serializer=pubsub.UpdateSubscriptionRequest.serialize, 
    420                response_deserializer=pubsub.Subscription.deserialize, 
    421            ) 
    422        return self._stubs["update_subscription"] 
    423 
    424    @property 
    425    def list_subscriptions( 
    426        self, 
    427    ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]: 
    428        r"""Return a callable for the list subscriptions method over gRPC. 
    429 
    430        Lists matching subscriptions. 
    431 
    432        Returns: 
    433            Callable[[~.ListSubscriptionsRequest], 
    434                    ~.ListSubscriptionsResponse]: 
    435                A function that, when called, will call the underlying RPC 
    436                on the server. 
    437        """ 
    438        # Generate a "stub function" on-the-fly which will actually make 
    439        # the request. 
    440        # gRPC handles serialization and deserialization, so we just need 
    441        # to pass in the functions for each. 
    442        if "list_subscriptions" not in self._stubs: 
    443            self._stubs["list_subscriptions"] = self._logged_channel.unary_unary( 
    444                "/google.pubsub.v1.Subscriber/ListSubscriptions", 
    445                request_serializer=pubsub.ListSubscriptionsRequest.serialize, 
    446                response_deserializer=pubsub.ListSubscriptionsResponse.deserialize, 
    447            ) 
    448        return self._stubs["list_subscriptions"] 
    449 
    450    @property 
    451    def delete_subscription( 
    452        self, 
    453    ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]: 
    454        r"""Return a callable for the delete subscription method over gRPC. 
    455 
    456        Deletes an existing subscription. All messages retained in the 
    457        subscription are immediately dropped. Calls to ``Pull`` after 
    458        deletion will return ``NOT_FOUND``. After a subscription is 
    459        deleted, a new one may be created with the same name, but the 
    460        new one has no association with the old subscription or its 
    461        topic unless the same topic is specified. 
    462 
    463        Returns: 
    464            Callable[[~.DeleteSubscriptionRequest], 
    465                    ~.Empty]: 
    466                A function that, when called, will call the underlying RPC 
    467                on the server. 
    468        """ 
    469        # Generate a "stub function" on-the-fly which will actually make 
    470        # the request. 
    471        # gRPC handles serialization and deserialization, so we just need 
    472        # to pass in the functions for each. 
    473        if "delete_subscription" not in self._stubs: 
    474            self._stubs["delete_subscription"] = self._logged_channel.unary_unary( 
    475                "/google.pubsub.v1.Subscriber/DeleteSubscription", 
    476                request_serializer=pubsub.DeleteSubscriptionRequest.serialize, 
    477                response_deserializer=empty_pb2.Empty.FromString, 
    478            ) 
    479        return self._stubs["delete_subscription"] 
    480 
    481    @property 
    482    def modify_ack_deadline( 
    483        self, 
    484    ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]: 
    485        r"""Return a callable for the modify ack deadline method over gRPC. 
    486 
    487        Modifies the ack deadline for a specific message. This method is 
    488        useful to indicate that more time is needed to process a message 
    489        by the subscriber, or to make the message available for 
    490        redelivery if the processing was interrupted. Note that this 
    491        does not modify the subscription-level ``ackDeadlineSeconds`` 
    492        used for subsequent messages. 
    493 
    494        Returns: 
    495            Callable[[~.ModifyAckDeadlineRequest], 
    496                    ~.Empty]: 
    497                A function that, when called, will call the underlying RPC 
    498                on the server. 
    499        """ 
    500        # Generate a "stub function" on-the-fly which will actually make 
    501        # the request. 
    502        # gRPC handles serialization and deserialization, so we just need 
    503        # to pass in the functions for each. 
    504        if "modify_ack_deadline" not in self._stubs: 
    505            self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary( 
    506                "/google.pubsub.v1.Subscriber/ModifyAckDeadline", 
    507                request_serializer=pubsub.ModifyAckDeadlineRequest.serialize, 
    508                response_deserializer=empty_pb2.Empty.FromString, 
    509            ) 
    510        return self._stubs["modify_ack_deadline"] 
    511 
    512    @property 
    513    def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]: 
    514        r"""Return a callable for the acknowledge method over gRPC. 
    515 
    516        Acknowledges the messages associated with the ``ack_ids`` in the 
    517        ``AcknowledgeRequest``. The Pub/Sub system can remove the 
    518        relevant messages from the subscription. 
    519 
    520        Acknowledging a message whose ack deadline has expired may 
    521        succeed, but such a message may be redelivered later. 
    522        Acknowledging a message more than once will not result in an 
    523        error. 
    524 
    525        Returns: 
    526            Callable[[~.AcknowledgeRequest], 
    527                    ~.Empty]: 
    528                A function that, when called, will call the underlying RPC 
    529                on the server. 
    530        """ 
    531        # Generate a "stub function" on-the-fly which will actually make 
    532        # the request. 
    533        # gRPC handles serialization and deserialization, so we just need 
    534        # to pass in the functions for each. 
    535        if "acknowledge" not in self._stubs: 
    536            self._stubs["acknowledge"] = self._logged_channel.unary_unary( 
    537                "/google.pubsub.v1.Subscriber/Acknowledge", 
    538                request_serializer=pubsub.AcknowledgeRequest.serialize, 
    539                response_deserializer=empty_pb2.Empty.FromString, 
    540            ) 
    541        return self._stubs["acknowledge"] 
    542 
    543    @property 
    544    def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]: 
    545        r"""Return a callable for the pull method over gRPC. 
    546 
    547        Pulls messages from the server. 
    548 
    549        Returns: 
    550            Callable[[~.PullRequest], 
    551                    ~.PullResponse]: 
    552                A function that, when called, will call the underlying RPC 
    553                on the server. 
    554        """ 
    555        # Generate a "stub function" on-the-fly which will actually make 
    556        # the request. 
    557        # gRPC handles serialization and deserialization, so we just need 
    558        # to pass in the functions for each. 
    559        if "pull" not in self._stubs: 
    560            self._stubs["pull"] = self._logged_channel.unary_unary( 
    561                "/google.pubsub.v1.Subscriber/Pull", 
    562                request_serializer=pubsub.PullRequest.serialize, 
    563                response_deserializer=pubsub.PullResponse.deserialize, 
    564            ) 
    565        return self._stubs["pull"] 
    566 
    567    @property 
    568    def streaming_pull( 
    569        self, 
    570    ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]: 
    571        r"""Return a callable for the streaming pull method over gRPC. 
    572 
    573        Establishes a stream with the server, which sends messages down 
    574        to the client. The client streams acknowledgments and ack 
    575        deadline modifications back to the server. The server will close 
    576        the stream and return the status on any error. The server may 
    577        close the stream with status ``UNAVAILABLE`` to reassign 
    578        server-side resources, in which case, the client should 
    579        re-establish the stream. Flow control can be achieved by 
    580        configuring the underlying RPC channel. 
    581 
    582        Returns: 
    583            Callable[[~.StreamingPullRequest], 
    584                    ~.StreamingPullResponse]: 
    585                A function that, when called, will call the underlying RPC 
    586                on the server. 
    587        """ 
    588        # Generate a "stub function" on-the-fly which will actually make 
    589        # the request. 
    590        # gRPC handles serialization and deserialization, so we just need 
    591        # to pass in the functions for each. 
    592        if "streaming_pull" not in self._stubs: 
    593            self._stubs["streaming_pull"] = self._logged_channel.stream_stream( 
    594                "/google.pubsub.v1.Subscriber/StreamingPull", 
    595                request_serializer=pubsub.StreamingPullRequest.serialize, 
    596                response_deserializer=pubsub.StreamingPullResponse.deserialize, 
    597            ) 
    598        return self._stubs["streaming_pull"] 
    599 
    600    @property 
    601    def modify_push_config( 
    602        self, 
    603    ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]: 
    604        r"""Return a callable for the modify push config method over gRPC. 
    605 
    606        Modifies the ``PushConfig`` for a specified subscription. 
    607 
    608        This may be used to change a push subscription to a pull one 
    609        (signified by an empty ``PushConfig``) or vice versa, or change 
    610        the endpoint URL and other attributes of a push subscription. 
    611        Messages will accumulate for delivery continuously through the 
    612        call regardless of changes to the ``PushConfig``. 
    613 
    614        Returns: 
    615            Callable[[~.ModifyPushConfigRequest], 
    616                    ~.Empty]: 
    617                A function that, when called, will call the underlying RPC 
    618                on the server. 
    619        """ 
    620        # Generate a "stub function" on-the-fly which will actually make 
    621        # the request. 
    622        # gRPC handles serialization and deserialization, so we just need 
    623        # to pass in the functions for each. 
    624        if "modify_push_config" not in self._stubs: 
    625            self._stubs["modify_push_config"] = self._logged_channel.unary_unary( 
    626                "/google.pubsub.v1.Subscriber/ModifyPushConfig", 
    627                request_serializer=pubsub.ModifyPushConfigRequest.serialize, 
    628                response_deserializer=empty_pb2.Empty.FromString, 
    629            ) 
    630        return self._stubs["modify_push_config"] 
    631 
    632    @property 
    633    def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]: 
    634        r"""Return a callable for the get snapshot method over gRPC. 
    635 
    636        Gets the configuration details of a snapshot. Snapshots are used 
    637        in 
    638        `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 
    639        operations, which allow you to manage message acknowledgments in 
    640        bulk. That is, you can set the acknowledgment state of messages 
    641        in an existing subscription to the state captured by a snapshot. 
    642 
    643        Returns: 
    644            Callable[[~.GetSnapshotRequest], 
    645                    ~.Snapshot]: 
    646                A function that, when called, will call the underlying RPC 
    647                on the server. 
    648        """ 
    649        # Generate a "stub function" on-the-fly which will actually make 
    650        # the request. 
    651        # gRPC handles serialization and deserialization, so we just need 
    652        # to pass in the functions for each. 
    653        if "get_snapshot" not in self._stubs: 
    654            self._stubs["get_snapshot"] = self._logged_channel.unary_unary( 
    655                "/google.pubsub.v1.Subscriber/GetSnapshot", 
    656                request_serializer=pubsub.GetSnapshotRequest.serialize, 
    657                response_deserializer=pubsub.Snapshot.deserialize, 
    658            ) 
    659        return self._stubs["get_snapshot"] 
    660 
    661    @property 
    662    def list_snapshots( 
    663        self, 
    664    ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]: 
    665        r"""Return a callable for the list snapshots method over gRPC. 
    666 
    667        Lists the existing snapshots. Snapshots are used in 
    668        `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 
    669        operations, which allow you to manage message acknowledgments in 
    670        bulk. That is, you can set the acknowledgment state of messages 
    671        in an existing subscription to the state captured by a snapshot. 
    672 
    673        Returns: 
    674            Callable[[~.ListSnapshotsRequest], 
    675                    ~.ListSnapshotsResponse]: 
    676                A function that, when called, will call the underlying RPC 
    677                on the server. 
    678        """ 
    679        # Generate a "stub function" on-the-fly which will actually make 
    680        # the request. 
    681        # gRPC handles serialization and deserialization, so we just need 
    682        # to pass in the functions for each. 
    683        if "list_snapshots" not in self._stubs: 
    684            self._stubs["list_snapshots"] = self._logged_channel.unary_unary( 
    685                "/google.pubsub.v1.Subscriber/ListSnapshots", 
    686                request_serializer=pubsub.ListSnapshotsRequest.serialize, 
    687                response_deserializer=pubsub.ListSnapshotsResponse.deserialize, 
    688            ) 
    689        return self._stubs["list_snapshots"] 
    690 
    691    @property 
    692    def create_snapshot( 
    693        self, 
    694    ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]: 
    695        r"""Return a callable for the create snapshot method over gRPC. 
    696 
    697        Creates a snapshot from the requested subscription. Snapshots 
    698        are used in 
    699        `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 
    700        operations, which allow you to manage message acknowledgments in 
    701        bulk. That is, you can set the acknowledgment state of messages 
    702        in an existing subscription to the state captured by a snapshot. 
    703        If the snapshot already exists, returns ``ALREADY_EXISTS``. If 
    704        the requested subscription doesn't exist, returns ``NOT_FOUND``. 
    705        If the backlog in the subscription is too old -- and the 
    706        resulting snapshot would expire in less than 1 hour -- then 
    707        ``FAILED_PRECONDITION`` is returned. See also the 
    708        ``Snapshot.expire_time`` field. If the name is not provided in 
    709        the request, the server will assign a random name for this 
    710        snapshot on the same project as the subscription, conforming to 
    711        the [resource name format] 
    712        (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 
    713        The generated name is populated in the returned Snapshot object. 
    714        Note that for REST API requests, you must specify a name in the 
    715        request. 
    716 
    717        Returns: 
    718            Callable[[~.CreateSnapshotRequest], 
    719                    ~.Snapshot]: 
    720                A function that, when called, will call the underlying RPC 
    721                on the server. 
    722        """ 
    723        # Generate a "stub function" on-the-fly which will actually make 
    724        # the request. 
    725        # gRPC handles serialization and deserialization, so we just need 
    726        # to pass in the functions for each. 
    727        if "create_snapshot" not in self._stubs: 
    728            self._stubs["create_snapshot"] = self._logged_channel.unary_unary( 
    729                "/google.pubsub.v1.Subscriber/CreateSnapshot", 
    730                request_serializer=pubsub.CreateSnapshotRequest.serialize, 
    731                response_deserializer=pubsub.Snapshot.deserialize, 
    732            ) 
    733        return self._stubs["create_snapshot"] 
    734 
    735    @property 
    736    def update_snapshot( 
    737        self, 
    738    ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]: 
    739        r"""Return a callable for the update snapshot method over gRPC. 
    740 
    741        Updates an existing snapshot by updating the fields specified in 
    742        the update mask. Snapshots are used in 
    743        `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 
    744        operations, which allow you to manage message acknowledgments in 
    745        bulk. That is, you can set the acknowledgment state of messages 
    746        in an existing subscription to the state captured by a snapshot. 
    747 
    748        Returns: 
    749            Callable[[~.UpdateSnapshotRequest], 
    750                    ~.Snapshot]: 
    751                A function that, when called, will call the underlying RPC 
    752                on the server. 
    753        """ 
    754        # Generate a "stub function" on-the-fly which will actually make 
    755        # the request. 
    756        # gRPC handles serialization and deserialization, so we just need 
    757        # to pass in the functions for each. 
    758        if "update_snapshot" not in self._stubs: 
    759            self._stubs["update_snapshot"] = self._logged_channel.unary_unary( 
    760                "/google.pubsub.v1.Subscriber/UpdateSnapshot", 
    761                request_serializer=pubsub.UpdateSnapshotRequest.serialize, 
    762                response_deserializer=pubsub.Snapshot.deserialize, 
    763            ) 
    764        return self._stubs["update_snapshot"] 
    765 
    766    @property 
    767    def delete_snapshot( 
    768        self, 
    769    ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]: 
    770        r"""Return a callable for the delete snapshot method over gRPC. 
    771 
    772        Removes an existing snapshot. Snapshots are used in [Seek] 
    773        (https://cloud.google.com/pubsub/docs/replay-overview) 
    774        operations, which allow you to manage message acknowledgments in 
    775        bulk. That is, you can set the acknowledgment state of messages 
    776        in an existing subscription to the state captured by a snapshot. 
    777        When the snapshot is deleted, all messages retained in the 
    778        snapshot are immediately dropped. After a snapshot is deleted, a 
    779        new one may be created with the same name, but the new one has 
    780        no association with the old snapshot or its subscription, unless 
    781        the same subscription is specified. 
    782 
    783        Returns: 
    784            Callable[[~.DeleteSnapshotRequest], 
    785                    ~.Empty]: 
    786                A function that, when called, will call the underlying RPC 
    787                on the server. 
    788        """ 
    789        # Generate a "stub function" on-the-fly which will actually make 
    790        # the request. 
    791        # gRPC handles serialization and deserialization, so we just need 
    792        # to pass in the functions for each. 
    793        if "delete_snapshot" not in self._stubs: 
    794            self._stubs["delete_snapshot"] = self._logged_channel.unary_unary( 
    795                "/google.pubsub.v1.Subscriber/DeleteSnapshot", 
    796                request_serializer=pubsub.DeleteSnapshotRequest.serialize, 
    797                response_deserializer=empty_pb2.Empty.FromString, 
    798            ) 
    799        return self._stubs["delete_snapshot"] 
    800 
    801    @property 
    802    def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]: 
    803        r"""Return a callable for the seek method over gRPC. 
    804 
    805        Seeks an existing subscription to a point in time or to a given 
    806        snapshot, whichever is provided in the request. Snapshots are 
    807        used in [Seek] 
    808        (https://cloud.google.com/pubsub/docs/replay-overview) 
    809        operations, which allow you to manage message acknowledgments in 
    810        bulk. That is, you can set the acknowledgment state of messages 
    811        in an existing subscription to the state captured by a snapshot. 
    812        Note that both the subscription and the snapshot must be on the 
    813        same topic. 
    814 
    815        Returns: 
    816            Callable[[~.SeekRequest], 
    817                    ~.SeekResponse]: 
    818                A function that, when called, will call the underlying RPC 
    819                on the server. 
    820        """ 
    821        # Generate a "stub function" on-the-fly which will actually make 
    822        # the request. 
    823        # gRPC handles serialization and deserialization, so we just need 
    824        # to pass in the functions for each. 
    825        if "seek" not in self._stubs: 
    826            self._stubs["seek"] = self._logged_channel.unary_unary( 
    827                "/google.pubsub.v1.Subscriber/Seek", 
    828                request_serializer=pubsub.SeekRequest.serialize, 
    829                response_deserializer=pubsub.SeekResponse.deserialize, 
    830            ) 
    831        return self._stubs["seek"] 
    832 
    833    def close(self): 
    834        self._logged_channel.close() 
    835 
    836    @property 
    837    def set_iam_policy( 
    838        self, 
    839    ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 
    840        r"""Return a callable for the set iam policy method over gRPC. 
    841        Sets the IAM access control policy on the specified 
    842        function. Replaces any existing policy. 
    843        Returns: 
    844            Callable[[~.SetIamPolicyRequest], 
    845                    ~.Policy]: 
    846                A function that, when called, will call the underlying RPC 
    847                on the server. 
    848        """ 
    849        # Generate a "stub function" on-the-fly which will actually make 
    850        # the request. 
    851        # gRPC handles serialization and deserialization, so we just need 
    852        # to pass in the functions for each. 
    853        if "set_iam_policy" not in self._stubs: 
    854            self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 
    855                "/google.iam.v1.IAMPolicy/SetIamPolicy", 
    856                request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 
    857                response_deserializer=policy_pb2.Policy.FromString, 
    858            ) 
    859        return self._stubs["set_iam_policy"] 
    860 
    861    @property 
    862    def get_iam_policy( 
    863        self, 
    864    ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 
    865        r"""Return a callable for the get iam policy method over gRPC. 
    866        Gets the IAM access control policy for a function. 
    867        Returns an empty policy if the function exists and does 
    868        not have a policy set. 
    869        Returns: 
    870            Callable[[~.GetIamPolicyRequest], 
    871                    ~.Policy]: 
    872                A function that, when called, will call the underlying RPC 
    873                on the server. 
    874        """ 
    875        # Generate a "stub function" on-the-fly which will actually make 
    876        # the request. 
    877        # gRPC handles serialization and deserialization, so we just need 
    878        # to pass in the functions for each. 
    879        if "get_iam_policy" not in self._stubs: 
    880            self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 
    881                "/google.iam.v1.IAMPolicy/GetIamPolicy", 
    882                request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 
    883                response_deserializer=policy_pb2.Policy.FromString, 
    884            ) 
    885        return self._stubs["get_iam_policy"] 
    886 
    887    @property 
    888    def test_iam_permissions( 
    889        self, 
    890    ) -> Callable[ 
    891        [iam_policy_pb2.TestIamPermissionsRequest], 
    892        iam_policy_pb2.TestIamPermissionsResponse, 
    893    ]: 
    894        r"""Return a callable for the test iam permissions method over gRPC. 
    895        Tests the specified permissions against the IAM access control 
    896        policy for a function. If the function does not exist, this will 
    897        return an empty set of permissions, not a NOT_FOUND error. 
    898        Returns: 
    899            Callable[[~.TestIamPermissionsRequest], 
    900                    ~.TestIamPermissionsResponse]: 
    901                A function that, when called, will call the underlying RPC 
    902                on the server. 
    903        """ 
    904        # Generate a "stub function" on-the-fly which will actually make 
    905        # the request. 
    906        # gRPC handles serialization and deserialization, so we just need 
    907        # to pass in the functions for each. 
    908        if "test_iam_permissions" not in self._stubs: 
    909            self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 
    910                "/google.iam.v1.IAMPolicy/TestIamPermissions", 
    911                request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 
    912                response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 
    913            ) 
    914        return self._stubs["test_iam_permissions"] 
    915 
    916    @property 
    917    def kind(self) -> str: 
    918        return "grpc" 
    919 
    920 
    921__all__ = ("SubscriberGrpcTransport",)