Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/pipeline/policies/_utils.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

106 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import datetime 

27import email.utils 

28import urllib.parse 

29from typing import Optional, Set, cast, Union, Tuple 

30from urllib.parse import urlparse 

31 

32from azure.core.pipeline.transport import ( 

33 HttpResponse as LegacyHttpResponse, 

34 AsyncHttpResponse as LegacyAsyncHttpResponse, 

35 HttpRequest as LegacyHttpRequest, 

36) 

37from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest 

38 

39 

40from ...utils._utils import case_insensitive_dict, CaseInsensitiveSet 

41from .. import PipelineResponse 

42 

43 

44AllHttpResponseType = Union[HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse] 

45HTTPRequestType = Union[HttpRequest, LegacyHttpRequest] 

46 

47 

48def _parse_http_date(text: str) -> datetime.datetime: 

49 """Parse a HTTP date format into datetime. 

50 

51 :param str text: Text containing a date in HTTP format 

52 :rtype: datetime.datetime 

53 :return: The parsed datetime 

54 """ 

55 parsed_date = email.utils.parsedate_tz(text) 

56 if not parsed_date: 

57 raise ValueError("Invalid HTTP date") 

58 tz_offset = cast(int, parsed_date[9]) # Look at the code, tz_offset is always an int, at worst 0 

59 return datetime.datetime(*parsed_date[:6], tzinfo=datetime.timezone(datetime.timedelta(seconds=tz_offset))) 

60 

61 

62def parse_retry_after(retry_after: str) -> float: 

63 """Helper to parse Retry-After and get value in seconds. 

64 

65 :param str retry_after: Retry-After header 

66 :rtype: float 

67 :return: Value of Retry-After in seconds. 

68 """ 

69 delay: float # Using the Mypy recommendation to use float for "int or float" 

70 try: 

71 delay = float(retry_after) 

72 except ValueError: 

73 # Not an integer? Try HTTP date 

74 retry_date = _parse_http_date(retry_after) 

75 delay = (retry_date - datetime.datetime.now(retry_date.tzinfo)).total_seconds() 

76 return max(0, delay) 

77 

78 

79def get_retry_after(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[float]: 

80 """Get the value of Retry-After in seconds. 

81 

82 :param response: The PipelineResponse object 

83 :type response: ~azure.core.pipeline.PipelineResponse 

84 :return: Value of Retry-After in seconds. 

85 :rtype: float or None 

86 """ 

87 headers = case_insensitive_dict(response.http_response.headers) 

88 retry_after = headers.get("retry-after") 

89 if retry_after: 

90 return parse_retry_after(retry_after) 

91 for ms_header in ["retry-after-ms", "x-ms-retry-after-ms"]: 

92 retry_after = headers.get(ms_header) 

93 if retry_after: 

94 parsed_retry_after = parse_retry_after(retry_after) 

95 return parsed_retry_after / 1000.0 

96 return None 

97 

98 

99def get_domain(url: str) -> str: 

100 """Get the domain of an url. 

101 

102 :param str url: The url. 

103 :rtype: str 

104 :return: The domain of the url. 

105 """ 

106 return str(urlparse(url).netloc).lower() 

107 

108 

109def get_challenge_parameter(headers, challenge_scheme: str, challenge_parameter: str) -> Optional[str]: 

110 """ 

111 Parses the specified parameter from a challenge header found in the response. 

112 

113 :param dict[str, str] headers: The response headers to parse. 

114 :param str challenge_scheme: The challenge scheme containing the challenge parameter, e.g., "Bearer". 

115 :param str challenge_parameter: The parameter key name to search for. 

116 :return: The value of the parameter name if found. 

117 :rtype: str or None 

118 """ 

119 header_value = headers.get("WWW-Authenticate") 

120 if not header_value: 

121 return None 

122 

123 scheme = challenge_scheme 

124 parameter = challenge_parameter 

125 header_span = header_value 

126 

127 # Iterate through each challenge value. 

128 while True: 

129 challenge = get_next_challenge(header_span) 

130 if not challenge: 

131 break 

132 challenge_key, header_span = challenge 

133 if challenge_key.lower() != scheme.lower(): 

134 continue 

135 # Enumerate each key-value parameter until we find the parameter key on the specified scheme challenge. 

136 while True: 

137 parameters = get_next_parameter(header_span) 

138 if not parameters: 

139 break 

140 key, value, header_span = parameters 

141 if key.lower() == parameter.lower(): 

142 return value 

143 

144 return None 

145 

146 

147def get_next_challenge(header_value: str) -> Optional[Tuple[str, str]]: 

148 """ 

149 Iterates through the challenge schemes present in a challenge header. 

150 

151 :param str header_value: The header value which will be sliced to remove the first parsed challenge key. 

152 :return: The parsed challenge scheme and the remaining header value. 

153 :rtype: tuple[str, str] or None 

154 """ 

155 header_value = header_value.lstrip(" ") 

156 end_of_challenge_key = header_value.find(" ") 

157 

158 if end_of_challenge_key < 0: 

159 return None 

160 

161 challenge_key = header_value[:end_of_challenge_key] 

162 header_value = header_value[end_of_challenge_key + 1 :] 

163 

164 return challenge_key, header_value 

165 

166 

167def get_next_parameter(header_value: str, separator: str = "=") -> Optional[Tuple[str, str, str]]: 

168 """ 

169 Iterates through a challenge header value to extract key-value parameters. 

170 

171 :param str header_value: The header value after being parsed by get_next_challenge. 

172 :param str separator: The challenge parameter key-value pair separator, default is '='. 

173 :return: The next available challenge parameter as a tuple (param_key, param_value, remaining header_value). 

174 :rtype: tuple[str, str, str] or None 

175 """ 

176 space_or_comma = " ," 

177 header_value = header_value.lstrip(space_or_comma) 

178 

179 next_space = header_value.find(" ") 

180 next_separator = header_value.find(separator) 

181 

182 if next_space < next_separator and next_space != -1: 

183 return None 

184 

185 if next_separator < 0: 

186 return None 

187 

188 param_key = header_value[:next_separator].strip() 

189 header_value = header_value[next_separator + 1 :] 

190 

191 quote_index = header_value.find('"') 

192 

193 if quote_index >= 0: 

194 header_value = header_value[quote_index + 1 :] 

195 param_value = header_value[: header_value.find('"')] 

196 else: 

197 trailing_delimiter_index = header_value.find(" ") 

198 if trailing_delimiter_index >= 0: 

199 param_value = header_value[:trailing_delimiter_index] 

200 else: 

201 param_value = header_value 

202 

203 if header_value != param_value: 

204 header_value = header_value[len(param_value) + 1 :] 

205 

206 return param_key, param_value, header_value 

207 

208 

209def sanitize_url(url: str, allowed_query_params: Set[str], redacted_placeholder: str = "REDACTED") -> str: 

210 """Redact query parameter values not in the allowlist. 

211 

212 :param str url: The URL to sanitize. 

213 :param set[str] allowed_query_params: Set of query parameter names whose values should not be redacted. 

214 If a :class:`~azure.core.utils._utils.CaseInsensitiveSet` is provided, lookups are case-insensitive 

215 without per-call normalization. 

216 :param str redacted_placeholder: The placeholder to use for redacted values. 

217 :return: The sanitized URL with redacted query parameter values. 

218 :rtype: str 

219 """ 

220 parsed_url = urllib.parse.urlparse(url) 

221 if not parsed_url.query: 

222 return url 

223 

224 # Operate on the raw query string to preserve original percent-encoding. 

225 if not isinstance(allowed_query_params, CaseInsensitiveSet): 

226 allowed_query_params = CaseInsensitiveSet(allowed_query_params) 

227 

228 parts = [] 

229 for param in parsed_url.query.split("&"): 

230 eq_idx = param.find("=") 

231 if eq_idx == -1: 

232 # No value to redact, keep as-is. 

233 parts.append(param) 

234 else: 

235 key = param[:eq_idx] 

236 parts.append(param if key in allowed_query_params else f"{key}={redacted_placeholder}") 

237 

238 sanitized_query = "&".join(parts) 

239 sanitized_url = parsed_url._replace(query=sanitized_query) 

240 return urllib.parse.urlunparse(sanitized_url)