Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/pipeline/policies/_utils.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26import datetime 

27import email.utils 

28from typing import Optional, cast, Union, Tuple 

29from urllib.parse import urlparse 

30 

31from azure.core.pipeline.transport import ( 

32 HttpResponse as LegacyHttpResponse, 

33 AsyncHttpResponse as LegacyAsyncHttpResponse, 

34 HttpRequest as LegacyHttpRequest, 

35) 

36from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest 

37 

38 

39from ...utils._utils import _FixedOffset, case_insensitive_dict 

40from .. import PipelineResponse 

41 

42AllHttpResponseType = Union[HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse] 

43HTTPRequestType = Union[HttpRequest, LegacyHttpRequest] 

44 

45 

46def _parse_http_date(text: str) -> datetime.datetime: 

47 """Parse a HTTP date format into datetime. 

48 

49 :param str text: Text containing a date in HTTP format 

50 :rtype: datetime.datetime 

51 :return: The parsed datetime 

52 """ 

53 parsed_date = email.utils.parsedate_tz(text) 

54 if not parsed_date: 

55 raise ValueError("Invalid HTTP date") 

56 tz_offset = cast(int, parsed_date[9]) # Look at the code, tz_offset is always an int, at worst 0 

57 return datetime.datetime(*parsed_date[:6], tzinfo=_FixedOffset(tz_offset / 60)) 

58 

59 

60def parse_retry_after(retry_after: str) -> float: 

61 """Helper to parse Retry-After and get value in seconds. 

62 

63 :param str retry_after: Retry-After header 

64 :rtype: float 

65 :return: Value of Retry-After in seconds. 

66 """ 

67 delay: float # Using the Mypy recommendation to use float for "int or float" 

68 try: 

69 delay = float(retry_after) 

70 except ValueError: 

71 # Not an integer? Try HTTP date 

72 retry_date = _parse_http_date(retry_after) 

73 delay = (retry_date - datetime.datetime.now(retry_date.tzinfo)).total_seconds() 

74 return max(0, delay) 

75 

76 

77def get_retry_after(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[float]: 

78 """Get the value of Retry-After in seconds. 

79 

80 :param response: The PipelineResponse object 

81 :type response: ~azure.core.pipeline.PipelineResponse 

82 :return: Value of Retry-After in seconds. 

83 :rtype: float or None 

84 """ 

85 headers = case_insensitive_dict(response.http_response.headers) 

86 retry_after = headers.get("retry-after") 

87 if retry_after: 

88 return parse_retry_after(retry_after) 

89 for ms_header in ["retry-after-ms", "x-ms-retry-after-ms"]: 

90 retry_after = headers.get(ms_header) 

91 if retry_after: 

92 parsed_retry_after = parse_retry_after(retry_after) 

93 return parsed_retry_after / 1000.0 

94 return None 

95 

96 

97def get_domain(url: str) -> str: 

98 """Get the domain of an url. 

99 

100 :param str url: The url. 

101 :rtype: str 

102 :return: The domain of the url. 

103 """ 

104 return str(urlparse(url).netloc).lower() 

105 

106 

107def get_challenge_parameter(headers, challenge_scheme: str, challenge_parameter: str) -> Optional[str]: 

108 """ 

109 Parses the specified parameter from a challenge header found in the response. 

110 

111 :param dict[str, str] headers: The response headers to parse. 

112 :param str challenge_scheme: The challenge scheme containing the challenge parameter, e.g., "Bearer". 

113 :param str challenge_parameter: The parameter key name to search for. 

114 :return: The value of the parameter name if found. 

115 :rtype: str or None 

116 """ 

117 header_value = headers.get("WWW-Authenticate") 

118 if not header_value: 

119 return None 

120 

121 scheme = challenge_scheme 

122 parameter = challenge_parameter 

123 header_span = header_value 

124 

125 # Iterate through each challenge value. 

126 while True: 

127 challenge = get_next_challenge(header_span) 

128 if not challenge: 

129 break 

130 challenge_key, header_span = challenge 

131 if challenge_key.lower() != scheme.lower(): 

132 continue 

133 # Enumerate each key-value parameter until we find the parameter key on the specified scheme challenge. 

134 while True: 

135 parameters = get_next_parameter(header_span) 

136 if not parameters: 

137 break 

138 key, value, header_span = parameters 

139 if key.lower() == parameter.lower(): 

140 return value 

141 

142 return None 

143 

144 

145def get_next_challenge(header_value: str) -> Optional[Tuple[str, str]]: 

146 """ 

147 Iterates through the challenge schemes present in a challenge header. 

148 

149 :param str header_value: The header value which will be sliced to remove the first parsed challenge key. 

150 :return: The parsed challenge scheme and the remaining header value. 

151 :rtype: tuple[str, str] or None 

152 """ 

153 header_value = header_value.lstrip(" ") 

154 end_of_challenge_key = header_value.find(" ") 

155 

156 if end_of_challenge_key < 0: 

157 return None 

158 

159 challenge_key = header_value[:end_of_challenge_key] 

160 header_value = header_value[end_of_challenge_key + 1 :] 

161 

162 return challenge_key, header_value 

163 

164 

165def get_next_parameter(header_value: str, separator: str = "=") -> Optional[Tuple[str, str, str]]: 

166 """ 

167 Iterates through a challenge header value to extract key-value parameters. 

168 

169 :param str header_value: The header value after being parsed by get_next_challenge. 

170 :param str separator: The challenge parameter key-value pair separator, default is '='. 

171 :return: The next available challenge parameter as a tuple (param_key, param_value, remaining header_value). 

172 :rtype: tuple[str, str, str] or None 

173 """ 

174 space_or_comma = " ," 

175 header_value = header_value.lstrip(space_or_comma) 

176 

177 next_space = header_value.find(" ") 

178 next_separator = header_value.find(separator) 

179 

180 if next_space < next_separator and next_space != -1: 

181 return None 

182 

183 if next_separator < 0: 

184 return None 

185 

186 param_key = header_value[:next_separator].strip() 

187 header_value = header_value[next_separator + 1 :] 

188 

189 quote_index = header_value.find('"') 

190 

191 if quote_index >= 0: 

192 header_value = header_value[quote_index + 1 :] 

193 param_value = header_value[: header_value.find('"')] 

194 else: 

195 trailing_delimiter_index = header_value.find(" ") 

196 if trailing_delimiter_index >= 0: 

197 param_value = header_value[:trailing_delimiter_index] 

198 else: 

199 param_value = header_value 

200 

201 if header_value != param_value: 

202 header_value = header_value[len(param_value) + 1 :] 

203 

204 return param_key, param_value, header_value