1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26"""
27This module is the requests implementation of Pipeline ABC
28"""
29import json
30import inspect
31import logging
32import os
33import platform
34import xml.etree.ElementTree as ET
35import types
36import re
37import uuid
38from typing import IO, cast, Union, Optional, AnyStr, Dict, Any, Set, MutableMapping
39import urllib.parse
40
41from azure.core import __version__ as azcore_version
42from azure.core.exceptions import DecodeError
43
44from azure.core.pipeline import PipelineRequest, PipelineResponse
45from ._base import SansIOHTTPPolicy
46
47from ..transport import HttpRequest as LegacyHttpRequest
48from ..transport._base import _HttpResponseBase as LegacySansIOHttpResponse
49from ...rest import HttpRequest
50from ...rest._rest_py3 import _HttpResponseBase as SansIOHttpResponse
51
52_LOGGER = logging.getLogger(__name__)
53
54HTTPRequestType = Union[LegacyHttpRequest, HttpRequest]
55HTTPResponseType = Union[LegacySansIOHttpResponse, SansIOHttpResponse]
56PipelineResponseType = PipelineResponse[HTTPRequestType, HTTPResponseType]
57
58
59class HeadersPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
60 """A simple policy that sends the given headers with the request.
61
62 This will overwrite any headers already defined in the request. Headers can be
63 configured up front, where any custom headers will be applied to all outgoing
64 operations, and additional headers can also be added dynamically per operation.
65
66 :param dict base_headers: Headers to send with the request.
67
68 .. admonition:: Example:
69
70 .. literalinclude:: ../samples/test_example_sansio.py
71 :start-after: [START headers_policy]
72 :end-before: [END headers_policy]
73 :language: python
74 :dedent: 4
75 :caption: Configuring a headers policy.
76 """
77
78 def __init__(
79 self, base_headers: Optional[Dict[str, str]] = None, **kwargs: Any
80 ) -> None: # pylint: disable=super-init-not-called
81 self._headers: Dict[str, str] = base_headers or {}
82 self._headers.update(kwargs.pop("headers", {}))
83
84 @property
85 def headers(self) -> Dict[str, str]:
86 """The current headers collection.
87
88 :rtype: dict[str, str]
89 :return: The current headers collection.
90 """
91 return self._headers
92
93 def add_header(self, key: str, value: str) -> None:
94 """Add a header to the configuration to be applied to all requests.
95
96 :param str key: The header.
97 :param str value: The header's value.
98 """
99 self._headers[key] = value
100
101 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
102 """Updates with the given headers before sending the request to the next policy.
103
104 :param request: The PipelineRequest object
105 :type request: ~azure.core.pipeline.PipelineRequest
106 """
107 request.http_request.headers.update(self.headers)
108 additional_headers = request.context.options.pop("headers", {})
109 if additional_headers:
110 request.http_request.headers.update(additional_headers)
111
112
113class _Unset:
114 pass
115
116
117class RequestIdPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
118 """A simple policy that sets the given request id in the header.
119
120 This will overwrite request id that is already defined in the request. Request id can be
121 configured up front, where the request id will be applied to all outgoing
122 operations, and additional request id can also be set dynamically per operation.
123
124 :keyword str request_id: The request id to be added into header.
125 :keyword bool auto_request_id: Auto generates a unique request ID per call if true which is by default.
126 :keyword str request_id_header_name: Header name to use. Default is "x-ms-client-request-id".
127
128 .. admonition:: Example:
129
130 .. literalinclude:: ../samples/test_example_sansio.py
131 :start-after: [START request_id_policy]
132 :end-before: [END request_id_policy]
133 :language: python
134 :dedent: 4
135 :caption: Configuring a request id policy.
136 """
137
138 def __init__(
139 self, # pylint: disable=unused-argument
140 *,
141 request_id: Union[str, Any] = _Unset,
142 auto_request_id: bool = True,
143 request_id_header_name: str = "x-ms-client-request-id",
144 **kwargs: Any
145 ) -> None:
146 super()
147 self._request_id = request_id
148 self._auto_request_id = auto_request_id
149 self._request_id_header_name = request_id_header_name
150
151 def set_request_id(self, value: str) -> None:
152 """Add the request id to the configuration to be applied to all requests.
153
154 :param str value: The request id value.
155 """
156 self._request_id = value
157
158 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
159 """Updates with the given request id before sending the request to the next policy.
160
161 :param request: The PipelineRequest object
162 :type request: ~azure.core.pipeline.PipelineRequest
163 """
164 request_id = unset = object()
165 if "request_id" in request.context.options:
166 request_id = request.context.options.pop("request_id")
167 if request_id is None:
168 return
169 elif self._request_id is None:
170 return
171 elif self._request_id is not _Unset:
172 if self._request_id_header_name in request.http_request.headers:
173 return
174 request_id = self._request_id
175 elif self._auto_request_id:
176 if self._request_id_header_name in request.http_request.headers:
177 return
178 request_id = str(uuid.uuid1())
179 if request_id is not unset:
180 header = {self._request_id_header_name: cast(str, request_id)}
181 request.http_request.headers.update(header)
182
183
184class UserAgentPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
185 """User-Agent Policy. Allows custom values to be added to the User-Agent header.
186
187 :param str base_user_agent: Sets the base user agent value.
188
189 :keyword bool user_agent_overwrite: Overwrites User-Agent when True. Defaults to False.
190 :keyword bool user_agent_use_env: Gets user-agent from environment. Defaults to True.
191 :keyword str user_agent: If specified, this will be added in front of the user agent string.
192 :keyword str sdk_moniker: If specified, the user agent string will be
193 azsdk-python-[sdk_moniker] Python/[python_version] ([platform_version])
194
195 .. admonition:: Example:
196
197 .. literalinclude:: ../samples/test_example_sansio.py
198 :start-after: [START user_agent_policy]
199 :end-before: [END user_agent_policy]
200 :language: python
201 :dedent: 4
202 :caption: Configuring a user agent policy.
203 """
204
205 _USERAGENT = "User-Agent"
206 _ENV_ADDITIONAL_USER_AGENT = "AZURE_HTTP_USER_AGENT"
207
208 def __init__(
209 self, base_user_agent: Optional[str] = None, **kwargs: Any
210 ) -> None: # pylint: disable=super-init-not-called
211 self.overwrite: bool = kwargs.pop("user_agent_overwrite", False)
212 self.use_env: bool = kwargs.pop("user_agent_use_env", True)
213 application_id: Optional[str] = kwargs.pop("user_agent", None)
214 sdk_moniker: str = kwargs.pop("sdk_moniker", "core/{}".format(azcore_version))
215
216 if base_user_agent:
217 self._user_agent = base_user_agent
218 else:
219 self._user_agent = "azsdk-python-{} Python/{} ({})".format(
220 sdk_moniker, platform.python_version(), platform.platform()
221 )
222
223 if application_id:
224 self._user_agent = "{} {}".format(application_id, self._user_agent)
225
226 @property
227 def user_agent(self) -> str:
228 """The current user agent value.
229
230 :return: The current user agent value.
231 :rtype: str
232 """
233 if self.use_env:
234 add_user_agent_header = os.environ.get(self._ENV_ADDITIONAL_USER_AGENT, None)
235 if add_user_agent_header is not None:
236 return "{} {}".format(self._user_agent, add_user_agent_header)
237 return self._user_agent
238
239 def add_user_agent(self, value: str) -> None:
240 """Add value to current user agent with a space.
241 :param str value: value to add to user agent.
242 """
243 self._user_agent = "{} {}".format(self._user_agent, value)
244
245 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
246 """Modifies the User-Agent header before the request is sent.
247
248 :param request: The PipelineRequest object
249 :type request: ~azure.core.pipeline.PipelineRequest
250 """
251 http_request = request.http_request
252 options_dict = request.context.options
253 if "user_agent" in options_dict:
254 user_agent = options_dict.pop("user_agent")
255 if options_dict.pop("user_agent_overwrite", self.overwrite):
256 http_request.headers[self._USERAGENT] = user_agent
257 else:
258 user_agent = "{} {}".format(user_agent, self.user_agent)
259 http_request.headers[self._USERAGENT] = user_agent
260
261 elif self.overwrite or self._USERAGENT not in http_request.headers:
262 http_request.headers[self._USERAGENT] = self.user_agent
263
264
265class NetworkTraceLoggingPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
266 """The logging policy in the pipeline is used to output HTTP network trace to the configured logger.
267
268 This accepts both global configuration, and per-request level with "enable_http_logger"
269
270 :param bool logging_enable: Use to enable per operation. Defaults to False.
271
272 .. admonition:: Example:
273
274 .. literalinclude:: ../samples/test_example_sansio.py
275 :start-after: [START network_trace_logging_policy]
276 :end-before: [END network_trace_logging_policy]
277 :language: python
278 :dedent: 4
279 :caption: Configuring a network trace logging policy.
280 """
281
282 def __init__(self, logging_enable: bool = False, **kwargs: Any): # pylint: disable=unused-argument
283 self.enable_http_logger = logging_enable
284
285 def on_request(
286 self, request: PipelineRequest[HTTPRequestType]
287 ) -> None: # pylint: disable=too-many-return-statements
288 """Logs HTTP request to the DEBUG logger.
289
290 :param request: The PipelineRequest object.
291 :type request: ~azure.core.pipeline.PipelineRequest
292 """
293 http_request = request.http_request
294 options = request.context.options
295 logging_enable = options.pop("logging_enable", self.enable_http_logger)
296 request.context["logging_enable"] = logging_enable
297 if logging_enable:
298 if not _LOGGER.isEnabledFor(logging.DEBUG):
299 return
300
301 try:
302 log_string = "Request URL: '{}'".format(http_request.url)
303 log_string += "\nRequest method: '{}'".format(http_request.method)
304 log_string += "\nRequest headers:"
305 for header, value in http_request.headers.items():
306 log_string += "\n '{}': '{}'".format(header, value)
307 log_string += "\nRequest body:"
308
309 # We don't want to log the binary data of a file upload.
310 if isinstance(http_request.body, types.GeneratorType):
311 log_string += "\nFile upload"
312 _LOGGER.debug(log_string)
313 return
314 try:
315 if isinstance(http_request.body, types.AsyncGeneratorType):
316 log_string += "\nFile upload"
317 _LOGGER.debug(log_string)
318 return
319 except AttributeError:
320 pass
321 if http_request.body:
322 log_string += "\n{}".format(str(http_request.body))
323 _LOGGER.debug(log_string)
324 return
325 log_string += "\nThis request has no body"
326 _LOGGER.debug(log_string)
327 except Exception as err: # pylint: disable=broad-except
328 _LOGGER.debug("Failed to log request: %r", err)
329
330 def on_response(
331 self,
332 request: PipelineRequest[HTTPRequestType],
333 response: PipelineResponse[HTTPRequestType, HTTPResponseType],
334 ) -> None:
335 """Logs HTTP response to the DEBUG logger.
336
337 :param request: The PipelineRequest object.
338 :type request: ~azure.core.pipeline.PipelineRequest
339 :param response: The PipelineResponse object.
340 :type response: ~azure.core.pipeline.PipelineResponse
341 """
342 http_response = response.http_response
343 try:
344 logging_enable = response.context["logging_enable"]
345 if logging_enable:
346 if not _LOGGER.isEnabledFor(logging.DEBUG):
347 return
348
349 log_string = "Response status: '{}'".format(http_response.status_code)
350 log_string += "\nResponse headers:"
351 for res_header, value in http_response.headers.items():
352 log_string += "\n '{}': '{}'".format(res_header, value)
353
354 # We don't want to log binary data if the response is a file.
355 log_string += "\nResponse content:"
356 pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE)
357 header = http_response.headers.get("content-disposition")
358
359 if header and pattern.match(header):
360 filename = header.partition("=")[2]
361 log_string += "\nFile attachments: {}".format(filename)
362 elif http_response.headers.get("content-type", "").endswith("octet-stream"):
363 log_string += "\nBody contains binary data."
364 elif http_response.headers.get("content-type", "").startswith("image"):
365 log_string += "\nBody contains image data."
366 else:
367 if response.context.options.get("stream", False):
368 log_string += "\nBody is streamable."
369 else:
370 log_string += "\n{}".format(http_response.text())
371 _LOGGER.debug(log_string)
372 except Exception as err: # pylint: disable=broad-except
373 _LOGGER.debug("Failed to log response: %s", repr(err))
374
375
376class _HiddenClassProperties(type):
377 # Backward compatible for DEFAULT_HEADERS_WHITELIST
378 # https://github.com/Azure/azure-sdk-for-python/issues/26331
379
380 @property
381 def DEFAULT_HEADERS_WHITELIST(cls) -> Set[str]:
382 return cls.DEFAULT_HEADERS_ALLOWLIST
383
384 @DEFAULT_HEADERS_WHITELIST.setter
385 def DEFAULT_HEADERS_WHITELIST(cls, value: Set[str]) -> None:
386 cls.DEFAULT_HEADERS_ALLOWLIST = value
387
388
389class HttpLoggingPolicy(
390 SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType],
391 metaclass=_HiddenClassProperties,
392):
393 """The Pipeline policy that handles logging of HTTP requests and responses.
394
395 :param logger: The logger to use for logging. Default to azure.core.pipeline.policies.http_logging_policy.
396 :type logger: logging.Logger
397 """
398
399 DEFAULT_HEADERS_ALLOWLIST: Set[str] = set(
400 [
401 "x-ms-request-id",
402 "x-ms-client-request-id",
403 "x-ms-return-client-request-id",
404 "x-ms-error-code",
405 "traceparent",
406 "Accept",
407 "Cache-Control",
408 "Connection",
409 "Content-Length",
410 "Content-Type",
411 "Date",
412 "ETag",
413 "Expires",
414 "If-Match",
415 "If-Modified-Since",
416 "If-None-Match",
417 "If-Unmodified-Since",
418 "Last-Modified",
419 "Pragma",
420 "Request-Id",
421 "Retry-After",
422 "Server",
423 "Transfer-Encoding",
424 "User-Agent",
425 "WWW-Authenticate", # OAuth Challenge header.
426 ]
427 )
428 REDACTED_PLACEHOLDER: str = "REDACTED"
429 MULTI_RECORD_LOG: str = "AZURE_SDK_LOGGING_MULTIRECORD"
430
431 def __init__(self, logger: Optional[logging.Logger] = None, **kwargs: Any): # pylint: disable=unused-argument
432 self.logger: logging.Logger = logger or logging.getLogger("azure.core.pipeline.policies.http_logging_policy")
433 self.allowed_query_params: Set[str] = set()
434 self.allowed_header_names: Set[str] = set(self.__class__.DEFAULT_HEADERS_ALLOWLIST)
435
436 def _redact_query_param(self, key: str, value: str) -> str:
437 lower_case_allowed_query_params = [param.lower() for param in self.allowed_query_params]
438 return value if key.lower() in lower_case_allowed_query_params else HttpLoggingPolicy.REDACTED_PLACEHOLDER
439
440 def _redact_header(self, key: str, value: str) -> str:
441 lower_case_allowed_header_names = [header.lower() for header in self.allowed_header_names]
442 return value if key.lower() in lower_case_allowed_header_names else HttpLoggingPolicy.REDACTED_PLACEHOLDER
443
444 def on_request( # pylint: disable=too-many-return-statements
445 self, request: PipelineRequest[HTTPRequestType]
446 ) -> None:
447 """Logs HTTP method, url and headers.
448 :param request: The PipelineRequest object.
449 :type request: ~azure.core.pipeline.PipelineRequest
450 """
451 http_request = request.http_request
452 options = request.context.options
453 # Get logger in my context first (request has been retried)
454 # then read from kwargs (pop if that's the case)
455 # then use my instance logger
456 logger = request.context.setdefault("logger", options.pop("logger", self.logger))
457
458 if not logger.isEnabledFor(logging.INFO):
459 return
460
461 try:
462 parsed_url = list(urllib.parse.urlparse(http_request.url))
463 parsed_qp = urllib.parse.parse_qsl(parsed_url[4], keep_blank_values=True)
464 filtered_qp = [(key, self._redact_query_param(key, value)) for key, value in parsed_qp]
465 # 4 is query
466 parsed_url[4] = "&".join(["=".join(part) for part in filtered_qp])
467 redacted_url = urllib.parse.urlunparse(parsed_url)
468
469 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False)
470 if multi_record:
471 logger.info("Request URL: %r", redacted_url)
472 logger.info("Request method: %r", http_request.method)
473 logger.info("Request headers:")
474 for header, value in http_request.headers.items():
475 value = self._redact_header(header, value)
476 logger.info(" %r: %r", header, value)
477 if isinstance(http_request.body, types.GeneratorType):
478 logger.info("File upload")
479 return
480 try:
481 if isinstance(http_request.body, types.AsyncGeneratorType):
482 logger.info("File upload")
483 return
484 except AttributeError:
485 pass
486 if http_request.body:
487 logger.info("A body is sent with the request")
488 return
489 logger.info("No body was attached to the request")
490 return
491 log_string = "Request URL: '{}'".format(redacted_url)
492 log_string += "\nRequest method: '{}'".format(http_request.method)
493 log_string += "\nRequest headers:"
494 for header, value in http_request.headers.items():
495 value = self._redact_header(header, value)
496 log_string += "\n '{}': '{}'".format(header, value)
497 if isinstance(http_request.body, types.GeneratorType):
498 log_string += "\nFile upload"
499 logger.info(log_string)
500 return
501 try:
502 if isinstance(http_request.body, types.AsyncGeneratorType):
503 log_string += "\nFile upload"
504 logger.info(log_string)
505 return
506 except AttributeError:
507 pass
508 if http_request.body:
509 log_string += "\nA body is sent with the request"
510 logger.info(log_string)
511 return
512 log_string += "\nNo body was attached to the request"
513 logger.info(log_string)
514
515 except Exception as err: # pylint: disable=broad-except
516 logger.warning("Failed to log request: %s", repr(err))
517
518 def on_response(
519 self,
520 request: PipelineRequest[HTTPRequestType],
521 response: PipelineResponse[HTTPRequestType, HTTPResponseType],
522 ) -> None:
523 http_response = response.http_response
524
525 # Get logger in my context first (request has been retried)
526 # then read from kwargs (pop if that's the case)
527 # then use my instance logger
528 # If on_request was called, should always read from context
529 options = request.context.options
530 logger = request.context.setdefault("logger", options.pop("logger", self.logger))
531
532 try:
533 if not logger.isEnabledFor(logging.INFO):
534 return
535
536 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False)
537 if multi_record:
538 logger.info("Response status: %r", http_response.status_code)
539 logger.info("Response headers:")
540 for res_header, value in http_response.headers.items():
541 value = self._redact_header(res_header, value)
542 logger.info(" %r: %r", res_header, value)
543 return
544 log_string = "Response status: {}".format(http_response.status_code)
545 log_string += "\nResponse headers:"
546 for res_header, value in http_response.headers.items():
547 value = self._redact_header(res_header, value)
548 log_string += "\n '{}': '{}'".format(res_header, value)
549 logger.info(log_string)
550 except Exception as err: # pylint: disable=broad-except
551 logger.warning("Failed to log response: %s", repr(err))
552
553
554class ContentDecodePolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
555 """Policy for decoding unstreamed response content.
556
557 :param response_encoding: The encoding to use if known for this service (will disable auto-detection)
558 :type response_encoding: str
559 """
560
561 # Accept "text" because we're open minded people...
562 JSON_REGEXP = re.compile(r"^(application|text)/([0-9a-z+.-]+\+)?json$")
563
564 # Name used in context
565 CONTEXT_NAME = "deserialized_data"
566
567 def __init__(
568 self, response_encoding: Optional[str] = None, **kwargs: Any # pylint: disable=unused-argument
569 ) -> None:
570 self._response_encoding = response_encoding
571
572 @classmethod
573 def deserialize_from_text(
574 cls,
575 data: Optional[Union[AnyStr, IO[AnyStr]]],
576 mime_type: Optional[str] = None,
577 response: Optional[HTTPResponseType] = None,
578 ) -> Any:
579 """Decode response data according to content-type.
580
581 Accept a stream of data as well, but will be load at once in memory for now.
582 If no content-type, will return the string version (not bytes, not stream)
583
584 :param data: The data to deserialize.
585 :type data: str or bytes or file-like object
586 :param response: The HTTP response.
587 :type response: ~azure.core.pipeline.transport.HttpResponse
588 :param str mime_type: The mime type. As mime type, charset is not expected.
589 :param response: If passed, exception will be annotated with that response
590 :type response: any
591 :raises ~azure.core.exceptions.DecodeError: If deserialization fails
592 :returns: A dict (JSON), XML tree or str, depending of the mime_type
593 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str
594 """
595 if not data:
596 return None
597
598 if hasattr(data, "read"):
599 # Assume a stream
600 data = cast(IO, data).read()
601
602 if isinstance(data, bytes):
603 data_as_str = data.decode(encoding="utf-8-sig")
604 else:
605 # Explain to mypy the correct type.
606 data_as_str = cast(str, data)
607
608 if mime_type is None:
609 return data_as_str
610
611 if cls.JSON_REGEXP.match(mime_type):
612 try:
613 return json.loads(data_as_str)
614 except ValueError as err:
615 raise DecodeError(
616 message="JSON is invalid: {}".format(err),
617 response=response,
618 error=err,
619 ) from err
620 elif "xml" in (mime_type or []):
621 try:
622 return ET.fromstring(data_as_str) # nosec
623 except ET.ParseError as err:
624 # It might be because the server has an issue, and returned JSON with
625 # content-type XML....
626 # So let's try a JSON load, and if it's still broken
627 # let's flow the initial exception
628 def _json_attemp(data):
629 try:
630 return True, json.loads(data)
631 except ValueError:
632 return False, None # Don't care about this one
633
634 success, json_result = _json_attemp(data)
635 if success:
636 return json_result
637 # If i'm here, it's not JSON, it's not XML, let's scream
638 # and raise the last context in this block (the XML exception)
639 # The function hack is because Py2.7 messes up with exception
640 # context otherwise.
641 _LOGGER.critical("Wasn't XML not JSON, failing")
642 raise DecodeError("XML is invalid", response=response) from err
643 elif mime_type.startswith("text/"):
644 return data_as_str
645 raise DecodeError("Cannot deserialize content-type: {}".format(mime_type))
646
647 @classmethod
648 def deserialize_from_http_generics(
649 cls,
650 response: HTTPResponseType,
651 encoding: Optional[str] = None,
652 ) -> Any:
653 """Deserialize from HTTP response.
654
655 Headers will tested for "content-type"
656
657 :param response: The HTTP response
658 :type response: any
659 :param str encoding: The encoding to use if known for this service (will disable auto-detection)
660 :raises ~azure.core.exceptions.DecodeError: If deserialization fails
661 :returns: A dict (JSON), XML tree or str, depending of the mime_type
662 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str
663 """
664 # Try to use content-type from headers if available
665 if response.content_type:
666 mime_type = response.content_type.split(";")[0].strip().lower()
667 # Ouch, this server did not declare what it sent...
668 # Let's guess it's JSON...
669 # Also, since Autorest was considering that an empty body was a valid JSON,
670 # need that test as well....
671 else:
672 mime_type = "application/json"
673
674 # Rely on transport implementation to give me "text()" decoded correctly
675 if hasattr(response, "read"):
676 # since users can call deserialize_from_http_generics by themselves
677 # we want to make sure our new responses are read before we try to
678 # deserialize. Only read sync responses since we're in a sync function
679 #
680 # Technically HttpResponse do not contain a "read()", but we don't know what
681 # people have been able to pass here, so keep this code for safety,
682 # even if it's likely dead code
683 if not inspect.iscoroutinefunction(response.read): # type: ignore
684 response.read() # type: ignore
685 return cls.deserialize_from_text(response.text(encoding), mime_type, response=response)
686
687 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
688 options = request.context.options
689 response_encoding = options.pop("response_encoding", self._response_encoding)
690 if response_encoding:
691 request.context["response_encoding"] = response_encoding
692
693 def on_response(
694 self,
695 request: PipelineRequest[HTTPRequestType],
696 response: PipelineResponse[HTTPRequestType, HTTPResponseType],
697 ) -> None:
698 """Extract data from the body of a REST response object.
699 This will load the entire payload in memory.
700 Will follow Content-Type to parse.
701 We assume everything is UTF8 (BOM acceptable).
702
703 :param request: The PipelineRequest object.
704 :type request: ~azure.core.pipeline.PipelineRequest
705 :param response: The PipelineResponse object.
706 :type response: ~azure.core.pipeline.PipelineResponse
707 :raises JSONDecodeError: If JSON is requested and parsing is impossible.
708 :raises UnicodeDecodeError: If bytes is not UTF8
709 :raises xml.etree.ElementTree.ParseError: If bytes is not valid XML
710 :raises ~azure.core.exceptions.DecodeError: If deserialization fails
711 """
712 # If response was asked as stream, do NOT read anything and quit now
713 if response.context.options.get("stream", True):
714 return
715
716 response_encoding = request.context.get("response_encoding")
717
718 response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics(
719 response.http_response, response_encoding
720 )
721
722
723class ProxyPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]):
724 """A proxy policy.
725
726 Dictionary mapping protocol or protocol and host to the URL of the proxy
727 to be used on each Request.
728
729 :param MutableMapping proxies: Maps protocol or protocol and hostname to the URL
730 of the proxy.
731
732 .. admonition:: Example:
733
734 .. literalinclude:: ../samples/test_example_sansio.py
735 :start-after: [START proxy_policy]
736 :end-before: [END proxy_policy]
737 :language: python
738 :dedent: 4
739 :caption: Configuring a proxy policy.
740 """
741
742 def __init__(
743 self, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any
744 ): # pylint: disable=unused-argument,super-init-not-called
745 self.proxies = proxies
746
747 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
748 ctxt = request.context.options
749 if self.proxies and "proxies" not in ctxt:
750 ctxt["proxies"] = self.proxies