Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/oauth2cli/authcode.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

164 statements  

1# Note: This docstring is also used by this script's command line help. 

2"""A one-stop helper for desktop app to acquire an authorization code. 

3 

4It starts a web server to listen redirect_uri, waiting for auth code. 

5It optionally opens a browser window to guide a human user to manually login. 

6After obtaining an auth code, the web server will automatically shut down. 

7""" 

8import logging 

9import os 

10import socket 

11import sys 

12from string import Template 

13import threading 

14import time 

15 

16try: # Python 3 

17 from http.server import HTTPServer, BaseHTTPRequestHandler 

18 from urllib.parse import urlparse, parse_qs, urlencode 

19 from html import escape 

20except ImportError: # Fall back to Python 2 

21 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 

22 from urlparse import urlparse, parse_qs 

23 from urllib import urlencode 

24 from cgi import escape 

25 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing 

31 with AuthCodeReceiver(port=listen_port) as receiver: 

32 return receiver.get_auth_response( 

33 auth_uri=auth_uri, 

34 welcome_template="""<html><body> 

35 Open this link to <a href='$auth_uri'>Sign In</a> 

36 (You may want to use incognito window) 

37 <hr><a href='$abort_uri'>Abort</a> 

38 </body></html>""", 

39 ).get("code") 

40 

41 

42def _is_inside_docker(): 

43 try: 

44 with open("/proc/1/cgroup") as f: # https://stackoverflow.com/a/20012536/728675 

45 # Search keyword "/proc/pid/cgroup" in this link for the file format 

46 # https://man7.org/linux/man-pages/man7/cgroups.7.html 

47 for line in f.readlines(): 

48 cgroup_path = line.split(":", 2)[2].strip() 

49 if cgroup_path.strip() != "/": 

50 return True 

51 except IOError: 

52 pass # We are probably not running on Linux 

53 return os.path.exists("/.dockerenv") # Docker on Mac will run this line 

54 

55 

56def is_wsl(): 

57 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364 

58 # Run `uname -a` to get 'release' without python 

59 # - WSL 1: '4.4.0-19041-Microsoft' 

60 # - WSL 2: '4.19.128-microsoft-standard' 

61 import platform 

62 uname = platform.uname() 

63 platform_name = getattr(uname, 'system', uname[0]).lower() 

64 release = getattr(uname, 'release', uname[2]).lower() 

65 return platform_name == 'linux' and 'microsoft' in release 

66 

67 

68def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error 

69 """Browse uri with named browser. Default browser is customizable by $BROWSER""" 

70 import webbrowser # Lazy import. Some distro may not have this. 

71 if browser_name: 

72 browser_opened = webbrowser.get(browser_name).open(auth_uri) 

73 else: 

74 # This one can survive BROWSER=nonexist, while get(None).open(...) can not 

75 browser_opened = webbrowser.open(auth_uri) 

76 

77 # In WSL which doesn't have www-browser, try launching browser with PowerShell 

78 if not browser_opened and is_wsl(): 

79 try: 

80 import subprocess 

81 # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe 

82 # Ampersand (&) should be quoted 

83 exit_code = subprocess.call( 

84 ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(auth_uri)]) 

85 browser_opened = exit_code == 0 

86 except FileNotFoundError: # WSL might be too old 

87 pass 

88 return browser_opened 

89 

90 

91def _qs2kv(qs): 

92 """Flatten parse_qs()'s single-item lists into the item itself""" 

93 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v 

94 for k, v in qs.items()} 

95 

96 

97def _is_html(text): 

98 return text.startswith("<") # Good enough for our purpose 

99 

100 

101def _escape(key_value_pairs): 

102 return {k: escape(v) for k, v in key_value_pairs.items()} 

103 

104 

105def _printify(text): 

106 # If an https request is sent to an http server, the text needs to be repr-ed 

107 return repr(text) if isinstance(text, str) and not text.isprintable() else text 

108 

109 

110class _AuthCodeHandler(BaseHTTPRequestHandler): 

111 def do_GET(self): 

112 # For flexibility, we choose to not check self.path matching redirect_uri 

113 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') 

114 qs = parse_qs(urlparse(self.path).query) 

115 if qs.get('code') or qs.get("error"): # So, it is an auth response 

116 auth_response = _qs2kv(qs) 

117 logger.debug("Got auth response: %s", auth_response) 

118 if self.server.auth_state and self.server.auth_state != auth_response.get("state"): 

119 # OAuth2 successful and error responses contain state when it was used 

120 # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 

121 self._send_full_response("State mismatch") # Possibly an attack 

122 else: 

123 template = (self.server.success_template 

124 if "code" in qs else self.server.error_template) 

125 if _is_html(template.template): 

126 safe_data = _escape(auth_response) # Foiling an XSS attack 

127 else: 

128 safe_data = auth_response 

129 self._send_full_response(template.safe_substitute(**safe_data)) 

130 self.server.auth_response = auth_response # Set it now, after the response is likely sent 

131 else: 

132 self._send_full_response(self.server.welcome_page) 

133 # NOTE: Don't do self.server.shutdown() here. It'll halt the server. 

134 

135 def _send_full_response(self, body, is_ok=True): 

136 self.send_response(200 if is_ok else 400) 

137 content_type = 'text/html' if _is_html(body) else 'text/plain' 

138 self.send_header('Content-type', content_type) 

139 self.end_headers() 

140 self.wfile.write(body.encode("utf-8")) 

141 

142 def log_message(self, format, *args): 

143 # To override the default log-to-stderr behavior 

144 logger.debug(format, *map(_printify, args)) 

145 

146 

147class _AuthCodeHttpServer(HTTPServer, object): 

148 def __init__(self, server_address, *args, **kwargs): 

149 _, port = server_address 

150 if port and (sys.platform == "win32" or is_wsl()): 

151 # The default allow_reuse_address is True. It works fine on non-Windows. 

152 # On Windows, it undesirably allows multiple servers listening on same port, 

153 # yet the second server would not receive any incoming request. 

154 # So, we need to turn it off. 

155 self.allow_reuse_address = False 

156 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs) 

157 

158 def handle_timeout(self): 

159 # It will be triggered when no request comes in self.timeout seconds. 

160 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout 

161 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server 

162 # We choose to not call self.server_close() here, 

163 # because it would cause a socket.error exception in handle_request(), 

164 # and likely end up the server being server_close() twice. 

165 

166 

167class _AuthCodeHttpServer6(_AuthCodeHttpServer): 

168 address_family = socket.AF_INET6 

169 

170 

171class AuthCodeReceiver(object): 

172 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API 

173 def __init__(self, port=None, scheduled_actions=None): 

174 """Create a Receiver waiting for incoming auth response. 

175 

176 :param port: 

177 The local web server will listen at http://...:<port> 

178 You need to use the same port when you register with your app. 

179 If your Identity Provider supports dynamic port, you can use port=0 here. 

180 Port 0 means to use an arbitrary unused port, per this official example: 

181 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins 

182 

183 :param scheduled_actions: 

184 For example, if the input is 

185 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]`` 

186 then the receiver would call that lambda function after 

187 waiting the response for 10 seconds. 

188 """ 

189 address = "0.0.0.0" if _is_inside_docker() else "127.0.0.1" # Hardcode 

190 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3): 

191 # * Clients should listen on the loopback network interface only. 

192 # (It is not recommended to use "" shortcut to bind all addr.) 

193 # * the use of localhost is NOT RECOMMENDED. 

194 # (Use) the loopback IP literal 

195 # rather than localhost avoids inadvertently listening on network 

196 # interfaces other than the loopback interface. 

197 # Note: 

198 # When this server physically listens to a specific IP (as it should), 

199 # you will still be able to specify your redirect_uri using either 

200 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration. 

201 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy 

202 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer 

203 # TODO: But, it would treat "localhost" or "" as IPv4. 

204 # If pressed, we might just expose a family parameter to caller. 

205 self._server = Server((address, port or 0), _AuthCodeHandler) 

206 self._closing = False 

207 

208 def get_port(self): 

209 """The port this server actually listening to""" 

210 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address 

211 return self._server.server_address[1] 

212 

213 def get_auth_response(self, timeout=None, **kwargs): 

214 """Wait and return the auth response. Raise RuntimeError when timeout. 

215 

216 :param str auth_uri: 

217 If provided, this function will try to open a local browser. 

218 :param int timeout: In seconds. None means wait indefinitely. 

219 :param str state: 

220 You may provide the state you used in auth_uri, 

221 then we will use it to validate incoming response. 

222 :param str welcome_template: 

223 If provided, your end user will see it instead of the auth_uri. 

224 When present, it shall be a plaintext or html template following 

225 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_, 

226 and include some of these placeholders: $auth_uri and $abort_uri. 

227 :param str success_template: 

228 The page will be displayed when authentication was largely successful. 

229 Placeholders can be any of these: 

230 https://tools.ietf.org/html/rfc6749#section-5.1 

231 :param str error_template: 

232 The page will be displayed when authentication encountered error. 

233 Placeholders can be any of these: 

234 https://tools.ietf.org/html/rfc6749#section-5.2 

235 :param callable auth_uri_callback: 

236 A function with the shape of lambda auth_uri: ... 

237 When a browser was unable to be launch, this function will be called, 

238 so that the app could tell user to manually visit the auth_uri. 

239 :param str browser_name: 

240 If you did 

241 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))`` 

242 beforehand, you can pass in the name "xyz" to use that browser. 

243 The default value ``None`` means using default browser, 

244 which is customizable by env var $BROWSER. 

245 :return: 

246 The auth response of the first leg of Auth Code flow, 

247 typically {"code": "...", "state": "..."} or {"error": "...", ...} 

248 See https://tools.ietf.org/html/rfc6749#section-4.1.2 

249 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse 

250 Returns None when the state was mismatched, or when timeout occurred. 

251 """ 

252 # Historically, the _get_auth_response() uses HTTPServer.handle_request(), 

253 # because its handle-and-retry logic is conceptually as easy as a while loop. 

254 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works. 

255 # All those are true when running on Linux. 

256 # 

257 # However, the behaviors on Windows turns out to be different. 

258 # A socket server waiting for request would freeze the current thread. 

259 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK. 

260 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc 

261 # 

262 # The solution would need to somehow put the http server into its own thread. 

263 # This could be done by the pattern of ``http.server.test()`` which internally 

264 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7). 

265 # Or create our own thread to wrap the HTTPServer.handle_request() inside. 

266 result = {} # A mutable object to be filled with thread's return value 

267 t = threading.Thread( 

268 target=self._get_auth_response, args=(result,), kwargs=kwargs) 

269 t.daemon = True # So that it won't prevent the main thread from exiting 

270 t.start() 

271 begin = time.time() 

272 while (time.time() - begin < timeout) if timeout else True: 

273 time.sleep(1) # Short detection interval to make happy path responsive 

274 if not t.is_alive(): # Then the thread has finished its job and exited 

275 break 

276 while (self._scheduled_actions 

277 and time.time() - begin > self._scheduled_actions[0][0]): 

278 _, callback = self._scheduled_actions.pop(0) 

279 callback() 

280 return result or None 

281 

282 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, 

283 welcome_template=None, success_template=None, error_template=None, 

284 auth_uri_callback=None, 

285 browser_name=None, 

286 ): 

287 welcome_uri = "http://localhost:{p}".format(p=self.get_port()) 

288 abort_uri = "{loc}?error=abort".format(loc=welcome_uri) 

289 logger.debug("Abort by visit %s", abort_uri) 

290 self._server.welcome_page = Template(welcome_template or "").safe_substitute( 

291 auth_uri=auth_uri, abort_uri=abort_uri) 

292 if auth_uri: # Now attempt to open a local browser to visit it 

293 _uri = welcome_uri if welcome_template else auth_uri 

294 logger.info("Open a browser on this device to visit: %s" % _uri) 

295 browser_opened = False 

296 try: 

297 browser_opened = _browse(_uri, browser_name=browser_name) 

298 except: # Had to use broad except, because the potential 

299 # webbrowser.Error is purposely undefined outside of _browse(). 

300 # Absorb and proceed. Because browser could be manually run elsewhere. 

301 logger.exception("_browse(...) unsuccessful") 

302 if not browser_opened: 

303 if not auth_uri_callback: 

304 logger.warning( 

305 "Found no browser in current environment. " 

306 "If this program is being run inside a container " 

307 "which either (1) has access to host network " 

308 "(i.e. started by `docker run --net=host -it ...`), " 

309 "or (2) published port {port} to host network " 

310 "(i.e. started by `docker run -p 127.0.0.1:{port}:{port} -it ...`), " 

311 "you can use browser on host to visit the following link. " 

312 "Otherwise, this auth attempt would either timeout " 

313 "(current timeout setting is {timeout}) " 

314 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format( 

315 auth_uri=_uri, timeout=timeout, port=self.get_port())) 

316 else: # Then it is the auth_uri_callback()'s job to inform the user 

317 auth_uri_callback(_uri) 

318 

319 self._server.success_template = Template(success_template or 

320 "Authentication completed. You can close this window now.") 

321 self._server.error_template = Template(error_template or 

322 "Authentication failed. $error: $error_description. ($error_uri)") 

323 

324 self._server.timeout = timeout # Otherwise its handle_timeout() won't work 

325 self._server.auth_response = {} # Shared with _AuthCodeHandler 

326 self._server.auth_state = state # So handler will check it before sending response 

327 while not self._closing: # Otherwise, the handle_request() attempt 

328 # would yield noisy ValueError trace 

329 # Derived from 

330 # https://docs.python.org/2/library/basehttpserver.html#more-examples 

331 self._server.handle_request() 

332 if self._server.auth_response: 

333 break 

334 result.update(self._server.auth_response) # Return via writable result param 

335 

336 def close(self): 

337 """Either call this eventually; or use the entire class as context manager""" 

338 self._closing = True 

339 self._server.server_close() 

340 

341 def __enter__(self): 

342 return self 

343 

344 def __exit__(self, exc_type, exc_val, exc_tb): 

345 self.close() 

346 

347# Note: Manually use or test this module by: 

348# python -m path.to.this.file -h 

349if __name__ == '__main__': 

350 import argparse, json 

351 from .oauth2 import Client 

352 logging.basicConfig(level=logging.INFO) 

353 p = parser = argparse.ArgumentParser( 

354 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

355 description=__doc__ + "The auth code received will be shown at stdout.") 

356 p.add_argument( 

357 '--endpoint', help="The auth endpoint for your app.", 

358 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize") 

359 p.add_argument('client_id', help="The client_id of your application") 

360 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri") 

361 p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second") 

362 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri") 

363 p.add_argument('--scope', default=None, help="The scope list") 

364 args = parser.parse_args() 

365 client = Client({"authorization_endpoint": args.endpoint}, args.client_id) 

366 with AuthCodeReceiver(port=args.port) as receiver: 

367 flow = client.initiate_auth_code_flow( 

368 scope=args.scope.split() if args.scope else None, 

369 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()), 

370 ) 

371 print(json.dumps(receiver.get_auth_response( 

372 auth_uri=flow["auth_uri"], 

373 welcome_template= 

374 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a", 

375 error_template="<html>Oh no. $error</html>", 

376 success_template="Oh yeah. Got $code", 

377 timeout=args.timeout, 

378 state=flow["state"], # Optional 

379 ), indent=4))