Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/msal/cloudshell.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

56 statements  

1# Copyright (c) Microsoft Corporation. 

2# All rights reserved. 

3# 

4# This code is licensed under the MIT License. 

5 

6"""This module wraps Cloud Shell's IMDS-like interface inside an OAuth2-like helper""" 

7import base64 

8import json 

9import logging 

10import os 

11import time 

12try: # Python 2 

13 from urlparse import urlparse 

14except: # Python 3 

15 from urllib.parse import urlparse 

16from .oauth2cli.oidc import decode_part 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def _is_running_in_cloud_shell(): 

23 return os.environ.get("AZUREPS_HOST_ENVIRONMENT", "").startswith("cloud-shell") 

24 

25 

26def _scope_to_resource(scope): # This is an experimental reasonable-effort approach 

27 cloud_shell_supported_audiences = [ 

28 "https://analysis.windows.net/powerbi/api", # Came from https://msazure.visualstudio.com/One/_git/compute-CloudShell?path=/src/images/agent/env/envconfig.PROD.json 

29 "https://pas.windows.net/CheckMyAccess/Linux/.default", # Cloud Shell accepts it as-is 

30 ] 

31 for a in cloud_shell_supported_audiences: 

32 if scope.startswith(a): 

33 return a 

34 u = urlparse(scope) 

35 if not u.scheme and not u.netloc: # Typically the "GUID/scope" case 

36 return u.path.split("/")[0] 

37 if u.scheme: 

38 trailer = ( # https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#trailing-slash-and-default 

39 "/" if u.path.startswith("//") else "") 

40 return "{}://{}{}".format(u.scheme, u.netloc, trailer) 

41 return scope # There is no much else we can do here 

42 

43 

44def _obtain_token(http_client, scopes, client_id=None, data=None): 

45 resp = http_client.post( 

46 "http://localhost:50342/oauth2/token", 

47 data=dict( 

48 data or {}, 

49 resource=" ".join(map(_scope_to_resource, scopes))), 

50 headers={"Metadata": "true"}, 

51 ) 

52 if resp.status_code >= 300: 

53 logger.debug("Cloud Shell IMDS error: %s", resp.text) 

54 cs_error = json.loads(resp.text).get("error", {}) 

55 return {k: v for k, v in { 

56 "error": cs_error.get("code"), 

57 "error_description": cs_error.get("message"), 

58 }.items() if v} 

59 imds_payload = json.loads(resp.text) 

60 BEARER = "Bearer" 

61 oauth2_response = { 

62 "access_token": imds_payload["access_token"], 

63 "expires_in": int(imds_payload["expires_in"]), 

64 "token_type": imds_payload.get("token_type", BEARER), 

65 } 

66 expected_token_type = (data or {}).get("token_type", BEARER) 

67 if oauth2_response["token_type"] != expected_token_type: 

68 return { # Generate a normal error (rather than an intrusive exception) 

69 "error": "broker_error", 

70 "error_description": "token_type {} is not supported by this version of Azure Portal".format( 

71 expected_token_type), 

72 } 

73 parts = imds_payload["access_token"].split(".") 

74 

75 # The following default values are useful in SSH Cert scenario 

76 client_info = { # Default value, in case the real value will be unavailable 

77 "uid": "user", 

78 "utid": "cloudshell", 

79 } 

80 now = time.time() 

81 preferred_username = "currentuser@cloudshell" 

82 oauth2_response["id_token_claims"] = { # First 5 claims are required per OIDC 

83 "iss": "cloudshell", 

84 "sub": "user", 

85 "aud": client_id, 

86 "exp": now + 3600, 

87 "iat": now, 

88 "preferred_username": preferred_username, # Useful as MSAL account's username 

89 } 

90 

91 if len(parts) == 3: # Probably a JWT. Use it to derive client_info and id token. 

92 try: 

93 # Data defined in https://docs.microsoft.com/en-us/azure/active-directory/develop/access-tokens#payload-claims 

94 jwt_payload = json.loads(decode_part(parts[1])) 

95 client_info = { 

96 # Mimic a real home_account_id, 

97 # so that this pseudo account and a real account would interop. 

98 "uid": jwt_payload.get("oid", "user"), 

99 "utid": jwt_payload.get("tid", "cloudshell"), 

100 } 

101 oauth2_response["id_token_claims"] = { 

102 "iss": jwt_payload["iss"], 

103 "sub": jwt_payload["sub"], # Could use oid instead 

104 "aud": client_id, 

105 "exp": jwt_payload["exp"], 

106 "iat": jwt_payload["iat"], 

107 "preferred_username": jwt_payload.get("preferred_username") # V2 

108 or jwt_payload.get("unique_name") # V1 

109 or preferred_username, 

110 } 

111 except ValueError: 

112 logger.debug("Unable to decode jwt payload: %s", parts[1]) 

113 oauth2_response["client_info"] = base64.b64encode( 

114 # Mimic a client_info, so that MSAL would create an account 

115 json.dumps(client_info).encode("utf-8")).decode("utf-8") 

116 oauth2_response["id_token_claims"]["tid"] = client_info["utid"] # TBD 

117 

118 ## Note: Decided to not surface resource back as scope, 

119 ## because they would cause the downstream OAuth2 code path to 

120 ## cache the token with a different scope and won't hit them later. 

121 #if imds_payload.get("resource"): 

122 # oauth2_response["scope"] = imds_payload["resource"] 

123 if imds_payload.get("refresh_token"): 

124 oauth2_response["refresh_token"] = imds_payload["refresh_token"] 

125 return oauth2_response 

126