1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26from __future__ import annotations
27import copy
28import codecs
29import email.message
30from json import dumps
31from typing import (
32 Optional,
33 Union,
34 Mapping,
35 Sequence,
36 Tuple,
37 IO,
38 Any,
39 Iterable,
40 MutableMapping,
41 AsyncIterable,
42 cast,
43 Dict,
44 TYPE_CHECKING,
45)
46import xml.etree.ElementTree as ET
47from urllib.parse import urlparse
48from azure.core.serialization import AzureJSONEncoder
49from ..utils._pipeline_transport_rest_shared import (
50 _format_parameters_helper,
51 _pad_attr_name,
52 _prepare_multipart_body_helper,
53 _serialize_request,
54 _format_data_helper,
55 get_file_items,
56)
57
58if TYPE_CHECKING:
59 # This avoid a circular import
60 from ._rest_py3 import HttpRequest
61
62################################### TYPES SECTION #########################
63
64binary_type = str
65PrimitiveData = Optional[Union[str, int, float, bool]]
66
67ParamsType = Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]]
68
69FileContent = Union[str, bytes, IO[str], IO[bytes]]
70
71FileType = Union[
72 # file (or bytes)
73 FileContent,
74 # (filename, file (or bytes))
75 Tuple[Optional[str], FileContent],
76 # (filename, file (or bytes), content_type)
77 Tuple[Optional[str], FileContent, Optional[str]],
78]
79
80FilesType = Union[Mapping[str, FileType], Sequence[Tuple[str, FileType]]]
81
82ContentTypeBase = Union[str, bytes, Iterable[bytes]]
83ContentType = Union[str, bytes, Iterable[bytes], AsyncIterable[bytes]]
84
85DataType = Optional[Union[bytes, Dict[str, Union[str, int]]]]
86
87########################### HELPER SECTION #################################
88
89
90def _verify_data_object(name, value):
91 if not isinstance(name, str):
92 raise TypeError("Invalid type for data name. Expected str, got {}: {}".format(type(name), name))
93 if value is not None and not isinstance(value, (str, bytes, int, float)):
94 raise TypeError("Invalid type for data value. Expected primitive type, got {}: {}".format(type(name), name))
95
96
97def set_urlencoded_body(data, has_files):
98 body = {}
99 default_headers = {}
100 for f, d in data.items():
101 if not d:
102 continue
103 if isinstance(d, list):
104 for item in d:
105 _verify_data_object(f, item)
106 else:
107 _verify_data_object(f, d)
108 body[f] = d
109 if not has_files:
110 # little hacky, but for files we don't send a content type with
111 # boundary so requests / aiohttp etc deal with it
112 default_headers["Content-Type"] = "application/x-www-form-urlencoded"
113 return default_headers, body
114
115
116def set_multipart_body(files: FilesType):
117 formatted_files = [(f, _format_data_helper(d)) for f, d in get_file_items(files) if d is not None]
118 return {}, dict(formatted_files) if isinstance(files, Mapping) else formatted_files
119
120
121def set_xml_body(content):
122 headers = {}
123 bytes_content = ET.tostring(content, encoding="utf8")
124 body = bytes_content.replace(b"encoding='utf8'", b"encoding='utf-8'")
125 if body:
126 headers["Content-Length"] = str(len(body))
127 return headers, body
128
129
130def set_content_body(
131 content: Any,
132) -> Tuple[MutableMapping[str, str], Optional[ContentTypeBase]]:
133 headers: MutableMapping[str, str] = {}
134
135 if isinstance(content, ET.Element):
136 # XML body
137 return set_xml_body(content)
138 if isinstance(content, (str, bytes)):
139 headers = {}
140 body = content
141 if isinstance(content, str):
142 headers["Content-Type"] = "text/plain"
143 if body:
144 headers["Content-Length"] = str(len(body))
145 return headers, body
146 if any(hasattr(content, attr) for attr in ["read", "__iter__", "__aiter__"]):
147 return headers, content
148 raise TypeError(
149 "Unexpected type for 'content': '{}'. ".format(type(content))
150 + "We expect 'content' to either be str, bytes, a open file-like object or an iterable/asynciterable."
151 )
152
153
154def set_json_body(json: Any) -> Tuple[Dict[str, str], Any]:
155 headers = {"Content-Type": "application/json"}
156 if hasattr(json, "read"):
157 content_headers, body = set_content_body(json)
158 headers.update(content_headers)
159 else:
160 body = dumps(json, cls=AzureJSONEncoder)
161 headers.update({"Content-Length": str(len(body))})
162 return headers, body
163
164
165def lookup_encoding(encoding: str) -> bool:
166 # including check for whether encoding is known taken from httpx
167 try:
168 codecs.lookup(encoding)
169 return True
170 except LookupError:
171 return False
172
173
174def get_charset_encoding(response) -> Optional[str]:
175 content_type = response.headers.get("Content-Type")
176
177 if not content_type:
178 return None
179 # https://peps.python.org/pep-0594/#cgi
180 m = email.message.Message()
181 m["content-type"] = content_type
182 encoding = cast(str, m.get_param("charset")) # -> utf-8
183 if encoding is None or not lookup_encoding(encoding):
184 return None
185 return encoding
186
187
188def decode_to_text(encoding: Optional[str], content: bytes) -> str:
189 if not content:
190 return ""
191 if encoding == "utf-8":
192 encoding = "utf-8-sig"
193 if encoding:
194 return content.decode(encoding)
195 return codecs.getincrementaldecoder("utf-8-sig")(errors="replace").decode(content)
196
197
198class HttpRequestBackcompatMixin:
199 def __getattr__(self, attr: str) -> Any:
200 backcompat_attrs = [
201 "files",
202 "data",
203 "multipart_mixed_info",
204 "query",
205 "body",
206 "format_parameters",
207 "set_streamed_data_body",
208 "set_text_body",
209 "set_xml_body",
210 "set_json_body",
211 "set_formdata_body",
212 "set_bytes_body",
213 "set_multipart_mixed",
214 "prepare_multipart_body",
215 "serialize",
216 ]
217 attr = _pad_attr_name(attr, backcompat_attrs)
218 return self.__getattribute__(attr)
219
220 def __setattr__(self, attr: str, value: Any) -> None:
221 backcompat_attrs = [
222 "multipart_mixed_info",
223 "files",
224 "data",
225 "body",
226 ]
227 attr = _pad_attr_name(attr, backcompat_attrs)
228 super(HttpRequestBackcompatMixin, self).__setattr__(attr, value)
229
230 @property
231 def _multipart_mixed_info(self) -> Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]:
232 """DEPRECATED: Information used to make multipart mixed requests.
233 This is deprecated and will be removed in a later release.
234
235 :rtype: tuple
236 :return: (requests, policies, boundary, kwargs)
237 """
238 try:
239 return self._multipart_mixed_info_val
240 except AttributeError:
241 return None
242
243 @_multipart_mixed_info.setter
244 def _multipart_mixed_info(self, val: Optional[Tuple[Sequence[Any], Sequence[Any], str, Dict[str, Any]]]):
245 """DEPRECATED: Set information to make multipart mixed requests.
246 This is deprecated and will be removed in a later release.
247
248 :param tuple val: (requests, policies, boundary, kwargs)
249 """
250 self._multipart_mixed_info_val = val
251
252 @property
253 def _query(self) -> Dict[str, Any]:
254 """DEPRECATED: Query parameters passed in by user
255 This is deprecated and will be removed in a later release.
256
257 :rtype: dict
258 :return: Query parameters
259 """
260 query = urlparse(self.url).query
261 if query:
262 return {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]}
263 return {}
264
265 @property
266 def _body(self) -> DataType:
267 """DEPRECATED: Body of the request. You should use the `content` property instead
268 This is deprecated and will be removed in a later release.
269
270 :rtype: bytes
271 :return: Body of the request
272 """
273 return self._data
274
275 @_body.setter
276 def _body(self, val: DataType) -> None:
277 """DEPRECATED: Set the body of the request
278 This is deprecated and will be removed in a later release.
279
280 :param bytes val: Body of the request
281 """
282 self._data = val
283
284 def _format_parameters(self, params: MutableMapping[str, str]) -> None:
285 """DEPRECATED: Format the query parameters
286 This is deprecated and will be removed in a later release.
287 You should pass the query parameters through the kwarg `params`
288 instead.
289
290 :param dict params: Query parameters
291 """
292 _format_parameters_helper(self, params)
293
294 def _set_streamed_data_body(self, data):
295 """DEPRECATED: Set the streamed request body.
296 This is deprecated and will be removed in a later release.
297 You should pass your stream content through the `content` kwarg instead
298
299 :param data: Streamed data
300 :type data: bytes or iterable
301 """
302 if not isinstance(data, binary_type) and not any(
303 hasattr(data, attr) for attr in ["read", "__iter__", "__aiter__"]
304 ):
305 raise TypeError("A streamable data source must be an open file-like object or iterable.")
306 headers = self._set_body(content=data)
307 self._files = None
308 self.headers.update(headers)
309
310 def _set_text_body(self, data):
311 """DEPRECATED: Set the text body
312 This is deprecated and will be removed in a later release.
313 You should pass your text content through the `content` kwarg instead
314
315 :param str data: Text data
316 """
317 headers = self._set_body(content=data)
318 self.headers.update(headers)
319 self._files = None
320
321 def _set_xml_body(self, data):
322 """DEPRECATED: Set the xml body.
323 This is deprecated and will be removed in a later release.
324 You should pass your xml content through the `content` kwarg instead
325
326 :param data: XML data
327 :type data: xml.etree.ElementTree.Element
328 """
329 headers = self._set_body(content=data)
330 self.headers.update(headers)
331 self._files = None
332
333 def _set_json_body(self, data):
334 """DEPRECATED: Set the json request body.
335 This is deprecated and will be removed in a later release.
336 You should pass your json content through the `json` kwarg instead
337
338 :param data: JSON data
339 :type data: dict
340 """
341 headers = self._set_body(json=data)
342 self.headers.update(headers)
343 self._files = None
344
345 def _set_formdata_body(self, data=None):
346 """DEPRECATED: Set the formrequest body.
347 This is deprecated and will be removed in a later release.
348 You should pass your stream content through the `files` kwarg instead
349
350 :param data: Form data
351 :type data: dict
352 """
353 if data is None:
354 data = {}
355 content_type = self.headers.pop("Content-Type", None) if self.headers else None
356
357 if content_type and content_type.lower() == "application/x-www-form-urlencoded":
358 headers = self._set_body(data=data)
359 self._files = None
360 else: # Assume "multipart/form-data"
361 headers = self._set_body(files=data)
362 self._data = None
363 self.headers.update(headers)
364
365 def _set_bytes_body(self, data):
366 """DEPRECATED: Set the bytes request body.
367 This is deprecated and will be removed in a later release.
368 You should pass your bytes content through the `content` kwarg instead
369
370 :param bytes data: Bytes data
371 """
372 headers = self._set_body(content=data)
373 # we don't want default Content-Type
374 # in 2.7, byte strings are still strings, so they get set with text/plain content type
375
376 headers.pop("Content-Type", None)
377 self.headers.update(headers)
378 self._files = None
379
380 def _set_multipart_mixed(self, *requests: HttpRequest, **kwargs: Any) -> None:
381 """DEPRECATED: Set the multipart mixed info.
382 This is deprecated and will be removed in a later release.
383
384 :param requests: Requests to be sent in the multipart request
385 :type requests: list[HttpRequest]
386 """
387 self.multipart_mixed_info: Tuple[Sequence[HttpRequest], Sequence[Any], str, Dict[str, Any]] = (
388 requests,
389 kwargs.pop("policies", []),
390 kwargs.pop("boundary", None),
391 kwargs,
392 )
393
394 def _prepare_multipart_body(self, content_index=0):
395 """DEPRECATED: Prepare your request body for multipart requests.
396 This is deprecated and will be removed in a later release.
397
398 :param int content_index: The index of the request to be sent in the multipart request
399 :returns: The updated index after all parts in this request have been added.
400 :rtype: int
401 """
402 return _prepare_multipart_body_helper(self, content_index)
403
404 def _serialize(self):
405 """DEPRECATED: Serialize this request using application/http spec.
406 This is deprecated and will be removed in a later release.
407
408 :rtype: bytes
409 :return: The serialized request
410 """
411 return _serialize_request(self)
412
413 def _add_backcompat_properties(self, request, memo):
414 """While deepcopying, we also need to add the private backcompat attrs.
415
416 :param HttpRequest request: The request to copy from
417 :param dict memo: The memo dict used by deepcopy
418 """
419 request._multipart_mixed_info = copy.deepcopy( # pylint: disable=protected-access
420 self._multipart_mixed_info, memo
421 )