Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/cloudshell.py: 29%

52 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:20 +0000

1# Copyright (c) Microsoft Corporation. 

2# All rights reserved. 

3# 

4# This code is licensed under the MIT License. 

5 

6"""This module wraps Cloud Shell's IMDS-like interface inside an OAuth2-like helper""" 

7import base64 

8import json 

9import logging 

10import os 

11import time 

12try: # Python 2 

13 from urlparse import urlparse 

14except: # Python 3 

15 from urllib.parse import urlparse 

16from .oauth2cli.oidc import decode_part 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def _is_running_in_cloud_shell(): 

23 return os.environ.get("AZUREPS_HOST_ENVIRONMENT", "").startswith("cloud-shell") 

24 

25 

26def _scope_to_resource(scope): # This is an experimental reasonable-effort approach 

27 cloud_shell_supported_audiences = [ 

28 "https://analysis.windows.net/powerbi/api", # Came from https://msazure.visualstudio.com/One/_git/compute-CloudShell?path=/src/images/agent/env/envconfig.PROD.json 

29 "https://pas.windows.net/CheckMyAccess/Linux/.default", # Cloud Shell accepts it as-is 

30 ] 

31 for a in cloud_shell_supported_audiences: 

32 if scope.startswith(a): 

33 return a 

34 u = urlparse(scope) 

35 if u.scheme: 

36 return "{}://{}".format(u.scheme, u.netloc) 

37 return scope # There is no much else we can do here 

38 

39 

40def _obtain_token(http_client, scopes, client_id=None, data=None): 

41 resp = http_client.post( 

42 "http://localhost:50342/oauth2/token", 

43 data=dict( 

44 data or {}, 

45 resource=" ".join(map(_scope_to_resource, scopes))), 

46 headers={"Metadata": "true"}, 

47 ) 

48 if resp.status_code >= 300: 

49 logger.debug("Cloud Shell IMDS error: %s", resp.text) 

50 cs_error = json.loads(resp.text).get("error", {}) 

51 return {k: v for k, v in { 

52 "error": cs_error.get("code"), 

53 "error_description": cs_error.get("message"), 

54 }.items() if v} 

55 imds_payload = json.loads(resp.text) 

56 BEARER = "Bearer" 

57 oauth2_response = { 

58 "access_token": imds_payload["access_token"], 

59 "expires_in": int(imds_payload["expires_in"]), 

60 "token_type": imds_payload.get("token_type", BEARER), 

61 } 

62 expected_token_type = (data or {}).get("token_type", BEARER) 

63 if oauth2_response["token_type"] != expected_token_type: 

64 return { # Generate a normal error (rather than an intrusive exception) 

65 "error": "broker_error", 

66 "error_description": "token_type {} is not supported by this version of Azure Portal".format( 

67 expected_token_type), 

68 } 

69 parts = imds_payload["access_token"].split(".") 

70 

71 # The following default values are useful in SSH Cert scenario 

72 client_info = { # Default value, in case the real value will be unavailable 

73 "uid": "user", 

74 "utid": "cloudshell", 

75 } 

76 now = time.time() 

77 preferred_username = "currentuser@cloudshell" 

78 oauth2_response["id_token_claims"] = { # First 5 claims are required per OIDC 

79 "iss": "cloudshell", 

80 "sub": "user", 

81 "aud": client_id, 

82 "exp": now + 3600, 

83 "iat": now, 

84 "preferred_username": preferred_username, # Useful as MSAL account's username 

85 } 

86 

87 if len(parts) == 3: # Probably a JWT. Use it to derive client_info and id token. 

88 try: 

89 # Data defined in https://docs.microsoft.com/en-us/azure/active-directory/develop/access-tokens#payload-claims 

90 jwt_payload = json.loads(decode_part(parts[1])) 

91 client_info = { 

92 # Mimic a real home_account_id, 

93 # so that this pseudo account and a real account would interop. 

94 "uid": jwt_payload.get("oid", "user"), 

95 "utid": jwt_payload.get("tid", "cloudshell"), 

96 } 

97 oauth2_response["id_token_claims"] = { 

98 "iss": jwt_payload["iss"], 

99 "sub": jwt_payload["sub"], # Could use oid instead 

100 "aud": client_id, 

101 "exp": jwt_payload["exp"], 

102 "iat": jwt_payload["iat"], 

103 "preferred_username": jwt_payload.get("preferred_username") # V2 

104 or jwt_payload.get("unique_name") # V1 

105 or preferred_username, 

106 } 

107 except ValueError: 

108 logger.debug("Unable to decode jwt payload: %s", parts[1]) 

109 oauth2_response["client_info"] = base64.b64encode( 

110 # Mimic a client_info, so that MSAL would create an account 

111 json.dumps(client_info).encode("utf-8")).decode("utf-8") 

112 oauth2_response["id_token_claims"]["tid"] = client_info["utid"] # TBD 

113 

114 ## Note: Decided to not surface resource back as scope, 

115 ## because they would cause the downstream OAuth2 code path to 

116 ## cache the token with a different scope and won't hit them later. 

117 #if imds_payload.get("resource"): 

118 # oauth2_response["scope"] = imds_payload["resource"] 

119 if imds_payload.get("refresh_token"): 

120 oauth2_response["refresh_token"] = imds_payload["refresh_token"] 

121 return oauth2_response 

122