Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/oauth2cli/authcode.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1# Note: This docstring is also used by this script's command line help. 

2"""A one-stop helper for desktop app to acquire an authorization code. 

3 

4It starts a web server to listen redirect_uri, waiting for auth code. 

5It optionally opens a browser window to guide a human user to manually login. 

6After obtaining an auth code, the web server will automatically shut down. 

7""" 

8from collections import defaultdict 

9import logging 

10import os 

11import socket 

12import sys 

13from string import Template 

14import threading 

15import time 

16 

17try: # Python 3 

18 from http.server import HTTPServer, BaseHTTPRequestHandler 

19 from urllib.parse import urlparse, parse_qs, urlencode 

20 from html import escape 

21except ImportError: # Fall back to Python 2 

22 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 

23 from urlparse import urlparse, parse_qs 

24 from urllib import urlencode 

25 from cgi import escape 

26 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing 

32 with AuthCodeReceiver(port=listen_port) as receiver: 

33 return receiver.get_auth_response( 

34 auth_uri=auth_uri, 

35 welcome_template="""<html><body> 

36 Open this link to <a href='$auth_uri'>Sign In</a> 

37 (You may want to use incognito window) 

38 <hr><a href='$abort_uri'>Abort</a> 

39 </body></html>""", 

40 ).get("code") 

41 

42 

43def _is_inside_docker(): 

44 try: 

45 with open("/proc/1/cgroup") as f: # https://stackoverflow.com/a/20012536/728675 

46 # Search keyword "/proc/pid/cgroup" in this link for the file format 

47 # https://man7.org/linux/man-pages/man7/cgroups.7.html 

48 for line in f.readlines(): 

49 cgroup_path = line.split(":", 2)[2].strip() 

50 if cgroup_path.strip() != "/": 

51 return True 

52 except IOError: 

53 pass # We are probably not running on Linux 

54 return os.path.exists("/.dockerenv") # Docker on Mac will run this line 

55 

56 

57def is_wsl(): 

58 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364 

59 # Run `uname -a` to get 'release' without python 

60 # - WSL 1: '4.4.0-19041-Microsoft' 

61 # - WSL 2: '4.19.128-microsoft-standard' 

62 import platform 

63 uname = platform.uname() 

64 platform_name = getattr(uname, 'system', uname[0]).lower() 

65 release = getattr(uname, 'release', uname[2]).lower() 

66 return platform_name == 'linux' and 'microsoft' in release 

67 

68 

69def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error 

70 """Browse uri with named browser. Default browser is customizable by $BROWSER""" 

71 import webbrowser # Lazy import. Some distro may not have this. 

72 if browser_name: 

73 browser_opened = webbrowser.get(browser_name).open(auth_uri) 

74 else: 

75 # This one can survive BROWSER=nonexist, while get(None).open(...) can not 

76 browser_opened = webbrowser.open(auth_uri) 

77 

78 # In WSL which doesn't have www-browser, try launching browser with PowerShell 

79 if not browser_opened and is_wsl(): 

80 try: 

81 import subprocess 

82 # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe 

83 # Ampersand (&) should be quoted 

84 exit_code = subprocess.call( 

85 ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(auth_uri)]) 

86 browser_opened = exit_code == 0 

87 except FileNotFoundError: # WSL might be too old 

88 pass 

89 return browser_opened 

90 

91 

92def _qs2kv(qs): 

93 """Flatten parse_qs()'s single-item lists into the item itself""" 

94 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v 

95 for k, v in qs.items()} 

96 

97 

98def _is_html(text): 

99 return text.startswith("<") # Good enough for our purpose 

100 

101 

102def _escape(key_value_pairs): 

103 return {k: escape(v) for k, v in key_value_pairs.items()} 

104 

105 

106def _printify(text): 

107 # If an https request is sent to an http server, the text needs to be repr-ed 

108 return repr(text) if isinstance(text, str) and not text.isprintable() else text 

109 

110 

111class _AuthCodeHandler(BaseHTTPRequestHandler): 

112 def do_GET(self): 

113 qs = parse_qs(urlparse(self.path).query) 

114 welcome_param = qs.get('welcome', [None])[0] 

115 error_param = qs.get('error', [None])[0] 

116 if welcome_param == 'true': # Useful in manual e2e tests 

117 self._send_full_response(self.server.welcome_page) 

118 elif error_param == 'abort': # Useful in manual e2e tests 

119 self._send_full_response("Authentication aborted", is_ok=False) 

120 elif qs: 

121 # GET request with auth code or error - reject for security (form_post only) 

122 self._send_full_response( 

123 "response_mode=query is not supported for authentication responses. " 

124 "This application operates in response_mode=form_post mode only.", 

125 is_ok=False) 

126 else: 

127 # IdP may have error scenarios that result in a parameter-less GET request 

128 self._send_full_response( 

129 "Authentication could not be completed. You can close this window and return to the application.", 

130 is_ok=False) 

131 # NOTE: Don't do self.server.shutdown() here. It'll halt the server. 

132 

133 def do_POST(self): # Handle form_post response where auth code is in body 

134 # For flexibility, we choose to not check self.path matching redirect_uri 

135 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') 

136 content_length = int(self.headers.get('Content-Length', 0)) 

137 post_data = self.rfile.read(content_length).decode('utf-8') 

138 qs = parse_qs(post_data) 

139 if qs.get('code') or qs.get('error'): # So, it is an auth response 

140 self._process_auth_response(_qs2kv(qs)) 

141 else: 

142 self._send_full_response("Invalid POST request", is_ok=False) 

143 # NOTE: Don't do self.server.shutdown() here. It'll halt the server. 

144 

145 def _process_auth_response(self, auth_response): 

146 """Process the auth response from either GET or POST request.""" 

147 logger.debug("Got auth response: %s", auth_response) 

148 if self.server.auth_state and self.server.auth_state != auth_response.get("state"): 

149 # OAuth2 successful and error responses contain state when it was used 

150 # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 

151 self._send_full_response( # Possibly an attack 

152 "State mismatch. Waiting for next response... or you may abort.", is_ok=False) 

153 else: 

154 template = (self.server.success_template 

155 if "code" in auth_response else self.server.error_template) 

156 if _is_html(template.template): 

157 safe_data = _escape(auth_response) # Foiling an XSS attack 

158 else: 

159 safe_data = auth_response 

160 filled_data = defaultdict(str, safe_data) # So that missing keys will be empty string 

161 self._send_full_response(template.safe_substitute(**filled_data)) 

162 self.server.auth_response = auth_response # Set it now, after the response is likely sent 

163 

164 def _send_full_response(self, body, is_ok=True): 

165 self.send_response(200 if is_ok else 400) 

166 content_type = 'text/html' if _is_html(body) else 'text/plain' 

167 self.send_header('Content-type', content_type) 

168 self.end_headers() 

169 self.wfile.write(body.encode("utf-8")) 

170 

171 def log_message(self, format, *args): 

172 # To override the default log-to-stderr behavior 

173 logger.debug(format, *map(_printify, args)) 

174 

175 

176class _AuthCodeHttpServer(HTTPServer, object): 

177 def __init__(self, server_address, *args, **kwargs): 

178 _, port = server_address 

179 if port and (sys.platform == "win32" or is_wsl()): 

180 # The default allow_reuse_address is True. It works fine on non-Windows. 

181 # On Windows, it undesirably allows multiple servers listening on same port, 

182 # yet the second server would not receive any incoming request. 

183 # So, we need to turn it off. 

184 self.allow_reuse_address = False 

185 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs) 

186 

187 def handle_timeout(self): 

188 # It will be triggered when no request comes in self.timeout seconds. 

189 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout 

190 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server 

191 # We choose to not call self.server_close() here, 

192 # because it would cause a socket.error exception in handle_request(), 

193 # and likely end up the server being server_close() twice. 

194 

195 

196class _AuthCodeHttpServer6(_AuthCodeHttpServer): 

197 address_family = socket.AF_INET6 

198 

199 

200class AuthCodeReceiver(object): 

201 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API 

202 def __init__(self, port=None, scheduled_actions=None): 

203 """Create a Receiver waiting for incoming auth response. 

204 

205 :param port: 

206 The local web server will listen at http://...:<port> 

207 You need to use the same port when you register with your app. 

208 If your Identity Provider supports dynamic port, you can use port=0 here. 

209 Port 0 means to use an arbitrary unused port, per this official example: 

210 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins 

211 

212 :param scheduled_actions: 

213 For example, if the input is 

214 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]`` 

215 then the receiver would call that lambda function after 

216 waiting the response for 10 seconds. 

217 """ 

218 address = "0.0.0.0" if _is_inside_docker() else "127.0.0.1" # Hardcode 

219 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3): 

220 # * Clients should listen on the loopback network interface only. 

221 # (It is not recommended to use "" shortcut to bind all addr.) 

222 # * the use of localhost is NOT RECOMMENDED. 

223 # (Use) the loopback IP literal 

224 # rather than localhost avoids inadvertently listening on network 

225 # interfaces other than the loopback interface. 

226 # Note: 

227 # When this server physically listens to a specific IP (as it should), 

228 # you will still be able to specify your redirect_uri using either 

229 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration. 

230 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy 

231 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer 

232 # TODO: But, it would treat "localhost" or "" as IPv4. 

233 # If pressed, we might just expose a family parameter to caller. 

234 self._server = Server((address, port or 0), _AuthCodeHandler) 

235 self._closing = False 

236 

237 def get_port(self): 

238 """The port this server actually listening to""" 

239 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address 

240 return self._server.server_address[1] 

241 

242 def get_auth_response(self, timeout=None, **kwargs): 

243 """Wait and return the auth response. Raise RuntimeError when timeout. 

244 

245 :param str auth_uri: 

246 If provided, this function will try to open a local browser. 

247 Starting from 2026, the built-in http server will require response_mode=form_post. 

248 :param int timeout: In seconds. None means wait indefinitely. 

249 :param str state: 

250 You may provide the state you used in auth_uri, 

251 then we will use it to validate incoming response. 

252 :param str welcome_template: 

253 If provided, your end user will see it instead of the auth_uri. 

254 When present, it shall be a plaintext or html template following 

255 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_, 

256 and include some of these placeholders: $auth_uri and $abort_uri. 

257 :param str success_template: 

258 The page will be displayed when authentication was largely successful. 

259 Placeholders can be any of these: 

260 https://tools.ietf.org/html/rfc6749#section-5.1 

261 :param str error_template: 

262 The page will be displayed when authentication encountered error. 

263 Placeholders can be any of these: 

264 https://tools.ietf.org/html/rfc6749#section-5.2 

265 :param callable auth_uri_callback: 

266 A function with the shape of lambda auth_uri: ... 

267 When a browser was unable to be launch, this function will be called, 

268 so that the app could tell user to manually visit the auth_uri. 

269 :param str browser_name: 

270 If you did 

271 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))`` 

272 beforehand, you can pass in the name "xyz" to use that browser. 

273 The default value ``None`` means using default browser, 

274 which is customizable by env var $BROWSER. 

275 :return: 

276 The auth response of the first leg of Auth Code flow, 

277 typically {"code": "...", "state": "..."} or {"error": "...", ...} 

278 See https://tools.ietf.org/html/rfc6749#section-4.1.2 

279 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse 

280 Returns None when the state was mismatched, or when timeout occurred. 

281 """ 

282 # Historically, the _get_auth_response() uses HTTPServer.handle_request(), 

283 # because its handle-and-retry logic is conceptually as easy as a while loop. 

284 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works. 

285 # All those are true when running on Linux. 

286 # 

287 # However, the behaviors on Windows turns out to be different. 

288 # A socket server waiting for request would freeze the current thread. 

289 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK. 

290 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc 

291 # 

292 # The solution would need to somehow put the http server into its own thread. 

293 # This could be done by the pattern of ``http.server.test()`` which internally 

294 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7). 

295 # Or create our own thread to wrap the HTTPServer.handle_request() inside. 

296 result = {} # A mutable object to be filled with thread's return value 

297 t = threading.Thread( 

298 target=self._get_auth_response, args=(result,), kwargs=kwargs) 

299 t.daemon = True # So that it won't prevent the main thread from exiting 

300 t.start() 

301 begin = time.time() 

302 while (time.time() - begin < timeout) if timeout else True: 

303 time.sleep(1) # Short detection interval to make happy path responsive 

304 if not t.is_alive(): # Then the thread has finished its job and exited 

305 break 

306 while (self._scheduled_actions 

307 and time.time() - begin > self._scheduled_actions[0][0]): 

308 _, callback = self._scheduled_actions.pop(0) 

309 callback() 

310 return result or None 

311 

312 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, 

313 welcome_template=None, success_template=None, error_template=None, 

314 auth_uri_callback=None, 

315 browser_name=None, 

316 ): 

317 netloc = "http://localhost:{p}".format(p=self.get_port()) 

318 abort_uri = "{loc}?error=abort".format(loc=netloc) 

319 logger.debug("Abort by visit %s", abort_uri) 

320 

321 if auth_uri: 

322 # Note to maintainers: 

323 # Do not enforce response_mode=form_post by secretly hardcoding it here. 

324 # Just validate it here, so we won't surprise caller by changing their auth_uri behind the scene. 

325 params = parse_qs(urlparse(auth_uri).query) 

326 assert params.get('response_mode', [None])[0] == 'form_post', ( 

327 "The built-in http server supports HTTP POST only. " 

328 "The auth_uri must be built with response_mode=form_post") 

329 

330 self._server.welcome_page = Template(welcome_template or "").safe_substitute( 

331 auth_uri=auth_uri, abort_uri=abort_uri) 

332 if auth_uri: # Now attempt to open a local browser to visit it 

333 _uri = (netloc + "?welcome=true") if welcome_template else auth_uri 

334 logger.info("Open a browser on this device to visit: %s" % _uri) 

335 browser_opened = False 

336 try: 

337 browser_opened = _browse(_uri, browser_name=browser_name) 

338 except: # Had to use broad except, because the potential 

339 # webbrowser.Error is purposely undefined outside of _browse(). 

340 # Absorb and proceed. Because browser could be manually run elsewhere. 

341 logger.exception("_browse(...) unsuccessful") 

342 if not browser_opened: 

343 if not auth_uri_callback: 

344 logger.warning( 

345 "Found no browser in current environment. " 

346 "If this program is being run inside a container " 

347 "which either (1) has access to host network " 

348 "(i.e. started by `docker run --net=host -it ...`), " 

349 "or (2) published port {port} to host network " 

350 "(i.e. started by `docker run -p 127.0.0.1:{port}:{port} -it ...`), " 

351 "you can use browser on host to visit the following link. " 

352 "Otherwise, this auth attempt would either timeout " 

353 "(current timeout setting is {timeout}) " 

354 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format( 

355 auth_uri=_uri, timeout=timeout, port=self.get_port())) 

356 else: # Then it is the auth_uri_callback()'s job to inform the user 

357 auth_uri_callback(_uri) 

358 

359 recommendation = "For your security: Do not share the contents of this page, the address bar, or take screenshots." # From MSRC 

360 self._server.success_template = Template(success_template or 

361 "Authentication complete. You can return to the application. Please close this browser tab.\n\n" + recommendation) 

362 self._server.error_template = Template(error_template or 

363 # Do NOT invent new placeholders in this template. Just use standard keys defined in OAuth2 RFC. 

364 # Otherwise there is no obvious canonical way for caller to know what placeholders are supported. 

365 # Besides, we have been using these standard keys for years. Changing now would break backward compatibility. 

366 "Authentication failed. $error: $error_description. ($error_uri).\n\n" + recommendation) 

367 

368 self._server.timeout = timeout # Otherwise its handle_timeout() won't work 

369 self._server.auth_response = {} # Shared with _AuthCodeHandler 

370 self._server.auth_state = state # So handler will check it before sending response 

371 while not self._closing: # Otherwise, the handle_request() attempt 

372 # would yield noisy ValueError trace 

373 # Derived from 

374 # https://docs.python.org/2/library/basehttpserver.html#more-examples 

375 self._server.handle_request() 

376 if self._server.auth_response: 

377 break 

378 result.update(self._server.auth_response) # Return via writable result param 

379 

380 def close(self): 

381 """Either call this eventually; or use the entire class as context manager""" 

382 self._closing = True 

383 self._server.server_close() 

384 

385 def __enter__(self): 

386 return self 

387 

388 def __exit__(self, exc_type, exc_val, exc_tb): 

389 self.close() 

390 

391# Note: Manually use or test this module by: 

392# python -m path.to.this.file -h 

393if __name__ == '__main__': 

394 import argparse, json 

395 from .oauth2 import Client 

396 logging.basicConfig(level=logging.INFO) 

397 p = parser = argparse.ArgumentParser( 

398 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

399 description=__doc__ + "The auth code received will be shown at stdout.") 

400 p.add_argument( 

401 '--endpoint', help="The auth endpoint for your app.", 

402 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize") 

403 p.add_argument('client_id', help="The client_id of your application") 

404 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri") 

405 p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second") 

406 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri") 

407 p.add_argument('--scope', default=None, help="The scope list") 

408 args = parser.parse_args() 

409 client = Client({"authorization_endpoint": args.endpoint}, args.client_id) 

410 with AuthCodeReceiver(port=args.port) as receiver: 

411 flow = client.initiate_auth_code_flow( 

412 scope=args.scope.split() if args.scope else None, 

413 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()), 

414 ) 

415 print(json.dumps(receiver.get_auth_response( 

416 auth_uri=flow["auth_uri"], 

417 welcome_template= 

418 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a>", 

419 error_template="<html>Oh no. $error</html>", 

420 success_template="Oh yeah. Got $code", 

421 timeout=args.timeout, 

422 state=flow["state"], # Optional 

423 ), indent=4))