Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/user_realm.py: 27%

89 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1#------------------------------------------------------------------------------ 

2# 

3# Copyright (c) Microsoft Corporation.  

4# All rights reserved. 

5#  

6# This code is licensed under the MIT License. 

7#  

8# Permission is hereby granted, free of charge, to any person obtaining a copy 

9# of this software and associated documentation files(the "Software"), to deal 

10# in the Software without restriction, including without limitation the rights 

11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell 

12# copies of the Software, and to permit persons to whom the Software is 

13# furnished to do so, subject to the following conditions : 

14#  

15# The above copyright notice and this permission notice shall be included in 

16# all copies or substantial portions of the Software. 

17#  

18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE 

21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 

24# THE SOFTWARE. 

25# 

26#------------------------------------------------------------------------------ 

27import json 

28 

29try: 

30 from urllib.parse import quote, urlencode 

31 from urllib.parse import urlunparse 

32except ImportError: 

33 from urllib import quote, urlencode #pylint: disable=no-name-in-module 

34 from urlparse import urlunparse #pylint: disable=import-error 

35 

36import requests 

37 

38from . import constants 

39from . import log 

40from . import util 

41from .adal_error import AdalError 

42 

43USER_REALM_PATH_TEMPLATE = 'common/UserRealm/<user>' 

44 

45ACCOUNT_TYPE = constants.UserRealm.account_type 

46FEDERATION_PROTOCOL_TYPE = constants.UserRealm.federation_protocol_type 

47 

48 

49class UserRealm(object): 

50 

51 def __init__(self, call_context, user_principle, authority_url): 

52 

53 self._log = log.Logger("UserRealm", call_context['log_context']) 

54 self._call_context = call_context 

55 self.api_version = '1.0' 

56 self.federation_protocol = None 

57 self.account_type = None 

58 self.federation_metadata_url = None 

59 self.federation_active_auth_url = None 

60 self.cloud_audience_urn = None 

61 self._user_principle = user_principle 

62 self._authority_url = authority_url 

63 

64 def _get_user_realm_url(self): 

65 

66 url_components = list(util.copy_url(self._authority_url)) 

67 url_encoded_user = quote(self._user_principle, safe='~()*!.\'') 

68 url_components[2] = '/' + USER_REALM_PATH_TEMPLATE.replace('<user>', url_encoded_user) 

69 

70 user_realm_query = {'api-version':self.api_version} 

71 url_components[4] = urlencode(user_realm_query) 

72 return util.copy_url(urlunparse(url_components)) 

73 

74 @staticmethod 

75 def _validate_constant_value(value_dic, value, case_sensitive=False): 

76 

77 if not value: 

78 return False 

79 

80 if not case_sensitive: 

81 value = value.lower() 

82 

83 return value if value in value_dic.values() else False 

84 

85 @staticmethod 

86 def _validate_account_type(account_type): 

87 return UserRealm._validate_constant_value(ACCOUNT_TYPE, account_type) 

88 

89 @staticmethod 

90 def _validate_federation_protocol(protocol): 

91 return UserRealm._validate_constant_value(FEDERATION_PROTOCOL_TYPE, protocol) 

92 

93 def _log_parsed_response(self): 

94 

95 self._log.debug( 

96 'UserRealm response:\n' 

97 ' AccountType: %(account_type)s\n' 

98 ' FederationProtocol: %(federation_protocol)s\n' 

99 ' FederationMetatdataUrl: %(federation_metadata_url)s\n' 

100 ' FederationActiveAuthUrl: %(federation_active_auth_url)s', 

101 { 

102 "account_type": self.account_type, 

103 "federation_protocol": self.federation_protocol, 

104 "federation_metadata_url": self.federation_metadata_url, 

105 "federation_active_auth_url": self.federation_active_auth_url, 

106 }) 

107 

108 def _parse_discovery_response(self, body): 

109 

110 self._log.debug("Discovery response:\n %(discovery_response)s", 

111 {"discovery_response": body}) 

112 

113 try: 

114 response = json.loads(body) 

115 except ValueError: 

116 self._log.info( 

117 "Parsing realm discovery response JSON failed for body: %(body)s", 

118 {"body": body}) 

119 raise 

120 

121 account_type = UserRealm._validate_account_type(response['account_type']) 

122 if not account_type: 

123 raise AdalError('Cannot parse account_type: {}'.format(account_type)) 

124 self.account_type = account_type 

125 

126 if self.account_type == ACCOUNT_TYPE['Federated']: 

127 protocol = UserRealm._validate_federation_protocol(response['federation_protocol']) 

128 

129 if not protocol: 

130 raise AdalError('Cannot parse federation protocol: {}'.format(protocol)) 

131 

132 self.federation_protocol = protocol 

133 self.federation_metadata_url = response['federation_metadata_url'] 

134 self.federation_active_auth_url = response['federation_active_auth_url'] 

135 self.cloud_audience_urn = response.get('cloud_audience_urn', "urn:federation:MicrosoftOnline") 

136 

137 self._log_parsed_response() 

138 

139 def discover(self): 

140 

141 options = util.create_request_options(self, {'headers': {'Accept':'application/json'}}) 

142 user_realm_url = self._get_user_realm_url() 

143 self._log.debug("Performing user realm discovery at: %(user_realm_url)s", 

144 {"user_realm_url": user_realm_url.geturl()}) 

145 

146 operation = 'User Realm Discovery' 

147 resp = requests.get(user_realm_url.geturl(), headers=options['headers'], 

148 proxies=self._call_context.get('proxies', None), 

149 verify=self._call_context.get('verify_ssl', None)) 

150 util.log_return_correlation_id(self._log, operation, resp) 

151 

152 if resp.status_code == 429: 

153 resp.raise_for_status() # Will raise requests.exceptions.HTTPError 

154 if not util.is_http_success(resp.status_code): 

155 return_error_string = u"{} request returned http error: {}".format(operation, 

156 resp.status_code) 

157 error_response = "" 

158 if resp.text: 

159 return_error_string = u"{} and server response: {}".format(return_error_string, resp.text) 

160 try: 

161 error_response = resp.json() 

162 except ValueError: 

163 pass 

164 

165 raise AdalError(return_error_string, error_response) 

166 

167 else: 

168 self._parse_discovery_response(resp.text)