1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import datetime
27import email.utils
28import urllib.parse
29from typing import Optional, Set, cast, Union, Tuple
30from urllib.parse import urlparse
31
32from azure.core.pipeline.transport import (
33 HttpResponse as LegacyHttpResponse,
34 AsyncHttpResponse as LegacyAsyncHttpResponse,
35 HttpRequest as LegacyHttpRequest,
36)
37from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest
38
39
40from ...utils._utils import case_insensitive_dict, CaseInsensitiveSet
41from .. import PipelineResponse
42
43
44AllHttpResponseType = Union[HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse]
45HTTPRequestType = Union[HttpRequest, LegacyHttpRequest]
46
47
48def _parse_http_date(text: str) -> datetime.datetime:
49 """Parse a HTTP date format into datetime.
50
51 :param str text: Text containing a date in HTTP format
52 :rtype: datetime.datetime
53 :return: The parsed datetime
54 """
55 parsed_date = email.utils.parsedate_tz(text)
56 if not parsed_date:
57 raise ValueError("Invalid HTTP date")
58 tz_offset = cast(int, parsed_date[9]) # Look at the code, tz_offset is always an int, at worst 0
59 return datetime.datetime(*parsed_date[:6], tzinfo=datetime.timezone(datetime.timedelta(seconds=tz_offset)))
60
61
62def parse_retry_after(retry_after: str) -> float:
63 """Helper to parse Retry-After and get value in seconds.
64
65 :param str retry_after: Retry-After header
66 :rtype: float
67 :return: Value of Retry-After in seconds.
68 """
69 delay: float # Using the Mypy recommendation to use float for "int or float"
70 try:
71 delay = float(retry_after)
72 except ValueError:
73 # Not an integer? Try HTTP date
74 retry_date = _parse_http_date(retry_after)
75 delay = (retry_date - datetime.datetime.now(retry_date.tzinfo)).total_seconds()
76 return max(0, delay)
77
78
79def get_retry_after(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[float]:
80 """Get the value of Retry-After in seconds.
81
82 :param response: The PipelineResponse object
83 :type response: ~azure.core.pipeline.PipelineResponse
84 :return: Value of Retry-After in seconds.
85 :rtype: float or None
86 """
87 headers = case_insensitive_dict(response.http_response.headers)
88 retry_after = headers.get("retry-after")
89 if retry_after:
90 return parse_retry_after(retry_after)
91 for ms_header in ["retry-after-ms", "x-ms-retry-after-ms"]:
92 retry_after = headers.get(ms_header)
93 if retry_after:
94 parsed_retry_after = parse_retry_after(retry_after)
95 return parsed_retry_after / 1000.0
96 return None
97
98
99def get_domain(url: str) -> str:
100 """Get the domain of an url.
101
102 :param str url: The url.
103 :rtype: str
104 :return: The domain of the url.
105 """
106 return str(urlparse(url).netloc).lower()
107
108
109def get_challenge_parameter(headers, challenge_scheme: str, challenge_parameter: str) -> Optional[str]:
110 """
111 Parses the specified parameter from a challenge header found in the response.
112
113 :param dict[str, str] headers: The response headers to parse.
114 :param str challenge_scheme: The challenge scheme containing the challenge parameter, e.g., "Bearer".
115 :param str challenge_parameter: The parameter key name to search for.
116 :return: The value of the parameter name if found.
117 :rtype: str or None
118 """
119 header_value = headers.get("WWW-Authenticate")
120 if not header_value:
121 return None
122
123 scheme = challenge_scheme
124 parameter = challenge_parameter
125 header_span = header_value
126
127 # Iterate through each challenge value.
128 while True:
129 challenge = get_next_challenge(header_span)
130 if not challenge:
131 break
132 challenge_key, header_span = challenge
133 if challenge_key.lower() != scheme.lower():
134 continue
135 # Enumerate each key-value parameter until we find the parameter key on the specified scheme challenge.
136 while True:
137 parameters = get_next_parameter(header_span)
138 if not parameters:
139 break
140 key, value, header_span = parameters
141 if key.lower() == parameter.lower():
142 return value
143
144 return None
145
146
147def get_next_challenge(header_value: str) -> Optional[Tuple[str, str]]:
148 """
149 Iterates through the challenge schemes present in a challenge header.
150
151 :param str header_value: The header value which will be sliced to remove the first parsed challenge key.
152 :return: The parsed challenge scheme and the remaining header value.
153 :rtype: tuple[str, str] or None
154 """
155 header_value = header_value.lstrip(" ")
156 end_of_challenge_key = header_value.find(" ")
157
158 if end_of_challenge_key < 0:
159 return None
160
161 challenge_key = header_value[:end_of_challenge_key]
162 header_value = header_value[end_of_challenge_key + 1 :]
163
164 return challenge_key, header_value
165
166
167def get_next_parameter(header_value: str, separator: str = "=") -> Optional[Tuple[str, str, str]]:
168 """
169 Iterates through a challenge header value to extract key-value parameters.
170
171 :param str header_value: The header value after being parsed by get_next_challenge.
172 :param str separator: The challenge parameter key-value pair separator, default is '='.
173 :return: The next available challenge parameter as a tuple (param_key, param_value, remaining header_value).
174 :rtype: tuple[str, str, str] or None
175 """
176 space_or_comma = " ,"
177 header_value = header_value.lstrip(space_or_comma)
178
179 next_space = header_value.find(" ")
180 next_separator = header_value.find(separator)
181
182 if next_space < next_separator and next_space != -1:
183 return None
184
185 if next_separator < 0:
186 return None
187
188 param_key = header_value[:next_separator].strip()
189 header_value = header_value[next_separator + 1 :]
190
191 quote_index = header_value.find('"')
192
193 if quote_index >= 0:
194 header_value = header_value[quote_index + 1 :]
195 param_value = header_value[: header_value.find('"')]
196 else:
197 trailing_delimiter_index = header_value.find(" ")
198 if trailing_delimiter_index >= 0:
199 param_value = header_value[:trailing_delimiter_index]
200 else:
201 param_value = header_value
202
203 if header_value != param_value:
204 header_value = header_value[len(param_value) + 1 :]
205
206 return param_key, param_value, header_value
207
208
209def sanitize_url(url: str, allowed_query_params: Set[str], redacted_placeholder: str = "REDACTED") -> str:
210 """Redact query parameter values not in the allowlist.
211
212 :param str url: The URL to sanitize.
213 :param set[str] allowed_query_params: Set of query parameter names whose values should not be redacted.
214 If a :class:`~azure.core.utils._utils.CaseInsensitiveSet` is provided, lookups are case-insensitive
215 without per-call normalization.
216 :param str redacted_placeholder: The placeholder to use for redacted values.
217 :return: The sanitized URL with redacted query parameter values.
218 :rtype: str
219 """
220 parsed_url = urllib.parse.urlparse(url)
221 if not parsed_url.query:
222 return url
223
224 # Operate on the raw query string to preserve original percent-encoding.
225 if not isinstance(allowed_query_params, CaseInsensitiveSet):
226 allowed_query_params = CaseInsensitiveSet(allowed_query_params)
227
228 parts = []
229 for param in parsed_url.query.split("&"):
230 eq_idx = param.find("=")
231 if eq_idx == -1:
232 # No value to redact, keep as-is.
233 parts.append(param)
234 else:
235 key = param[:eq_idx]
236 parts.append(param if key in allowed_query_params else f"{key}={redacted_placeholder}")
237
238 sanitized_query = "&".join(parts)
239 sanitized_url = parsed_url._replace(query=sanitized_query)
240 return urllib.parse.urlunparse(sanitized_url)