Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/oauth2cli/authcode.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

202 statements  

1# Note: This docstring is also used by this script's command line help. 

2"""A one-stop helper for desktop app to acquire an authorization code. 

3 

4It starts a web server to listen redirect_uri, waiting for auth code. 

5It optionally opens a browser window to guide a human user to manually login. 

6After obtaining an auth code, the web server will automatically shut down. 

7""" 

8from collections import defaultdict 

9import logging 

10import os 

11import socket 

12import sys 

13from string import Template 

14import threading 

15import time 

16 

17try: # Python 3 

18 from http.server import HTTPServer, BaseHTTPRequestHandler 

19 from urllib.parse import urlparse, parse_qs, urlencode 

20 from html import escape 

21except ImportError: # Fall back to Python 2 

22 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 

23 from urlparse import urlparse, parse_qs 

24 from urllib import urlencode 

25 from cgi import escape 

26 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing 

32 with AuthCodeReceiver(port=listen_port) as receiver: 

33 return receiver.get_auth_response( 

34 auth_uri=auth_uri, 

35 welcome_template="""<html><body> 

36 Open this link to <a href='$auth_uri'>Sign In</a> 

37 (You may want to use incognito window) 

38 <hr><a href='$abort_uri'>Abort</a> 

39 </body></html>""", 

40 ).get("code") 

41 

42 

43def _is_inside_docker(): 

44 try: 

45 with open("/proc/1/cgroup") as f: # https://stackoverflow.com/a/20012536/728675 

46 # Search keyword "/proc/pid/cgroup" in this link for the file format 

47 # https://man7.org/linux/man-pages/man7/cgroups.7.html 

48 for line in f.readlines(): 

49 cgroup_path = line.split(":", 2)[2].strip() 

50 if cgroup_path.strip() != "/": 

51 return True 

52 except IOError: 

53 pass # We are probably not running on Linux 

54 return os.path.exists("/.dockerenv") # Docker on Mac will run this line 

55 

56 

57def is_wsl(): 

58 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364 

59 # Run `uname -a` to get 'release' without python 

60 # - WSL 1: '4.4.0-19041-Microsoft' 

61 # - WSL 2: '4.19.128-microsoft-standard' 

62 import platform 

63 uname = platform.uname() 

64 platform_name = getattr(uname, 'system', uname[0]).lower() 

65 release = getattr(uname, 'release', uname[2]).lower() 

66 return platform_name == 'linux' and 'microsoft' in release 

67 

68 

69def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error 

70 """Browse uri with named browser. Default browser is customizable by $BROWSER""" 

71 try: 

72 parsed_uri = urlparse(auth_uri) 

73 if parsed_uri.scheme not in ("http", "https"): 

74 logger.warning("Invalid URI scheme for browser: %s", parsed_uri.scheme) 

75 return False 

76 except ValueError: 

77 logger.warning("Invalid URI: %s", auth_uri) 

78 return False 

79 if any(c in auth_uri for c in "\n\r\t"): 

80 logger.warning("Invalid characters in URI") 

81 return False 

82 

83 import webbrowser # Lazy import. Some distro may not have this. 

84 if browser_name: 

85 browser_opened = webbrowser.get(browser_name).open(auth_uri) 

86 else: 

87 # This one can survive BROWSER=nonexist, while get(None).open(...) can not 

88 browser_opened = webbrowser.open(auth_uri) 

89 

90 # In WSL which doesn't have www-browser, try launching browser with explorer.exe 

91 if not browser_opened and is_wsl(): 

92 import subprocess 

93 try: # Try wslview first, which is the recommended way on WSL 

94 # https://github.com/wslutilities/wslu 

95 exit_code = subprocess.call(['wslview', auth_uri]) 

96 browser_opened = exit_code == 0 

97 except FileNotFoundError: # wslview might not be installed 

98 pass 

99 if not browser_opened: 

100 try: 

101 # Fallback to explorer.exe as recommended for WSL 

102 # Note: explorer.exe returns 1 on success in some WSL environments 

103 exit_code = subprocess.call(['explorer.exe', auth_uri]) 

104 browser_opened = exit_code in (0, 1) 

105 except FileNotFoundError: 

106 pass 

107 return browser_opened 

108 

109 

110def _qs2kv(qs): 

111 """Flatten parse_qs()'s single-item lists into the item itself""" 

112 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v 

113 for k, v in qs.items()} 

114 

115 

116def _is_html(text): 

117 return text.startswith("<") # Good enough for our purpose 

118 

119 

120def _escape(key_value_pairs): 

121 return {k: escape(v) for k, v in key_value_pairs.items()} 

122 

123def _printify(text): 

124 # If an https request is sent to an http server, the text needs to be repr-ed 

125 return repr(text) if isinstance(text, str) and not text.isprintable() else text 

126 

127class _AuthCodeHandler(BaseHTTPRequestHandler): 

128 def do_GET(self): 

129 qs = parse_qs(urlparse(self.path).query) 

130 welcome_param = qs.get('welcome', [None])[0] 

131 error_param = qs.get('error', [None])[0] 

132 if welcome_param == 'true': # Useful in manual e2e tests 

133 self._send_full_response(self.server.welcome_page) 

134 elif error_param == 'abort': # Useful in manual e2e tests 

135 self._send_full_response("Authentication aborted", is_ok=False) 

136 elif qs: 

137 # GET request with auth code or error - reject for security (form_post only) 

138 self._send_full_response( 

139 "response_mode=query is not supported for authentication responses. " 

140 "This application operates in response_mode=form_post mode only.", 

141 is_ok=False) 

142 else: 

143 # IdP may have error scenarios that result in a parameter-less GET request 

144 self._send_full_response( 

145 "Authentication could not be completed. You can close this window and return to the application.", 

146 is_ok=False) 

147 # NOTE: Don't do self.server.shutdown() here. It'll halt the server. 

148 

149 def do_POST(self): # Handle form_post response where auth code is in body 

150 # For flexibility, we choose to not check self.path matching redirect_uri 

151 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') 

152 content_length = int(self.headers.get('Content-Length', 0)) 

153 post_data = self.rfile.read(content_length).decode('utf-8') 

154 qs = parse_qs(post_data) 

155 if qs.get('code') or qs.get('error'): # So, it is an auth response 

156 self._process_auth_response(_qs2kv(qs)) 

157 else: 

158 self._send_full_response("Invalid POST request", is_ok=False) 

159 # NOTE: Don't do self.server.shutdown() here. It'll halt the server. 

160 

161 def _process_auth_response(self, auth_response): 

162 """Process the auth response from either GET or POST request.""" 

163 logger.debug("Got auth response: %s", auth_response) 

164 if self.server.auth_state and self.server.auth_state != auth_response.get("state"): 

165 # OAuth2 successful and error responses contain state when it was used 

166 # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 

167 self._send_full_response( # Possibly an attack 

168 "State mismatch. Waiting for next response... or you may abort.", is_ok=False) 

169 else: 

170 template = (self.server.success_template 

171 if "code" in auth_response else self.server.error_template) 

172 if _is_html(template.template): 

173 safe_data = _escape(auth_response) # Foiling an XSS attack 

174 else: 

175 safe_data = auth_response 

176 filled_data = defaultdict(str, safe_data) # So that missing keys will be empty string 

177 self._send_full_response(template.safe_substitute(**filled_data)) 

178 self.server.auth_response = auth_response # Set it now, after the response is likely sent 

179 

180 def _send_full_response(self, body, is_ok=True): 

181 self.send_response(200 if is_ok else 400) 

182 content_type = 'text/html' if _is_html(body) else 'text/plain' 

183 self.send_header('Content-type', content_type) 

184 self.end_headers() 

185 self.wfile.write(body.encode("utf-8")) 

186 

187 def log_message(self, format, *args): 

188 # To override the default log-to-stderr behavior 

189 logger.debug(format, *map(_printify, args)) 

190 

191 

192class _AuthCodeHttpServer(HTTPServer, object): 

193 def __init__(self, server_address, *args, **kwargs): 

194 _, port = server_address 

195 if port and (sys.platform == "win32" or is_wsl()): 

196 # The default allow_reuse_address is True. It works fine on non-Windows. 

197 # On Windows, it undesirably allows multiple servers listening on same port, 

198 # yet the second server would not receive any incoming request. 

199 # So, we need to turn it off. 

200 self.allow_reuse_address = False 

201 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs) 

202 

203 def handle_timeout(self): 

204 # It will be triggered when no request comes in self.timeout seconds. 

205 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout 

206 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server 

207 # We choose to not call self.server_close() here, 

208 # because it would cause a socket.error exception in handle_request(), 

209 # and likely end up the server being server_close() twice. 

210 

211 

212class _AuthCodeHttpServer6(_AuthCodeHttpServer): 

213 address_family = socket.AF_INET6 

214 

215 

216class AuthCodeReceiver(object): 

217 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API 

218 def __init__(self, port=None, scheduled_actions=None): 

219 """Create a Receiver waiting for incoming auth response. 

220 

221 :param port: 

222 The local web server will listen at http://...:<port> 

223 You need to use the same port when you register with your app. 

224 If your Identity Provider supports dynamic port, you can use port=0 here. 

225 Port 0 means to use an arbitrary unused port, per this official example: 

226 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins 

227 

228 :param scheduled_actions: 

229 For example, if the input is 

230 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]`` 

231 then the receiver would call that lambda function after 

232 waiting the response for 10 seconds. 

233 """ 

234 address = "0.0.0.0" if _is_inside_docker() else "127.0.0.1" # Hardcode 

235 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3): 

236 # * Clients should listen on the loopback network interface only. 

237 # (It is not recommended to use "" shortcut to bind all addr.) 

238 # * the use of localhost is NOT RECOMMENDED. 

239 # (Use) the loopback IP literal 

240 # rather than localhost avoids inadvertently listening on network 

241 # interfaces other than the loopback interface. 

242 # Note: 

243 # When this server physically listens to a specific IP (as it should), 

244 # you will still be able to specify your redirect_uri using either 

245 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration. 

246 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy 

247 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer 

248 # TODO: But, it would treat "localhost" or "" as IPv4. 

249 # If pressed, we might just expose a family parameter to caller. 

250 self._server = Server((address, port or 0), _AuthCodeHandler) 

251 self._closing = False 

252 

253 def get_port(self): 

254 """The port this server actually listening to""" 

255 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address 

256 return self._server.server_address[1] 

257 

258 def get_auth_response(self, timeout=None, **kwargs): 

259 """Wait and return the auth response. Raise RuntimeError when timeout. 

260 

261 :param str auth_uri: 

262 If provided, this function will try to open a local browser. 

263 Starting from 2026, the built-in http server will require response_mode=form_post. 

264 :param int timeout: In seconds. None means wait indefinitely. 

265 :param str state: 

266 You may provide the state you used in auth_uri, 

267 then we will use it to validate incoming response. 

268 :param str welcome_template: 

269 If provided, your end user will see it instead of the auth_uri. 

270 When present, it shall be a plaintext or html template following 

271 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_, 

272 and include some of these placeholders: $auth_uri and $abort_uri. 

273 :param str success_template: 

274 The page will be displayed when authentication was largely successful. 

275 Placeholders can be any of these: 

276 https://tools.ietf.org/html/rfc6749#section-5.1 

277 :param str error_template: 

278 The page will be displayed when authentication encountered error. 

279 Placeholders can be any of these: 

280 https://tools.ietf.org/html/rfc6749#section-5.2 

281 :param callable auth_uri_callback: 

282 A function with the shape of lambda auth_uri: ... 

283 When a browser was unable to be launch, this function will be called, 

284 so that the app could tell user to manually visit the auth_uri. 

285 :param str browser_name: 

286 If you did 

287 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))`` 

288 beforehand, you can pass in the name "xyz" to use that browser. 

289 The default value ``None`` means using default browser, 

290 which is customizable by env var $BROWSER. 

291 :return: 

292 The auth response of the first leg of Auth Code flow, 

293 typically {"code": "...", "state": "..."} or {"error": "...", ...} 

294 See https://tools.ietf.org/html/rfc6749#section-4.1.2 

295 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse 

296 Returns None when the state was mismatched, or when timeout occurred. 

297 """ 

298 # Historically, the _get_auth_response() uses HTTPServer.handle_request(), 

299 # because its handle-and-retry logic is conceptually as easy as a while loop. 

300 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works. 

301 # All those are true when running on Linux. 

302 # 

303 # However, the behaviors on Windows turns out to be different. 

304 # A socket server waiting for request would freeze the current thread. 

305 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK. 

306 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc 

307 # 

308 # The solution would need to somehow put the http server into its own thread. 

309 # This could be done by the pattern of ``http.server.test()`` which internally 

310 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7). 

311 # Or create our own thread to wrap the HTTPServer.handle_request() inside. 

312 result = {} # A mutable object to be filled with thread's return value 

313 t = threading.Thread( 

314 target=self._get_auth_response, args=(result,), kwargs=kwargs) 

315 t.daemon = True # So that it won't prevent the main thread from exiting 

316 t.start() 

317 begin = time.time() 

318 while (time.time() - begin < timeout) if timeout else True: 

319 time.sleep(1) # Short detection interval to make happy path responsive 

320 if not t.is_alive(): # Then the thread has finished its job and exited 

321 break 

322 while (self._scheduled_actions 

323 and time.time() - begin > self._scheduled_actions[0][0]): 

324 _, callback = self._scheduled_actions.pop(0) 

325 callback() 

326 return result or None 

327 

328 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, 

329 welcome_template=None, success_template=None, error_template=None, 

330 auth_uri_callback=None, 

331 browser_name=None, 

332 ): 

333 netloc = "http://localhost:{p}".format(p=self.get_port()) 

334 abort_uri = "{loc}?error=abort".format(loc=netloc) 

335 logger.debug("Abort by visit %s", abort_uri) 

336 

337 if auth_uri: 

338 # Note to maintainers: 

339 # Do not enforce response_mode=form_post by secretly hardcoding it here. 

340 # Just validate it here, so we won't surprise caller by changing their auth_uri behind the scene. 

341 params = parse_qs(urlparse(auth_uri).query) 

342 assert params.get('response_mode', [None])[0] == 'form_post', ( 

343 "The built-in http server supports HTTP POST only. " 

344 "The auth_uri must be built with response_mode=form_post") 

345 

346 self._server.welcome_page = Template(welcome_template or "").safe_substitute( 

347 auth_uri=auth_uri, abort_uri=abort_uri) 

348 if auth_uri: # Now attempt to open a local browser to visit it 

349 _uri = (netloc + "?welcome=true") if welcome_template else auth_uri 

350 logger.info("Open a browser on this device to visit: %s" % _uri) 

351 browser_opened = False 

352 try: 

353 browser_opened = _browse(_uri, browser_name=browser_name) 

354 except: # Had to use broad except, because the potential 

355 # webbrowser.Error is purposely undefined outside of _browse(). 

356 # Absorb and proceed. Because browser could be manually run elsewhere. 

357 logger.exception("_browse(...) unsuccessful") 

358 if not browser_opened: 

359 if not auth_uri_callback: 

360 logger.warning( 

361 "Found no browser in current environment. " 

362 "If this program is being run inside a container " 

363 "which either (1) has access to host network " 

364 "(i.e. started by `docker run --net=host -it ...`), " 

365 "or (2) published port {port} to host network " 

366 "(i.e. started by `docker run -p 127.0.0.1:{port}:{port} -it ...`), " 

367 "you can use browser on host to visit the following link. " 

368 "Otherwise, this auth attempt would either timeout " 

369 "(current timeout setting is {timeout}) " 

370 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format( 

371 auth_uri=_uri, timeout=timeout, port=self.get_port())) 

372 else: # Then it is the auth_uri_callback()'s job to inform the user 

373 auth_uri_callback(_uri) 

374 

375 recommendation = "For your security: Do not share the contents of this page, the address bar, or take screenshots." # From MSRC 

376 self._server.success_template = Template(success_template or 

377 "Authentication complete. You can return to the application. Please close this browser tab.\n\n" + recommendation) 

378 self._server.error_template = Template(error_template or 

379 # Do NOT invent new placeholders in this template. Just use standard keys defined in OAuth2 RFC. 

380 # Otherwise there is no obvious canonical way for caller to know what placeholders are supported. 

381 # Besides, we have been using these standard keys for years. Changing now would break backward compatibility. 

382 "Authentication failed. $error: $error_description. ($error_uri).\n\n" + recommendation) 

383 

384 self._server.timeout = timeout # Otherwise its handle_timeout() won't work 

385 self._server.auth_response = {} # Shared with _AuthCodeHandler 

386 self._server.auth_state = state # So handler will check it before sending response 

387 while not self._closing: # Otherwise, the handle_request() attempt 

388 # would yield noisy ValueError trace 

389 # Derived from 

390 # https://docs.python.org/2/library/basehttpserver.html#more-examples 

391 self._server.handle_request() 

392 if self._server.auth_response: 

393 break 

394 result.update(self._server.auth_response) # Return via writable result param 

395 

396 def close(self): 

397 """Either call this eventually; or use the entire class as context manager""" 

398 self._closing = True 

399 self._server.server_close() 

400 

401 def __enter__(self): 

402 return self 

403 

404 def __exit__(self, exc_type, exc_val, exc_tb): 

405 self.close() 

406 

407# Note: Manually use or test this module by: 

408# python -m path.to.this.file -h 

409if __name__ == '__main__': 

410 import argparse, json 

411 from .oauth2 import Client 

412 logging.basicConfig(level=logging.INFO) 

413 p = parser = argparse.ArgumentParser( 

414 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

415 description=__doc__ + "The auth code received will be shown at stdout.") 

416 p.add_argument( 

417 '--endpoint', help="The auth endpoint for your app.", 

418 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize") 

419 p.add_argument('client_id', help="The client_id of your application") 

420 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri") 

421 p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second") 

422 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri") 

423 p.add_argument('--scope', default=None, help="The scope list") 

424 args = parser.parse_args() 

425 client = Client({"authorization_endpoint": args.endpoint}, args.client_id) 

426 with AuthCodeReceiver(port=args.port) as receiver: 

427 flow = client.initiate_auth_code_flow( 

428 scope=args.scope.split() if args.scope else None, 

429 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()), 

430 ) 

431 print(json.dumps(receiver.get_auth_response( 

432 auth_uri=flow["auth_uri"], 

433 welcome_template= 

434 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a>", 

435 error_template="<html>Oh no. $error</html>", 

436 success_template="Oh yeah. Got $code", 

437 timeout=args.timeout, 

438 state=flow["state"], # Optional 

439 ), indent=4))