1# Note: This docstring is also used by this script's command line help.
2"""A one-stop helper for desktop app to acquire an authorization code.
3
4It starts a web server to listen redirect_uri, waiting for auth code.
5It optionally opens a browser window to guide a human user to manually login.
6After obtaining an auth code, the web server will automatically shut down.
7"""
8import logging
9import os
10import socket
11import sys
12from string import Template
13import threading
14import time
15
16try: # Python 3
17 from http.server import HTTPServer, BaseHTTPRequestHandler
18 from urllib.parse import urlparse, parse_qs, urlencode
19 from html import escape
20except ImportError: # Fall back to Python 2
21 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
22 from urlparse import urlparse, parse_qs
23 from urllib import urlencode
24 from cgi import escape
25
26
27logger = logging.getLogger(__name__)
28
29
30def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing
31 with AuthCodeReceiver(port=listen_port) as receiver:
32 return receiver.get_auth_response(
33 auth_uri=auth_uri,
34 welcome_template="""<html><body>
35 Open this link to <a href='$auth_uri'>Sign In</a>
36 (You may want to use incognito window)
37 <hr><a href='$abort_uri'>Abort</a>
38 </body></html>""",
39 ).get("code")
40
41
42def _is_inside_docker():
43 try:
44 with open("/proc/1/cgroup") as f: # https://stackoverflow.com/a/20012536/728675
45 # Search keyword "/proc/pid/cgroup" in this link for the file format
46 # https://man7.org/linux/man-pages/man7/cgroups.7.html
47 for line in f.readlines():
48 cgroup_path = line.split(":", 2)[2].strip()
49 if cgroup_path.strip() != "/":
50 return True
51 except IOError:
52 pass # We are probably not running on Linux
53 return os.path.exists("/.dockerenv") # Docker on Mac will run this line
54
55
56def is_wsl():
57 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
58 # Run `uname -a` to get 'release' without python
59 # - WSL 1: '4.4.0-19041-Microsoft'
60 # - WSL 2: '4.19.128-microsoft-standard'
61 import platform
62 uname = platform.uname()
63 platform_name = getattr(uname, 'system', uname[0]).lower()
64 release = getattr(uname, 'release', uname[2]).lower()
65 return platform_name == 'linux' and 'microsoft' in release
66
67
68def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error
69 """Browse uri with named browser. Default browser is customizable by $BROWSER"""
70 import webbrowser # Lazy import. Some distro may not have this.
71 if browser_name:
72 browser_opened = webbrowser.get(browser_name).open(auth_uri)
73 else:
74 # This one can survive BROWSER=nonexist, while get(None).open(...) can not
75 browser_opened = webbrowser.open(auth_uri)
76
77 # In WSL which doesn't have www-browser, try launching browser with PowerShell
78 if not browser_opened and is_wsl():
79 try:
80 import subprocess
81 # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe
82 # Ampersand (&) should be quoted
83 exit_code = subprocess.call(
84 ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(auth_uri)])
85 browser_opened = exit_code == 0
86 except FileNotFoundError: # WSL might be too old
87 pass
88 return browser_opened
89
90
91def _qs2kv(qs):
92 """Flatten parse_qs()'s single-item lists into the item itself"""
93 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v
94 for k, v in qs.items()}
95
96
97def _is_html(text):
98 return text.startswith("<") # Good enough for our purpose
99
100
101def _escape(key_value_pairs):
102 return {k: escape(v) for k, v in key_value_pairs.items()}
103
104
105def _printify(text):
106 # If an https request is sent to an http server, the text needs to be repr-ed
107 return repr(text) if isinstance(text, str) and not text.isprintable() else text
108
109
110class _AuthCodeHandler(BaseHTTPRequestHandler):
111 def do_GET(self):
112 # For flexibility, we choose to not check self.path matching redirect_uri
113 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
114 qs = parse_qs(urlparse(self.path).query)
115 if qs.get('code') or qs.get("error"): # So, it is an auth response
116 auth_response = _qs2kv(qs)
117 logger.debug("Got auth response: %s", auth_response)
118 if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
119 # OAuth2 successful and error responses contain state when it was used
120 # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
121 self._send_full_response("State mismatch") # Possibly an attack
122 else:
123 template = (self.server.success_template
124 if "code" in qs else self.server.error_template)
125 if _is_html(template.template):
126 safe_data = _escape(auth_response) # Foiling an XSS attack
127 else:
128 safe_data = auth_response
129 self._send_full_response(template.safe_substitute(**safe_data))
130 self.server.auth_response = auth_response # Set it now, after the response is likely sent
131 else:
132 self._send_full_response(self.server.welcome_page)
133 # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
134
135 def _send_full_response(self, body, is_ok=True):
136 self.send_response(200 if is_ok else 400)
137 content_type = 'text/html' if _is_html(body) else 'text/plain'
138 self.send_header('Content-type', content_type)
139 self.end_headers()
140 self.wfile.write(body.encode("utf-8"))
141
142 def log_message(self, format, *args):
143 # To override the default log-to-stderr behavior
144 logger.debug(format, *map(_printify, args))
145
146
147class _AuthCodeHttpServer(HTTPServer, object):
148 def __init__(self, server_address, *args, **kwargs):
149 _, port = server_address
150 if port and (sys.platform == "win32" or is_wsl()):
151 # The default allow_reuse_address is True. It works fine on non-Windows.
152 # On Windows, it undesirably allows multiple servers listening on same port,
153 # yet the second server would not receive any incoming request.
154 # So, we need to turn it off.
155 self.allow_reuse_address = False
156 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs)
157
158 def handle_timeout(self):
159 # It will be triggered when no request comes in self.timeout seconds.
160 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
161 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server
162 # We choose to not call self.server_close() here,
163 # because it would cause a socket.error exception in handle_request(),
164 # and likely end up the server being server_close() twice.
165
166
167class _AuthCodeHttpServer6(_AuthCodeHttpServer):
168 address_family = socket.AF_INET6
169
170
171class AuthCodeReceiver(object):
172 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
173 def __init__(self, port=None, scheduled_actions=None):
174 """Create a Receiver waiting for incoming auth response.
175
176 :param port:
177 The local web server will listen at http://...:<port>
178 You need to use the same port when you register with your app.
179 If your Identity Provider supports dynamic port, you can use port=0 here.
180 Port 0 means to use an arbitrary unused port, per this official example:
181 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins
182
183 :param scheduled_actions:
184 For example, if the input is
185 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]``
186 then the receiver would call that lambda function after
187 waiting the response for 10 seconds.
188 """
189 address = "0.0.0.0" if _is_inside_docker() else "127.0.0.1" # Hardcode
190 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3):
191 # * Clients should listen on the loopback network interface only.
192 # (It is not recommended to use "" shortcut to bind all addr.)
193 # * the use of localhost is NOT RECOMMENDED.
194 # (Use) the loopback IP literal
195 # rather than localhost avoids inadvertently listening on network
196 # interfaces other than the loopback interface.
197 # Note:
198 # When this server physically listens to a specific IP (as it should),
199 # you will still be able to specify your redirect_uri using either
200 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration.
201 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy
202 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer
203 # TODO: But, it would treat "localhost" or "" as IPv4.
204 # If pressed, we might just expose a family parameter to caller.
205 self._server = Server((address, port or 0), _AuthCodeHandler)
206 self._closing = False
207
208 def get_port(self):
209 """The port this server actually listening to"""
210 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
211 return self._server.server_address[1]
212
213 def get_auth_response(self, timeout=None, **kwargs):
214 """Wait and return the auth response. Raise RuntimeError when timeout.
215
216 :param str auth_uri:
217 If provided, this function will try to open a local browser.
218 :param int timeout: In seconds. None means wait indefinitely.
219 :param str state:
220 You may provide the state you used in auth_uri,
221 then we will use it to validate incoming response.
222 :param str welcome_template:
223 If provided, your end user will see it instead of the auth_uri.
224 When present, it shall be a plaintext or html template following
225 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_,
226 and include some of these placeholders: $auth_uri and $abort_uri.
227 :param str success_template:
228 The page will be displayed when authentication was largely successful.
229 Placeholders can be any of these:
230 https://tools.ietf.org/html/rfc6749#section-5.1
231 :param str error_template:
232 The page will be displayed when authentication encountered error.
233 Placeholders can be any of these:
234 https://tools.ietf.org/html/rfc6749#section-5.2
235 :param callable auth_uri_callback:
236 A function with the shape of lambda auth_uri: ...
237 When a browser was unable to be launch, this function will be called,
238 so that the app could tell user to manually visit the auth_uri.
239 :param str browser_name:
240 If you did
241 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))``
242 beforehand, you can pass in the name "xyz" to use that browser.
243 The default value ``None`` means using default browser,
244 which is customizable by env var $BROWSER.
245 :return:
246 The auth response of the first leg of Auth Code flow,
247 typically {"code": "...", "state": "..."} or {"error": "...", ...}
248 See https://tools.ietf.org/html/rfc6749#section-4.1.2
249 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
250 Returns None when the state was mismatched, or when timeout occurred.
251 """
252 # Historically, the _get_auth_response() uses HTTPServer.handle_request(),
253 # because its handle-and-retry logic is conceptually as easy as a while loop.
254 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works.
255 # All those are true when running on Linux.
256 #
257 # However, the behaviors on Windows turns out to be different.
258 # A socket server waiting for request would freeze the current thread.
259 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK.
260 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc
261 #
262 # The solution would need to somehow put the http server into its own thread.
263 # This could be done by the pattern of ``http.server.test()`` which internally
264 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7).
265 # Or create our own thread to wrap the HTTPServer.handle_request() inside.
266 result = {} # A mutable object to be filled with thread's return value
267 t = threading.Thread(
268 target=self._get_auth_response, args=(result,), kwargs=kwargs)
269 t.daemon = True # So that it won't prevent the main thread from exiting
270 t.start()
271 begin = time.time()
272 while (time.time() - begin < timeout) if timeout else True:
273 time.sleep(1) # Short detection interval to make happy path responsive
274 if not t.is_alive(): # Then the thread has finished its job and exited
275 break
276 while (self._scheduled_actions
277 and time.time() - begin > self._scheduled_actions[0][0]):
278 _, callback = self._scheduled_actions.pop(0)
279 callback()
280 return result or None
281
282 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
283 welcome_template=None, success_template=None, error_template=None,
284 auth_uri_callback=None,
285 browser_name=None,
286 ):
287 welcome_uri = "http://localhost:{p}".format(p=self.get_port())
288 abort_uri = "{loc}?error=abort".format(loc=welcome_uri)
289 logger.debug("Abort by visit %s", abort_uri)
290 self._server.welcome_page = Template(welcome_template or "").safe_substitute(
291 auth_uri=auth_uri, abort_uri=abort_uri)
292 if auth_uri: # Now attempt to open a local browser to visit it
293 _uri = welcome_uri if welcome_template else auth_uri
294 logger.info("Open a browser on this device to visit: %s" % _uri)
295 browser_opened = False
296 try:
297 browser_opened = _browse(_uri, browser_name=browser_name)
298 except: # Had to use broad except, because the potential
299 # webbrowser.Error is purposely undefined outside of _browse().
300 # Absorb and proceed. Because browser could be manually run elsewhere.
301 logger.exception("_browse(...) unsuccessful")
302 if not browser_opened:
303 if not auth_uri_callback:
304 logger.warning(
305 "Found no browser in current environment. "
306 "If this program is being run inside a container "
307 "which either (1) has access to host network "
308 "(i.e. started by `docker run --net=host -it ...`), "
309 "or (2) published port {port} to host network "
310 "(i.e. started by `docker run -p 127.0.0.1:{port}:{port} -it ...`), "
311 "you can use browser on host to visit the following link. "
312 "Otherwise, this auth attempt would either timeout "
313 "(current timeout setting is {timeout}) "
314 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format(
315 auth_uri=_uri, timeout=timeout, port=self.get_port()))
316 else: # Then it is the auth_uri_callback()'s job to inform the user
317 auth_uri_callback(_uri)
318
319 self._server.success_template = Template(success_template or
320 "Authentication completed. You can close this window now.")
321 self._server.error_template = Template(error_template or
322 "Authentication failed. $error: $error_description. ($error_uri)")
323
324 self._server.timeout = timeout # Otherwise its handle_timeout() won't work
325 self._server.auth_response = {} # Shared with _AuthCodeHandler
326 self._server.auth_state = state # So handler will check it before sending response
327 while not self._closing: # Otherwise, the handle_request() attempt
328 # would yield noisy ValueError trace
329 # Derived from
330 # https://docs.python.org/2/library/basehttpserver.html#more-examples
331 self._server.handle_request()
332 if self._server.auth_response:
333 break
334 result.update(self._server.auth_response) # Return via writable result param
335
336 def close(self):
337 """Either call this eventually; or use the entire class as context manager"""
338 self._closing = True
339 self._server.server_close()
340
341 def __enter__(self):
342 return self
343
344 def __exit__(self, exc_type, exc_val, exc_tb):
345 self.close()
346
347# Note: Manually use or test this module by:
348# python -m path.to.this.file -h
349if __name__ == '__main__':
350 import argparse, json
351 from .oauth2 import Client
352 logging.basicConfig(level=logging.INFO)
353 p = parser = argparse.ArgumentParser(
354 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
355 description=__doc__ + "The auth code received will be shown at stdout.")
356 p.add_argument(
357 '--endpoint', help="The auth endpoint for your app.",
358 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize")
359 p.add_argument('client_id', help="The client_id of your application")
360 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri")
361 p.add_argument('--timeout', type=int, default=60, help="Timeout value, in second")
362 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri")
363 p.add_argument('--scope', default=None, help="The scope list")
364 args = parser.parse_args()
365 client = Client({"authorization_endpoint": args.endpoint}, args.client_id)
366 with AuthCodeReceiver(port=args.port) as receiver:
367 flow = client.initiate_auth_code_flow(
368 scope=args.scope.split() if args.scope else None,
369 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()),
370 )
371 print(json.dumps(receiver.get_auth_response(
372 auth_uri=flow["auth_uri"],
373 welcome_template=
374 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a",
375 error_template="<html>Oh no. $error</html>",
376 success_template="Oh yeah. Got $code",
377 timeout=args.timeout,
378 state=flow["state"], # Optional
379 ), indent=4))