1# -*- coding: utf-8 -*- 
    2# Copyright 2025 Google LLC 
    3# 
    4# Licensed under the Apache License, Version 2.0 (the "License"); 
    5# you may not use this file except in compliance with the License. 
    6# You may obtain a copy of the License at 
    7# 
    8#     http://www.apache.org/licenses/LICENSE-2.0 
    9# 
    10# Unless required by applicable law or agreed to in writing, software 
    11# distributed under the License is distributed on an "AS IS" BASIS, 
    12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    13# See the License for the specific language governing permissions and 
    14# limitations under the License. 
    15# 
    16import json 
    17import logging as std_logging 
    18import pickle 
    19import warnings 
    20from typing import Callable, Dict, Optional, Sequence, Tuple, Union 
    21 
    22from google.api_core import grpc_helpers 
    23from google.api_core import gapic_v1 
    24import google.auth  # type: ignore 
    25from google.auth import credentials as ga_credentials  # type: ignore 
    26from google.auth.transport.grpc import SslCredentials  # type: ignore 
    27from google.protobuf.json_format import MessageToJson 
    28import google.protobuf.message 
    29 
    30import grpc  # type: ignore 
    31import proto  # type: ignore 
    32 
    33from google.iam.v1 import iam_policy_pb2  # type: ignore 
    34from google.iam.v1 import policy_pb2  # type: ignore 
    35from google.protobuf import empty_pb2  # type: ignore 
    36from google.pubsub_v1.types import pubsub 
    37from .base import PublisherTransport, DEFAULT_CLIENT_INFO 
    38 
    39try: 
    40    from google.api_core import client_logging  # type: ignore 
    41 
    42    CLIENT_LOGGING_SUPPORTED = True  # pragma: NO COVER 
    43except ImportError:  # pragma: NO COVER 
    44    CLIENT_LOGGING_SUPPORTED = False 
    45 
    46_LOGGER = std_logging.getLogger(__name__) 
    47 
    48 
    49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor):  # pragma: NO COVER 
    50    def intercept_unary_unary(self, continuation, client_call_details, request): 
    51        logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 
    52            std_logging.DEBUG 
    53        ) 
    54        if logging_enabled:  # pragma: NO COVER 
    55            request_metadata = client_call_details.metadata 
    56            if isinstance(request, proto.Message): 
    57                request_payload = type(request).to_json(request) 
    58            elif isinstance(request, google.protobuf.message.Message): 
    59                request_payload = MessageToJson(request) 
    60            else: 
    61                request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 
    62 
    63            request_metadata = { 
    64                key: value.decode("utf-8") if isinstance(value, bytes) else value 
    65                for key, value in request_metadata 
    66            } 
    67            grpc_request = { 
    68                "payload": request_payload, 
    69                "requestMethod": "grpc", 
    70                "metadata": dict(request_metadata), 
    71            } 
    72            _LOGGER.debug( 
    73                f"Sending request for {client_call_details.method}", 
    74                extra={ 
    75                    "serviceName": "google.pubsub.v1.Publisher", 
    76                    "rpcName": str(client_call_details.method), 
    77                    "request": grpc_request, 
    78                    "metadata": grpc_request["metadata"], 
    79                }, 
    80            ) 
    81        response = continuation(client_call_details, request) 
    82        if logging_enabled:  # pragma: NO COVER 
    83            response_metadata = response.trailing_metadata() 
    84            # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 
    85            metadata = ( 
    86                dict([(k, str(v)) for k, v in response_metadata]) 
    87                if response_metadata 
    88                else None 
    89            ) 
    90            result = response.result() 
    91            if isinstance(result, proto.Message): 
    92                response_payload = type(result).to_json(result) 
    93            elif isinstance(result, google.protobuf.message.Message): 
    94                response_payload = MessageToJson(result) 
    95            else: 
    96                response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 
    97            grpc_response = { 
    98                "payload": response_payload, 
    99                "metadata": metadata, 
    100                "status": "OK", 
    101            } 
    102            _LOGGER.debug( 
    103                f"Received response for {client_call_details.method}.", 
    104                extra={ 
    105                    "serviceName": "google.pubsub.v1.Publisher", 
    106                    "rpcName": client_call_details.method, 
    107                    "response": grpc_response, 
    108                    "metadata": grpc_response["metadata"], 
    109                }, 
    110            ) 
    111        return response 
    112 
    113 
    114class PublisherGrpcTransport(PublisherTransport): 
    115    """gRPC backend transport for Publisher. 
    116 
    117    The service that an application uses to manipulate topics, 
    118    and to send messages to a topic. 
    119 
    120    This class defines the same methods as the primary client, so the 
    121    primary client can load the underlying transport implementation 
    122    and call it. 
    123 
    124    It sends protocol buffers over the wire using gRPC (which is built on 
    125    top of HTTP/2); the ``grpcio`` package must be installed. 
    126    """ 
    127 
    128    _stubs: Dict[str, Callable] 
    129 
    130    def __init__( 
    131        self, 
    132        *, 
    133        host: str = "pubsub.googleapis.com", 
    134        credentials: Optional[ga_credentials.Credentials] = None, 
    135        credentials_file: Optional[str] = None, 
    136        scopes: Optional[Sequence[str]] = None, 
    137        channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 
    138        api_mtls_endpoint: Optional[str] = None, 
    139        client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 
    140        ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 
    141        client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 
    142        quota_project_id: Optional[str] = None, 
    143        client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 
    144        always_use_jwt_access: Optional[bool] = False, 
    145        api_audience: Optional[str] = None, 
    146    ) -> None: 
    147        """Instantiate the transport. 
    148 
    149        Args: 
    150            host (Optional[str]): 
    151                 The hostname to connect to (default: 'pubsub.googleapis.com'). 
    152            credentials (Optional[google.auth.credentials.Credentials]): The 
    153                authorization credentials to attach to requests. These 
    154                credentials identify the application to the service; if none 
    155                are specified, the client will attempt to ascertain the 
    156                credentials from the environment. 
    157                This argument is ignored if a ``channel`` instance is provided. 
    158            credentials_file (Optional[str]): A file with credentials that can 
    159                be loaded with :func:`google.auth.load_credentials_from_file`. 
    160                This argument is ignored if a ``channel`` instance is provided. 
    161            scopes (Optional(Sequence[str])): A list of scopes. This argument is 
    162                ignored if a ``channel`` instance is provided. 
    163            channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 
    164                A ``Channel`` instance through which to make calls, or a Callable 
    165                that constructs and returns one. If set to None, ``self.create_channel`` 
    166                is used to create the channel. If a Callable is given, it will be called 
    167                with the same arguments as used in ``self.create_channel``. 
    168            api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 
    169                If provided, it overrides the ``host`` argument and tries to create 
    170                a mutual TLS channel with client SSL credentials from 
    171                ``client_cert_source`` or application default SSL credentials. 
    172            client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 
    173                Deprecated. A callback to provide client SSL certificate bytes and 
    174                private key bytes, both in PEM format. It is ignored if 
    175                ``api_mtls_endpoint`` is None. 
    176            ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 
    177                for the grpc channel. It is ignored if a ``channel`` instance is provided. 
    178            client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 
    179                A callback to provide client certificate bytes and private key bytes, 
    180                both in PEM format. It is used to configure a mutual TLS channel. It is 
    181                ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 
    182            quota_project_id (Optional[str]): An optional project to use for billing 
    183                and quota. 
    184            client_info (google.api_core.gapic_v1.client_info.ClientInfo): 
    185                The client info used to send a user-agent string along with 
    186                API requests. If ``None``, then default info will be used. 
    187                Generally, you only need to set this if you're developing 
    188                your own client library. 
    189            always_use_jwt_access (Optional[bool]): Whether self signed JWT should 
    190                be used for service account credentials. 
    191 
    192        Raises: 
    193          google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 
    194              creation failed for any reason. 
    195          google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 
    196              and ``credentials_file`` are passed. 
    197        """ 
    198        self._grpc_channel = None 
    199        self._ssl_channel_credentials = ssl_channel_credentials 
    200        self._stubs: Dict[str, Callable] = {} 
    201 
    202        if api_mtls_endpoint: 
    203            warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 
    204        if client_cert_source: 
    205            warnings.warn("client_cert_source is deprecated", DeprecationWarning) 
    206 
    207        if isinstance(channel, grpc.Channel): 
    208            # Ignore credentials if a channel was passed. 
    209            credentials = None 
    210            self._ignore_credentials = True 
    211            # If a channel was explicitly provided, set it. 
    212            self._grpc_channel = channel 
    213            self._ssl_channel_credentials = None 
    214 
    215        else: 
    216            if api_mtls_endpoint: 
    217                host = api_mtls_endpoint 
    218 
    219                # Create SSL credentials with client_cert_source or application 
    220                # default SSL credentials. 
    221                if client_cert_source: 
    222                    cert, key = client_cert_source() 
    223                    self._ssl_channel_credentials = grpc.ssl_channel_credentials( 
    224                        certificate_chain=cert, private_key=key 
    225                    ) 
    226                else: 
    227                    self._ssl_channel_credentials = SslCredentials().ssl_credentials 
    228 
    229            else: 
    230                if client_cert_source_for_mtls and not ssl_channel_credentials: 
    231                    cert, key = client_cert_source_for_mtls() 
    232                    self._ssl_channel_credentials = grpc.ssl_channel_credentials( 
    233                        certificate_chain=cert, private_key=key 
    234                    ) 
    235 
    236        # The base transport sets the host, credentials and scopes 
    237        super().__init__( 
    238            host=host, 
    239            credentials=credentials, 
    240            credentials_file=credentials_file, 
    241            scopes=scopes, 
    242            quota_project_id=quota_project_id, 
    243            client_info=client_info, 
    244            always_use_jwt_access=always_use_jwt_access, 
    245            api_audience=api_audience, 
    246        ) 
    247 
    248        if not self._grpc_channel: 
    249            # initialize with the provided callable or the default channel 
    250            channel_init = channel or type(self).create_channel 
    251            self._grpc_channel = channel_init( 
    252                self._host, 
    253                # use the credentials which are saved 
    254                credentials=self._credentials, 
    255                # Set ``credentials_file`` to ``None`` here as 
    256                # the credentials that we saved earlier should be used. 
    257                credentials_file=None, 
    258                scopes=self._scopes, 
    259                ssl_credentials=self._ssl_channel_credentials, 
    260                quota_project_id=quota_project_id, 
    261                options=[ 
    262                    ("grpc.max_send_message_length", -1), 
    263                    ("grpc.max_receive_message_length", -1), 
    264                    ("grpc.max_metadata_size", 4 * 1024 * 1024), 
    265                    ("grpc.keepalive_time_ms", 30000), 
    266                ], 
    267            ) 
    268 
    269        self._interceptor = _LoggingClientInterceptor() 
    270        self._logged_channel = grpc.intercept_channel( 
    271            self._grpc_channel, self._interceptor 
    272        ) 
    273 
    274        # Wrap messages. This must be done after self._logged_channel exists 
    275        self._prep_wrapped_messages(client_info) 
    276 
    277    @classmethod 
    278    def create_channel( 
    279        cls, 
    280        host: str = "pubsub.googleapis.com", 
    281        credentials: Optional[ga_credentials.Credentials] = None, 
    282        credentials_file: Optional[str] = None, 
    283        scopes: Optional[Sequence[str]] = None, 
    284        quota_project_id: Optional[str] = None, 
    285        **kwargs, 
    286    ) -> grpc.Channel: 
    287        """Create and return a gRPC channel object. 
    288        Args: 
    289            host (Optional[str]): The host for the channel to use. 
    290            credentials (Optional[~.Credentials]): The 
    291                authorization credentials to attach to requests. These 
    292                credentials identify this application to the service. If 
    293                none are specified, the client will attempt to ascertain 
    294                the credentials from the environment. 
    295            credentials_file (Optional[str]): A file with credentials that can 
    296                be loaded with :func:`google.auth.load_credentials_from_file`. 
    297                This argument is mutually exclusive with credentials. 
    298            scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 
    299                service. These are only used when credentials are not specified and 
    300                are passed to :func:`google.auth.default`. 
    301            quota_project_id (Optional[str]): An optional project to use for billing 
    302                and quota. 
    303            kwargs (Optional[dict]): Keyword arguments, which are passed to the 
    304                channel creation. 
    305        Returns: 
    306            grpc.Channel: A gRPC channel object. 
    307 
    308        Raises: 
    309            google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 
    310              and ``credentials_file`` are passed. 
    311        """ 
    312 
    313        return grpc_helpers.create_channel( 
    314            host, 
    315            credentials=credentials, 
    316            credentials_file=credentials_file, 
    317            quota_project_id=quota_project_id, 
    318            default_scopes=cls.AUTH_SCOPES, 
    319            scopes=scopes, 
    320            default_host=cls.DEFAULT_HOST, 
    321            **kwargs, 
    322        ) 
    323 
    324    @property 
    325    def grpc_channel(self) -> grpc.Channel: 
    326        """Return the channel designed to connect to this service.""" 
    327        return self._grpc_channel 
    328 
    329    @property 
    330    def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]: 
    331        r"""Return a callable for the create topic method over gRPC. 
    332 
    333        Creates the given topic with the given name. See the [resource 
    334        name rules] 
    335        (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 
    336 
    337        Returns: 
    338            Callable[[~.Topic], 
    339                    ~.Topic]: 
    340                A function that, when called, will call the underlying RPC 
    341                on the server. 
    342        """ 
    343        # Generate a "stub function" on-the-fly which will actually make 
    344        # the request. 
    345        # gRPC handles serialization and deserialization, so we just need 
    346        # to pass in the functions for each. 
    347        if "create_topic" not in self._stubs: 
    348            self._stubs["create_topic"] = self._logged_channel.unary_unary( 
    349                "/google.pubsub.v1.Publisher/CreateTopic", 
    350                request_serializer=pubsub.Topic.serialize, 
    351                response_deserializer=pubsub.Topic.deserialize, 
    352            ) 
    353        return self._stubs["create_topic"] 
    354 
    355    @property 
    356    def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]: 
    357        r"""Return a callable for the update topic method over gRPC. 
    358 
    359        Updates an existing topic by updating the fields 
    360        specified in the update mask. Note that certain 
    361        properties of a topic are not modifiable. 
    362 
    363        Returns: 
    364            Callable[[~.UpdateTopicRequest], 
    365                    ~.Topic]: 
    366                A function that, when called, will call the underlying RPC 
    367                on the server. 
    368        """ 
    369        # Generate a "stub function" on-the-fly which will actually make 
    370        # the request. 
    371        # gRPC handles serialization and deserialization, so we just need 
    372        # to pass in the functions for each. 
    373        if "update_topic" not in self._stubs: 
    374            self._stubs["update_topic"] = self._logged_channel.unary_unary( 
    375                "/google.pubsub.v1.Publisher/UpdateTopic", 
    376                request_serializer=pubsub.UpdateTopicRequest.serialize, 
    377                response_deserializer=pubsub.Topic.deserialize, 
    378            ) 
    379        return self._stubs["update_topic"] 
    380 
    381    @property 
    382    def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]: 
    383        r"""Return a callable for the publish method over gRPC. 
    384 
    385        Adds one or more messages to the topic. Returns ``NOT_FOUND`` if 
    386        the topic does not exist. 
    387 
    388        Returns: 
    389            Callable[[~.PublishRequest], 
    390                    ~.PublishResponse]: 
    391                A function that, when called, will call the underlying RPC 
    392                on the server. 
    393        """ 
    394        # Generate a "stub function" on-the-fly which will actually make 
    395        # the request. 
    396        # gRPC handles serialization and deserialization, so we just need 
    397        # to pass in the functions for each. 
    398        if "publish" not in self._stubs: 
    399            self._stubs["publish"] = self._logged_channel.unary_unary( 
    400                "/google.pubsub.v1.Publisher/Publish", 
    401                request_serializer=pubsub.PublishRequest.serialize, 
    402                response_deserializer=pubsub.PublishResponse.deserialize, 
    403            ) 
    404        return self._stubs["publish"] 
    405 
    406    @property 
    407    def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]: 
    408        r"""Return a callable for the get topic method over gRPC. 
    409 
    410        Gets the configuration of a topic. 
    411 
    412        Returns: 
    413            Callable[[~.GetTopicRequest], 
    414                    ~.Topic]: 
    415                A function that, when called, will call the underlying RPC 
    416                on the server. 
    417        """ 
    418        # Generate a "stub function" on-the-fly which will actually make 
    419        # the request. 
    420        # gRPC handles serialization and deserialization, so we just need 
    421        # to pass in the functions for each. 
    422        if "get_topic" not in self._stubs: 
    423            self._stubs["get_topic"] = self._logged_channel.unary_unary( 
    424                "/google.pubsub.v1.Publisher/GetTopic", 
    425                request_serializer=pubsub.GetTopicRequest.serialize, 
    426                response_deserializer=pubsub.Topic.deserialize, 
    427            ) 
    428        return self._stubs["get_topic"] 
    429 
    430    @property 
    431    def list_topics( 
    432        self, 
    433    ) -> Callable[[pubsub.ListTopicsRequest], pubsub.ListTopicsResponse]: 
    434        r"""Return a callable for the list topics method over gRPC. 
    435 
    436        Lists matching topics. 
    437 
    438        Returns: 
    439            Callable[[~.ListTopicsRequest], 
    440                    ~.ListTopicsResponse]: 
    441                A function that, when called, will call the underlying RPC 
    442                on the server. 
    443        """ 
    444        # Generate a "stub function" on-the-fly which will actually make 
    445        # the request. 
    446        # gRPC handles serialization and deserialization, so we just need 
    447        # to pass in the functions for each. 
    448        if "list_topics" not in self._stubs: 
    449            self._stubs["list_topics"] = self._logged_channel.unary_unary( 
    450                "/google.pubsub.v1.Publisher/ListTopics", 
    451                request_serializer=pubsub.ListTopicsRequest.serialize, 
    452                response_deserializer=pubsub.ListTopicsResponse.deserialize, 
    453            ) 
    454        return self._stubs["list_topics"] 
    455 
    456    @property 
    457    def list_topic_subscriptions( 
    458        self, 
    459    ) -> Callable[ 
    460        [pubsub.ListTopicSubscriptionsRequest], pubsub.ListTopicSubscriptionsResponse 
    461    ]: 
    462        r"""Return a callable for the list topic subscriptions method over gRPC. 
    463 
    464        Lists the names of the attached subscriptions on this 
    465        topic. 
    466 
    467        Returns: 
    468            Callable[[~.ListTopicSubscriptionsRequest], 
    469                    ~.ListTopicSubscriptionsResponse]: 
    470                A function that, when called, will call the underlying RPC 
    471                on the server. 
    472        """ 
    473        # Generate a "stub function" on-the-fly which will actually make 
    474        # the request. 
    475        # gRPC handles serialization and deserialization, so we just need 
    476        # to pass in the functions for each. 
    477        if "list_topic_subscriptions" not in self._stubs: 
    478            self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary( 
    479                "/google.pubsub.v1.Publisher/ListTopicSubscriptions", 
    480                request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize, 
    481                response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize, 
    482            ) 
    483        return self._stubs["list_topic_subscriptions"] 
    484 
    485    @property 
    486    def list_topic_snapshots( 
    487        self, 
    488    ) -> Callable[ 
    489        [pubsub.ListTopicSnapshotsRequest], pubsub.ListTopicSnapshotsResponse 
    490    ]: 
    491        r"""Return a callable for the list topic snapshots method over gRPC. 
    492 
    493        Lists the names of the snapshots on this topic. Snapshots are 
    494        used in 
    495        `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 
    496        operations, which allow you to manage message acknowledgments in 
    497        bulk. That is, you can set the acknowledgment state of messages 
    498        in an existing subscription to the state captured by a snapshot. 
    499 
    500        Returns: 
    501            Callable[[~.ListTopicSnapshotsRequest], 
    502                    ~.ListTopicSnapshotsResponse]: 
    503                A function that, when called, will call the underlying RPC 
    504                on the server. 
    505        """ 
    506        # Generate a "stub function" on-the-fly which will actually make 
    507        # the request. 
    508        # gRPC handles serialization and deserialization, so we just need 
    509        # to pass in the functions for each. 
    510        if "list_topic_snapshots" not in self._stubs: 
    511            self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary( 
    512                "/google.pubsub.v1.Publisher/ListTopicSnapshots", 
    513                request_serializer=pubsub.ListTopicSnapshotsRequest.serialize, 
    514                response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize, 
    515            ) 
    516        return self._stubs["list_topic_snapshots"] 
    517 
    518    @property 
    519    def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]: 
    520        r"""Return a callable for the delete topic method over gRPC. 
    521 
    522        Deletes the topic with the given name. Returns ``NOT_FOUND`` if 
    523        the topic does not exist. After a topic is deleted, a new topic 
    524        may be created with the same name; this is an entirely new topic 
    525        with none of the old configuration or subscriptions. Existing 
    526        subscriptions to this topic are not deleted, but their ``topic`` 
    527        field is set to ``_deleted-topic_``. 
    528 
    529        Returns: 
    530            Callable[[~.DeleteTopicRequest], 
    531                    ~.Empty]: 
    532                A function that, when called, will call the underlying RPC 
    533                on the server. 
    534        """ 
    535        # Generate a "stub function" on-the-fly which will actually make 
    536        # the request. 
    537        # gRPC handles serialization and deserialization, so we just need 
    538        # to pass in the functions for each. 
    539        if "delete_topic" not in self._stubs: 
    540            self._stubs["delete_topic"] = self._logged_channel.unary_unary( 
    541                "/google.pubsub.v1.Publisher/DeleteTopic", 
    542                request_serializer=pubsub.DeleteTopicRequest.serialize, 
    543                response_deserializer=empty_pb2.Empty.FromString, 
    544            ) 
    545        return self._stubs["delete_topic"] 
    546 
    547    @property 
    548    def detach_subscription( 
    549        self, 
    550    ) -> Callable[ 
    551        [pubsub.DetachSubscriptionRequest], pubsub.DetachSubscriptionResponse 
    552    ]: 
    553        r"""Return a callable for the detach subscription method over gRPC. 
    554 
    555        Detaches a subscription from this topic. All messages retained 
    556        in the subscription are dropped. Subsequent ``Pull`` and 
    557        ``StreamingPull`` requests will return FAILED_PRECONDITION. If 
    558        the subscription is a push subscription, pushes to the endpoint 
    559        will stop. 
    560 
    561        Returns: 
    562            Callable[[~.DetachSubscriptionRequest], 
    563                    ~.DetachSubscriptionResponse]: 
    564                A function that, when called, will call the underlying RPC 
    565                on the server. 
    566        """ 
    567        # Generate a "stub function" on-the-fly which will actually make 
    568        # the request. 
    569        # gRPC handles serialization and deserialization, so we just need 
    570        # to pass in the functions for each. 
    571        if "detach_subscription" not in self._stubs: 
    572            self._stubs["detach_subscription"] = self._logged_channel.unary_unary( 
    573                "/google.pubsub.v1.Publisher/DetachSubscription", 
    574                request_serializer=pubsub.DetachSubscriptionRequest.serialize, 
    575                response_deserializer=pubsub.DetachSubscriptionResponse.deserialize, 
    576            ) 
    577        return self._stubs["detach_subscription"] 
    578 
    579    def close(self): 
    580        self._logged_channel.close() 
    581 
    582    @property 
    583    def set_iam_policy( 
    584        self, 
    585    ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 
    586        r"""Return a callable for the set iam policy method over gRPC. 
    587        Sets the IAM access control policy on the specified 
    588        function. Replaces any existing policy. 
    589        Returns: 
    590            Callable[[~.SetIamPolicyRequest], 
    591                    ~.Policy]: 
    592                A function that, when called, will call the underlying RPC 
    593                on the server. 
    594        """ 
    595        # Generate a "stub function" on-the-fly which will actually make 
    596        # the request. 
    597        # gRPC handles serialization and deserialization, so we just need 
    598        # to pass in the functions for each. 
    599        if "set_iam_policy" not in self._stubs: 
    600            self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 
    601                "/google.iam.v1.IAMPolicy/SetIamPolicy", 
    602                request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 
    603                response_deserializer=policy_pb2.Policy.FromString, 
    604            ) 
    605        return self._stubs["set_iam_policy"] 
    606 
    607    @property 
    608    def get_iam_policy( 
    609        self, 
    610    ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 
    611        r"""Return a callable for the get iam policy method over gRPC. 
    612        Gets the IAM access control policy for a function. 
    613        Returns an empty policy if the function exists and does 
    614        not have a policy set. 
    615        Returns: 
    616            Callable[[~.GetIamPolicyRequest], 
    617                    ~.Policy]: 
    618                A function that, when called, will call the underlying RPC 
    619                on the server. 
    620        """ 
    621        # Generate a "stub function" on-the-fly which will actually make 
    622        # the request. 
    623        # gRPC handles serialization and deserialization, so we just need 
    624        # to pass in the functions for each. 
    625        if "get_iam_policy" not in self._stubs: 
    626            self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 
    627                "/google.iam.v1.IAMPolicy/GetIamPolicy", 
    628                request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 
    629                response_deserializer=policy_pb2.Policy.FromString, 
    630            ) 
    631        return self._stubs["get_iam_policy"] 
    632 
    633    @property 
    634    def test_iam_permissions( 
    635        self, 
    636    ) -> Callable[ 
    637        [iam_policy_pb2.TestIamPermissionsRequest], 
    638        iam_policy_pb2.TestIamPermissionsResponse, 
    639    ]: 
    640        r"""Return a callable for the test iam permissions method over gRPC. 
    641        Tests the specified permissions against the IAM access control 
    642        policy for a function. If the function does not exist, this will 
    643        return an empty set of permissions, not a NOT_FOUND error. 
    644        Returns: 
    645            Callable[[~.TestIamPermissionsRequest], 
    646                    ~.TestIamPermissionsResponse]: 
    647                A function that, when called, will call the underlying RPC 
    648                on the server. 
    649        """ 
    650        # Generate a "stub function" on-the-fly which will actually make 
    651        # the request. 
    652        # gRPC handles serialization and deserialization, so we just need 
    653        # to pass in the functions for each. 
    654        if "test_iam_permissions" not in self._stubs: 
    655            self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 
    656                "/google.iam.v1.IAMPolicy/TestIamPermissions", 
    657                request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 
    658                response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 
    659            ) 
    660        return self._stubs["test_iam_permissions"] 
    661 
    662    @property 
    663    def kind(self) -> str: 
    664        return "grpc" 
    665 
    666 
    667__all__ = ("PublisherGrpcTransport",)