1# Note: This docstring is also used by this script's command line help.
2"""A one-stop helper for desktop app to acquire an authorization code.
3
4It starts a web server to listen redirect_uri, waiting for auth code.
5It optionally opens a browser window to guide a human user to manually login.
6After obtaining an auth code, the web server will automatically shut down.
7"""
8from collections import defaultdict
9import logging
10import os
11import socket
12import sys
13from string import Template
14import threading
15import time
16
17try: # Python 3
18 from http.server import HTTPServer, BaseHTTPRequestHandler
19 from urllib.parse import urlparse, parse_qs, urlencode
20 from html import escape
21except ImportError: # Fall back to Python 2
22 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
23 from urlparse import urlparse, parse_qs
24 from urllib import urlencode
25 from cgi import escape
26
27
28logger = logging.getLogger(__name__)
29
30
31def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing
32 with AuthCodeReceiver(port=listen_port) as receiver:
33 return receiver.get_auth_response(
34 auth_uri=auth_uri,
35 welcome_template="""<html><body>
36 Open this link to <a href='$auth_uri'>Sign In</a>
37 (You may want to use incognito window)
38 <hr><a href='$abort_uri'>Abort</a>
39 </body></html>""",
40 ).get("code")
41
42
43def _is_inside_docker():
44 try:
45 with open("/proc/1/cgroup") as f: # https://stackoverflow.com/a/20012536/728675
46 # Search keyword "/proc/pid/cgroup" in this link for the file format
47 # https://man7.org/linux/man-pages/man7/cgroups.7.html
48 for line in f.readlines():
49 cgroup_path = line.split(":", 2)[2].strip()
50 if cgroup_path.strip() != "/":
51 return True
52 except IOError:
53 pass # We are probably not running on Linux
54 return os.path.exists("/.dockerenv") # Docker on Mac will run this line
55
56
57def is_wsl():
58 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
59 # Run `uname -a` to get 'release' without python
60 # - WSL 1: '4.4.0-19041-Microsoft'
61 # - WSL 2: '4.19.128-microsoft-standard'
62 import platform
63 uname = platform.uname()
64 platform_name = getattr(uname, 'system', uname[0]).lower()
65 release = getattr(uname, 'release', uname[2]).lower()
66 return platform_name == 'linux' and 'microsoft' in release
67
68
69def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error
70 """Browse uri with named browser. Default browser is customizable by $BROWSER"""
71 try:
72 parsed_uri = urlparse(auth_uri)
73 if parsed_uri.scheme not in ("http", "https"):
74 logger.warning("Invalid URI scheme for browser: %s", parsed_uri.scheme)
75 return False
76 except ValueError:
77 logger.warning("Invalid URI: %s", auth_uri)
78 return False
79 if any(c in auth_uri for c in "\n\r\t"):
80 logger.warning("Invalid characters in URI")
81 return False
82
83 import webbrowser # Lazy import. Some distro may not have this.
84 if browser_name:
85 browser_opened = webbrowser.get(browser_name).open(auth_uri)
86 else:
87 # This one can survive BROWSER=nonexist, while get(None).open(...) can not
88 browser_opened = webbrowser.open(auth_uri)
89
90 # In WSL which doesn't have www-browser, try launching browser with explorer.exe
91 if not browser_opened and is_wsl():
92 import subprocess
93 try: # Try wslview first, which is the recommended way on WSL
94 # https://github.com/wslutilities/wslu
95 exit_code = subprocess.call(['wslview', auth_uri])
96 browser_opened = exit_code == 0
97 except FileNotFoundError: # wslview might not be installed
98 pass
99 if not browser_opened:
100 try:
101 # Fallback to explorer.exe as recommended for WSL
102 # Note: explorer.exe returns 1 on success in some WSL environments
103 exit_code = subprocess.call(['explorer.exe', auth_uri])
104 browser_opened = exit_code in (0, 1)
105 except FileNotFoundError:
106 pass
107 return browser_opened
108
109
110def _qs2kv(qs):
111 """Flatten parse_qs()'s single-item lists into the item itself"""
112 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v
113 for k, v in qs.items()}
114
115
116def _is_html(text):
117 return text.startswith("<") # Good enough for our purpose
118
119
120def _escape(key_value_pairs):
121 return {k: escape(v) for k, v in key_value_pairs.items()}
122
123def _printify(text):
124 # If an https request is sent to an http server, the text needs to be repr-ed
125 return repr(text) if isinstance(text, str) and not text.isprintable() else text
126
127class _AuthCodeHandler(BaseHTTPRequestHandler):
128 def do_GET(self):
129 qs = parse_qs(urlparse(self.path).query)
130 welcome_param = qs.get('welcome', [None])[0]
131 error_param = qs.get('error', [None])[0]
132 if welcome_param == 'true': # Useful in manual e2e tests
133 self._send_full_response(self.server.welcome_page)
134 elif error_param == 'abort': # Useful in manual e2e tests
135 self._send_full_response("Authentication aborted", is_ok=False)
136 elif qs:
137 # GET request with auth code or error - reject for security (form_post only)
138 self._send_full_response(
139 "response_mode=query is not supported for authentication responses. "
140 "This application operates in response_mode=form_post mode only.",
141 is_ok=False)
142 else:
143 # IdP may have error scenarios that result in a parameter-less GET request
144 self._send_full_response(
145 "Authentication could not be completed. You can close this window and return to the application.",
146 is_ok=False)
147 # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
148
149 def do_POST(self): # Handle form_post response where auth code is in body
150 # For flexibility, we choose to not check self.path matching redirect_uri
151 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
152 content_length = int(self.headers.get('Content-Length', 0))
153 post_data = self.rfile.read(content_length).decode('utf-8')
154 qs = parse_qs(post_data)
155 if qs.get('code') or qs.get('error'): # So, it is an auth response
156 self._process_auth_response(_qs2kv(qs))
157 else:
158 self._send_full_response("Invalid POST request", is_ok=False)
159 # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
160
161 def _process_auth_response(self, auth_response):
162 """Process the auth response from either GET or POST request."""
163 logger.debug("Got auth response: %s", auth_response)
164 if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
165 # OAuth2 successful and error responses contain state when it was used
166 # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
167 self._send_full_response( # Possibly an attack
168 "State mismatch. Waiting for next response... or you may abort.", is_ok=False)
169 else:
170 template = (self.server.success_template
171 if "code" in auth_response else self.server.error_template)
172 if _is_html(template.template):
173 safe_data = _escape(auth_response) # Foiling an XSS attack
174 else:
175 safe_data = auth_response
176 filled_data = defaultdict(str, safe_data) # So that missing keys will be empty string
177 self._send_full_response(template.safe_substitute(**filled_data))
178 self.server.auth_response = auth_response # Set it now, after the response is likely sent
179
180 def _send_full_response(self, body, is_ok=True):
181 self.send_response(200 if is_ok else 400)
182 content_type = 'text/html' if _is_html(body) else 'text/plain'
183 self.send_header('Content-type', content_type)
184 self.end_headers()
185 self.wfile.write(body.encode("utf-8"))
186
187 def log_message(self, format, *args):
188 # To override the default log-to-stderr behavior
189 logger.debug(format, *map(_printify, args))
190
191
192class _AuthCodeHttpServer(HTTPServer, object):
193 def __init__(self, server_address, *args, **kwargs):
194 _, port = server_address
195 if port and (sys.platform == "win32" or is_wsl()):
196 # The default allow_reuse_address is True. It works fine on non-Windows.
197 # On Windows, it undesirably allows multiple servers listening on same port,
198 # yet the second server would not receive any incoming request.
199 # So, we need to turn it off.
200 self.allow_reuse_address = False
201 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs)
202
203 def handle_timeout(self):
204 # It will be triggered when no request comes in self.timeout seconds.
205 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
206 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server
207 # We choose to not call self.server_close() here,
208 # because it would cause a socket.error exception in handle_request(),
209 # and likely end up the server being server_close() twice.
210
211
212class _AuthCodeHttpServer6(_AuthCodeHttpServer):
213 address_family = socket.AF_INET6
214
215
216class AuthCodeReceiver(object):
217 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
218 def __init__(self, port=None, scheduled_actions=None):
219 """Create a Receiver waiting for incoming auth response.
220
221 :param port:
222 The local web server will listen at http://...:<port>
223 You need to use the same port when you register with your app.
224 If your Identity Provider supports dynamic port, you can use port=0 here.
225 Port 0 means to use an arbitrary unused port, per this official example:
226 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins
227
228 :param scheduled_actions:
229 For example, if the input is
230 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]``
231 then the receiver would call that lambda function after
232 waiting the response for 10 seconds.
233 """
234 address = "0.0.0.0" if _is_inside_docker() else "127.0.0.1" # Hardcode
235 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3):
236 # * Clients should listen on the loopback network interface only.
237 # (It is not recommended to use "" shortcut to bind all addr.)
238 # * the use of localhost is NOT RECOMMENDED.
239 # (Use) the loopback IP literal
240 # rather than localhost avoids inadvertently listening on network
241 # interfaces other than the loopback interface.
242 # Note:
243 # When this server physically listens to a specific IP (as it should),
244 # you will still be able to specify your redirect_uri using either
245 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration.
246 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy
247 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer
248 # TODO: But, it would treat "localhost" or "" as IPv4.
249 # If pressed, we might just expose a family parameter to caller.
250 self._server = Server((address, port or 0), _AuthCodeHandler)
251 self._closing = False
252
253 def get_port(self):
254 """The port this server actually listening to"""
255 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
256 return self._server.server_address[1]
257
258 def get_auth_response(self, timeout=None, **kwargs):
259 """Wait and return the auth response. Raise RuntimeError when timeout.
260
261 :param str auth_uri:
262 If provided, this function will try to open a local browser.
263 Starting from 2026, the built-in http server will require response_mode=form_post.
264 :param int timeout: In seconds. None means wait indefinitely.
265 :param str state:
266 You may provide the state you used in auth_uri,
267 then we will use it to validate incoming response.
268 :param str welcome_template:
269 If provided, your end user will see it instead of the auth_uri.
270 When present, it shall be a plaintext or html template following
271 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_,
272 and include some of these placeholders: $auth_uri and $abort_uri.
273 :param str success_template:
274 The page will be displayed when authentication was largely successful.
275 Placeholders can be any of these:
276 https://tools.ietf.org/html/rfc6749#section-5.1
277 :param str error_template:
278 The page will be displayed when authentication encountered error.
279 Placeholders can be any of these:
280 https://tools.ietf.org/html/rfc6749#section-5.2
281 :param callable auth_uri_callback:
282 A function with the shape of lambda auth_uri: ...
283 When a browser was unable to be launch, this function will be called,
284 so that the app could tell user to manually visit the auth_uri.
285 :param str browser_name:
286 If you did
287 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))``
288 beforehand, you can pass in the name "xyz" to use that browser.
289 The default value ``None`` means using default browser,
290 which is customizable by env var $BROWSER.
291 :return:
292 The auth response of the first leg of Auth Code flow,
293 typically {"code": "...", "state": "..."} or {"error": "...", ...}
294 See https://tools.ietf.org/html/rfc6749#section-4.1.2
295 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
296 Returns None when the state was mismatched, or when timeout occurred.
297 """
298 # Historically, the _get_auth_response() uses HTTPServer.handle_request(),
299 # because its handle-and-retry logic is conceptually as easy as a while loop.
300 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works.
301 # All those are true when running on Linux.
302 #
303 # However, the behaviors on Windows turns out to be different.
304 # A socket server waiting for request would freeze the current thread.
305 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK.
306 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc
307 #
308 # The solution would need to somehow put the http server into its own thread.
309 # This could be done by the pattern of ``http.server.test()`` which internally
310 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7).
311 # Or create our own thread to wrap the HTTPServer.handle_request() inside.
312 result = {} # A mutable object to be filled with thread's return value
313 t = threading.Thread(
314 target=self._get_auth_response, args=(result,), kwargs=kwargs)
315 t.daemon = True # So that it won't prevent the main thread from exiting
316 t.start()
317 begin = time.time()
318 while (time.time() - begin < timeout) if timeout else True:
319 time.sleep(1) # Short detection interval to make happy path responsive
320 if not t.is_alive(): # Then the thread has finished its job and exited
321 break
322 while (self._scheduled_actions
323 and time.time() - begin > self._scheduled_actions[0][0]):
324 _, callback = self._scheduled_actions.pop(0)
325 callback()
326 return result or None
327
328 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
329 welcome_template=None, success_template=None, error_template=None,
330 auth_uri_callback=None,
331 browser_name=None,
332 ):
333 netloc = "http://localhost:{p}".format(p=self.get_port())
334 abort_uri = "{loc}?error=abort".format(loc=netloc)
335 logger.debug("Abort by visit %s", abort_uri)
336
337 if auth_uri:
338 # Note to maintainers:
339 # Do not enforce response_mode=form_post by secretly hardcoding it here.
340 # Just validate it here, so we won't surprise caller by changing their auth_uri behind the scene.
341 params = parse_qs(urlparse(auth_uri).query)
342 assert params.get('response_mode', [None])[0] == 'form_post', (
343 "The built-in http server supports HTTP POST only. "
344 "The auth_uri must be built with response_mode=form_post")
345
346 self._server.welcome_page = Template(welcome_template or "").safe_substitute(
347 auth_uri=auth_uri, abort_uri=abort_uri)
348 if auth_uri: # Now attempt to open a local browser to visit it
349 _uri = (netloc + "?welcome=true") if welcome_template else auth_uri
350 logger.info("Open a browser on this device to visit: %s" % _uri)
351 browser_opened = False
352 try:
353 browser_opened = _browse(_uri, browser_name=browser_name)
354 except: # Had to use broad except, because the potential
355 # webbrowser.Error is purposely undefined outside of _browse().
356 # Absorb and proceed. Because browser could be manually run elsewhere.
357 logger.exception("_browse(...) unsuccessful")
358 if not browser_opened:
359 if not auth_uri_callback:
360 logger.warning(
361 "Found no browser in current environment. "
362 "If this program is being run inside a container "
363 "which either (1) has access to host network "
364 "(i.e. started by `docker run --net=host -it ...`), "
365 "or (2) published port {port} to host network "
366 "(i.e. started by `docker run -p 127.0.0.1:{port}:{port} -it ...`), "
367 "you can use browser on host to visit the following link. "
368 "Otherwise, this auth attempt would either timeout "
369 "(current timeout setting is {timeout}) "
370 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format(
371 auth_uri=_uri, timeout=timeout, port=self.get_port()))
372 else: # Then it is the auth_uri_callback()'s job to inform the user
373 auth_uri_callback(_uri)
374
375 recommendation = "For your security: Do not share the contents of this page, the address bar, or take screenshots." # From MSRC
376 self._server.success_template = Template(success_template or
377 "Authentication complete. You can return to the application. Please close this browser tab.\n\n" + recommendation)
378 self._server.error_template = Template(error_template or
379 # Do NOT invent new placeholders in this template. Just use standard keys defined in OAuth2 RFC.
380 # Otherwise there is no obvious canonical way for caller to know what placeholders are supported.
381 # Besides, we have been using these standard keys for years. Changing now would break backward compatibility.
382 "Authentication failed. $error: $error_description. ($error_uri).\n\n" + recommendation)
383
384 self._server.timeout = timeout # Otherwise its handle_timeout() won't work
385 self._server.auth_response = {} # Shared with _AuthCodeHandler
386 self._server.auth_state = state # So handler will check it before sending response
387 while not self._closing: # Otherwise, the handle_request() attempt
388 # would yield noisy ValueError trace
389 # Derived from
390 # https://docs.python.org/2/library/basehttpserver.html#more-examples
391 self._server.handle_request()
392 if self._server.auth_response:
393 break
394 result.update(self._server.auth_response) # Return via writable result param
395
396 def close(self):
397 """Either call this eventually; or use the entire class as context manager"""
398 self._closing = True
399 self._server.server_close()
400
401 def __enter__(self):
402 return self
403
404 def __exit__(self, exc_type, exc_val, exc_tb):
405 self.close()
406
407# Note: Manually use or test this module by:
408# python -m path.to.this.file -h
409if __name__ == '__main__':
410 import argparse, json
411 from .oauth2 import Client
412 logging.basicConfig(level=logging.INFO)
413 p = parser = argparse.ArgumentParser(
414 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
415 description=__doc__ + "The auth code received will be shown at stdout.")
416 p.add_argument(
417 '--endpoint', help="The auth endpoint for your app.",
418 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize")
419 p.add_argument('client_id', help="The client_id of your application")
420 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri")
421 p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second")
422 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri")
423 p.add_argument('--scope', default=None, help="The scope list")
424 args = parser.parse_args()
425 client = Client({"authorization_endpoint": args.endpoint}, args.client_id)
426 with AuthCodeReceiver(port=args.port) as receiver:
427 flow = client.initiate_auth_code_flow(
428 scope=args.scope.split() if args.scope else None,
429 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()),
430 )
431 print(json.dumps(receiver.get_auth_response(
432 auth_uri=flow["auth_uri"],
433 welcome_template=
434 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a>",
435 error_template="<html>Oh no. $error</html>",
436 success_template="Oh yeah. Got $code",
437 timeout=args.timeout,
438 state=flow["state"], # Optional
439 ), indent=4))