1# Copyright 2022 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""External Account Authorized User Credentials. 
    16This module provides credentials based on OAuth 2.0 access and refresh tokens. 
    17These credentials usually access resources on behalf of a user (resource 
    18owner). 
    19 
    20Specifically, these are sourced using external identities via Workforce Identity Federation. 
    21 
    22Obtaining the initial access and refresh token can be done through the Google Cloud CLI. 
    23 
    24Example credential: 
    25{ 
    26  "type": "external_account_authorized_user", 
    27  "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID", 
    28  "refresh_token": "refreshToken", 
    29  "token_url": "https://sts.googleapis.com/v1/oauth/token", 
    30  "token_info_url": "https://sts.googleapis.com/v1/instrospect", 
    31  "client_id": "clientId", 
    32  "client_secret": "clientSecret" 
    33} 
    34""" 
    35 
    36import datetime 
    37import io 
    38import json 
    39 
    40from google.auth import _helpers 
    41from google.auth import credentials 
    42from google.auth import exceptions 
    43from google.oauth2 import sts 
    44from google.oauth2 import utils 
    45 
    46_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user" 
    47 
    48 
    49class Credentials( 
    50    credentials.CredentialsWithQuotaProject, 
    51    credentials.ReadOnlyScoped, 
    52    credentials.CredentialsWithTokenUri, 
    53): 
    54    """Credentials for External Account Authorized Users. 
    55 
    56    This is used to instantiate Credentials for exchanging refresh tokens from 
    57    authorized users for Google access token and authorizing requests to Google 
    58    APIs. 
    59 
    60    The credentials are considered immutable. If you want to modify the 
    61    quota project, use `with_quota_project` and if you want to modify the token 
    62    uri, use `with_token_uri`. 
    63 
    64    **IMPORTANT**: 
    65    This class does not validate the credential configuration. A security 
    66    risk occurs when a credential configuration configured with malicious urls 
    67    is used. 
    68    When the credential configuration is accepted from an 
    69    untrusted source, you should validate it before using. 
    70    Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.""" 
    71 
    72    def __init__( 
    73        self, 
    74        token=None, 
    75        expiry=None, 
    76        refresh_token=None, 
    77        audience=None, 
    78        client_id=None, 
    79        client_secret=None, 
    80        token_url=None, 
    81        token_info_url=None, 
    82        revoke_url=None, 
    83        scopes=None, 
    84        quota_project_id=None, 
    85        universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, 
    86    ): 
    87        """Instantiates a external account authorized user credentials object. 
    88 
    89        Args: 
    90        token (str): The OAuth 2.0 access token. Can be None if refresh information 
    91            is provided. 
    92        expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access 
    93            token. 
    94        refresh_token (str): The optional OAuth 2.0 refresh token. If specified, 
    95            credentials can be refreshed. 
    96        audience (str): The optional STS audience which contains the resource name for the workforce 
    97            pool and the provider identifier in that pool. 
    98        client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as 
    99            None if the token can not be refreshed. 
    100        client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be 
    101            left as None if the token can not be refreshed. 
    102        token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for 
    103            refresh, can be left as None if the token can not be refreshed. 
    104        token_info_url (str): The optional STS endpoint URL for token introspection. 
    105        revoke_url (str): The optional STS endpoint URL for revoking tokens. 
    106        quota_project_id (str): The optional project ID used for quota and billing. 
    107            This project may be different from the project used to 
    108            create the credentials. 
    109        universe_domain (Optional[str]): The universe domain. The default value 
    110            is googleapis.com. 
    111 
    112        Returns: 
    113            google.auth.external_account_authorized_user.Credentials: The 
    114                constructed credentials. 
    115        """ 
    116        super(Credentials, self).__init__() 
    117 
    118        self.token = token 
    119        self.expiry = expiry 
    120        self._audience = audience 
    121        self._refresh_token = refresh_token 
    122        self._token_url = token_url 
    123        self._token_info_url = token_info_url 
    124        self._client_id = client_id 
    125        self._client_secret = client_secret 
    126        self._revoke_url = revoke_url 
    127        self._quota_project_id = quota_project_id 
    128        self._scopes = scopes 
    129        self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN 
    130        self._cred_file_path = None 
    131 
    132        if not self.valid and not self.can_refresh: 
    133            raise exceptions.InvalidOperation( 
    134                "Token should be created with fields to make it valid (`token` and " 
    135                "`expiry`), or fields to allow it to refresh (`refresh_token`, " 
    136                "`token_url`, `client_id`, `client_secret`)." 
    137            ) 
    138 
    139        self._client_auth = None 
    140        if self._client_id: 
    141            self._client_auth = utils.ClientAuthentication( 
    142                utils.ClientAuthType.basic, self._client_id, self._client_secret 
    143            ) 
    144        self._sts_client = sts.Client(self._token_url, self._client_auth) 
    145 
    146    @property 
    147    def info(self): 
    148        """Generates the serializable dictionary representation of the current 
    149        credentials. 
    150 
    151        Returns: 
    152            Mapping: The dictionary representation of the credentials. This is the 
    153                reverse of the "from_info" method defined in this class. It is 
    154                useful for serializing the current credentials so it can deserialized 
    155                later. 
    156        """ 
    157        config_info = self.constructor_args() 
    158        config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE) 
    159        if config_info["expiry"]: 
    160            config_info["expiry"] = config_info["expiry"].isoformat() + "Z" 
    161 
    162        return {key: value for key, value in config_info.items() if value is not None} 
    163 
    164    def constructor_args(self): 
    165        return { 
    166            "audience": self._audience, 
    167            "refresh_token": self._refresh_token, 
    168            "token_url": self._token_url, 
    169            "token_info_url": self._token_info_url, 
    170            "client_id": self._client_id, 
    171            "client_secret": self._client_secret, 
    172            "token": self.token, 
    173            "expiry": self.expiry, 
    174            "revoke_url": self._revoke_url, 
    175            "scopes": self._scopes, 
    176            "quota_project_id": self._quota_project_id, 
    177            "universe_domain": self._universe_domain, 
    178        } 
    179 
    180    @property 
    181    def scopes(self): 
    182        """Optional[str]: The OAuth 2.0 permission scopes.""" 
    183        return self._scopes 
    184 
    185    @property 
    186    def requires_scopes(self): 
    187        """ False: OAuth 2.0 credentials have their scopes set when 
    188        the initial token is requested and can not be changed.""" 
    189        return False 
    190 
    191    @property 
    192    def client_id(self): 
    193        """Optional[str]: The OAuth 2.0 client ID.""" 
    194        return self._client_id 
    195 
    196    @property 
    197    def client_secret(self): 
    198        """Optional[str]: The OAuth 2.0 client secret.""" 
    199        return self._client_secret 
    200 
    201    @property 
    202    def audience(self): 
    203        """Optional[str]: The STS audience which contains the resource name for the 
    204            workforce pool and the provider identifier in that pool.""" 
    205        return self._audience 
    206 
    207    @property 
    208    def refresh_token(self): 
    209        """Optional[str]: The OAuth 2.0 refresh token.""" 
    210        return self._refresh_token 
    211 
    212    @property 
    213    def token_url(self): 
    214        """Optional[str]: The STS token exchange endpoint for refresh.""" 
    215        return self._token_url 
    216 
    217    @property 
    218    def token_info_url(self): 
    219        """Optional[str]: The STS endpoint for token info.""" 
    220        return self._token_info_url 
    221 
    222    @property 
    223    def revoke_url(self): 
    224        """Optional[str]: The STS endpoint for token revocation.""" 
    225        return self._revoke_url 
    226 
    227    @property 
    228    def is_user(self): 
    229        """ True: This credential always represents a user.""" 
    230        return True 
    231 
    232    @property 
    233    def can_refresh(self): 
    234        return all( 
    235            (self._refresh_token, self._token_url, self._client_id, self._client_secret) 
    236        ) 
    237 
    238    def get_project_id(self, request=None): 
    239        """Retrieves the project ID corresponding to the workload identity or workforce pool. 
    240        For workforce pool credentials, it returns the project ID corresponding to 
    241        the workforce_pool_user_project. 
    242 
    243        When not determinable, None is returned. 
    244 
    245        Args: 
    246            request (google.auth.transport.requests.Request): Request object. 
    247                Unused here, but passed from _default.default(). 
    248 
    249        Return: 
    250          str: project ID is not determinable for this credential type so it returns None 
    251        """ 
    252 
    253        return None 
    254 
    255    def to_json(self, strip=None): 
    256        """Utility function that creates a JSON representation of this 
    257        credential. 
    258        Args: 
    259            strip (Sequence[str]): Optional list of members to exclude from the 
    260                                   generated JSON. 
    261        Returns: 
    262            str: A JSON representation of this instance. When converted into 
    263            a dictionary, it can be passed to from_info() 
    264            to create a new instance. 
    265        """ 
    266        strip = strip if strip else [] 
    267        return json.dumps({k: v for (k, v) in self.info.items() if k not in strip}) 
    268 
    269    def refresh(self, request): 
    270        """Refreshes the access token. 
    271 
    272        Args: 
    273            request (google.auth.transport.Request): The object used to make 
    274                HTTP requests. 
    275 
    276        Raises: 
    277            google.auth.exceptions.RefreshError: If the credentials could 
    278                not be refreshed. 
    279        """ 
    280        if not self.can_refresh: 
    281            raise exceptions.RefreshError( 
    282                "The credentials do not contain the necessary fields need to " 
    283                "refresh the access token. You must specify refresh_token, " 
    284                "token_url, client_id, and client_secret." 
    285            ) 
    286 
    287        now = _helpers.utcnow() 
    288        response_data = self._make_sts_request(request) 
    289 
    290        self.token = response_data.get("access_token") 
    291 
    292        lifetime = datetime.timedelta(seconds=response_data.get("expires_in")) 
    293        self.expiry = now + lifetime 
    294 
    295        if "refresh_token" in response_data: 
    296            self._refresh_token = response_data["refresh_token"] 
    297 
    298    def _make_sts_request(self, request): 
    299        return self._sts_client.refresh_token(request, self._refresh_token) 
    300 
    301    @_helpers.copy_docstring(credentials.Credentials) 
    302    def get_cred_info(self): 
    303        if self._cred_file_path: 
    304            return { 
    305                "credential_source": self._cred_file_path, 
    306                "credential_type": "external account authorized user credentials", 
    307            } 
    308        return None 
    309 
    310    def _make_copy(self): 
    311        kwargs = self.constructor_args() 
    312        cred = self.__class__(**kwargs) 
    313        cred._cred_file_path = self._cred_file_path 
    314        return cred 
    315 
    316    @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 
    317    def with_quota_project(self, quota_project_id): 
    318        cred = self._make_copy() 
    319        cred._quota_project_id = quota_project_id 
    320        return cred 
    321 
    322    @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) 
    323    def with_token_uri(self, token_uri): 
    324        cred = self._make_copy() 
    325        cred._token_url = token_uri 
    326        return cred 
    327 
    328    @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) 
    329    def with_universe_domain(self, universe_domain): 
    330        cred = self._make_copy() 
    331        cred._universe_domain = universe_domain 
    332        return cred 
    333 
    334    @classmethod 
    335    def from_info(cls, info, **kwargs): 
    336        """Creates a Credentials instance from parsed external account info. 
    337 
    338        **IMPORTANT**: 
    339        This method does not validate the credential configuration. A security 
    340        risk occurs when a credential configuration configured with malicious urls 
    341        is used. 
    342        When the credential configuration is accepted from an 
    343        untrusted source, you should validate it before using with this method. 
    344        Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 
    345 
    346        Args: 
    347            info (Mapping[str, str]): The external account info in Google 
    348                format. 
    349            kwargs: Additional arguments to pass to the constructor. 
    350 
    351        Returns: 
    352            google.auth.external_account_authorized_user.Credentials: The 
    353                constructed credentials. 
    354 
    355        Raises: 
    356            ValueError: For invalid parameters. 
    357        """ 
    358        expiry = info.get("expiry") 
    359        if expiry: 
    360            expiry = datetime.datetime.strptime( 
    361                expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" 
    362            ) 
    363        return cls( 
    364            audience=info.get("audience"), 
    365            refresh_token=info.get("refresh_token"), 
    366            token_url=info.get("token_url"), 
    367            token_info_url=info.get("token_info_url"), 
    368            client_id=info.get("client_id"), 
    369            client_secret=info.get("client_secret"), 
    370            token=info.get("token"), 
    371            expiry=expiry, 
    372            revoke_url=info.get("revoke_url"), 
    373            quota_project_id=info.get("quota_project_id"), 
    374            scopes=info.get("scopes"), 
    375            universe_domain=info.get( 
    376                "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN 
    377            ), 
    378            **kwargs 
    379        ) 
    380 
    381    @classmethod 
    382    def from_file(cls, filename, **kwargs): 
    383        """Creates a Credentials instance from an external account json file. 
    384 
    385        **IMPORTANT**: 
    386        This method does not validate the credential configuration. A security 
    387        risk occurs when a credential configuration configured with malicious urls 
    388        is used. 
    389        When the credential configuration is accepted from an 
    390        untrusted source, you should validate it before using with this method. 
    391        Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. 
    392 
    393        Args: 
    394            filename (str): The path to the external account json file. 
    395            kwargs: Additional arguments to pass to the constructor. 
    396 
    397        Returns: 
    398            google.auth.external_account_authorized_user.Credentials: The 
    399                constructed credentials. 
    400        """ 
    401        with io.open(filename, "r", encoding="utf-8") as json_file: 
    402            data = json.load(json_file) 
    403            return cls.from_info(data, **kwargs)