1# Copyright 2021 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15""" Challenges for reauthentication. 
    16""" 
    17 
    18import abc 
    19import base64 
    20import getpass 
    21import sys 
    22 
    23from google.auth import _helpers 
    24from google.auth import exceptions 
    25from google.oauth2 import webauthn_handler_factory 
    26from google.oauth2.webauthn_types import ( 
    27    AuthenticationExtensionsClientInputs, 
    28    GetRequest, 
    29    PublicKeyCredentialDescriptor, 
    30) 
    31 
    32 
    33REAUTH_ORIGIN = "https://accounts.google.com" 
    34SAML_CHALLENGE_MESSAGE = ( 
    35    "Please run `gcloud auth login` to complete reauthentication with SAML." 
    36) 
    37WEBAUTHN_TIMEOUT_MS = 120000  # Two minute timeout 
    38 
    39 
    40def get_user_password(text): 
    41    """Get password from user. 
    42 
    43    Override this function with a different logic if you are using this library 
    44    outside a CLI. 
    45 
    46    Args: 
    47        text (str): message for the password prompt. 
    48 
    49    Returns: 
    50        str: password string. 
    51    """ 
    52    return getpass.getpass(text) 
    53 
    54 
    55class ReauthChallenge(metaclass=abc.ABCMeta): 
    56    """Base class for reauth challenges.""" 
    57 
    58    @property 
    59    @abc.abstractmethod 
    60    def name(self):  # pragma: NO COVER 
    61        """Returns the name of the challenge.""" 
    62        raise NotImplementedError("name property must be implemented") 
    63 
    64    @property 
    65    @abc.abstractmethod 
    66    def is_locally_eligible(self):  # pragma: NO COVER 
    67        """Returns true if a challenge is supported locally on this machine.""" 
    68        raise NotImplementedError("is_locally_eligible property must be implemented") 
    69 
    70    @abc.abstractmethod 
    71    def obtain_challenge_input(self, metadata):  # pragma: NO COVER 
    72        """Performs logic required to obtain credentials and returns it. 
    73 
    74        Args: 
    75            metadata (Mapping): challenge metadata returned in the 'challenges' field in 
    76                the initial reauth request. Includes the 'challengeType' field 
    77                and other challenge-specific fields. 
    78 
    79        Returns: 
    80            response that will be send to the reauth service as the content of 
    81            the 'proposalResponse' field in the request body. Usually a dict 
    82            with the keys specific to the challenge. For example, 
    83            ``{'credential': password}`` for password challenge. 
    84        """ 
    85        raise NotImplementedError("obtain_challenge_input method must be implemented") 
    86 
    87 
    88class PasswordChallenge(ReauthChallenge): 
    89    """Challenge that asks for user's password.""" 
    90 
    91    @property 
    92    def name(self): 
    93        return "PASSWORD" 
    94 
    95    @property 
    96    def is_locally_eligible(self): 
    97        return True 
    98 
    99    @_helpers.copy_docstring(ReauthChallenge) 
    100    def obtain_challenge_input(self, unused_metadata): 
    101        passwd = get_user_password("Please enter your password:") 
    102        if not passwd: 
    103            passwd = " "  # avoid the server crashing in case of no password :D 
    104        return {"credential": passwd} 
    105 
    106 
    107class SecurityKeyChallenge(ReauthChallenge): 
    108    """Challenge that asks for user's security key touch.""" 
    109 
    110    @property 
    111    def name(self): 
    112        return "SECURITY_KEY" 
    113 
    114    @property 
    115    def is_locally_eligible(self): 
    116        return True 
    117 
    118    @_helpers.copy_docstring(ReauthChallenge) 
    119    def obtain_challenge_input(self, metadata): 
    120        # Check if there is an available Webauthn Handler, if not use pyu2f 
    121        try: 
    122            factory = webauthn_handler_factory.WebauthnHandlerFactory() 
    123            webauthn_handler = factory.get_handler() 
    124            if webauthn_handler is not None: 
    125                sys.stderr.write("Please insert and touch your security key\n") 
    126                return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) 
    127        except Exception: 
    128            # Attempt pyu2f if exception in webauthn flow 
    129            pass 
    130 
    131        try: 
    132            import pyu2f.convenience.authenticator  # type: ignore 
    133            import pyu2f.errors  # type: ignore 
    134            import pyu2f.model  # type: ignore 
    135        except ImportError: 
    136            raise exceptions.ReauthFailError( 
    137                "pyu2f dependency is required to use Security key reauth feature. " 
    138                "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`." 
    139            ) 
    140        sk = metadata["securityKey"] 
    141        challenges = sk["challenges"] 
    142        # Read both 'applicationId' and 'relyingPartyId', if they are the same, use 
    143        # applicationId, if they are different, use relyingPartyId first and retry 
    144        # with applicationId 
    145        application_id = sk["applicationId"] 
    146        relying_party_id = sk["relyingPartyId"] 
    147 
    148        if application_id != relying_party_id: 
    149            application_parameters = [relying_party_id, application_id] 
    150        else: 
    151            application_parameters = [application_id] 
    152 
    153        challenge_data = [] 
    154        for c in challenges: 
    155            kh = c["keyHandle"].encode("ascii") 
    156            key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) 
    157            challenge = c["challenge"].encode("ascii") 
    158            challenge = base64.urlsafe_b64decode(challenge) 
    159            challenge_data.append({"key": key, "challenge": challenge}) 
    160 
    161        # Track number of tries to suppress error message until all application_parameters 
    162        # are tried. 
    163        tries = 0 
    164        for app_id in application_parameters: 
    165            try: 
    166                tries += 1 
    167                api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( 
    168                    REAUTH_ORIGIN 
    169                ) 
    170                response = api.Authenticate( 
    171                    app_id, challenge_data, print_callback=sys.stderr.write 
    172                ) 
    173                return {"securityKey": response} 
    174            except pyu2f.errors.U2FError as e: 
    175                if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: 
    176                    # Only show error if all app_ids have been tried 
    177                    if tries == len(application_parameters): 
    178                        sys.stderr.write("Ineligible security key.\n") 
    179                        return None 
    180                    continue 
    181                if e.code == pyu2f.errors.U2FError.TIMEOUT: 
    182                    sys.stderr.write( 
    183                        "Timed out while waiting for security key touch.\n" 
    184                    ) 
    185                else: 
    186                    raise e 
    187            except pyu2f.errors.PluginError as e: 
    188                sys.stderr.write("Plugin error: {}.\n".format(e)) 
    189                continue 
    190            except pyu2f.errors.NoDeviceFoundError: 
    191                sys.stderr.write("No security key found.\n") 
    192            return None 
    193 
    194    def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): 
    195        sk = metadata.get("securityKey") 
    196        if sk is None: 
    197            raise exceptions.InvalidValue("securityKey is None") 
    198        challenges = sk.get("challenges") 
    199        application_id = sk.get("applicationId") 
    200        relying_party_id = sk.get("relyingPartyId") 
    201        if challenges is None or len(challenges) < 1: 
    202            raise exceptions.InvalidValue("challenges is None or empty") 
    203        if application_id is None: 
    204            raise exceptions.InvalidValue("application_id is None") 
    205        if relying_party_id is None: 
    206            raise exceptions.InvalidValue("relying_party_id is None") 
    207 
    208        allow_credentials = [] 
    209        for challenge in challenges: 
    210            kh = challenge.get("keyHandle") 
    211            if kh is None: 
    212                raise exceptions.InvalidValue("keyHandle is None") 
    213            key_handle = self._unpadded_urlsafe_b64recode(kh) 
    214            allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle)) 
    215 
    216        extension = AuthenticationExtensionsClientInputs(appid=application_id) 
    217 
    218        challenge = challenges[0].get("challenge") 
    219        if challenge is None: 
    220            raise exceptions.InvalidValue("challenge is None") 
    221 
    222        get_request = GetRequest( 
    223            origin=REAUTH_ORIGIN, 
    224            rpid=relying_party_id, 
    225            challenge=self._unpadded_urlsafe_b64recode(challenge), 
    226            timeout_ms=WEBAUTHN_TIMEOUT_MS, 
    227            allow_credentials=allow_credentials, 
    228            user_verification="required", 
    229            extensions=extension, 
    230        ) 
    231 
    232        try: 
    233            get_response = webauthn_handler.get(get_request) 
    234        except Exception as e: 
    235            sys.stderr.write("Webauthn Error: {}.\n".format(e)) 
    236            raise e 
    237 
    238        response = { 
    239            "clientData": get_response.response.client_data_json, 
    240            "authenticatorData": get_response.response.authenticator_data, 
    241            "signatureData": get_response.response.signature, 
    242            "applicationId": application_id, 
    243            "keyHandle": get_response.id, 
    244            "securityKeyReplyType": 2, 
    245        } 
    246        return {"securityKey": response} 
    247 
    248    def _unpadded_urlsafe_b64recode(self, s): 
    249        """Converts standard b64 encoded string to url safe b64 encoded string 
    250        with no padding.""" 
    251        b = base64.urlsafe_b64decode(s) 
    252        return base64.urlsafe_b64encode(b).decode().rstrip("=") 
    253 
    254 
    255class SamlChallenge(ReauthChallenge): 
    256    """Challenge that asks the users to browse to their ID Providers. 
    257 
    258    Currently SAML challenge is not supported. When obtaining the challenge 
    259    input, exception will be raised to instruct the users to run 
    260    `gcloud auth login` for reauthentication. 
    261    """ 
    262 
    263    @property 
    264    def name(self): 
    265        return "SAML" 
    266 
    267    @property 
    268    def is_locally_eligible(self): 
    269        return True 
    270 
    271    def obtain_challenge_input(self, metadata): 
    272        # Magic Arch has not fully supported returning a proper dedirect URL 
    273        # for programmatic SAML users today. So we error our here and request 
    274        # users to use gcloud to complete a login. 
    275        raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE) 
    276 
    277 
    278AVAILABLE_CHALLENGES = { 
    279    challenge.name: challenge 
    280    for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()] 
    281}