Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/mex.py: 17%

172 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1#------------------------------------------------------------------------------ 

2# 

3# Copyright (c) Microsoft Corporation.  

4# All rights reserved. 

5#  

6# This code is licensed under the MIT License. 

7#  

8# Permission is hereby granted, free of charge, to any person obtaining a copy 

9# of this software and associated documentation files(the "Software"), to deal 

10# in the Software without restriction, including without limitation the rights 

11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell 

12# copies of the Software, and to permit persons to whom the Software is 

13# furnished to do so, subject to the following conditions : 

14#  

15# The above copyright notice and this permission notice shall be included in 

16# all copies or substantial portions of the Software. 

17#  

18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE 

21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 

24# THE SOFTWARE. 

25# 

26#------------------------------------------------------------------------------ 

27 

28try: 

29 from urllib.parse import urlparse 

30except ImportError: 

31 from urlparse import urlparse # pylint: disable=import-error 

32 

33try: 

34 from xml.etree import cElementTree as ET 

35except ImportError: 

36 from xml.etree import ElementTree as ET 

37 

38import requests 

39 

40from . import log 

41from . import util 

42from . import xmlutil 

43from .constants import XmlNamespaces, WSTrustVersion 

44from .adal_error import AdalError 

45 

46TRANSPORT_BINDING_XPATH = 'wsp:ExactlyOne/wsp:All/sp:TransportBinding' 

47TRANSPORT_BINDING_2005_XPATH = 'wsp:ExactlyOne/wsp:All/sp2005:TransportBinding' #pylint: disable=invalid-name 

48 

49SOAP_ACTION_XPATH = 'wsdl:operation/soap12:operation' 

50RST_SOAP_ACTION_13 = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue' 

51RST_SOAP_ACTION_2005 = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue' #pylint: disable=invalid-name 

52SOAP_TRANSPORT_XPATH = 'soap12:binding' 

53SOAP_HTTP_TRANSPORT_VALUE = 'http://schemas.xmlsoap.org/soap/http' 

54 

55PORT_XPATH = 'wsdl:service/wsdl:port' 

56ADDRESS_XPATH = 'wsa10:EndpointReference/wsa10:Address' 

57 

58def _url_is_secure(endpoint_url): 

59 parsed = urlparse(endpoint_url) 

60 return parsed.scheme == 'https' 

61 

62class Mex(object): 

63 

64 def __init__(self, call_context, url): 

65 

66 self._log = log.Logger("MEX", call_context.get('log_context')) 

67 self._call_context = call_context 

68 self._url = url 

69 self._dom = None 

70 self._parents = None 

71 self._mex_doc = None 

72 self.username_password_policy = {} 

73 self._log.debug("Mex created with url: %(mex_url)s", 

74 {"mex_url": self._url}) 

75 

76 def discover(self): 

77 options = util.create_request_options(self, {'headers': {'Content-Type': 'application/soap+xml'}}) 

78 

79 try: 

80 operation = "Mex Get" 

81 resp = requests.get(self._url, headers=options['headers'], 

82 verify=self._call_context.get('verify_ssl', None), 

83 proxies=self._call_context.get('proxies', None)) 

84 util.log_return_correlation_id(self._log, operation, resp) 

85 except Exception: 

86 self._log.exception( 

87 "%(operation)s request failed", {"operation": operation}) 

88 raise 

89 

90 if resp.status_code == 429: 

91 resp.raise_for_status() # Will raise requests.exceptions.HTTPError 

92 if not util.is_http_success(resp.status_code): 

93 return_error_string = u"{} request returned http error: {}".format(operation, resp.status_code) 

94 error_response = "" 

95 if resp.text: 

96 return_error_string = u"{} and server response: {}".format(return_error_string, resp.text) 

97 try: 

98 error_response = resp.json() 

99 except ValueError: 

100 pass 

101 raise AdalError(return_error_string, error_response) 

102 else: 

103 try: 

104 self._mex_doc = resp.text 

105 #options = {'errorHandler':self._log.error} 

106 self._dom = ET.fromstring(self._mex_doc) 

107 self._parents = {c:p for p in self._dom.iter() for c in p} 

108 self._parse() 

109 except Exception: 

110 self._log.info('Failed to parse mex response in to DOM') 

111 raise 

112 

113 def _check_policy(self, policy_node): 

114 policy_id = policy_node.attrib["{{{}}}Id".format(XmlNamespaces.namespaces['wsu'])] 

115 

116 # Try with Transport Binding XPath 

117 transport_binding_nodes = xmlutil.xpath_find(policy_node, TRANSPORT_BINDING_XPATH) 

118 

119 # If unsuccessful, try again with 2005 XPath 

120 if not transport_binding_nodes: 

121 transport_binding_nodes = xmlutil.xpath_find(policy_node, TRANSPORT_BINDING_2005_XPATH) 

122 

123 # If we did not find any binding, this is potentially bad. 

124 if not transport_binding_nodes: 

125 self._log.debug( 

126 "Potential policy did not match required transport binding: %(policy_id)s", 

127 {"policy_id": policy_id}) 

128 else: 

129 self._log.debug("Found matching policy id: %(policy_id)s", 

130 {"policy_id": policy_id}) 

131 

132 return policy_id 

133 

134 def _select_username_password_polices(self, xpath): 

135 

136 policies = {} 

137 username_token_nodes = xmlutil.xpath_find(self._dom, xpath) 

138 if not username_token_nodes: 

139 self._log.warn("No username token policy nodes found.") 

140 return 

141 

142 for node in username_token_nodes: 

143 policy_node = self._parents[self._parents[self._parents[self._parents[self._parents[self._parents[self._parents[node]]]]]]] 

144 policy_id = self._check_policy(policy_node) 

145 if policy_id: 

146 id_ref = '#' + policy_id 

147 policies[id_ref] = {policy_id:id_ref} 

148 

149 return policies if policies else None 

150 

151 def _check_soap_action_and_transport(self, binding_node): 

152 

153 soap_action = "" 

154 soap_transport = "" 

155 name = binding_node.get('name') 

156 

157 soap_transport_attributes = "" 

158 soap_action_attributes = xmlutil.xpath_find(binding_node, SOAP_ACTION_XPATH)[0].attrib['soapAction'] 

159 

160 if soap_action_attributes: 

161 soap_action = soap_action_attributes 

162 soap_transport_attributes = xmlutil.xpath_find(binding_node, SOAP_TRANSPORT_XPATH)[0].attrib['transport'] 

163 

164 if soap_transport_attributes: 

165 soap_transport = soap_transport_attributes 

166 

167 if soap_transport == SOAP_HTTP_TRANSPORT_VALUE: 

168 if soap_action == RST_SOAP_ACTION_13: 

169 self._log.debug( 

170 'found binding matching Action and Transport: %(binding_node)s', 

171 {"binding_node": name}) 

172 return WSTrustVersion.WSTRUST13 

173 elif soap_action == RST_SOAP_ACTION_2005: 

174 self._log.debug( 

175 'found binding matching Action and Transport: %(binding_node)s', 

176 {"binding_node": name}) 

177 return WSTrustVersion.WSTRUST2005 

178 

179 self._log.debug( 

180 'binding node did not match soap Action or Transport: %(binding_node)s', 

181 {"binding_node": name}) 

182 return WSTrustVersion.UNDEFINED 

183 

184 def _get_matching_bindings(self, policies): 

185 

186 bindings = {} 

187 binding_policy_ref_nodes = xmlutil.xpath_find(self._dom, 'wsdl:binding/wsp:PolicyReference') 

188 

189 for node in binding_policy_ref_nodes: 

190 uri = node.get('URI') 

191 policy = policies.get(uri) 

192 if policy: 

193 binding_node = self._parents[node] 

194 binding_name = binding_node.get('name') 

195 

196 version = self._check_soap_action_and_transport(binding_node) 

197 if version != WSTrustVersion.UNDEFINED: 

198 bindings[binding_name] = { 

199 'url': uri, 

200 'version': version 

201 } 

202 

203 return bindings if bindings else None 

204 

205 def _get_ports_for_policy_bindings(self, bindings, policies): 

206 

207 port_nodes = xmlutil.xpath_find(self._dom, PORT_XPATH) 

208 if not port_nodes: 

209 self._log.warn("No ports found") 

210 

211 for node in port_nodes: 

212 binding_id = node.get('binding') 

213 binding_id = binding_id.split(':')[-1] 

214 

215 trust_policy = bindings.get(binding_id) 

216 if trust_policy: 

217 binding_policy = policies.get(trust_policy.get('url')) 

218 if binding_policy and not binding_policy.get('url', None): 

219 binding_policy['version'] = trust_policy['version'] 

220 address_node = node.find(ADDRESS_XPATH, XmlNamespaces.namespaces) 

221 if address_node is None: 

222 raise AdalError("No address nodes on port") 

223 

224 address = xmlutil.find_element_text(address_node) 

225 if _url_is_secure(address): 

226 binding_policy['url'] = address 

227 else: 

228 self._log.warn( 

229 "Skipping insecure endpoint: %(mex_endpoint)s", 

230 {"mex_endpoint": address}) 

231 

232 def _select_single_matching_policy(self, policies): 

233 

234 matching_policies = [p for p in policies.values() if p.get('url')] 

235 if not matching_policies: 

236 self._log.warn("No policies found with a url.") 

237 return 

238 

239 wstrust13_policy = None 

240 wstrust2005_policy = None 

241 for policy in matching_policies: 

242 version = policy.get('version', None) 

243 if version == WSTrustVersion.WSTRUST13: 

244 wstrust13_policy = policy 

245 elif version == WSTrustVersion.WSTRUST2005: 

246 wstrust2005_policy = policy 

247 

248 if wstrust13_policy is None and wstrust2005_policy is None: 

249 self._log.warn('No policies found for either wstrust13 or wstrust2005') 

250 

251 self.username_password_policy = wstrust13_policy or wstrust2005_policy 

252 

253 def _parse(self): 

254 policies = self._select_username_password_polices( 

255 'wsp:Policy/wsp:ExactlyOne/wsp:All/sp:SignedEncryptedSupportingTokens/wsp:Policy/sp:UsernameToken/wsp:Policy/sp:WssUsernameToken10') 

256 

257 xpath2005 = 'wsp:Policy/wsp:ExactlyOne/wsp:All/sp2005:SignedSupportingTokens/wsp:Policy/sp2005:UsernameToken/wsp:Policy/sp2005:WssUsernameToken10' 

258 if policies: 

259 policies2005 = self._select_username_password_polices(xpath2005) 

260 if policies2005: 

261 policies.update(policies2005) 

262 else: 

263 policies = self._select_username_password_polices(xpath2005) 

264 

265 if not policies: 

266 raise AdalError("No matching policies.") 

267 

268 

269 bindings = self._get_matching_bindings(policies) 

270 if not bindings: 

271 raise AdalError("No matching bindings.") 

272 

273 self._get_ports_for_policy_bindings(bindings, policies) 

274 self._select_single_matching_policy(policies) 

275 

276 if not self._url: 

277 raise AdalError("No ws-trust endpoints match requirements.")