1# Copyright 2016 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0 
    16 
    17This module implements the JWT Profile for OAuth 2.0 Authorization Grants 
    18as defined by `RFC 7523`_ with particular support for how this RFC is 
    19implemented in Google's infrastructure. Google refers to these credentials 
    20as *Service Accounts*. 
    21 
    22Service accounts are used for server-to-server communication, such as 
    23interactions between a web application server and a Google service. The 
    24service account belongs to your application instead of to an individual end 
    25user. In contrast to other OAuth 2.0 profiles, no users are involved and your 
    26application "acts" as the service account. 
    27 
    28Typically an application uses a service account when the application uses 
    29Google APIs to work with its own data rather than a user's data. For example, 
    30an application that uses Google Cloud Datastore for data persistence would use 
    31a service account to authenticate its calls to the Google Cloud Datastore API. 
    32However, an application that needs to access a user's Drive documents would 
    33use the normal OAuth 2.0 profile. 
    34 
    35Additionally, Google Apps domain administrators can grant service accounts 
    36`domain-wide delegation`_ authority to access user data on behalf of users in 
    37the domain. 
    38 
    39This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used 
    40in place of the usual authorization token returned during the standard 
    41OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as 
    42the acquired access token is used as the bearer token when making requests 
    43using these credentials. 
    44 
    45This profile differs from normal OAuth 2.0 profile because no user consent 
    46step is required. The use of the private key allows this profile to assert 
    47identity directly. 
    48 
    49This profile also differs from the :mod:`google.auth.jwt` authentication 
    50because the JWT credentials use the JWT directly as the bearer token. This 
    51profile instead only uses the JWT to obtain an OAuth 2.0 access token. The 
    52obtained OAuth 2.0 access token is used as the bearer token. 
    53 
    54Domain-wide delegation 
    55---------------------- 
    56 
    57Domain-wide delegation allows a service account to access user data on 
    58behalf of any user in a Google Apps domain without consent from the user. 
    59For example, an application that uses the Google Calendar API to add events to 
    60the calendars of all users in a Google Apps domain would use a service account 
    61to access the Google Calendar API on behalf of users. 
    62 
    63The Google Apps administrator must explicitly authorize the service account to 
    64do this. This authorization step is referred to as "delegating domain-wide 
    65authority" to a service account. 
    66 
    67You can use domain-wise delegation by creating a set of credentials with a 
    68specific subject using :meth:`~Credentials.with_subject`. 
    69 
    70.. _RFC 7523: https://tools.ietf.org/html/rfc7523 
    71""" 
    72 
    73import copy 
    74import datetime 
    75 
    76from google.auth import _helpers 
    77from google.auth import _service_account_info 
    78from google.auth import credentials 
    79from google.auth import exceptions 
    80from google.auth import iam 
    81from google.auth import jwt 
    82from google.auth import metrics 
    83from google.oauth2 import _client 
    84 
    85_DEFAULT_TOKEN_LIFETIME_SECS = 3600  # 1 hour in seconds 
    86_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" 
    87_TRUST_BOUNDARY_LOOKUP_ENDPOINT = ( 
    88    "https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations" 
    89) 
    90 
    91 
    92class Credentials( 
    93    credentials.Signing, 
    94    credentials.Scoped, 
    95    credentials.CredentialsWithQuotaProject, 
    96    credentials.CredentialsWithTokenUri, 
    97    credentials.CredentialsWithTrustBoundary, 
    98): 
    99    """Service account credentials 
    100 
    101    Usually, you'll create these credentials with one of the helper 
    102    constructors. To create credentials using a Google service account 
    103    private key JSON file:: 
    104 
    105        credentials = service_account.Credentials.from_service_account_file( 
    106            'service-account.json') 
    107 
    108    Or if you already have the service account file loaded:: 
    109 
    110        service_account_info = json.load(open('service_account.json')) 
    111        credentials = service_account.Credentials.from_service_account_info( 
    112            service_account_info) 
    113 
    114    Both helper methods pass on arguments to the constructor, so you can 
    115    specify additional scopes and a subject if necessary:: 
    116 
    117        credentials = service_account.Credentials.from_service_account_file( 
    118            'service-account.json', 
    119            scopes=['email'], 
    120            subject='user@example.com') 
    121 
    122    The credentials are considered immutable. If you want to modify the scopes 
    123    or the subject used for delegation, use :meth:`with_scopes` or 
    124    :meth:`with_subject`:: 
    125 
    126        scoped_credentials = credentials.with_scopes(['email']) 
    127        delegated_credentials = credentials.with_subject(subject) 
    128 
    129    To add a quota project, use :meth:`with_quota_project`:: 
    130 
    131        credentials = credentials.with_quota_project('myproject-123') 
    132    """ 
    133 
    134    def __init__( 
    135        self, 
    136        signer, 
    137        service_account_email, 
    138        token_uri, 
    139        scopes=None, 
    140        default_scopes=None, 
    141        subject=None, 
    142        project_id=None, 
    143        quota_project_id=None, 
    144        additional_claims=None, 
    145        always_use_jwt_access=False, 
    146        universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 
    147        trust_boundary=None, 
    148    ): 
    149        """ 
    150        Args: 
    151            signer (google.auth.crypt.Signer): The signer used to sign JWTs. 
    152            service_account_email (str): The service account's email. 
    153            scopes (Sequence[str]): User-defined scopes to request during the 
    154                authorization grant. 
    155            default_scopes (Sequence[str]): Default scopes passed by a 
    156                Google client library. Use 'scopes' for user-defined scopes. 
    157            token_uri (str): The OAuth 2.0 Token URI. 
    158            subject (str): For domain-wide delegation, the email address of the 
    159                user to for which to request delegated access. 
    160            project_id  (str): Project ID associated with the service account 
    161                credential. 
    162            quota_project_id (Optional[str]): The project ID used for quota and 
    163                billing. 
    164            additional_claims (Mapping[str, str]): Any additional claims for 
    165                the JWT assertion used in the authorization grant. 
    166            always_use_jwt_access (Optional[bool]): Whether self signed JWT should 
    167                be always used. 
    168            universe_domain (str): The universe domain. The default 
    169                universe domain is googleapis.com. For default value self 
    170                signed jwt is used for token refresh. 
    171            trust_boundary (Mapping[str,str]): A credential trust boundary. 
    172 
    173        .. note:: Typically one of the helper constructors 
    174            :meth:`from_service_account_file` or 
    175            :meth:`from_service_account_info` are used instead of calling the 
    176            constructor directly. 
    177        """ 
    178        super(Credentials, self).__init__() 
    179 
    180        self._cred_file_path = None 
    181        self._scopes = scopes 
    182        self._default_scopes = default_scopes 
    183        self._signer = signer 
    184        self._service_account_email = service_account_email 
    185        self._subject = subject 
    186        self._project_id = project_id 
    187        self._quota_project_id = quota_project_id 
    188        self._token_uri = token_uri 
    189        self._always_use_jwt_access = always_use_jwt_access 
    190        self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 
    191 
    192        if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 
    193            self._always_use_jwt_access = True 
    194 
    195        self._jwt_credentials = None 
    196 
    197        if additional_claims is not None: 
    198            self._additional_claims = additional_claims 
    199        else: 
    200            self._additional_claims = {} 
    201        self._trust_boundary = trust_boundary 
    202 
    203    @classmethod 
    204    def _from_signer_and_info(cls, signer, info, **kwargs): 
    205        """Creates a Credentials instance from a signer and service account 
    206        info. 
    207 
    208        Args: 
    209            signer (google.auth.crypt.Signer): The signer used to sign JWTs. 
    210            info (Mapping[str, str]): The service account info. 
    211            kwargs: Additional arguments to pass to the constructor. 
    212 
    213        Returns: 
    214            google.auth.jwt.Credentials: The constructed credentials. 
    215 
    216        Raises: 
    217            ValueError: If the info is not in the expected format. 
    218        """ 
    219        return cls( 
    220            signer, 
    221            service_account_email=info["client_email"], 
    222            token_uri=info["token_uri"], 
    223            project_id=info.get("project_id"), 
    224            universe_domain=info.get( 
    225                "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 
    226            ), 
    227            trust_boundary=info.get("trust_boundary"), 
    228            **kwargs, 
    229        ) 
    230 
    231    @classmethod 
    232    def from_service_account_info(cls, info, **kwargs): 
    233        """Creates a Credentials instance from parsed service account info. 
    234 
    235        Args: 
    236            info (Mapping[str, str]): The service account info in Google 
    237                format. 
    238            kwargs: Additional arguments to pass to the constructor. 
    239 
    240        Returns: 
    241            google.auth.service_account.Credentials: The constructed 
    242                credentials. 
    243 
    244        Raises: 
    245            ValueError: If the info is not in the expected format. 
    246        """ 
    247        signer = _service_account_info.from_dict( 
    248            info, require=["client_email", "token_uri"] 
    249        ) 
    250        return cls._from_signer_and_info(signer, info, **kwargs) 
    251 
    252    @classmethod 
    253    def from_service_account_file(cls, filename, **kwargs): 
    254        """Creates a Credentials instance from a service account json file. 
    255 
    256        Args: 
    257            filename (str): The path to the service account json file. 
    258            kwargs: Additional arguments to pass to the constructor. 
    259 
    260        Returns: 
    261            google.auth.service_account.Credentials: The constructed 
    262                credentials. 
    263        """ 
    264        info, signer = _service_account_info.from_filename( 
    265            filename, require=["client_email", "token_uri"] 
    266        ) 
    267        return cls._from_signer_and_info(signer, info, **kwargs) 
    268 
    269    @property 
    270    def service_account_email(self): 
    271        """The service account email.""" 
    272        return self._service_account_email 
    273 
    274    @property 
    275    def project_id(self): 
    276        """Project ID associated with this credential.""" 
    277        return self._project_id 
    278 
    279    @property 
    280    def requires_scopes(self): 
    281        """Checks if the credentials requires scopes. 
    282 
    283        Returns: 
    284            bool: True if there are no scopes set otherwise False. 
    285        """ 
    286        return True if not self._scopes else False 
    287 
    288    def _make_copy(self): 
    289        cred = self.__class__( 
    290            self._signer, 
    291            service_account_email=self._service_account_email, 
    292            scopes=copy.copy(self._scopes), 
    293            default_scopes=copy.copy(self._default_scopes), 
    294            token_uri=self._token_uri, 
    295            subject=self._subject, 
    296            project_id=self._project_id, 
    297            quota_project_id=self._quota_project_id, 
    298            additional_claims=self._additional_claims.copy(), 
    299            always_use_jwt_access=self._always_use_jwt_access, 
    300            universe_domain=self._universe_domain, 
    301            trust_boundary=self._trust_boundary, 
    302        ) 
    303        cred._cred_file_path = self._cred_file_path 
    304        return cred 
    305 
    306    @_helpers.copy_docstring(credentials.Scoped) 
    307    def with_scopes(self, scopes, default_scopes=None): 
    308        cred = self._make_copy() 
    309        cred._scopes = scopes 
    310        cred._default_scopes = default_scopes 
    311        return cred 
    312 
    313    def with_always_use_jwt_access(self, always_use_jwt_access): 
    314        """Create a copy of these credentials with the specified always_use_jwt_access value. 
    315 
    316        Args: 
    317            always_use_jwt_access (bool): Whether always use self signed JWT or not. 
    318 
    319        Returns: 
    320            google.auth.service_account.Credentials: A new credentials 
    321                instance. 
    322        Raises: 
    323            google.auth.exceptions.InvalidValue: If the universe domain is not 
    324                default and always_use_jwt_access is False. 
    325        """ 
    326        cred = self._make_copy() 
    327        if ( 
    328            cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 
    329            and not always_use_jwt_access 
    330        ): 
    331            raise exceptions.InvalidValue( 
    332                "always_use_jwt_access should be True for non-default universe domain" 
    333            ) 
    334        cred._always_use_jwt_access = always_use_jwt_access 
    335        return cred 
    336 
    337    @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 
    338    def with_universe_domain(self, universe_domain): 
    339        cred = self._make_copy() 
    340        cred._universe_domain = universe_domain 
    341        if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 
    342            cred._always_use_jwt_access = True 
    343        return cred 
    344 
    345    def with_subject(self, subject): 
    346        """Create a copy of these credentials with the specified subject. 
    347 
    348        Args: 
    349            subject (str): The subject claim. 
    350 
    351        Returns: 
    352            google.auth.service_account.Credentials: A new credentials 
    353                instance. 
    354        """ 
    355        cred = self._make_copy() 
    356        cred._subject = subject 
    357        return cred 
    358 
    359    def with_claims(self, additional_claims): 
    360        """Returns a copy of these credentials with modified claims. 
    361 
    362        Args: 
    363            additional_claims (Mapping[str, str]): Any additional claims for 
    364                the JWT payload. This will be merged with the current 
    365                additional claims. 
    366 
    367        Returns: 
    368            google.auth.service_account.Credentials: A new credentials 
    369                instance. 
    370        """ 
    371        new_additional_claims = copy.deepcopy(self._additional_claims) 
    372        new_additional_claims.update(additional_claims or {}) 
    373        cred = self._make_copy() 
    374        cred._additional_claims = new_additional_claims 
    375        return cred 
    376 
    377    @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 
    378    def with_quota_project(self, quota_project_id): 
    379        cred = self._make_copy() 
    380        cred._quota_project_id = quota_project_id 
    381        return cred 
    382 
    383    @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 
    384    def with_token_uri(self, token_uri): 
    385        cred = self._make_copy() 
    386        cred._token_uri = token_uri 
    387        return cred 
    388 
    389    @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 
    390    def with_trust_boundary(self, trust_boundary): 
    391        cred = self._make_copy() 
    392        cred._trust_boundary = trust_boundary 
    393        return cred 
    394 
    395    def _make_authorization_grant_assertion(self): 
    396        """Create the OAuth 2.0 assertion. 
    397 
    398        This assertion is used during the OAuth 2.0 grant to acquire an 
    399        access token. 
    400 
    401        Returns: 
    402            bytes: The authorization grant assertion. 
    403        """ 
    404        now = _helpers.utcnow() 
    405        lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 
    406        expiry = now + lifetime 
    407 
    408        payload = { 
    409            "iat": _helpers.datetime_to_secs(now), 
    410            "exp": _helpers.datetime_to_secs(expiry), 
    411            # The issuer must be the service account email. 
    412            "iss": self._service_account_email, 
    413            # The audience must be the auth token endpoint's URI 
    414            "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 
    415            "scope": _helpers.scopes_to_string(self._scopes or ()), 
    416        } 
    417 
    418        payload.update(self._additional_claims) 
    419 
    420        # The subject can be a user email for domain-wide delegation. 
    421        if self._subject: 
    422            payload.setdefault("sub", self._subject) 
    423 
    424        token = jwt.encode(self._signer, payload) 
    425 
    426        return token 
    427 
    428    def _use_self_signed_jwt(self): 
    429        # Since domain wide delegation doesn't work with self signed JWT. If 
    430        # subject exists, then we should not use self signed JWT. 
    431        return self._subject is None and self._jwt_credentials is not None 
    432 
    433    def _metric_header_for_usage(self): 
    434        if self._use_self_signed_jwt(): 
    435            return metrics.CRED_TYPE_SA_JWT 
    436        return metrics.CRED_TYPE_SA_ASSERTION 
    437 
    438    @_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary) 
    439    def _refresh_token(self, request): 
    440        if self._always_use_jwt_access and not self._jwt_credentials: 
    441            # If self signed jwt should be used but jwt credential is not 
    442            # created, try to create one with scopes 
    443            self._create_self_signed_jwt(None) 
    444 
    445        if ( 
    446            self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 
    447            and self._subject 
    448        ): 
    449            raise exceptions.RefreshError( 
    450                "domain wide delegation is not supported for non-default universe domain" 
    451            ) 
    452 
    453        if self._use_self_signed_jwt(): 
    454            self._jwt_credentials.refresh(request) 
    455            self.token = self._jwt_credentials.token.decode() 
    456            self.expiry = self._jwt_credentials.expiry 
    457        else: 
    458            assertion = self._make_authorization_grant_assertion() 
    459            access_token, expiry, _ = _client.jwt_grant( 
    460                request, self._token_uri, assertion 
    461            ) 
    462            self.token = access_token 
    463            self.expiry = expiry 
    464 
    465    def _create_self_signed_jwt(self, audience): 
    466        """Create a self-signed JWT from the credentials if requirements are met. 
    467 
    468        Args: 
    469            audience (str): The service URL. ``https://[API_ENDPOINT]/`` 
    470        """ 
    471        # https://google.aip.dev/auth/4111 
    472        if self._always_use_jwt_access: 
    473            if self._scopes: 
    474                additional_claims = {"scope": " ".join(self._scopes)} 
    475                if ( 
    476                    self._jwt_credentials is None 
    477                    or self._jwt_credentials.additional_claims != additional_claims 
    478                ): 
    479                    self._jwt_credentials = jwt.Credentials.from_signing_credentials( 
    480                        self, None, additional_claims=additional_claims 
    481                    ) 
    482            elif audience: 
    483                if ( 
    484                    self._jwt_credentials is None 
    485                    or self._jwt_credentials._audience != audience 
    486                ): 
    487 
    488                    self._jwt_credentials = jwt.Credentials.from_signing_credentials( 
    489                        self, audience 
    490                    ) 
    491            elif self._default_scopes: 
    492                additional_claims = {"scope": " ".join(self._default_scopes)} 
    493                if ( 
    494                    self._jwt_credentials is None 
    495                    or additional_claims != self._jwt_credentials.additional_claims 
    496                ): 
    497                    self._jwt_credentials = jwt.Credentials.from_signing_credentials( 
    498                        self, None, additional_claims=additional_claims 
    499                    ) 
    500        elif not self._scopes and audience: 
    501            self._jwt_credentials = jwt.Credentials.from_signing_credentials( 
    502                self, audience 
    503            ) 
    504 
    505    def _build_trust_boundary_lookup_url(self): 
    506        """Builds and returns the URL for the trust boundary lookup API. 
    507 
    508        This method constructs the specific URL for the IAM Credentials API's 
    509        `allowedLocations` endpoint, using the credential's universe domain 
    510        and service account email. 
    511 
    512        Raises: 
    513            ValueError: If `self.service_account_email` is None or an empty 
    514                string, as it's required to form the URL. 
    515 
    516        Returns: 
    517            str: The URL for the trust boundary lookup endpoint. 
    518        """ 
    519        if not self.service_account_email: 
    520            raise ValueError( 
    521                "Service account email is required to build the trust boundary lookup URL." 
    522            ) 
    523        return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format( 
    524            self._universe_domain, self._service_account_email 
    525        ) 
    526 
    527    @_helpers.copy_docstring(credentials.Signing) 
    528    def sign_bytes(self, message): 
    529        return self._signer.sign(message) 
    530 
    531    @property  # type: ignore 
    532    @_helpers.copy_docstring(credentials.Signing) 
    533    def signer(self): 
    534        return self._signer 
    535 
    536    @property  # type: ignore 
    537    @_helpers.copy_docstring(credentials.Signing) 
    538    def signer_email(self): 
    539        return self._service_account_email 
    540 
    541    @_helpers.copy_docstring(credentials.Credentials) 
    542    def get_cred_info(self): 
    543        if self._cred_file_path: 
    544            return { 
    545                "credential_source": self._cred_file_path, 
    546                "credential_type": "service account credentials", 
    547                "principal": self.service_account_email, 
    548            } 
    549        return None 
    550 
    551 
    552class IDTokenCredentials( 
    553    credentials.Signing, 
    554    credentials.CredentialsWithQuotaProject, 
    555    credentials.CredentialsWithTokenUri, 
    556): 
    557    """Open ID Connect ID Token-based service account credentials. 
    558 
    559    These credentials are largely similar to :class:`.Credentials`, but instead 
    560    of using an OAuth 2.0 Access Token as the bearer token, they use an Open 
    561    ID Connect ID Token as the bearer token. These credentials are useful when 
    562    communicating to services that require ID Tokens and can not accept access 
    563    tokens. 
    564 
    565    Usually, you'll create these credentials with one of the helper 
    566    constructors. To create credentials using a Google service account 
    567    private key JSON file:: 
    568 
    569        credentials = ( 
    570            service_account.IDTokenCredentials.from_service_account_file( 
    571                'service-account.json')) 
    572 
    573 
    574    Or if you already have the service account file loaded:: 
    575 
    576        service_account_info = json.load(open('service_account.json')) 
    577        credentials = ( 
    578            service_account.IDTokenCredentials.from_service_account_info( 
    579                service_account_info)) 
    580 
    581 
    582    Both helper methods pass on arguments to the constructor, so you can 
    583    specify additional scopes and a subject if necessary:: 
    584 
    585        credentials = ( 
    586            service_account.IDTokenCredentials.from_service_account_file( 
    587                'service-account.json', 
    588                scopes=['email'], 
    589                subject='user@example.com')) 
    590 
    591 
    592    The credentials are considered immutable. If you want to modify the scopes 
    593    or the subject used for delegation, use :meth:`with_scopes` or 
    594    :meth:`with_subject`:: 
    595 
    596        scoped_credentials = credentials.with_scopes(['email']) 
    597        delegated_credentials = credentials.with_subject(subject) 
    598 
    599    """ 
    600 
    601    def __init__( 
    602        self, 
    603        signer, 
    604        service_account_email, 
    605        token_uri, 
    606        target_audience, 
    607        additional_claims=None, 
    608        quota_project_id=None, 
    609        universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 
    610    ): 
    611        """ 
    612        Args: 
    613            signer (google.auth.crypt.Signer): The signer used to sign JWTs. 
    614            service_account_email (str): The service account's email. 
    615            token_uri (str): The OAuth 2.0 Token URI. 
    616            target_audience (str): The intended audience for these credentials, 
    617                used when requesting the ID Token. The ID Token's ``aud`` claim 
    618                will be set to this string. 
    619            additional_claims (Mapping[str, str]): Any additional claims for 
    620                the JWT assertion used in the authorization grant. 
    621            quota_project_id (Optional[str]): The project ID used for quota and billing. 
    622            universe_domain (str): The universe domain. The default 
    623                universe domain is googleapis.com. For default value IAM ID 
    624                token endponint is used for token refresh. Note that 
    625                iam.serviceAccountTokenCreator role is required to use the IAM 
    626                endpoint. 
    627 
    628        .. note:: Typically one of the helper constructors 
    629            :meth:`from_service_account_file` or 
    630            :meth:`from_service_account_info` are used instead of calling the 
    631            constructor directly. 
    632        """ 
    633        super(IDTokenCredentials, self).__init__() 
    634        self._signer = signer 
    635        self._service_account_email = service_account_email 
    636        self._token_uri = token_uri 
    637        self._target_audience = target_audience 
    638        self._quota_project_id = quota_project_id 
    639        self._use_iam_endpoint = False 
    640 
    641        if not universe_domain: 
    642            self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN 
    643        else: 
    644            self._universe_domain = universe_domain 
    645        self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( 
    646            "googleapis.com", self._universe_domain 
    647        ) 
    648 
    649        if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: 
    650            self._use_iam_endpoint = True 
    651 
    652        if additional_claims is not None: 
    653            self._additional_claims = additional_claims 
    654        else: 
    655            self._additional_claims = {} 
    656 
    657    @classmethod 
    658    def _from_signer_and_info(cls, signer, info, **kwargs): 
    659        """Creates a credentials instance from a signer and service account 
    660        info. 
    661 
    662        Args: 
    663            signer (google.auth.crypt.Signer): The signer used to sign JWTs. 
    664            info (Mapping[str, str]): The service account info. 
    665            kwargs: Additional arguments to pass to the constructor. 
    666 
    667        Returns: 
    668            google.auth.jwt.IDTokenCredentials: The constructed credentials. 
    669 
    670        Raises: 
    671            ValueError: If the info is not in the expected format. 
    672        """ 
    673        kwargs.setdefault("service_account_email", info["client_email"]) 
    674        kwargs.setdefault("token_uri", info["token_uri"]) 
    675        if "universe_domain" in info: 
    676            kwargs["universe_domain"] = info["universe_domain"] 
    677        return cls(signer, **kwargs) 
    678 
    679    @classmethod 
    680    def from_service_account_info(cls, info, **kwargs): 
    681        """Creates a credentials instance from parsed service account info. 
    682 
    683        Args: 
    684            info (Mapping[str, str]): The service account info in Google 
    685                format. 
    686            kwargs: Additional arguments to pass to the constructor. 
    687 
    688        Returns: 
    689            google.auth.service_account.IDTokenCredentials: The constructed 
    690                credentials. 
    691 
    692        Raises: 
    693            ValueError: If the info is not in the expected format. 
    694        """ 
    695        signer = _service_account_info.from_dict( 
    696            info, require=["client_email", "token_uri"] 
    697        ) 
    698        return cls._from_signer_and_info(signer, info, **kwargs) 
    699 
    700    @classmethod 
    701    def from_service_account_file(cls, filename, **kwargs): 
    702        """Creates a credentials instance from a service account json file. 
    703 
    704        Args: 
    705            filename (str): The path to the service account json file. 
    706            kwargs: Additional arguments to pass to the constructor. 
    707 
    708        Returns: 
    709            google.auth.service_account.IDTokenCredentials: The constructed 
    710                credentials. 
    711        """ 
    712        info, signer = _service_account_info.from_filename( 
    713            filename, require=["client_email", "token_uri"] 
    714        ) 
    715        return cls._from_signer_and_info(signer, info, **kwargs) 
    716 
    717    def _make_copy(self): 
    718        cred = self.__class__( 
    719            self._signer, 
    720            service_account_email=self._service_account_email, 
    721            token_uri=self._token_uri, 
    722            target_audience=self._target_audience, 
    723            additional_claims=self._additional_claims.copy(), 
    724            quota_project_id=self.quota_project_id, 
    725            universe_domain=self._universe_domain, 
    726        ) 
    727        # _use_iam_endpoint is not exposed in the constructor 
    728        cred._use_iam_endpoint = self._use_iam_endpoint 
    729        return cred 
    730 
    731    def with_target_audience(self, target_audience): 
    732        """Create a copy of these credentials with the specified target 
    733        audience. 
    734 
    735        Args: 
    736            target_audience (str): The intended audience for these credentials, 
    737            used when requesting the ID Token. 
    738 
    739        Returns: 
    740            google.auth.service_account.IDTokenCredentials: A new credentials 
    741                instance. 
    742        """ 
    743        cred = self._make_copy() 
    744        cred._target_audience = target_audience 
    745        return cred 
    746 
    747    def _with_use_iam_endpoint(self, use_iam_endpoint): 
    748        """Create a copy of these credentials with the use_iam_endpoint value. 
    749 
    750        Args: 
    751            use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will 
    752                be used instead of the token_uri. Note that 
    753                iam.serviceAccountTokenCreator role is required to use the IAM 
    754                endpoint. The default value is False. This feature is currently 
    755                experimental and subject to change without notice. 
    756 
    757        Returns: 
    758            google.auth.service_account.IDTokenCredentials: A new credentials 
    759                instance. 
    760        Raises: 
    761            google.auth.exceptions.InvalidValue: If the universe domain is not 
    762                default and use_iam_endpoint is False. 
    763        """ 
    764        cred = self._make_copy() 
    765        if ( 
    766            cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN 
    767            and not use_iam_endpoint 
    768        ): 
    769            raise exceptions.InvalidValue( 
    770                "use_iam_endpoint should be True for non-default universe domain" 
    771            ) 
    772        cred._use_iam_endpoint = use_iam_endpoint 
    773        return cred 
    774 
    775    @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 
    776    def with_quota_project(self, quota_project_id): 
    777        cred = self._make_copy() 
    778        cred._quota_project_id = quota_project_id 
    779        return cred 
    780 
    781    @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 
    782    def with_token_uri(self, token_uri): 
    783        cred = self._make_copy() 
    784        cred._token_uri = token_uri 
    785        return cred 
    786 
    787    def _make_authorization_grant_assertion(self): 
    788        """Create the OAuth 2.0 assertion. 
    789 
    790        This assertion is used during the OAuth 2.0 grant to acquire an 
    791        ID token. 
    792 
    793        Returns: 
    794            bytes: The authorization grant assertion. 
    795        """ 
    796        now = _helpers.utcnow() 
    797        lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) 
    798        expiry = now + lifetime 
    799 
    800        payload = { 
    801            "iat": _helpers.datetime_to_secs(now), 
    802            "exp": _helpers.datetime_to_secs(expiry), 
    803            # The issuer must be the service account email. 
    804            "iss": self.service_account_email, 
    805            # The audience must be the auth token endpoint's URI 
    806            "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, 
    807            # The target audience specifies which service the ID token is 
    808            # intended for. 
    809            "target_audience": self._target_audience, 
    810        } 
    811 
    812        payload.update(self._additional_claims) 
    813 
    814        token = jwt.encode(self._signer, payload) 
    815 
    816        return token 
    817 
    818    def _refresh_with_iam_endpoint(self, request): 
    819        """Use IAM generateIdToken endpoint to obtain an ID token. 
    820 
    821        It works as follows: 
    822 
    823        1. First we create a self signed jwt with 
    824        https://www.googleapis.com/auth/iam being the scope. 
    825 
    826        2. Next we use the self signed jwt as the access token, and make a POST 
    827        request to IAM generateIdToken endpoint. The request body is: 
    828            { 
    829                "audience": self._target_audience, 
    830                "includeEmail": "true", 
    831                "useEmailAzp": "true", 
    832            } 
    833 
    834        If the request is succesfully, it will return {"token":"the ID token"}, 
    835        and we can extract the ID token and compute its expiry. 
    836        """ 
    837        jwt_credentials = jwt.Credentials.from_signing_credentials( 
    838            self, 
    839            None, 
    840            additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, 
    841        ) 
    842        jwt_credentials.refresh(request) 
    843 
    844        self.token, self.expiry = _client.call_iam_generate_id_token_endpoint( 
    845            request, 
    846            self._iam_id_token_endpoint, 
    847            self.signer_email, 
    848            self._target_audience, 
    849            jwt_credentials.token.decode(), 
    850            self._universe_domain, 
    851        ) 
    852 
    853    @_helpers.copy_docstring(credentials.Credentials) 
    854    def refresh(self, request): 
    855        if self._use_iam_endpoint: 
    856            self._refresh_with_iam_endpoint(request) 
    857        else: 
    858            assertion = self._make_authorization_grant_assertion() 
    859            access_token, expiry, _ = _client.id_token_jwt_grant( 
    860                request, self._token_uri, assertion 
    861            ) 
    862            self.token = access_token 
    863            self.expiry = expiry 
    864 
    865    @property 
    866    def service_account_email(self): 
    867        """The service account email.""" 
    868        return self._service_account_email 
    869 
    870    @_helpers.copy_docstring(credentials.Signing) 
    871    def sign_bytes(self, message): 
    872        return self._signer.sign(message) 
    873 
    874    @property  # type: ignore 
    875    @_helpers.copy_docstring(credentials.Signing) 
    876    def signer(self): 
    877        return self._signer 
    878 
    879    @property  # type: ignore 
    880    @_helpers.copy_docstring(credentials.Signing) 
    881    def signer_email(self): 
    882        return self._service_account_email