Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/oauth2cli/authcode.py: 24%
139 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
1# Note: This docstring is also used by this script's command line help.
2"""A one-stop helper for desktop app to acquire an authorization code.
4It starts a web server to listen redirect_uri, waiting for auth code.
5It optionally opens a browser window to guide a human user to manually login.
6After obtaining an auth code, the web server will automatically shut down.
7"""
8import logging
9import socket
10import sys
11from string import Template
12import threading
13import time
15try: # Python 3
16 from http.server import HTTPServer, BaseHTTPRequestHandler
17 from urllib.parse import urlparse, parse_qs, urlencode
18except ImportError: # Fall back to Python 2
19 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
20 from urlparse import urlparse, parse_qs
21 from urllib import urlencode
24logger = logging.getLogger(__name__)
27def obtain_auth_code(listen_port, auth_uri=None): # Historically only used in testing
28 with AuthCodeReceiver(port=listen_port) as receiver:
29 return receiver.get_auth_response(
30 auth_uri=auth_uri,
31 welcome_template="""<html><body>
32 Open this link to <a href='$auth_uri'>Sign In</a>
33 (You may want to use incognito window)
34 <hr><a href='$abort_uri'>Abort</a>
35 </body></html>""",
36 ).get("code")
39def is_wsl():
40 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
41 # Run `uname -a` to get 'release' without python
42 # - WSL 1: '4.4.0-19041-Microsoft'
43 # - WSL 2: '4.19.128-microsoft-standard'
44 import platform
45 uname = platform.uname()
46 platform_name = getattr(uname, 'system', uname[0]).lower()
47 release = getattr(uname, 'release', uname[2]).lower()
48 return platform_name == 'linux' and 'microsoft' in release
51def _browse(auth_uri, browser_name=None): # throws ImportError, webbrowser.Error
52 """Browse uri with named browser. Default browser is customizable by $BROWSER"""
53 import webbrowser # Lazy import. Some distro may not have this.
54 if browser_name:
55 browser_opened = webbrowser.get(browser_name).open(auth_uri)
56 else:
57 # This one can survive BROWSER=nonexist, while get(None).open(...) can not
58 browser_opened = webbrowser.open(auth_uri)
60 # In WSL which doesn't have www-browser, try launching browser with PowerShell
61 if not browser_opened and is_wsl():
62 try:
63 import subprocess
64 # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe
65 # Ampersand (&) should be quoted
66 exit_code = subprocess.call(
67 ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(auth_uri)])
68 browser_opened = exit_code == 0
69 except FileNotFoundError: # WSL might be too old
70 pass
71 return browser_opened
74def _qs2kv(qs):
75 """Flatten parse_qs()'s single-item lists into the item itself"""
76 return {k: v[0] if isinstance(v, list) and len(v) == 1 else v
77 for k, v in qs.items()}
80class _AuthCodeHandler(BaseHTTPRequestHandler):
81 def do_GET(self):
82 # For flexibility, we choose to not check self.path matching redirect_uri
83 #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
84 qs = parse_qs(urlparse(self.path).query)
85 if qs.get('code') or qs.get("error"): # So, it is an auth response
86 self.server.auth_response = _qs2kv(qs)
87 logger.debug("Got auth response: %s", self.server.auth_response)
88 template = (self.server.success_template
89 if "code" in qs else self.server.error_template)
90 self._send_full_response(
91 template.safe_substitute(**self.server.auth_response))
92 # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
93 else:
94 self._send_full_response(self.server.welcome_page)
96 def _send_full_response(self, body, is_ok=True):
97 self.send_response(200 if is_ok else 400)
98 content_type = 'text/html' if body.startswith('<') else 'text/plain'
99 self.send_header('Content-type', content_type)
100 self.end_headers()
101 self.wfile.write(body.encode("utf-8"))
103 def log_message(self, format, *args):
104 logger.debug(format, *args) # To override the default log-to-stderr behavior
107class _AuthCodeHttpServer(HTTPServer, object):
108 def __init__(self, server_address, *args, **kwargs):
109 _, port = server_address
110 if port and (sys.platform == "win32" or is_wsl()):
111 # The default allow_reuse_address is True. It works fine on non-Windows.
112 # On Windows, it undesirably allows multiple servers listening on same port,
113 # yet the second server would not receive any incoming request.
114 # So, we need to turn it off.
115 self.allow_reuse_address = False
116 super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs)
118 def handle_timeout(self):
119 # It will be triggered when no request comes in self.timeout seconds.
120 # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
121 raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server
122 # We choose to not call self.server_close() here,
123 # because it would cause a socket.error exception in handle_request(),
124 # and likely end up the server being server_close() twice.
127class _AuthCodeHttpServer6(_AuthCodeHttpServer):
128 address_family = socket.AF_INET6
131class AuthCodeReceiver(object):
132 # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
133 def __init__(self, port=None, scheduled_actions=None):
134 """Create a Receiver waiting for incoming auth response.
136 :param port:
137 The local web server will listen at http://...:<port>
138 You need to use the same port when you register with your app.
139 If your Identity Provider supports dynamic port, you can use port=0 here.
140 Port 0 means to use an arbitrary unused port, per this official example:
141 https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins
143 :param scheduled_actions:
144 For example, if the input is
145 ``[(10, lambda: print("Got stuck during sign in? Call 800-000-0000"))]``
146 then the receiver would call that lambda function after
147 waiting the response for 10 seconds.
148 """
149 address = "127.0.0.1" # Hardcode, for now, Not sure what to expose, yet.
150 # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3):
151 # * Clients should listen on the loopback network interface only.
152 # (It is not recommended to use "" shortcut to bind all addr.)
153 # * the use of localhost is NOT RECOMMENDED.
154 # (Use) the loopback IP literal
155 # rather than localhost avoids inadvertently listening on network
156 # interfaces other than the loopback interface.
157 # Note:
158 # When this server physically listens to a specific IP (as it should),
159 # you will still be able to specify your redirect_uri using either
160 # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration.
161 self._scheduled_actions = sorted(scheduled_actions or []) # Make a copy
162 Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer
163 # TODO: But, it would treat "localhost" or "" as IPv4.
164 # If pressed, we might just expose a family parameter to caller.
165 self._server = Server((address, port or 0), _AuthCodeHandler)
166 self._closing = False
168 def get_port(self):
169 """The port this server actually listening to"""
170 # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
171 return self._server.server_address[1]
173 def get_auth_response(self, timeout=None, **kwargs):
174 """Wait and return the auth response. Raise RuntimeError when timeout.
176 :param str auth_uri:
177 If provided, this function will try to open a local browser.
178 :param int timeout: In seconds. None means wait indefinitely.
179 :param str state:
180 You may provide the state you used in auth_uri,
181 then we will use it to validate incoming response.
182 :param str welcome_template:
183 If provided, your end user will see it instead of the auth_uri.
184 When present, it shall be a plaintext or html template following
185 `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_,
186 and include some of these placeholders: $auth_uri and $abort_uri.
187 :param str success_template:
188 The page will be displayed when authentication was largely successful.
189 Placeholders can be any of these:
190 https://tools.ietf.org/html/rfc6749#section-5.1
191 :param str error_template:
192 The page will be displayed when authentication encountered error.
193 Placeholders can be any of these:
194 https://tools.ietf.org/html/rfc6749#section-5.2
195 :param callable auth_uri_callback:
196 A function with the shape of lambda auth_uri: ...
197 When a browser was unable to be launch, this function will be called,
198 so that the app could tell user to manually visit the auth_uri.
199 :param str browser_name:
200 If you did
201 ``webbrowser.register("xyz", None, BackgroundBrowser("/path/to/browser"))``
202 beforehand, you can pass in the name "xyz" to use that browser.
203 The default value ``None`` means using default browser,
204 which is customizable by env var $BROWSER.
205 :return:
206 The auth response of the first leg of Auth Code flow,
207 typically {"code": "...", "state": "..."} or {"error": "...", ...}
208 See https://tools.ietf.org/html/rfc6749#section-4.1.2
209 and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
210 Returns None when the state was mismatched, or when timeout occurred.
211 """
212 # Historically, the _get_auth_response() uses HTTPServer.handle_request(),
213 # because its handle-and-retry logic is conceptually as easy as a while loop.
214 # Also, handle_request() honors server.timeout setting, and CTRL+C simply works.
215 # All those are true when running on Linux.
216 #
217 # However, the behaviors on Windows turns out to be different.
218 # A socket server waiting for request would freeze the current thread.
219 # Neither timeout nor CTRL+C would work. End user would have to do CTRL+BREAK.
220 # https://stackoverflow.com/questions/1364173/stopping-python-using-ctrlc
221 #
222 # The solution would need to somehow put the http server into its own thread.
223 # This could be done by the pattern of ``http.server.test()`` which internally
224 # use ``ThreadingHTTPServer.serve_forever()`` (only available in Python 3.7).
225 # Or create our own thread to wrap the HTTPServer.handle_request() inside.
226 result = {} # A mutable object to be filled with thread's return value
227 t = threading.Thread(
228 target=self._get_auth_response, args=(result,), kwargs=kwargs)
229 t.daemon = True # So that it won't prevent the main thread from exiting
230 t.start()
231 begin = time.time()
232 while (time.time() - begin < timeout) if timeout else True:
233 time.sleep(1) # Short detection interval to make happy path responsive
234 if not t.is_alive(): # Then the thread has finished its job and exited
235 break
236 while (self._scheduled_actions
237 and time.time() - begin > self._scheduled_actions[0][0]):
238 _, callback = self._scheduled_actions.pop(0)
239 callback()
240 return result or None
242 def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
243 welcome_template=None, success_template=None, error_template=None,
244 auth_uri_callback=None,
245 browser_name=None,
246 ):
247 welcome_uri = "http://localhost:{p}".format(p=self.get_port())
248 abort_uri = "{loc}?error=abort".format(loc=welcome_uri)
249 logger.debug("Abort by visit %s", abort_uri)
250 self._server.welcome_page = Template(welcome_template or "").safe_substitute(
251 auth_uri=auth_uri, abort_uri=abort_uri)
252 if auth_uri: # Now attempt to open a local browser to visit it
253 _uri = welcome_uri if welcome_template else auth_uri
254 logger.info("Open a browser on this device to visit: %s" % _uri)
255 browser_opened = False
256 try:
257 browser_opened = _browse(_uri, browser_name=browser_name)
258 except: # Had to use broad except, because the potential
259 # webbrowser.Error is purposely undefined outside of _browse().
260 # Absorb and proceed. Because browser could be manually run elsewhere.
261 logger.exception("_browse(...) unsuccessful")
262 if not browser_opened:
263 if not auth_uri_callback:
264 logger.warning(
265 "Found no browser in current environment. "
266 "If this program is being run inside a container "
267 "which has access to host network "
268 "(i.e. started by `docker run --net=host -it ...`), "
269 "you can use browser on host to visit the following link. "
270 "Otherwise, this auth attempt would either timeout "
271 "(current timeout setting is {timeout}) "
272 "or be aborted by CTRL+C. Auth URI: {auth_uri}".format(
273 auth_uri=_uri, timeout=timeout))
274 else: # Then it is the auth_uri_callback()'s job to inform the user
275 auth_uri_callback(_uri)
277 self._server.success_template = Template(success_template or
278 "Authentication completed. You can close this window now.")
279 self._server.error_template = Template(error_template or
280 "Authentication failed. $error: $error_description. ($error_uri)")
282 self._server.timeout = timeout # Otherwise its handle_timeout() won't work
283 self._server.auth_response = {} # Shared with _AuthCodeHandler
284 while not self._closing: # Otherwise, the handle_request() attempt
285 # would yield noisy ValueError trace
286 # Derived from
287 # https://docs.python.org/2/library/basehttpserver.html#more-examples
288 self._server.handle_request()
289 if self._server.auth_response:
290 if state and state != self._server.auth_response.get("state"):
291 logger.debug("State mismatch. Ignoring this noise.")
292 else:
293 break
294 result.update(self._server.auth_response) # Return via writable result param
296 def close(self):
297 """Either call this eventually; or use the entire class as context manager"""
298 self._closing = True
299 self._server.server_close()
301 def __enter__(self):
302 return self
304 def __exit__(self, exc_type, exc_val, exc_tb):
305 self.close()
307# Note: Manually use or test this module by:
308# python -m path.to.this.file -h
309if __name__ == '__main__':
310 import argparse, json
311 from .oauth2 import Client
312 logging.basicConfig(level=logging.INFO)
313 p = parser = argparse.ArgumentParser(
314 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
315 description=__doc__ + "The auth code received will be shown at stdout.")
316 p.add_argument(
317 '--endpoint', help="The auth endpoint for your app.",
318 default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize")
319 p.add_argument('client_id', help="The client_id of your application")
320 p.add_argument('--port', type=int, default=0, help="The port in redirect_uri")
321 p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri")
322 p.add_argument('--scope', default=None, help="The scope list")
323 args = parser.parse_args()
324 client = Client({"authorization_endpoint": args.endpoint}, args.client_id)
325 with AuthCodeReceiver(port=args.port) as receiver:
326 flow = client.initiate_auth_code_flow(
327 scope=args.scope.split() if args.scope else None,
328 redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()),
329 )
330 print(json.dumps(receiver.get_auth_response(
331 auth_uri=flow["auth_uri"],
332 welcome_template=
333 "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a",
334 error_template="Oh no. $error",
335 success_template="Oh yeah. Got $code",
336 timeout=60,
337 state=flow["state"], # Optional
338 ), indent=4))