1# Note: This docstring is also used by this script's command line help.
2"""A one-stop helper for desktop app to acquire an authorization code.
3
4It starts a web server to listen redirect_uri, waiting for auth code.
5It optionally opens a browser window to guide a human user to manually login.
6After obtaining an auth code, the web server will automatically shut down.
7"""
8from collections import defaultdict
9import logging
10import os
11import socket
12import sys
13from string import Template
14import threading
15import time
16
17try: # Python 3
18 from http.server import HTTPServer, BaseHTTPRequestHandler
19 from urllib.parse import urlparse, parse_qs, urlencode
20 from html import escape
21except ImportError: # Fall back to Python 2
22 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
23 from urlparse import urlparse, parse_qs
24 from urllib import urlencode
25 from cgi import escape
26
27
28logger = logging.getLogger(__name__)
29
30
31def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing
32 with AuthCodeReceiver(port=listen_port) as receiver:
33 return receiver.get_auth_response(
34 auth_uri=auth_uri,
35 welcome_template="""<html><body>
36 Open this link to <a href='$auth_uri'>Sign In</a>
37 (You may want to use incognito window)
38 <hr><a href='$abort_uri'>Abort</a>
39 </body></html>""",
40 ).get("code")
41
42
43def _is_inside_docker():
44 try:
45 with open("/proc/1/cgroup") as f: # https://stackoverflow.com/a/20012536/728675
46 # Search keyword "/proc/pid/cgroup" in this link for the file format
47 # https://man7.org/linux/man-pages/man7/cgroups.7.html
48 for line in f.readlines():
49 cgroup_path = line.split(":", 2)[2].strip()
50 if cgroup_path.strip() != "/":
51 return True
52 except IOError:
53 pass # We are probably not running on Linux
54 return os.path.exists("/.dockerenv") # Docker on Mac will run this line
55
56
57def is_wsl():
58 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
59 # Run `uname -a` to get 'release' without python
60 # - WSL 1: '4.4.0-19041-Microsoft'
61 # - WSL 2: '4.19.128-microsoft-standard'
62 import platform
63 uname = platform.uname()
64 platform_name = getattr(uname, 'system', uname[0]).lower()
65 release = getattr(uname, 'release', uname[2]).lower()
66 return platform_name == 'linux' and 'microsoft' in release
67
68
69def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error
70 """Browse uri with named browser. Default browser is customizable by $BROWSER"""
71 import webbrowser # Lazy import. Some distro may not have this.
72 if browser_name:
73 browser_opened = webbrowser.get(browser_name).open(auth_uri)
74 else:
75 # This one can survive BROWSER=nonexist, while get(None).open(...) can not
76 browser_opened = webbrowser.open(auth_uri)
77
78 # In WSL which doesn't have www-browser, try launching browser with PowerShell
79 if not browser_opened and is_wsl():
80 try:
81 import subprocess
82 # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe
83 # Ampersand (&) should be quoted
84 exit_code = subprocess.call(
85 ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(auth_uri)])
86 browser_opened = exit_code == 0
87 except FileNotFoundError: # WSL might be too old
88 pass
89 return browser_opened
90
91
92def _qs2kv(qs):
93 """Flatten parse_qs()'s single-item lists into the item itself"""
94 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v
95 for k, v in qs.items()}
96
97
98def _is_html(text):
99 return text.startswith("<") # Good enough for our purpose
100
101
102def _escape(key_value_pairs):
103 return {k: escape(v) for k, v in key_value_pairs.items()}
104
105
106def _printify(text):
107 # If an https request is sent to an http server, the text needs to be repr-ed
108 return repr(text) if isinstance(text, str) and not text.isprintable() else text
109
110
111class _AuthCodeHandler(BaseHTTPRequestHandler):
112 def do_GET(self):
113 qs = parse_qs(urlparse(self.path).query)
114 welcome_param = qs.get('welcome', [None])[0]
115 error_param = qs.get('error', [None])[0]
116 if welcome_param == 'true': # Useful in manual e2e tests
117 self._send_full_response(self.server.welcome_page)
118 elif error_param == 'abort': # Useful in manual e2e tests
119 self._send_full_response("Authentication aborted", is_ok=False)
120 elif qs:
121 # GET request with auth code or error - reject for security (form_post only)
122 self._send_full_response(
123 "response_mode=query is not supported for authentication responses. "
124 "This application operates in response_mode=form_post mode only.",
125 is_ok=False)
126 else:
127 # IdP may have error scenarios that result in a parameter-less GET request
128 self._send_full_response(
129 "Authentication could not be completed. You can close this window and return to the application.",
130 is_ok=False)
131 # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
132
133 def do_POST(self): # Handle form_post response where auth code is in body
134 # For flexibility, we choose to not check self.path matching redirect_uri
135 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
136 content_length = int(self.headers.get('Content-Length', 0))
137 post_data = self.rfile.read(content_length).decode('utf-8')
138 qs = parse_qs(post_data)
139 if qs.get('code') or qs.get('error'): # So, it is an auth response
140 self._process_auth_response(_qs2kv(qs))
141 else:
142 self._send_full_response("Invalid POST request", is_ok=False)
143 # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
144
145 def _process_auth_response(self, auth_response):
146 """Process the auth response from either GET or POST request."""
147 logger.debug("Got auth response: %s", auth_response)
148 if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
149 # OAuth2 successful and error responses contain state when it was used
150 # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
151 self._send_full_response( # Possibly an attack
152 "State mismatch. Waiting for next response... or you may abort.", is_ok=False)
153 else:
154 template = (self.server.success_template
155 if "code" in auth_response else self.server.error_template)
156 if _is_html(template.template):
157 safe_data = _escape(auth_response) # Foiling an XSS attack
158 else:
159 safe_data = auth_response
160 filled_data = defaultdict(str, safe_data) # So that missing keys will be empty string
161 self._send_full_response(template.safe_substitute(**filled_data))
162 self.server.auth_response = auth_response # Set it now, after the response is likely sent
163
164 def _send_full_response(self, body, is_ok=True):
165 self.send_response(200 if is_ok else 400)
166 content_type = 'text/html' if _is_html(body) else 'text/plain'
167 self.send_header('Content-type', content_type)
168 self.end_headers()
169 self.wfile.write(body.encode("utf-8"))
170
171 def log_message(self, format, *args):
172 # To override the default log-to-stderr behavior
173 logger.debug(format, *map(_printify, args))
174
175
176class _AuthCodeHttpServer(HTTPServer, object):
177 def __init__(self, server_address, *args, **kwargs):
178 _, port = server_address
179 if port and (sys.platform == "win32" or is_wsl()):
180 # The default allow_reuse_address is True. It works fine on non-Windows.
181 # On Windows, it undesirably allows multiple servers listening on same port,
182 # yet the second server would not receive any incoming request.
183 # So, we need to turn it off.
184 self.allow_reuse_address = False
185 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs)
186
187 def handle_timeout(self):
188 # It will be triggered when no request comes in self.timeout seconds.
189 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
190 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server
191 # We choose to not call self.server_close() here,
192 # because it would cause a socket.error exception in handle_request(),
193 # and likely end up the server being server_close() twice.
194
195
196class _AuthCodeHttpServer6(_AuthCodeHttpServer):
197 address_family = socket.AF_INET6
198
199
200class AuthCodeReceiver(object):
201 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
202 def __init__(self, port=None, scheduled_actions=None):
203 """Create a Receiver waiting for incoming auth response.
204
205 :param port:
206 The local web server will listen at http://...:<port>
207 You need to use the same port when you register with your app.
208 If your Identity Provider supports dynamic port, you can use port=0 here.
209 Port 0 means to use an arbitrary unused port, per this official example:
210 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins
211
212 :param scheduled_actions:
213 For example, if the input is
214 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]``
215 then the receiver would call that lambda function after
216 waiting the response for 10 seconds.
217 """
218 address = "0.0.0.0" if _is_inside_docker() else "127.0.0.1" # Hardcode
219 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3):
220 # * Clients should listen on the loopback network interface only.
221 # (It is not recommended to use "" shortcut to bind all addr.)
222 # * the use of localhost is NOT RECOMMENDED.
223 # (Use) the loopback IP literal
224 # rather than localhost avoids inadvertently listening on network
225 # interfaces other than the loopback interface.
226 # Note:
227 # When this server physically listens to a specific IP (as it should),
228 # you will still be able to specify your redirect_uri using either
229 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration.
230 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy
231 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer
232 # TODO: But, it would treat "localhost" or "" as IPv4.
233 # If pressed, we might just expose a family parameter to caller.
234 self._server = Server((address, port or 0), _AuthCodeHandler)
235 self._closing = False
236
237 def get_port(self):
238 """The port this server actually listening to"""
239 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
240 return self._server.server_address[1]
241
242 def get_auth_response(self, timeout=None, **kwargs):
243 """Wait and return the auth response. Raise RuntimeError when timeout.
244
245 :param str auth_uri:
246 If provided, this function will try to open a local browser.
247 Starting from 2026, the built-in http server will require response_mode=form_post.
248 :param int timeout: In seconds. None means wait indefinitely.
249 :param str state:
250 You may provide the state you used in auth_uri,
251 then we will use it to validate incoming response.
252 :param str welcome_template:
253 If provided, your end user will see it instead of the auth_uri.
254 When present, it shall be a plaintext or html template following
255 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_,
256 and include some of these placeholders: $auth_uri and $abort_uri.
257 :param str success_template:
258 The page will be displayed when authentication was largely successful.
259 Placeholders can be any of these:
260 https://tools.ietf.org/html/rfc6749#section-5.1
261 :param str error_template:
262 The page will be displayed when authentication encountered error.
263 Placeholders can be any of these:
264 https://tools.ietf.org/html/rfc6749#section-5.2
265 :param callable auth_uri_callback:
266 A function with the shape of lambda auth_uri: ...
267 When a browser was unable to be launch, this function will be called,
268 so that the app could tell user to manually visit the auth_uri.
269 :param str browser_name:
270 If you did
271 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))``
272 beforehand, you can pass in the name "xyz" to use that browser.
273 The default value ``None`` means using default browser,
274 which is customizable by env var $BROWSER.
275 :return:
276 The auth response of the first leg of Auth Code flow,
277 typically {"code": "...", "state": "..."} or {"error": "...", ...}
278 See https://tools.ietf.org/html/rfc6749#section-4.1.2
279 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
280 Returns None when the state was mismatched, or when timeout occurred.
281 """
282 # Historically, the _get_auth_response() uses HTTPServer.handle_request(),
283 # because its handle-and-retry logic is conceptually as easy as a while loop.
284 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works.
285 # All those are true when running on Linux.
286 #
287 # However, the behaviors on Windows turns out to be different.
288 # A socket server waiting for request would freeze the current thread.
289 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK.
290 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc
291 #
292 # The solution would need to somehow put the http server into its own thread.
293 # This could be done by the pattern of ``http.server.test()`` which internally
294 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7).
295 # Or create our own thread to wrap the HTTPServer.handle_request() inside.
296 result = {} # A mutable object to be filled with thread's return value
297 t = threading.Thread(
298 target=self._get_auth_response, args=(result,), kwargs=kwargs)
299 t.daemon = True # So that it won't prevent the main thread from exiting
300 t.start()
301 begin = time.time()
302 while (time.time() - begin < timeout) if timeout else True:
303 time.sleep(1) # Short detection interval to make happy path responsive
304 if not t.is_alive(): # Then the thread has finished its job and exited
305 break
306 while (self._scheduled_actions
307 and time.time() - begin > self._scheduled_actions[0][0]):
308 _, callback = self._scheduled_actions.pop(0)
309 callback()
310 return result or None
311
312 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
313 welcome_template=None, success_template=None, error_template=None,
314 auth_uri_callback=None,
315 browser_name=None,
316 ):
317 netloc = "http://localhost:{p}".format(p=self.get_port())
318 abort_uri = "{loc}?error=abort".format(loc=netloc)
319 logger.debug("Abort by visit %s", abort_uri)
320
321 if auth_uri:
322 # Note to maintainers:
323 # Do not enforce response_mode=form_post by secretly hardcoding it here.
324 # Just validate it here, so we won't surprise caller by changing their auth_uri behind the scene.
325 params = parse_qs(urlparse(auth_uri).query)
326 assert params.get('response_mode', [None])[0] == 'form_post', (
327 "The built-in http server supports HTTP POST only. "
328 "The auth_uri must be built with response_mode=form_post")
329
330 self._server.welcome_page = Template(welcome_template or "").safe_substitute(
331 auth_uri=auth_uri, abort_uri=abort_uri)
332 if auth_uri: # Now attempt to open a local browser to visit it
333 _uri = (netloc + "?welcome=true") if welcome_template else auth_uri
334 logger.info("Open a browser on this device to visit: %s" % _uri)
335 browser_opened = False
336 try:
337 browser_opened = _browse(_uri, browser_name=browser_name)
338 except: # Had to use broad except, because the potential
339 # webbrowser.Error is purposely undefined outside of _browse().
340 # Absorb and proceed. Because browser could be manually run elsewhere.
341 logger.exception("_browse(...) unsuccessful")
342 if not browser_opened:
343 if not auth_uri_callback:
344 logger.warning(
345 "Found no browser in current environment. "
346 "If this program is being run inside a container "
347 "which either (1) has access to host network "
348 "(i.e. started by `docker run --net=host -it ...`), "
349 "or (2) published port {port} to host network "
350 "(i.e. started by `docker run -p 127.0.0.1:{port}:{port} -it ...`), "
351 "you can use browser on host to visit the following link. "
352 "Otherwise, this auth attempt would either timeout "
353 "(current timeout setting is {timeout}) "
354 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format(
355 auth_uri=_uri, timeout=timeout, port=self.get_port()))
356 else: # Then it is the auth_uri_callback()'s job to inform the user
357 auth_uri_callback(_uri)
358
359 recommendation = "For your security: Do not share the contents of this page, the address bar, or take screenshots." # From MSRC
360 self._server.success_template = Template(success_template or
361 "Authentication complete. You can return to the application. Please close this browser tab.\n\n" + recommendation)
362 self._server.error_template = Template(error_template or
363 # Do NOT invent new placeholders in this template. Just use standard keys defined in OAuth2 RFC.
364 # Otherwise there is no obvious canonical way for caller to know what placeholders are supported.
365 # Besides, we have been using these standard keys for years. Changing now would break backward compatibility.
366 "Authentication failed. $error: $error_description. ($error_uri).\n\n" + recommendation)
367
368 self._server.timeout = timeout # Otherwise its handle_timeout() won't work
369 self._server.auth_response = {} # Shared with _AuthCodeHandler
370 self._server.auth_state = state # So handler will check it before sending response
371 while not self._closing: # Otherwise, the handle_request() attempt
372 # would yield noisy ValueError trace
373 # Derived from
374 # https://docs.python.org/2/library/basehttpserver.html#more-examples
375 self._server.handle_request()
376 if self._server.auth_response:
377 break
378 result.update(self._server.auth_response) # Return via writable result param
379
380 def close(self):
381 """Either call this eventually; or use the entire class as context manager"""
382 self._closing = True
383 self._server.server_close()
384
385 def __enter__(self):
386 return self
387
388 def __exit__(self, exc_type, exc_val, exc_tb):
389 self.close()
390
391# Note: Manually use or test this module by:
392# python -m path.to.this.file -h
393if __name__ == '__main__':
394 import argparse, json
395 from .oauth2 import Client
396 logging.basicConfig(level=logging.INFO)
397 p = parser = argparse.ArgumentParser(
398 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
399 description=__doc__ + "The auth code received will be shown at stdout.")
400 p.add_argument(
401 '--endpoint', help="The auth endpoint for your app.",
402 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize")
403 p.add_argument('client_id', help="The client_id of your application")
404 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri")
405 p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second")
406 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri")
407 p.add_argument('--scope', default=None, help="The scope list")
408 args = parser.parse_args()
409 client = Client({"authorization_endpoint": args.endpoint}, args.client_id)
410 with AuthCodeReceiver(port=args.port) as receiver:
411 flow = client.initiate_auth_code_flow(
412 scope=args.scope.split() if args.scope else None,
413 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()),
414 )
415 print(json.dumps(receiver.get_auth_response(
416 auth_uri=flow["auth_uri"],
417 welcome_template=
418 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a>",
419 error_template="<html>Oh no. $error</html>",
420 success_template="Oh yeah. Got $code",
421 timeout=args.timeout,
422 state=flow["state"], # Optional
423 ), indent=4))