1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26import datetime
27import email.utils
28from typing import Optional, cast, Union, Tuple
29from urllib.parse import urlparse
30
31from azure.core.pipeline.transport import (
32 HttpResponse as LegacyHttpResponse,
33 AsyncHttpResponse as LegacyAsyncHttpResponse,
34 HttpRequest as LegacyHttpRequest,
35)
36from azure.core.rest import HttpResponse, AsyncHttpResponse, HttpRequest
37
38
39from ...utils._utils import _FixedOffset, case_insensitive_dict
40from .. import PipelineResponse
41
42AllHttpResponseType = Union[HttpResponse, LegacyHttpResponse, AsyncHttpResponse, LegacyAsyncHttpResponse]
43HTTPRequestType = Union[HttpRequest, LegacyHttpRequest]
44
45
46def _parse_http_date(text: str) -> datetime.datetime:
47 """Parse a HTTP date format into datetime.
48
49 :param str text: Text containing a date in HTTP format
50 :rtype: datetime.datetime
51 :return: The parsed datetime
52 """
53 parsed_date = email.utils.parsedate_tz(text)
54 if not parsed_date:
55 raise ValueError("Invalid HTTP date")
56 tz_offset = cast(int, parsed_date[9]) # Look at the code, tz_offset is always an int, at worst 0
57 return datetime.datetime(*parsed_date[:6], tzinfo=_FixedOffset(tz_offset / 60))
58
59
60def parse_retry_after(retry_after: str) -> float:
61 """Helper to parse Retry-After and get value in seconds.
62
63 :param str retry_after: Retry-After header
64 :rtype: float
65 :return: Value of Retry-After in seconds.
66 """
67 delay: float # Using the Mypy recommendation to use float for "int or float"
68 try:
69 delay = float(retry_after)
70 except ValueError:
71 # Not an integer? Try HTTP date
72 retry_date = _parse_http_date(retry_after)
73 delay = (retry_date - datetime.datetime.now(retry_date.tzinfo)).total_seconds()
74 return max(0, delay)
75
76
77def get_retry_after(response: PipelineResponse[HTTPRequestType, AllHttpResponseType]) -> Optional[float]:
78 """Get the value of Retry-After in seconds.
79
80 :param response: The PipelineResponse object
81 :type response: ~azure.core.pipeline.PipelineResponse
82 :return: Value of Retry-After in seconds.
83 :rtype: float or None
84 """
85 headers = case_insensitive_dict(response.http_response.headers)
86 retry_after = headers.get("retry-after")
87 if retry_after:
88 return parse_retry_after(retry_after)
89 for ms_header in ["retry-after-ms", "x-ms-retry-after-ms"]:
90 retry_after = headers.get(ms_header)
91 if retry_after:
92 parsed_retry_after = parse_retry_after(retry_after)
93 return parsed_retry_after / 1000.0
94 return None
95
96
97def get_domain(url: str) -> str:
98 """Get the domain of an url.
99
100 :param str url: The url.
101 :rtype: str
102 :return: The domain of the url.
103 """
104 return str(urlparse(url).netloc).lower()
105
106
107def get_challenge_parameter(headers, challenge_scheme: str, challenge_parameter: str) -> Optional[str]:
108 """
109 Parses the specified parameter from a challenge header found in the response.
110
111 :param dict[str, str] headers: The response headers to parse.
112 :param str challenge_scheme: The challenge scheme containing the challenge parameter, e.g., "Bearer".
113 :param str challenge_parameter: The parameter key name to search for.
114 :return: The value of the parameter name if found.
115 :rtype: str or None
116 """
117 header_value = headers.get("WWW-Authenticate")
118 if not header_value:
119 return None
120
121 scheme = challenge_scheme
122 parameter = challenge_parameter
123 header_span = header_value
124
125 # Iterate through each challenge value.
126 while True:
127 challenge = get_next_challenge(header_span)
128 if not challenge:
129 break
130 challenge_key, header_span = challenge
131 if challenge_key.lower() != scheme.lower():
132 continue
133 # Enumerate each key-value parameter until we find the parameter key on the specified scheme challenge.
134 while True:
135 parameters = get_next_parameter(header_span)
136 if not parameters:
137 break
138 key, value, header_span = parameters
139 if key.lower() == parameter.lower():
140 return value
141
142 return None
143
144
145def get_next_challenge(header_value: str) -> Optional[Tuple[str, str]]:
146 """
147 Iterates through the challenge schemes present in a challenge header.
148
149 :param str header_value: The header value which will be sliced to remove the first parsed challenge key.
150 :return: The parsed challenge scheme and the remaining header value.
151 :rtype: tuple[str, str] or None
152 """
153 header_value = header_value.lstrip(" ")
154 end_of_challenge_key = header_value.find(" ")
155
156 if end_of_challenge_key < 0:
157 return None
158
159 challenge_key = header_value[:end_of_challenge_key]
160 header_value = header_value[end_of_challenge_key + 1 :]
161
162 return challenge_key, header_value
163
164
165def get_next_parameter(header_value: str, separator: str = "=") -> Optional[Tuple[str, str, str]]:
166 """
167 Iterates through a challenge header value to extract key-value parameters.
168
169 :param str header_value: The header value after being parsed by get_next_challenge.
170 :param str separator: The challenge parameter key-value pair separator, default is '='.
171 :return: The next available challenge parameter as a tuple (param_key, param_value, remaining header_value).
172 :rtype: tuple[str, str, str] or None
173 """
174 space_or_comma = " ,"
175 header_value = header_value.lstrip(space_or_comma)
176
177 next_space = header_value.find(" ")
178 next_separator = header_value.find(separator)
179
180 if next_space < next_separator and next_space != -1:
181 return None
182
183 if next_separator < 0:
184 return None
185
186 param_key = header_value[:next_separator].strip()
187 header_value = header_value[next_separator + 1 :]
188
189 quote_index = header_value.find('"')
190
191 if quote_index >= 0:
192 header_value = header_value[quote_index + 1 :]
193 param_value = header_value[: header_value.find('"')]
194 else:
195 trailing_delimiter_index = header_value.find(" ")
196 if trailing_delimiter_index >= 0:
197 param_value = header_value[:trailing_delimiter_index]
198 else:
199 param_value = header_value
200
201 if header_value != param_value:
202 header_value = header_value[len(param_value) + 1 :]
203
204 return param_key, param_value, header_value