1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from __future__ import annotations
27import copy
28import codecs
29import email.message
30from json import dumps
31from typing import (
32 Optional,
33 Union,
34 Mapping,
35 Sequence,
36 Tuple,
37 IO,
38 Any,
39 Iterable,
40 MutableMapping,
41 AsyncIterable,
42 cast,
43 Dict,
44 TYPE_CHECKING,
45)
46import xml.etree.ElementTree as ET
47from urllib.parse import urlparse
48from azure.core.serialization import AzureJSONEncoder
49from ..utils._pipeline_transport_rest_shared import (
50 _format_parameters_helper,
51 _pad_attr_name,
52 _prepare_multipart_body_helper,
53 _serialize_request,
54 _format_data_helper,
55 get_file_items,
56)
57
58if TYPE_CHECKING:
59 # This avoid a circular import
60 from ._rest_py3 import HttpRequest
61
62################################### TYPES SECTION #########################
63
64binary_type = str
65PrimitiveData = Optional[Union[str, int, float, bool]]
66
67ParamsType = Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]]
68
69FileContent = Union[str, bytes, IO[str], IO[bytes]]
70
71FileType = Union[
72 # file (or bytes)
73 FileContent,
74 # (filename, file (or bytes))
75 Tuple[Optional[str], FileContent],
76 # (filename, file (or bytes), content_type)
77 Tuple[Optional[str], FileContent, Optional[str]],
78]
79
80FilesType = Union[Mapping[str, FileType], Sequence[Tuple[str, FileType]]]
81
82ContentTypeBase = Union[str, bytes, Iterable[bytes]]
83ContentType = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
84
85DataType = Optional[Union[bytes, Dict[str, Union[str, int]]]]
86
87########################### HELPER SECTION #################################
88
89
90def _verify_data_object(name, value):
91 if not isinstance(name, str):
92 raise TypeError("Invalid type for data name. Expected str, got {}: {}".format(type(name), name))
93 if value is not None and not isinstance(value, (str, bytes, int, float)):
94 raise TypeError("Invalid type for data value. Expected primitive type, got {}: {}".format(type(name), name))
95
96
97def set_urlencoded_body(data, has_files):
98 body = {}
99 default_headers = {}
100 for f, d in data.items():
101 if not d:
102 continue
103 if isinstance(d, list):
104 for item in d:
105 _verify_data_object(f, item)
106 else:
107 _verify_data_object(f, d)
108 body[f] = d
109 if not has_files:
110 # little hacky, but for files we don't send a content type with
111 # boundary so requests / aiohttp etc deal with it
112 default_headers["Content-Type"] = "application/x-www-form-urlencoded"
113 return default_headers, body
114
115
116def set_multipart_body(files: FilesType):
117 formatted_files = [(f, _format_data_helper(d)) for f, d in get_file_items(files) if d is not None]
118 return {}, dict(formatted_files) if isinstance(files, Mapping) else formatted_files
119
120
121def set_xml_body(content):
122 headers = {}
123 bytes_content = ET.tostring(content, encoding="utf8")
124 body = bytes_content.replace(b"encoding='utf8'", b"encoding='utf-8'")
125 if body:
126 headers["Content-Length"] = str(len(body))
127 return headers, body
128
129
130def set_content_body(
131 content: Any,
132) -> Tuple[MutableMapping[str, str], Optional[ContentTypeBase]]:
133 headers: MutableMapping[str, str] = {}
134
135 if isinstance(content, ET.Element):
136 # XML body
137 return set_xml_body(content)
138 if isinstance(content, (str, bytes)):
139 headers = {}
140 body = content
141 if isinstance(content, str):
142 headers["Content-Type"] = "text/plain"
143 if body:
144 headers["Content-Length"] = str(len(body))
145 return headers, body
146 if any(hasattr(content, attr) for attr in ["read", "__iter__", "__aiter__"]):
147 return headers, content
148 raise TypeError(
149 "Unexpected type for 'content': '{}'. ".format(type(content))
150 + "We expect 'content' to either be str, bytes, a open file-like object or an iterable/asynciterable."
151 )
152
153
154def set_json_body(json: Any) -> Tuple[Dict[str, str], Any]:
155 headers = {"Content-Type": "application/json"}
156 if hasattr(json, "read"):
157 content_headers, body = set_content_body(json)
158 headers.update(content_headers)
159 else:
160 body = dumps(json, cls=AzureJSONEncoder)
161 headers.update({"Content-Length": str(len(body))})
162 return headers, body
163
164
165def lookup_encoding(encoding: str) -> bool:
166 # including check for whether encoding is known taken from httpx
167 try:
168 codecs.lookup(encoding)
169 return True
170 except LookupError:
171 return False
172
173
174def get_charset_encoding(response) -> Optional[str]:
175 content_type = response.headers.get("Content-Type")
176
177 if not content_type:
178 return None
179 # https://peps.python.org/pep-0594/#cgi
180 m = email.message.Message()
181 m["content-type"] = content_type
182 encoding = cast(str, m.get_param("charset")) # -> utf-8
183 if encoding is None or not lookup_encoding(encoding):
184 return None
185 return encoding
186
187
188def decode_to_text(encoding: Optional[str], content: bytes) -> str:
189 if not content:
190 return ""
191 if encoding == "utf-8":
192 encoding = "utf-8-sig"
193 if encoding:
194 return content.decode(encoding)
195 return codecs.getincrementaldecoder("utf-8-sig")(errors="replace").decode(content)
196
197
198class HttpRequestBackcompatMixin:
199 def __getattr__(self, attr: str) -> Any:
200 backcompat_attrs = [
201 "files",
202 "data",
203 "multipart_mixed_info",
204 "query",
205 "body",
206 "format_parameters",
207 "set_streamed_data_body",
208 "set_text_body",
209 "set_xml_body",
210 "set_json_body",
211 "set_formdata_body",
212 "set_bytes_body",
213 "set_multipart_mixed",
214 "prepare_multipart_body",
215 "serialize",
216 ]
217 attr = _pad_attr_name(attr, backcompat_attrs)
218 return self.__getattribute__(attr)
219
220 def __setattr__(self, attr: str, value: Any) -> None:
221 backcompat_attrs = [
222 "multipart_mixed_info",
223 "files",
224 "data",
225 "body",
226 ]
227 attr = _pad_attr_name(attr, backcompat_attrs)
228 super(HttpRequestBackcompatMixin, self).__setattr__(attr, value)
229
230 @property
231 def _multipart_mixed_info(
232 self,
233 ) -> Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]:
234 """DEPRECATED: Information used to make multipart mixed requests.
235 This is deprecated and will be removed in a later release.
236
237 :rtype: tuple
238 :return: (requests, policies, boundary, kwargs)
239 """
240 try:
241 return self._multipart_mixed_info_val
242 except AttributeError:
243 return None
244
245 @_multipart_mixed_info.setter
246 def _multipart_mixed_info(self, val: Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]):
247 """DEPRECATED: Set information to make multipart mixed requests.
248 This is deprecated and will be removed in a later release.
249
250 :param tuple val: (requests, policies, boundary, kwargs)
251 """
252 self._multipart_mixed_info_val = val
253
254 @property
255 def _query(self) -> Dict[str, Any]:
256 """DEPRECATED: Query parameters passed in by user
257 This is deprecated and will be removed in a later release.
258
259 :rtype: dict
260 :return: Query parameters
261 """
262 query = urlparse(self.url).query
263 if query:
264 return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]}
265 return {}
266
267 @property
268 def _body(self) -> DataType:
269 """DEPRECATED: Body of the request. You should use the `content` property instead
270 This is deprecated and will be removed in a later release.
271
272 :rtype: bytes
273 :return: Body of the request
274 """
275 return self._data
276
277 @_body.setter
278 def _body(self, val: DataType) -> None:
279 """DEPRECATED: Set the body of the request
280 This is deprecated and will be removed in a later release.
281
282 :param bytes val: Body of the request
283 """
284 self._data = val
285
286 def _format_parameters(self, params: MutableMapping[str, str]) -> None:
287 """DEPRECATED: Format the query parameters
288 This is deprecated and will be removed in a later release.
289 You should pass the query parameters through the kwarg `params`
290 instead.
291
292 :param dict params: Query parameters
293 """
294 _format_parameters_helper(self, params)
295
296 def _set_streamed_data_body(self, data):
297 """DEPRECATED: Set the streamed request body.
298 This is deprecated and will be removed in a later release.
299 You should pass your stream content through the `content` kwarg instead
300
301 :param data: Streamed data
302 :type data: bytes or iterable
303 """
304 if not isinstance(data, binary_type) and not any(
305 hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"]
306 ):
307 raise TypeError("A streamable data source must be an open file-like object or iterable.")
308 headers = self._set_body(content=data)
309 self._files = None
310 self.headers.update(headers)
311
312 def _set_text_body(self, data):
313 """DEPRECATED: Set the text body
314 This is deprecated and will be removed in a later release.
315 You should pass your text content through the `content` kwarg instead
316
317 :param str data: Text data
318 """
319 headers = self._set_body(content=data)
320 self.headers.update(headers)
321 self._files = None
322
323 def _set_xml_body(self, data):
324 """DEPRECATED: Set the xml body.
325 This is deprecated and will be removed in a later release.
326 You should pass your xml content through the `content` kwarg instead
327
328 :param data: XML data
329 :type data: xml.etree.ElementTree.Element
330 """
331 headers = self._set_body(content=data)
332 self.headers.update(headers)
333 self._files = None
334
335 def _set_json_body(self, data):
336 """DEPRECATED: Set the json request body.
337 This is deprecated and will be removed in a later release.
338 You should pass your json content through the `json` kwarg instead
339
340 :param data: JSON data
341 :type data: dict
342 """
343 headers = self._set_body(json=data)
344 self.headers.update(headers)
345 self._files = None
346
347 def _set_formdata_body(self, data=None):
348 """DEPRECATED: Set the formrequest body.
349 This is deprecated and will be removed in a later release.
350 You should pass your stream content through the `files` kwarg instead
351
352 :param data: Form data
353 :type data: dict
354 """
355 if data is None:
356 data = {}
357 content_type = self.headers.pop("Content-Type", None) if self.headers else None
358
359 if content_type and content_type.lower() == "application/x-www-form-urlencoded":
360 headers = self._set_body(data=data)
361 self._files = None
362 else: # Assume "multipart/form-data"
363 headers = self._set_body(files=data)
364 self._data = None
365 self.headers.update(headers)
366
367 def _set_bytes_body(self, data):
368 """DEPRECATED: Set the bytes request body.
369 This is deprecated and will be removed in a later release.
370 You should pass your bytes content through the `content` kwarg instead
371
372 :param bytes data: Bytes data
373 """
374 headers = self._set_body(content=data)
375 # we don't want default Content-Type
376 # in 2.7, byte strings are still strings, so they get set with text/plain content type
377
378 headers.pop("Content-Type", None)
379 self.headers.update(headers)
380 self._files = None
381
382 def _set_multipart_mixed(self, *requests: HttpRequest, **kwargs: Any) -> None:
383 """DEPRECATED: Set the multipart mixed info.
384 This is deprecated and will be removed in a later release.
385
386 :param requests: Requests to be sent in the multipart request
387 :type requests: list[HttpRequest]
388 """
389 self.multipart_mixed_info: Tuple[Sequence[HttpRequest], Sequence[Any], str, Dict[str, Any]] = (
390 requests,
391 kwargs.pop("policies", []),
392 kwargs.pop("boundary", None),
393 kwargs,
394 )
395
396 def _prepare_multipart_body(self, content_index=0):
397 """DEPRECATED: Prepare your request body for multipart requests.
398 This is deprecated and will be removed in a later release.
399
400 :param int content_index: The index of the request to be sent in the multipart request
401 :returns: The updated index after all parts in this request have been added.
402 :rtype: int
403 """
404 return _prepare_multipart_body_helper(self, content_index)
405
406 def _serialize(self):
407 """DEPRECATED: Serialize this request using application/http spec.
408 This is deprecated and will be removed in a later release.
409
410 :rtype: bytes
411 :return: The serialized request
412 """
413 return _serialize_request(self)
414
415 def _add_backcompat_properties(self, request, memo):
416 """While deepcopying, we also need to add the private backcompat attrs.
417
418 :param HttpRequest request: The request to copy from
419 :param dict memo: The memo dict used by deepcopy
420 """
421 request._multipart_mixed_info = copy.deepcopy( # pylint: disable=protected-access
422 self._multipart_mixed_info, memo
423 )