Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/adal/wstrust_response.py: 14%

111 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1#------------------------------------------------------------------------------ 

2# 

3# Copyright (c) Microsoft Corporation.  

4# All rights reserved. 

5#  

6# This code is licensed under the MIT License. 

7#  

8# Permission is hereby granted, free of charge, to any person obtaining a copy 

9# of this software and associated documentation files(the "Software"), to deal 

10# in the Software without restriction, including without limitation the rights 

11# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell 

12# copies of the Software, and to permit persons to whom the Software is 

13# furnished to do so, subject to the following conditions : 

14#  

15# The above copyright notice and this permission notice shall be included in 

16# all copies or substantial portions of the Software. 

17#  

18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE 

21# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

23# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 

24# THE SOFTWARE. 

25# 

26#------------------------------------------------------------------------------ 

27 

28try: 

29 from xml.etree import cElementTree as ET 

30except ImportError: 

31 from xml.etree import ElementTree as ET 

32import re 

33 

34from . import xmlutil 

35from . import log 

36from .adal_error import AdalError 

37from .constants import WSTrustVersion 

38 

39# Creates a log message that contains the RSTR scrubbed of the actual SAML assertion. 

40def scrub_rstr_log_message(response_str): 

41 # A regular expression for finding the SAML Assertion in an response_str. Used to remove the SAML 

42 # assertion when logging the response_str. 

43 assertion_regex = r'RequestedSecurityToken.*?((<.*?:Assertion.*?>).*<\/.*?Assertion>).*?' 

44 single_line_rstr, _ = re.subn(r'(\r\n|\n|\r)', '', response_str) 

45 

46 match = re.search(assertion_regex, single_line_rstr) 

47 if not match: 

48 #No Assertion was matched so just return the response_str as is. 

49 scrubbed_rstr = single_line_rstr 

50 else: 

51 saml_assertion = match.group(1) 

52 saml_assertion_start_tag = match.group(2) 

53 scrubbed_rstr = single_line_rstr.replace( 

54 saml_assertion, saml_assertion_start_tag + 'ASSERTION CONTENTS REDACTED</saml:Assertion>') 

55 

56 return 'RSTR Response: ' + scrubbed_rstr 

57 

58def findall_content(xml_string, tag): 

59 """ 

60 Given a tag name without any prefix, 

61 this function returns a list of the raw content inside this tag as-is. 

62 

63 >>> findall_content("<ns0:foo> what <bar> ever </bar> content </ns0:foo>", "foo") 

64 [" what <bar> ever </bar> content "] 

65 

66 Motivation: 

67 

68 Usually we would use XML parser to extract the data by xpath. 

69 However the ElementTree in Python will implicitly normalize the output 

70 by "hoisting" the inner inline namespaces into the outmost element. 

71 The result will be a semantically equivalent XML snippet, 

72 but not fully identical to the original one. 

73 While this effect shouldn't become a problem in all other cases, 

74 it does not seem to fully comply with Exclusive XML Canonicalization spec 

75 (https://www.w3.org/TR/xml-exc-c14n/), and void the SAML token signature. 

76 SAML signature algo needs the "XML -> C14N(XML) -> Signed(C14N(Xml))" order. 

77 

78 The binary extension lxml is probably the canonical way to solve this 

79 (https://stackoverflow.com/questions/22959577/python-exclusive-xml-canonicalization-xml-exc-c14n) 

80 but here we use this workaround, based on Regex, to return raw content as-is. 

81 """ 

82 # \w+ is good enough for https://www.w3.org/TR/REC-xml/#NT-NameChar 

83 pattern = r"<(?:\w+:)?%(tag)s(?:[^>]*)>(.*)</(?:\w+:)?%(tag)s" % {"tag": tag} 

84 return re.findall(pattern, xml_string, re.DOTALL) 

85 

86 

87class WSTrustResponse(object): 

88 

89 def __init__(self, call_context, response, wstrust_version): 

90 

91 self._log = log.Logger("WSTrustResponse", call_context['log_context']) 

92 self._call_context = call_context 

93 self._response = response 

94 self._dom = None 

95 self._parents = None 

96 self.error_code = None 

97 self.fault_message = None 

98 self.token_type = None 

99 self.token = None 

100 self._wstrust_version = wstrust_version 

101 

102 if response: 

103 self._log.debug(scrub_rstr_log_message(response)) 

104 

105 # Sample error message 

106 #<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://www.w3.org/2005/08/addressing" xmlns:u="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"> 

107 # <s:Header> 

108 # <a:Action s:mustUnderstand="1">http://www.w3.org/2005/08/addressing/soap/fault</a:Action> 

109 # - <o:Security s:mustUnderstand="1" xmlns:o="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"> 

110 # <u:Timestamp u:Id="_0"> 

111 # <u:Created>2013-07-30T00:32:21.989Z</u:Created> 

112 # <u:Expires>2013-07-30T00:37:21.989Z</u:Expires> 

113 # </u:Timestamp> 

114 # </o:Security> 

115 # </s:Header> 

116 # <s:Body> 

117 # <s:Fault> 

118 # <s:Code> 

119 # <s:Value>s:Sender</s:Value> 

120 # <s:Subcode> 

121 # <s:Value xmlns:a="http://docs.oasis-open.org/ws-sx/ws-trust/200512">a:RequestFailed</s:Value> 

122 # </s:Subcode> 

123 # </s:Code> 

124 # <s:Reason> 

125 # <s:Text xml:lang="en-US">MSIS3127: The specified request failed.</s:Text> 

126 # </s:Reason> 

127 # </s:Fault> 

128 # </s:Body> 

129 #</s:Envelope> 

130 

131 def _parse_error(self): 

132 

133 error_found = False 

134 

135 fault_node = xmlutil.xpath_find(self._dom, 's:Body/s:Fault/s:Reason/s:Text') 

136 if fault_node: 

137 self.fault_message = fault_node[0].text 

138 

139 if self.fault_message: 

140 error_found = True 

141 

142 # Subcode has minoccurs=0 and maxoccurs=1(default) according to the http://www.w3.org/2003/05/soap-envelope 

143 # Subcode may have another subcode as well. This is only targeting at top level subcode. 

144 # Subcode value may have different messages not always uses http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd. 

145 # text inside the value is not possible to select without prefix, so substring is necessary 

146 subnode = xmlutil.xpath_find(self._dom, 's:Body/s:Fault/s:Code/s:Subcode/s:Value') 

147 if len(subnode) > 1: 

148 raise AdalError("Found too many fault code values: {}".format(len(subnode))) 

149 

150 if subnode: 

151 error_code = subnode[0].text 

152 self.error_code = error_code.split(':')[1] 

153 

154 return error_found 

155 

156 def _parse_token(self): 

157 if self._wstrust_version == WSTrustVersion.WSTRUST2005: 

158 token_type_nodes_xpath = 's:Body/t:RequestSecurityTokenResponse/t:TokenType' 

159 security_token_xpath = 't:RequestedSecurityToken' 

160 else: 

161 token_type_nodes_xpath = 's:Body/wst:RequestSecurityTokenResponseCollection/wst:RequestSecurityTokenResponse/wst:TokenType' 

162 security_token_xpath = 'wst:RequestedSecurityToken' 

163 

164 token_type_nodes = xmlutil.xpath_find(self._dom, token_type_nodes_xpath) 

165 if not token_type_nodes: 

166 raise AdalError("No TokenType nodes found in RSTR") 

167 

168 for node in token_type_nodes: 

169 if self.token: 

170 self._log.warn("Found more than one returned token. Using the first.") 

171 break 

172 

173 token_type = xmlutil.find_element_text(node) 

174 if not token_type: 

175 self._log.warn("Could not find token type in RSTR token.") 

176 

177 requested_token_node = xmlutil.xpath_find(self._parents[node], security_token_xpath) 

178 if len(requested_token_node) > 1: 

179 raise AdalError("Found too many RequestedSecurityToken nodes for token type: {}".format(token_type)) 

180 

181 if not requested_token_node: 

182 self._log.warn( 

183 "Unable to find RequestsSecurityToken element associated with TokenType element: %(token_type)s", 

184 {"token_type": token_type}) 

185 continue 

186 

187 # Adjust namespaces (without this they are autogenerated) so this is understood 

188 # by the receiver. Then make a string repr of the element tree node. 

189 # See also http://blog.tomhennigan.co.uk/post/46945128556/elementtree-and-xmlns 

190 ET.register_namespace('saml', 'urn:oasis:names:tc:SAML:1.0:assertion') 

191 ET.register_namespace('ds', 'http://www.w3.org/2000/09/xmldsig#') 

192 

193 token = ET.tostring(requested_token_node[0][0]) 

194 

195 if token is None: 

196 self._log.warn( 

197 "Unable to find token associated with TokenType element: %(token_type)s", 

198 {"token_type": token_type}) 

199 continue 

200 

201 self.token = token 

202 self.token_type = token_type 

203 

204 self._log.info( 

205 "Found token of type: %(token_type)s", 

206 {"token_type": self.token_type}) 

207 

208 if self.token is None: 

209 raise AdalError("Unable to find any tokens in RSTR.") 

210 

211 @staticmethod 

212 def _parse_token_by_re(raw_response): 

213 for rstr in findall_content(raw_response, "RequestSecurityTokenResponse"): 

214 token_types = findall_content(rstr, "TokenType") 

215 tokens = findall_content(rstr, "RequestedSecurityToken") 

216 if token_types and tokens: 

217 # Historically, we use "us-ascii" encoding, but it should be "utf-8" 

218 # https://stackoverflow.com/questions/36658000/what-is-encoding-used-for-saml-conversations 

219 return tokens[0].encode('utf-8'), token_types[0] 

220 

221 

222 def parse(self): 

223 if not self._response: 

224 raise AdalError("Received empty RSTR response body.") 

225 

226 try: 

227 self._dom = ET.fromstring(self._response) 

228 except Exception as exp: 

229 raise AdalError('Failed to parse RSTR in to DOM', exp) 

230 

231 try: 

232 self._parents = {c:p for p in self._dom.iter() for c in p} 

233 error_found = self._parse_error() 

234 if error_found: 

235 str_error_code = self.error_code or 'NONE' 

236 str_fault_message = self.fault_message or 'NONE' 

237 error_template = 'Server returned error in RSTR - ErrorCode: {} : FaultMessage: {}' 

238 raise AdalError(error_template.format(str_error_code, str_fault_message)) 

239 

240 token_found = self._parse_token_by_re(self._response) 

241 if token_found: 

242 self.token, self.token_type = token_found 

243 else: # fallback to old logic 

244 self._parse_token() 

245 finally: 

246 self._dom = None 

247 self._parents = None 

248