1# -*- coding: utf-8 -*- 
    2# Copyright 2025 Google LLC 
    3# 
    4# Licensed under the Apache License, Version 2.0 (the "License"); 
    5# you may not use this file except in compliance with the License. 
    6# You may obtain a copy of the License at 
    7# 
    8#     http://www.apache.org/licenses/LICENSE-2.0 
    9# 
    10# Unless required by applicable law or agreed to in writing, software 
    11# distributed under the License is distributed on an "AS IS" BASIS, 
    12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    13# See the License for the specific language governing permissions and 
    14# limitations under the License. 
    15# 
    16import abc 
    17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 
    18 
    19import google.api_core 
    20from google.api_core import exceptions as core_exceptions 
    21from google.api_core import gapic_v1 
    22from google.api_core import retry as retries 
    23import google.auth  # type: ignore 
    24from google.auth import credentials as ga_credentials  # type: ignore 
    25from google.oauth2 import service_account  # type: ignore 
    26import google.protobuf 
    27 
    28from google.cloud.iam_credentials_v1 import gapic_version as package_version 
    29from google.cloud.iam_credentials_v1.types import common 
    30 
    31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 
    32    gapic_version=package_version.__version__ 
    33) 
    34 
    35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"):  # pragma: NO COVER 
    36    DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 
    37 
    38 
    39class IAMCredentialsTransport(abc.ABC): 
    40    """Abstract transport class for IAMCredentials.""" 
    41 
    42    AUTH_SCOPES = ("https://www.googleapis.com/auth/cloud-platform",) 
    43 
    44    DEFAULT_HOST: str = "iamcredentials.googleapis.com" 
    45 
    46    def __init__( 
    47        self, 
    48        *, 
    49        host: str = DEFAULT_HOST, 
    50        credentials: Optional[ga_credentials.Credentials] = None, 
    51        credentials_file: Optional[str] = None, 
    52        scopes: Optional[Sequence[str]] = None, 
    53        quota_project_id: Optional[str] = None, 
    54        client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 
    55        always_use_jwt_access: Optional[bool] = False, 
    56        api_audience: Optional[str] = None, 
    57        **kwargs, 
    58    ) -> None: 
    59        """Instantiate the transport. 
    60 
    61        Args: 
    62            host (Optional[str]): 
    63                 The hostname to connect to (default: 'iamcredentials.googleapis.com'). 
    64            credentials (Optional[google.auth.credentials.Credentials]): The 
    65                authorization credentials to attach to requests. These 
    66                credentials identify the application to the service; if none 
    67                are specified, the client will attempt to ascertain the 
    68                credentials from the environment. 
    69            credentials_file (Optional[str]): Deprecated. A file with credentials that can 
    70                be loaded with :func:`google.auth.load_credentials_from_file`. 
    71                This argument is mutually exclusive with credentials. This argument will be 
    72                removed in the next major version of this library. 
    73            scopes (Optional[Sequence[str]]): A list of scopes. 
    74            quota_project_id (Optional[str]): An optional project to use for billing 
    75                and quota. 
    76            client_info (google.api_core.gapic_v1.client_info.ClientInfo): 
    77                The client info used to send a user-agent string along with 
    78                API requests. If ``None``, then default info will be used. 
    79                Generally, you only need to set this if you're developing 
    80                your own client library. 
    81            always_use_jwt_access (Optional[bool]): Whether self signed JWT should 
    82                be used for service account credentials. 
    83        """ 
    84 
    85        scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 
    86 
    87        # Save the scopes. 
    88        self._scopes = scopes 
    89        if not hasattr(self, "_ignore_credentials"): 
    90            self._ignore_credentials: bool = False 
    91 
    92        # If no credentials are provided, then determine the appropriate 
    93        # defaults. 
    94        if credentials and credentials_file: 
    95            raise core_exceptions.DuplicateCredentialArgs( 
    96                "'credentials_file' and 'credentials' are mutually exclusive" 
    97            ) 
    98 
    99        if credentials_file is not None: 
    100            credentials, _ = google.auth.load_credentials_from_file( 
    101                credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 
    102            ) 
    103        elif credentials is None and not self._ignore_credentials: 
    104            credentials, _ = google.auth.default( 
    105                **scopes_kwargs, quota_project_id=quota_project_id 
    106            ) 
    107            # Don't apply audience if the credentials file passed from user. 
    108            if hasattr(credentials, "with_gdch_audience"): 
    109                credentials = credentials.with_gdch_audience( 
    110                    api_audience if api_audience else host 
    111                ) 
    112 
    113        # If the credentials are service account credentials, then always try to use self signed JWT. 
    114        if ( 
    115            always_use_jwt_access 
    116            and isinstance(credentials, service_account.Credentials) 
    117            and hasattr(service_account.Credentials, "with_always_use_jwt_access") 
    118        ): 
    119            credentials = credentials.with_always_use_jwt_access(True) 
    120 
    121        # Save the credentials. 
    122        self._credentials = credentials 
    123 
    124        # Save the hostname. Default to port 443 (HTTPS) if none is specified. 
    125        if ":" not in host: 
    126            host += ":443" 
    127        self._host = host 
    128 
    129    @property 
    130    def host(self): 
    131        return self._host 
    132 
    133    def _prep_wrapped_messages(self, client_info): 
    134        # Precompute the wrapped methods. 
    135        self._wrapped_methods = { 
    136            self.generate_access_token: gapic_v1.method.wrap_method( 
    137                self.generate_access_token, 
    138                default_retry=retries.Retry( 
    139                    initial=0.1, 
    140                    maximum=60.0, 
    141                    multiplier=1.3, 
    142                    predicate=retries.if_exception_type( 
    143                        core_exceptions.DeadlineExceeded, 
    144                        core_exceptions.ServiceUnavailable, 
    145                    ), 
    146                    deadline=60.0, 
    147                ), 
    148                default_timeout=60.0, 
    149                client_info=client_info, 
    150            ), 
    151            self.generate_id_token: gapic_v1.method.wrap_method( 
    152                self.generate_id_token, 
    153                default_retry=retries.Retry( 
    154                    initial=0.1, 
    155                    maximum=60.0, 
    156                    multiplier=1.3, 
    157                    predicate=retries.if_exception_type( 
    158                        core_exceptions.DeadlineExceeded, 
    159                        core_exceptions.ServiceUnavailable, 
    160                    ), 
    161                    deadline=60.0, 
    162                ), 
    163                default_timeout=60.0, 
    164                client_info=client_info, 
    165            ), 
    166            self.sign_blob: gapic_v1.method.wrap_method( 
    167                self.sign_blob, 
    168                default_retry=retries.Retry( 
    169                    initial=0.1, 
    170                    maximum=60.0, 
    171                    multiplier=1.3, 
    172                    predicate=retries.if_exception_type( 
    173                        core_exceptions.DeadlineExceeded, 
    174                        core_exceptions.ServiceUnavailable, 
    175                    ), 
    176                    deadline=60.0, 
    177                ), 
    178                default_timeout=60.0, 
    179                client_info=client_info, 
    180            ), 
    181            self.sign_jwt: gapic_v1.method.wrap_method( 
    182                self.sign_jwt, 
    183                default_retry=retries.Retry( 
    184                    initial=0.1, 
    185                    maximum=60.0, 
    186                    multiplier=1.3, 
    187                    predicate=retries.if_exception_type( 
    188                        core_exceptions.DeadlineExceeded, 
    189                        core_exceptions.ServiceUnavailable, 
    190                    ), 
    191                    deadline=60.0, 
    192                ), 
    193                default_timeout=60.0, 
    194                client_info=client_info, 
    195            ), 
    196        } 
    197 
    198    def close(self): 
    199        """Closes resources associated with the transport. 
    200 
    201        .. warning:: 
    202             Only call this method if the transport is NOT shared 
    203             with other clients - this may cause errors in other clients! 
    204        """ 
    205        raise NotImplementedError() 
    206 
    207    @property 
    208    def generate_access_token( 
    209        self, 
    210    ) -> Callable[ 
    211        [common.GenerateAccessTokenRequest], 
    212        Union[ 
    213            common.GenerateAccessTokenResponse, 
    214            Awaitable[common.GenerateAccessTokenResponse], 
    215        ], 
    216    ]: 
    217        raise NotImplementedError() 
    218 
    219    @property 
    220    def generate_id_token( 
    221        self, 
    222    ) -> Callable[ 
    223        [common.GenerateIdTokenRequest], 
    224        Union[ 
    225            common.GenerateIdTokenResponse, Awaitable[common.GenerateIdTokenResponse] 
    226        ], 
    227    ]: 
    228        raise NotImplementedError() 
    229 
    230    @property 
    231    def sign_blob( 
    232        self, 
    233    ) -> Callable[ 
    234        [common.SignBlobRequest], 
    235        Union[common.SignBlobResponse, Awaitable[common.SignBlobResponse]], 
    236    ]: 
    237        raise NotImplementedError() 
    238 
    239    @property 
    240    def sign_jwt( 
    241        self, 
    242    ) -> Callable[ 
    243        [common.SignJwtRequest], 
    244        Union[common.SignJwtResponse, Awaitable[common.SignJwtResponse]], 
    245    ]: 
    246        raise NotImplementedError() 
    247 
    248    @property 
    249    def kind(self) -> str: 
    250        raise NotImplementedError() 
    251 
    252 
    253__all__ = ("IAMCredentialsTransport",)