Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/oauth2/challenges.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

138 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" Challenges for reauthentication. 

16""" 

17 

18import abc 

19import base64 

20import getpass 

21import sys 

22 

23from google.auth import _helpers 

24from google.auth import exceptions 

25from google.oauth2 import webauthn_handler_factory 

26from google.oauth2.webauthn_types import ( 

27 AuthenticationExtensionsClientInputs, 

28 GetRequest, 

29 PublicKeyCredentialDescriptor, 

30) 

31 

32 

33REAUTH_ORIGIN = "https://accounts.google.com" 

34SAML_CHALLENGE_MESSAGE = ( 

35 "Please run `gcloud auth login` to complete reauthentication with SAML." 

36) 

37WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout 

38 

39 

40def get_user_password(text): 

41 """Get password from user. 

42 

43 Override this function with a different logic if you are using this library 

44 outside a CLI. 

45 

46 Args: 

47 text (str): message for the password prompt. 

48 

49 Returns: 

50 str: password string. 

51 """ 

52 return getpass.getpass(text) 

53 

54 

55class ReauthChallenge(metaclass=abc.ABCMeta): 

56 """Base class for reauth challenges.""" 

57 

58 @property 

59 @abc.abstractmethod 

60 def name(self): # pragma: NO COVER 

61 """Returns the name of the challenge.""" 

62 raise NotImplementedError("name property must be implemented") 

63 

64 @property 

65 @abc.abstractmethod 

66 def is_locally_eligible(self): # pragma: NO COVER 

67 """Returns true if a challenge is supported locally on this machine.""" 

68 raise NotImplementedError("is_locally_eligible property must be implemented") 

69 

70 @abc.abstractmethod 

71 def obtain_challenge_input(self, metadata): # pragma: NO COVER 

72 """Performs logic required to obtain credentials and returns it. 

73 

74 Args: 

75 metadata (Mapping): challenge metadata returned in the 'challenges' field in 

76 the initial reauth request. Includes the 'challengeType' field 

77 and other challenge-specific fields. 

78 

79 Returns: 

80 response that will be send to the reauth service as the content of 

81 the 'proposalResponse' field in the request body. Usually a dict 

82 with the keys specific to the challenge. For example, 

83 ``{'credential': password}`` for password challenge. 

84 """ 

85 raise NotImplementedError("obtain_challenge_input method must be implemented") 

86 

87 

88class PasswordChallenge(ReauthChallenge): 

89 """Challenge that asks for user's password.""" 

90 

91 @property 

92 def name(self): 

93 return "PASSWORD" 

94 

95 @property 

96 def is_locally_eligible(self): 

97 return True 

98 

99 @_helpers.copy_docstring(ReauthChallenge) 

100 def obtain_challenge_input(self, unused_metadata): 

101 passwd = get_user_password("Please enter your password:") 

102 if not passwd: 

103 passwd = " " # avoid the server crashing in case of no password :D 

104 return {"credential": passwd} 

105 

106 

107class SecurityKeyChallenge(ReauthChallenge): 

108 """Challenge that asks for user's security key touch.""" 

109 

110 @property 

111 def name(self): 

112 return "SECURITY_KEY" 

113 

114 @property 

115 def is_locally_eligible(self): 

116 return True 

117 

118 @_helpers.copy_docstring(ReauthChallenge) 

119 def obtain_challenge_input(self, metadata): 

120 # Check if there is an available Webauthn Handler, if not use pyu2f 

121 try: 

122 factory = webauthn_handler_factory.WebauthnHandlerFactory() 

123 webauthn_handler = factory.get_handler() 

124 if webauthn_handler is not None: 

125 sys.stderr.write("Please insert and touch your security key\n") 

126 return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) 

127 except Exception: 

128 # Attempt pyu2f if exception in webauthn flow 

129 pass 

130 

131 try: 

132 import pyu2f.convenience.authenticator # type: ignore 

133 import pyu2f.errors # type: ignore 

134 import pyu2f.model # type: ignore 

135 except ImportError: 

136 raise exceptions.ReauthFailError( 

137 "pyu2f dependency is required to use Security key reauth feature. " 

138 "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`." 

139 ) 

140 sk = metadata["securityKey"] 

141 challenges = sk["challenges"] 

142 # Read both 'applicationId' and 'relyingPartyId', if they are the same, use 

143 # applicationId, if they are different, use relyingPartyId first and retry 

144 # with applicationId 

145 application_id = sk["applicationId"] 

146 relying_party_id = sk["relyingPartyId"] 

147 

148 if application_id != relying_party_id: 

149 application_parameters = [relying_party_id, application_id] 

150 else: 

151 application_parameters = [application_id] 

152 

153 challenge_data = [] 

154 for c in challenges: 

155 kh = c["keyHandle"].encode("ascii") 

156 key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) 

157 challenge = c["challenge"].encode("ascii") 

158 challenge = base64.urlsafe_b64decode(challenge) 

159 challenge_data.append({"key": key, "challenge": challenge}) 

160 

161 # Track number of tries to suppress error message until all application_parameters 

162 # are tried. 

163 tries = 0 

164 for app_id in application_parameters: 

165 try: 

166 tries += 1 

167 api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( 

168 REAUTH_ORIGIN 

169 ) 

170 response = api.Authenticate( 

171 app_id, challenge_data, print_callback=sys.stderr.write 

172 ) 

173 return {"securityKey": response} 

174 except pyu2f.errors.U2FError as e: 

175 if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: 

176 # Only show error if all app_ids have been tried 

177 if tries == len(application_parameters): 

178 sys.stderr.write("Ineligible security key.\n") 

179 return None 

180 continue 

181 if e.code == pyu2f.errors.U2FError.TIMEOUT: 

182 sys.stderr.write( 

183 "Timed out while waiting for security key touch.\n" 

184 ) 

185 else: 

186 raise e 

187 except pyu2f.errors.PluginError as e: 

188 sys.stderr.write("Plugin error: {}.\n".format(e)) 

189 continue 

190 except pyu2f.errors.NoDeviceFoundError: 

191 sys.stderr.write("No security key found.\n") 

192 return None 

193 

194 def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): 

195 sk = metadata.get("securityKey") 

196 if sk is None: 

197 raise exceptions.InvalidValue("securityKey is None") 

198 challenges = sk.get("challenges") 

199 application_id = sk.get("applicationId") 

200 relying_party_id = sk.get("relyingPartyId") 

201 if challenges is None or len(challenges) < 1: 

202 raise exceptions.InvalidValue("challenges is None or empty") 

203 if application_id is None: 

204 raise exceptions.InvalidValue("application_id is None") 

205 if relying_party_id is None: 

206 raise exceptions.InvalidValue("relying_party_id is None") 

207 

208 allow_credentials = [] 

209 for challenge in challenges: 

210 kh = challenge.get("keyHandle") 

211 if kh is None: 

212 raise exceptions.InvalidValue("keyHandle is None") 

213 key_handle = self._unpadded_urlsafe_b64recode(kh) 

214 allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle)) 

215 

216 extension = AuthenticationExtensionsClientInputs(appid=application_id) 

217 

218 challenge = challenges[0].get("challenge") 

219 if challenge is None: 

220 raise exceptions.InvalidValue("challenge is None") 

221 

222 get_request = GetRequest( 

223 origin=REAUTH_ORIGIN, 

224 rpid=relying_party_id, 

225 challenge=self._unpadded_urlsafe_b64recode(challenge), 

226 timeout_ms=WEBAUTHN_TIMEOUT_MS, 

227 allow_credentials=allow_credentials, 

228 user_verification="required", 

229 extensions=extension, 

230 ) 

231 

232 try: 

233 get_response = webauthn_handler.get(get_request) 

234 except Exception as e: 

235 sys.stderr.write("Webauthn Error: {}.\n".format(e)) 

236 raise e 

237 

238 response = { 

239 "clientData": get_response.response.client_data_json, 

240 "authenticatorData": get_response.response.authenticator_data, 

241 "signatureData": get_response.response.signature, 

242 "applicationId": application_id, 

243 "keyHandle": get_response.id, 

244 "securityKeyReplyType": 2, 

245 } 

246 return {"securityKey": response} 

247 

248 def _unpadded_urlsafe_b64recode(self, s): 

249 """Converts standard b64 encoded string to url safe b64 encoded string 

250 with no padding.""" 

251 b = base64.urlsafe_b64decode(s) 

252 return base64.urlsafe_b64encode(b).decode().rstrip("=") 

253 

254 

255class SamlChallenge(ReauthChallenge): 

256 """Challenge that asks the users to browse to their ID Providers. 

257 

258 Currently SAML challenge is not supported. When obtaining the challenge 

259 input, exception will be raised to instruct the users to run 

260 `gcloud auth login` for reauthentication. 

261 """ 

262 

263 @property 

264 def name(self): 

265 return "SAML" 

266 

267 @property 

268 def is_locally_eligible(self): 

269 return True 

270 

271 def obtain_challenge_input(self, metadata): 

272 # Magic Arch has not fully supported returning a proper dedirect URL 

273 # for programmatic SAML users today. So we error our here and request 

274 # users to use gcloud to complete a login. 

275 raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE) 

276 

277 

278AVAILABLE_CHALLENGES = { 

279 challenge.name: challenge 

280 for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()] 

281}