Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/msal/authority.py: 36%

80 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:13 +0000

1import json 

2try: 

3 from urllib.parse import urlparse 

4except ImportError: # Fall back to Python 2 

5 from urlparse import urlparse 

6import logging 

7 

8from .exceptions import MsalServiceError 

9 

10 

11logger = logging.getLogger(__name__) 

12 

13# Endpoints were copied from here 

14# https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-national-cloud#azure-ad-authentication-endpoints 

15AZURE_US_GOVERNMENT = "login.microsoftonline.us" 

16AZURE_CHINA = "login.chinacloudapi.cn" 

17AZURE_PUBLIC = "login.microsoftonline.com" 

18 

19WORLD_WIDE = 'login.microsoftonline.com' # There was an alias login.windows.net 

20WELL_KNOWN_AUTHORITY_HOSTS = set([ 

21 WORLD_WIDE, 

22 AZURE_CHINA, 

23 'login-us.microsoftonline.com', 

24 AZURE_US_GOVERNMENT, 

25 ]) 

26WELL_KNOWN_B2C_HOSTS = [ 

27 "b2clogin.com", 

28 "b2clogin.cn", 

29 "b2clogin.us", 

30 "b2clogin.de", 

31 ] 

32 

33 

34class AuthorityBuilder(object): 

35 def __init__(self, instance, tenant): 

36 """A helper to save caller from doing string concatenation. 

37 

38 Usage is documented in :func:`application.ClientApplication.__init__`. 

39 """ 

40 self._instance = instance.rstrip("/") 

41 self._tenant = tenant.strip("/") 

42 

43 def __str__(self): 

44 return "https://{}/{}".format(self._instance, self._tenant) 

45 

46 

47class Authority(object): 

48 """This class represents an (already-validated) authority. 

49 

50 Once constructed, it contains members named "*_endpoint" for this instance. 

51 TODO: It will also cache the previously-validated authority instances. 

52 """ 

53 _domains_without_user_realm_discovery = set([]) 

54 

55 @property 

56 def http_client(self): # Obsolete. We will remove this eventually 

57 warnings.warn( 

58 "authority.http_client might be removed in MSAL Python 1.21+", DeprecationWarning) 

59 return self._http_client 

60 

61 def __init__( 

62 self, authority_url, http_client, 

63 validate_authority=True, 

64 instance_discovery=None, 

65 ): 

66 """Creates an authority instance, and also validates it. 

67 

68 :param validate_authority: 

69 The Authority validation process actually checks two parts: 

70 instance (a.k.a. host) and tenant. We always do a tenant discovery. 

71 This parameter only controls whether an instance discovery will be 

72 performed. 

73 """ 

74 # :param instance_discovery: 

75 # By default, the known-to-Microsoft validation will use an 

76 # instance discovery endpoint located at ``login.microsoftonline.com``. 

77 # You can customize the endpoint by providing a url as a string. 

78 # Or you can turn this behavior off by passing in a False here. 

79 self._http_client = http_client 

80 if isinstance(authority_url, AuthorityBuilder): 

81 authority_url = str(authority_url) 

82 authority, self.instance, tenant = canonicalize(authority_url) 

83 self.is_adfs = tenant.lower() == 'adfs' 

84 parts = authority.path.split('/') 

85 self._is_b2c = any( 

86 self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS 

87 ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_")) 

88 self._is_known_to_developer = self.is_adfs or self._is_b2c or not validate_authority 

89 is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS 

90 instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only 

91 WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too 

92 # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103 

93 # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33 

94 ) if instance_discovery in (None, True) else instance_discovery 

95 if instance_discovery_endpoint and not ( 

96 is_known_to_microsoft or self._is_known_to_developer): 

97 payload = _instance_discovery( 

98 "https://{}{}/oauth2/v2.0/authorize".format( 

99 self.instance, authority.path), 

100 self._http_client, 

101 instance_discovery_endpoint) 

102 if payload.get("error") == "invalid_instance": 

103 raise ValueError( 

104 "invalid_instance: " 

105 "The authority you provided, %s, is not whitelisted. " 

106 "If it is indeed your legit customized domain name, " 

107 "you can turn off this check by passing in " 

108 "validate_authority=False" 

109 % authority_url) 

110 tenant_discovery_endpoint = payload['tenant_discovery_endpoint'] 

111 else: 

112 tenant_discovery_endpoint = ( 

113 'https://{}:{}{}{}/.well-known/openid-configuration'.format( 

114 self.instance, 

115 443 if authority.port is None else authority.port, 

116 authority.path, # In B2C scenario, it is "/tenant/policy" 

117 "" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint 

118 )) 

119 try: 

120 openid_config = tenant_discovery( 

121 tenant_discovery_endpoint, 

122 self._http_client) 

123 except ValueError: 

124 raise ValueError( 

125 "Unable to get authority configuration for {}. " 

126 "Authority would typically be in a format of " 

127 "https://login.microsoftonline.com/your_tenant " 

128 "Also please double check your tenant name or GUID is correct.".format( 

129 authority_url)) 

130 logger.debug("openid_config = %s", openid_config) 

131 self.authorization_endpoint = openid_config['authorization_endpoint'] 

132 self.token_endpoint = openid_config['token_endpoint'] 

133 self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') 

134 _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID 

135 

136 def user_realm_discovery(self, username, correlation_id=None, response=None): 

137 # It will typically return a dict containing "ver", "account_type", 

138 # "federation_protocol", "cloud_audience_urn", 

139 # "federation_metadata_url", "federation_active_auth_url", etc. 

140 if self.instance not in self.__class__._domains_without_user_realm_discovery: 

141 resp = response or self._http_client.get( 

142 "https://{netloc}/common/userrealm/{username}?api-version=1.0".format( 

143 netloc=self.instance, username=username), 

144 headers={'Accept': 'application/json', 

145 'client-request-id': correlation_id},) 

146 if resp.status_code != 404: 

147 resp.raise_for_status() 

148 return json.loads(resp.text) 

149 self.__class__._domains_without_user_realm_discovery.add(self.instance) 

150 return {} # This can guide the caller to fall back normal ROPC flow 

151 

152 

153def canonicalize(authority_url): 

154 # Returns (url_parsed_result, hostname_in_lowercase, tenant) 

155 authority = urlparse(authority_url) 

156 parts = authority.path.split("/") 

157 if authority.scheme != "https" or len(parts) < 2 or not parts[1]: 

158 raise ValueError( 

159 "Your given address (%s) should consist of " 

160 "an https url with a minimum of one segment in a path: e.g. " 

161 "https://login.microsoftonline.com/<tenant> " 

162 "or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy" 

163 % authority_url) 

164 return authority, authority.hostname, parts[1] 

165 

166def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs): 

167 resp = http_client.get( 

168 instance_discovery_endpoint, 

169 params={'authorization_endpoint': url, 'api-version': '1.0'}, 

170 **kwargs) 

171 return json.loads(resp.text) 

172 

173def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs): 

174 # Returns Openid Configuration 

175 resp = http_client.get(tenant_discovery_endpoint, **kwargs) 

176 if resp.status_code == 200: 

177 payload = json.loads(resp.text) # It could raise ValueError 

178 if 'authorization_endpoint' in payload and 'token_endpoint' in payload: 

179 return payload # Happy path 

180 raise ValueError("OIDC Discovery does not provide enough information") 

181 if 400 <= resp.status_code < 500: 

182 # Nonexist tenant would hit this path 

183 # e.g. https://login.microsoftonline.com/nonexist_tenant/v2.0/.well-known/openid-configuration 

184 raise ValueError( 

185 "OIDC Discovery endpoint rejects our request. Error: {}".format( 

186 resp.text # Expose it as-is b/c OIDC defines no error response format 

187 )) 

188 # Transient network error would hit this path 

189 resp.raise_for_status() 

190 raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op 

191 "Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text)) 

192