Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/pipeline/policies/_universal.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

332 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26""" 

27This module is the requests implementation of Pipeline ABC 

28""" 

29import json 

30import inspect 

31import logging 

32import os 

33import platform 

34import xml.etree.ElementTree as ET 

35import types 

36import re 

37import uuid 

38from typing import IO, cast, Union, Optional, AnyStr, Dict, Any, Set, MutableMapping 

39import urllib.parse 

40 

41from azure.core import __version__ as azcore_version 

42from azure.core.exceptions import DecodeError 

43 

44from azure.core.pipeline import PipelineRequest, PipelineResponse 

45from ._base import SansIOHTTPPolicy 

46 

47from ..transport import HttpRequest as LegacyHttpRequest 

48from ..transport._base import _HttpResponseBase as LegacySansIOHttpResponse 

49from ...rest import HttpRequest 

50from ...rest._rest_py3 import _HttpResponseBase as SansIOHttpResponse 

51 

52_LOGGER = logging.getLogger(__name__) 

53 

54HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] 

55HTTPResponseType = Union[LegacySansIOHttpResponse, SansIOHttpResponse] 

56PipelineResponseType = PipelineResponse[HTTPRequestType, HTTPResponseType] 

57 

58 

59class HeadersPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

60 """A simple policy that sends the given headers with the request. 

61 

62 This will overwrite any headers already defined in the request. Headers can be 

63 configured up front, where any custom headers will be applied to all outgoing 

64 operations, and additional headers can also be added dynamically per operation. 

65 

66 :param dict base_headers: Headers to send with the request. 

67 

68 .. admonition:: Example: 

69 

70 .. literalinclude:: ../samples/test_example_sansio.py 

71 :start-after: [START headers_policy] 

72 :end-before: [END headers_policy] 

73 :language: python 

74 :dedent: 4 

75 :caption: Configuring a headers policy. 

76 """ 

77 

78 def __init__(self, base_headers: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: 

79 self._headers: Dict[str, str] = base_headers or {} 

80 self._headers.update(kwargs.pop("headers", {})) 

81 

82 @property 

83 def headers(self) -> Dict[str, str]: 

84 """The current headers collection. 

85 

86 :rtype: dict[str, str] 

87 :return: The current headers collection. 

88 """ 

89 return self._headers 

90 

91 def add_header(self, key: str, value: str) -> None: 

92 """Add a header to the configuration to be applied to all requests. 

93 

94 :param str key: The header. 

95 :param str value: The header's value. 

96 """ 

97 self._headers[key] = value 

98 

99 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

100 """Updates with the given headers before sending the request to the next policy. 

101 

102 :param request: The PipelineRequest object 

103 :type request: ~azure.core.pipeline.PipelineRequest 

104 """ 

105 request.http_request.headers.update(self.headers) 

106 additional_headers = request.context.options.pop("headers", {}) 

107 if additional_headers: 

108 request.http_request.headers.update(additional_headers) 

109 

110 

111class _Unset: 

112 pass 

113 

114 

115class RequestIdPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

116 """A simple policy that sets the given request id in the header. 

117 

118 This will overwrite request id that is already defined in the request. Request id can be 

119 configured up front, where the request id will be applied to all outgoing 

120 operations, and additional request id can also be set dynamically per operation. 

121 

122 :keyword str request_id: The request id to be added into header. 

123 :keyword bool auto_request_id: Auto generates a unique request ID per call if true which is by default. 

124 :keyword str request_id_header_name: Header name to use. Default is "x-ms-client-request-id". 

125 

126 .. admonition:: Example: 

127 

128 .. literalinclude:: ../samples/test_example_sansio.py 

129 :start-after: [START request_id_policy] 

130 :end-before: [END request_id_policy] 

131 :language: python 

132 :dedent: 4 

133 :caption: Configuring a request id policy. 

134 """ 

135 

136 def __init__( 

137 self, # pylint: disable=unused-argument 

138 *, 

139 request_id: Union[str, Any] = _Unset, 

140 auto_request_id: bool = True, 

141 request_id_header_name: str = "x-ms-client-request-id", 

142 **kwargs: Any 

143 ) -> None: 

144 super() 

145 self._request_id = request_id 

146 self._auto_request_id = auto_request_id 

147 self._request_id_header_name = request_id_header_name 

148 

149 def set_request_id(self, value: str) -> None: 

150 """Add the request id to the configuration to be applied to all requests. 

151 

152 :param str value: The request id value. 

153 """ 

154 self._request_id = value 

155 

156 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

157 """Updates with the given request id before sending the request to the next policy. 

158 

159 :param request: The PipelineRequest object 

160 :type request: ~azure.core.pipeline.PipelineRequest 

161 """ 

162 request_id = unset = object() 

163 if "request_id" in request.context.options: 

164 request_id = request.context.options.pop("request_id") 

165 if request_id is None: 

166 return 

167 elif self._request_id is None: 

168 return 

169 elif self._request_id is not _Unset: 

170 if self._request_id_header_name in request.http_request.headers: 

171 return 

172 request_id = self._request_id 

173 elif self._auto_request_id: 

174 if self._request_id_header_name in request.http_request.headers: 

175 return 

176 request_id = str(uuid.uuid1()) 

177 if request_id is not unset: 

178 header = {self._request_id_header_name: cast(str, request_id)} 

179 request.http_request.headers.update(header) 

180 

181 

182class UserAgentPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

183 """User-Agent Policy. Allows custom values to be added to the User-Agent header. 

184 

185 :param str base_user_agent: Sets the base user agent value. 

186 

187 :keyword bool user_agent_overwrite: Overwrites User-Agent when True. Defaults to False. 

188 :keyword bool user_agent_use_env: Gets user-agent from environment. Defaults to True. 

189 :keyword str user_agent: If specified, this will be added in front of the user agent string. 

190 :keyword str sdk_moniker: If specified, the user agent string will be 

191 azsdk-python-[sdk_moniker] Python/[python_version] ([platform_version]) 

192 

193 .. admonition:: Example: 

194 

195 .. literalinclude:: ../samples/test_example_sansio.py 

196 :start-after: [START user_agent_policy] 

197 :end-before: [END user_agent_policy] 

198 :language: python 

199 :dedent: 4 

200 :caption: Configuring a user agent policy. 

201 """ 

202 

203 _USERAGENT = "User-Agent" 

204 _ENV_ADDITIONAL_USER_AGENT = "AZURE_HTTP_USER_AGENT" 

205 

206 def __init__(self, base_user_agent: Optional[str] = None, **kwargs: Any) -> None: 

207 self.overwrite: bool = kwargs.pop("user_agent_overwrite", False) 

208 self.use_env: bool = kwargs.pop("user_agent_use_env", True) 

209 application_id: Optional[str] = kwargs.pop("user_agent", None) 

210 sdk_moniker: str = kwargs.pop("sdk_moniker", "core/{}".format(azcore_version)) 

211 

212 if base_user_agent: 

213 self._user_agent = base_user_agent 

214 else: 

215 self._user_agent = "azsdk-python-{} Python/{} ({})".format( 

216 sdk_moniker, platform.python_version(), platform.platform() 

217 ) 

218 

219 if application_id: 

220 self._user_agent = "{} {}".format(application_id, self._user_agent) 

221 

222 @property 

223 def user_agent(self) -> str: 

224 """The current user agent value. 

225 

226 :return: The current user agent value. 

227 :rtype: str 

228 """ 

229 if self.use_env: 

230 add_user_agent_header = os.environ.get(self._ENV_ADDITIONAL_USER_AGENT, None) 

231 if add_user_agent_header is not None: 

232 return "{} {}".format(self._user_agent, add_user_agent_header) 

233 return self._user_agent 

234 

235 def add_user_agent(self, value: str) -> None: 

236 """Add value to current user agent with a space. 

237 

238 :param str value: value to add to user agent. 

239 """ 

240 self._user_agent = "{} {}".format(self._user_agent, value) 

241 

242 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

243 """Modifies the User-Agent header before the request is sent. 

244 

245 :param request: The PipelineRequest object 

246 :type request: ~azure.core.pipeline.PipelineRequest 

247 """ 

248 http_request = request.http_request 

249 options_dict = request.context.options 

250 if "user_agent" in options_dict: 

251 user_agent = options_dict.pop("user_agent") 

252 if options_dict.pop("user_agent_overwrite", self.overwrite): 

253 http_request.headers[self._USERAGENT] = user_agent 

254 else: 

255 user_agent = "{} {}".format(user_agent, self.user_agent) 

256 http_request.headers[self._USERAGENT] = user_agent 

257 

258 elif self.overwrite or self._USERAGENT not in http_request.headers: 

259 http_request.headers[self._USERAGENT] = self.user_agent 

260 

261 

262class NetworkTraceLoggingPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

263 """The logging policy in the pipeline is used to output HTTP network trace to the configured logger. 

264 

265 This accepts both global configuration, and per-request level with "enable_http_logger" 

266 

267 :param bool logging_enable: Use to enable per operation. Defaults to False. 

268 

269 .. admonition:: Example: 

270 

271 .. literalinclude:: ../samples/test_example_sansio.py 

272 :start-after: [START network_trace_logging_policy] 

273 :end-before: [END network_trace_logging_policy] 

274 :language: python 

275 :dedent: 4 

276 :caption: Configuring a network trace logging policy. 

277 """ 

278 

279 def __init__(self, logging_enable: bool = False, **kwargs: Any): # pylint: disable=unused-argument 

280 self.enable_http_logger = logging_enable 

281 

282 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

283 """Logs HTTP request to the DEBUG logger. 

284 

285 :param request: The PipelineRequest object. 

286 :type request: ~azure.core.pipeline.PipelineRequest 

287 """ 

288 http_request = request.http_request 

289 options = request.context.options 

290 logging_enable = options.pop("logging_enable", self.enable_http_logger) 

291 request.context["logging_enable"] = logging_enable 

292 if logging_enable: 

293 if not _LOGGER.isEnabledFor(logging.DEBUG): 

294 return 

295 

296 try: 

297 log_string = "Request URL: '{}'".format(http_request.url) 

298 log_string += "\nRequest method: '{}'".format(http_request.method) 

299 log_string += "\nRequest headers:" 

300 for header, value in http_request.headers.items(): 

301 log_string += "\n '{}': '{}'".format(header, value) 

302 log_string += "\nRequest body:" 

303 

304 # We don't want to log the binary data of a file upload. 

305 if isinstance(http_request.body, types.GeneratorType): 

306 log_string += "\nFile upload" 

307 _LOGGER.debug(log_string) 

308 return 

309 try: 

310 if isinstance(http_request.body, types.AsyncGeneratorType): 

311 log_string += "\nFile upload" 

312 _LOGGER.debug(log_string) 

313 return 

314 except AttributeError: 

315 pass 

316 if http_request.body: 

317 log_string += "\n{}".format(str(http_request.body)) 

318 _LOGGER.debug(log_string) 

319 return 

320 log_string += "\nThis request has no body" 

321 _LOGGER.debug(log_string) 

322 except Exception as err: # pylint: disable=broad-except 

323 _LOGGER.debug("Failed to log request: %r", err) 

324 

325 def on_response( 

326 self, 

327 request: PipelineRequest[HTTPRequestType], 

328 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

329 ) -> None: 

330 """Logs HTTP response to the DEBUG logger. 

331 

332 :param request: The PipelineRequest object. 

333 :type request: ~azure.core.pipeline.PipelineRequest 

334 :param response: The PipelineResponse object. 

335 :type response: ~azure.core.pipeline.PipelineResponse 

336 """ 

337 http_response = response.http_response 

338 try: 

339 logging_enable = response.context["logging_enable"] 

340 if logging_enable: 

341 if not _LOGGER.isEnabledFor(logging.DEBUG): 

342 return 

343 

344 log_string = "Response status: '{}'".format(http_response.status_code) 

345 log_string += "\nResponse headers:" 

346 for res_header, value in http_response.headers.items(): 

347 log_string += "\n '{}': '{}'".format(res_header, value) 

348 

349 # We don't want to log binary data if the response is a file. 

350 log_string += "\nResponse content:" 

351 pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE) 

352 header = http_response.headers.get("content-disposition") 

353 

354 if header and pattern.match(header): 

355 filename = header.partition("=")[2] 

356 log_string += "\nFile attachments: {}".format(filename) 

357 elif http_response.headers.get("content-type", "").endswith("octet-stream"): 

358 log_string += "\nBody contains binary data." 

359 elif http_response.headers.get("content-type", "").startswith("image"): 

360 log_string += "\nBody contains image data." 

361 else: 

362 if response.context.options.get("stream", False): 

363 log_string += "\nBody is streamable." 

364 else: 

365 log_string += "\n{}".format(http_response.text()) 

366 _LOGGER.debug(log_string) 

367 except Exception as err: # pylint: disable=broad-except 

368 _LOGGER.debug("Failed to log response: %s", repr(err)) 

369 

370 

371class _HiddenClassProperties(type): 

372 # Backward compatible for DEFAULT_HEADERS_WHITELIST 

373 # https://github.com/Azure/azure-sdk-for-python/issues/26331 

374 

375 @property 

376 def DEFAULT_HEADERS_WHITELIST(cls) -> Set[str]: 

377 return cls.DEFAULT_HEADERS_ALLOWLIST 

378 

379 @DEFAULT_HEADERS_WHITELIST.setter 

380 def DEFAULT_HEADERS_WHITELIST(cls, value: Set[str]) -> None: 

381 cls.DEFAULT_HEADERS_ALLOWLIST = value 

382 

383 

384class HttpLoggingPolicy( 

385 SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType], 

386 metaclass=_HiddenClassProperties, 

387): 

388 """The Pipeline policy that handles logging of HTTP requests and responses. 

389 

390 :param logger: The logger to use for logging. Default to azure.core.pipeline.policies.http_logging_policy. 

391 :type logger: logging.Logger 

392 """ 

393 

394 DEFAULT_HEADERS_ALLOWLIST: Set[str] = set( 

395 [ 

396 "x-ms-request-id", 

397 "x-ms-client-request-id", 

398 "x-ms-return-client-request-id", 

399 "x-ms-error-code", 

400 "traceparent", 

401 "Accept", 

402 "Cache-Control", 

403 "Connection", 

404 "Content-Length", 

405 "Content-Type", 

406 "Date", 

407 "ETag", 

408 "Expires", 

409 "If-Match", 

410 "If-Modified-Since", 

411 "If-None-Match", 

412 "If-Unmodified-Since", 

413 "Last-Modified", 

414 "Pragma", 

415 "Request-Id", 

416 "Retry-After", 

417 "Server", 

418 "Transfer-Encoding", 

419 "User-Agent", 

420 "WWW-Authenticate", # OAuth Challenge header. 

421 "x-vss-e2eid", # Needed by Azure DevOps pipelines. 

422 "x-msedge-ref", # Needed by Azure DevOps pipelines. 

423 ] 

424 ) 

425 REDACTED_PLACEHOLDER: str = "REDACTED" 

426 MULTI_RECORD_LOG: str = "AZURE_SDK_LOGGING_MULTIRECORD" 

427 

428 def __init__(self, logger: Optional[logging.Logger] = None, **kwargs: Any): # pylint: disable=unused-argument 

429 self.logger: logging.Logger = logger or logging.getLogger("azure.core.pipeline.policies.http_logging_policy") 

430 self.allowed_query_params: Set[str] = set() 

431 self.allowed_header_names: Set[str] = set(self.__class__.DEFAULT_HEADERS_ALLOWLIST) 

432 

433 def _redact_query_param(self, key: str, value: str) -> str: 

434 lower_case_allowed_query_params = [param.lower() for param in self.allowed_query_params] 

435 return value if key.lower() in lower_case_allowed_query_params else HttpLoggingPolicy.REDACTED_PLACEHOLDER 

436 

437 def _redact_header(self, key: str, value: str) -> str: 

438 lower_case_allowed_header_names = [header.lower() for header in self.allowed_header_names] 

439 return value if key.lower() in lower_case_allowed_header_names else HttpLoggingPolicy.REDACTED_PLACEHOLDER 

440 

441 def on_request( # pylint: disable=too-many-return-statements 

442 self, request: PipelineRequest[HTTPRequestType] 

443 ) -> None: 

444 """Logs HTTP method, url and headers. 

445 

446 :param request: The PipelineRequest object. 

447 :type request: ~azure.core.pipeline.PipelineRequest 

448 """ 

449 http_request = request.http_request 

450 options = request.context.options 

451 # Get logger in my context first (request has been retried) 

452 # then read from kwargs (pop if that's the case) 

453 # then use my instance logger 

454 logger = request.context.setdefault("logger", options.pop("logger", self.logger)) 

455 

456 if not logger.isEnabledFor(logging.INFO): 

457 return 

458 

459 try: 

460 parsed_url = list(urllib.parse.urlparse(http_request.url)) 

461 parsed_qp = urllib.parse.parse_qsl(parsed_url[4], keep_blank_values=True) 

462 filtered_qp = [(key, self._redact_query_param(key, value)) for key, value in parsed_qp] 

463 # 4 is query 

464 parsed_url[4] = "&".join(["=".join(part) for part in filtered_qp]) 

465 redacted_url = urllib.parse.urlunparse(parsed_url) 

466 

467 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) 

468 if multi_record: 

469 logger.info("Request URL: %r", redacted_url) 

470 logger.info("Request method: %r", http_request.method) 

471 logger.info("Request headers:") 

472 for header, value in http_request.headers.items(): 

473 value = self._redact_header(header, value) 

474 logger.info(" %r: %r", header, value) 

475 if isinstance(http_request.body, types.GeneratorType): 

476 logger.info("File upload") 

477 return 

478 try: 

479 if isinstance(http_request.body, types.AsyncGeneratorType): 

480 logger.info("File upload") 

481 return 

482 except AttributeError: 

483 pass 

484 if http_request.body: 

485 logger.info("A body is sent with the request") 

486 return 

487 logger.info("No body was attached to the request") 

488 return 

489 log_string = "Request URL: '{}'".format(redacted_url) 

490 log_string += "\nRequest method: '{}'".format(http_request.method) 

491 log_string += "\nRequest headers:" 

492 for header, value in http_request.headers.items(): 

493 value = self._redact_header(header, value) 

494 log_string += "\n '{}': '{}'".format(header, value) 

495 if isinstance(http_request.body, types.GeneratorType): 

496 log_string += "\nFile upload" 

497 logger.info(log_string) 

498 return 

499 try: 

500 if isinstance(http_request.body, types.AsyncGeneratorType): 

501 log_string += "\nFile upload" 

502 logger.info(log_string) 

503 return 

504 except AttributeError: 

505 pass 

506 if http_request.body: 

507 log_string += "\nA body is sent with the request" 

508 logger.info(log_string) 

509 return 

510 log_string += "\nNo body was attached to the request" 

511 logger.info(log_string) 

512 

513 except Exception: # pylint: disable=broad-except 

514 logger.warning("Failed to log request.") 

515 

516 def on_response( 

517 self, 

518 request: PipelineRequest[HTTPRequestType], 

519 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

520 ) -> None: 

521 """Logs HTTP response status and headers. 

522 

523 :param request: The PipelineRequest object. 

524 :type request: ~azure.core.pipeline.PipelineRequest 

525 :param response: The PipelineResponse object. 

526 :type response: ~azure.core.pipeline.PipelineResponse 

527 """ 

528 http_response = response.http_response 

529 

530 # Get logger in my context first (request has been retried) 

531 # then read from kwargs (pop if that's the case) 

532 # then use my instance logger 

533 # If on_request was called, should always read from context 

534 options = request.context.options 

535 logger = request.context.setdefault("logger", options.pop("logger", self.logger)) 

536 

537 try: 

538 if not logger.isEnabledFor(logging.INFO): 

539 return 

540 

541 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) 

542 if multi_record: 

543 logger.info("Response status: %r", http_response.status_code) 

544 logger.info("Response headers:") 

545 for res_header, value in http_response.headers.items(): 

546 value = self._redact_header(res_header, value) 

547 logger.info(" %r: %r", res_header, value) 

548 return 

549 log_string = "Response status: {}".format(http_response.status_code) 

550 log_string += "\nResponse headers:" 

551 for res_header, value in http_response.headers.items(): 

552 value = self._redact_header(res_header, value) 

553 log_string += "\n '{}': '{}'".format(res_header, value) 

554 logger.info(log_string) 

555 except Exception: # pylint: disable=broad-except 

556 logger.warning("Failed to log response.") 

557 

558 

559class ContentDecodePolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

560 """Policy for decoding unstreamed response content. 

561 

562 :param response_encoding: The encoding to use if known for this service (will disable auto-detection) 

563 :type response_encoding: str 

564 """ 

565 

566 # Accept "text" because we're open minded people... 

567 JSON_REGEXP = re.compile(r"^(application|text)/([0-9a-z+.-]+\+)?json$") 

568 

569 # Name used in context 

570 CONTEXT_NAME = "deserialized_data" 

571 

572 def __init__( 

573 self, response_encoding: Optional[str] = None, **kwargs: Any # pylint: disable=unused-argument 

574 ) -> None: 

575 self._response_encoding = response_encoding 

576 

577 @classmethod 

578 def deserialize_from_text( 

579 cls, 

580 data: Optional[Union[AnyStr, IO[AnyStr]]], 

581 mime_type: Optional[str] = None, 

582 response: Optional[HTTPResponseType] = None, 

583 ) -> Any: 

584 """Decode response data according to content-type. 

585 

586 Accept a stream of data as well, but will be load at once in memory for now. 

587 If no content-type, will return the string version (not bytes, not stream) 

588 

589 :param data: The data to deserialize. 

590 :type data: str or bytes or file-like object 

591 :param response: The HTTP response. 

592 :type response: ~azure.core.pipeline.transport.HttpResponse 

593 :param str mime_type: The mime type. As mime type, charset is not expected. 

594 :param response: If passed, exception will be annotated with that response 

595 :type response: any 

596 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

597 :returns: A dict (JSON), XML tree or str, depending of the mime_type 

598 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str 

599 """ 

600 if not data: 

601 return None 

602 

603 if hasattr(data, "read"): 

604 # Assume a stream 

605 data = cast(IO, data).read() 

606 

607 if isinstance(data, bytes): 

608 data_as_str = data.decode(encoding="utf-8-sig") 

609 else: 

610 # Explain to mypy the correct type. 

611 data_as_str = cast(str, data) 

612 

613 if mime_type is None: 

614 return data_as_str 

615 

616 if cls.JSON_REGEXP.match(mime_type): 

617 try: 

618 return json.loads(data_as_str) 

619 except ValueError as err: 

620 raise DecodeError( 

621 message="JSON is invalid: {}".format(err), 

622 response=response, 

623 error=err, 

624 ) from err 

625 elif "xml" in (mime_type or []): 

626 try: 

627 return ET.fromstring(data_as_str) # nosec 

628 except ET.ParseError as err: 

629 # It might be because the server has an issue, and returned JSON with 

630 # content-type XML.... 

631 # So let's try a JSON load, and if it's still broken 

632 # let's flow the initial exception 

633 def _json_attemp(data): 

634 try: 

635 return True, json.loads(data) 

636 except ValueError: 

637 return False, None # Don't care about this one 

638 

639 success, json_result = _json_attemp(data) 

640 if success: 

641 return json_result 

642 # If i'm here, it's not JSON, it's not XML, let's scream 

643 # and raise the last context in this block (the XML exception) 

644 # The function hack is because Py2.7 messes up with exception 

645 # context otherwise. 

646 _LOGGER.critical("Wasn't XML not JSON, failing") 

647 raise DecodeError("XML is invalid", response=response) from err 

648 elif mime_type.startswith("text/"): 

649 return data_as_str 

650 raise DecodeError("Cannot deserialize content-type: {}".format(mime_type)) 

651 

652 @classmethod 

653 def deserialize_from_http_generics( 

654 cls, 

655 response: HTTPResponseType, 

656 encoding: Optional[str] = None, 

657 ) -> Any: 

658 """Deserialize from HTTP response. 

659 

660 Headers will tested for "content-type" 

661 

662 :param response: The HTTP response 

663 :type response: any 

664 :param str encoding: The encoding to use if known for this service (will disable auto-detection) 

665 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

666 :returns: A dict (JSON), XML tree or str, depending of the mime_type 

667 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str 

668 """ 

669 # Try to use content-type from headers if available 

670 if response.content_type: 

671 mime_type = response.content_type.split(";")[0].strip().lower() 

672 # Ouch, this server did not declare what it sent... 

673 # Let's guess it's JSON... 

674 # Also, since Autorest was considering that an empty body was a valid JSON, 

675 # need that test as well.... 

676 else: 

677 mime_type = "application/json" 

678 

679 # Rely on transport implementation to give me "text()" decoded correctly 

680 if hasattr(response, "read"): 

681 # since users can call deserialize_from_http_generics by themselves 

682 # we want to make sure our new responses are read before we try to 

683 # deserialize. Only read sync responses since we're in a sync function 

684 # 

685 # Technically HttpResponse do not contain a "read()", but we don't know what 

686 # people have been able to pass here, so keep this code for safety, 

687 # even if it's likely dead code 

688 if not inspect.iscoroutinefunction(response.read): # type: ignore 

689 response.read() # type: ignore 

690 return cls.deserialize_from_text(response.text(encoding), mime_type, response=response) 

691 

692 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

693 """Set the response encoding in the request context. 

694 

695 :param request: The PipelineRequest object. 

696 :type request: ~azure.core.pipeline.PipelineRequest 

697 """ 

698 options = request.context.options 

699 response_encoding = options.pop("response_encoding", self._response_encoding) 

700 if response_encoding: 

701 request.context["response_encoding"] = response_encoding 

702 

703 def on_response( 

704 self, 

705 request: PipelineRequest[HTTPRequestType], 

706 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

707 ) -> None: 

708 """Extract data from the body of a REST response object. 

709 This will load the entire payload in memory. 

710 Will follow Content-Type to parse. 

711 We assume everything is UTF8 (BOM acceptable). 

712 

713 :param request: The PipelineRequest object. 

714 :type request: ~azure.core.pipeline.PipelineRequest 

715 :param response: The PipelineResponse object. 

716 :type response: ~azure.core.pipeline.PipelineResponse 

717 :raises JSONDecodeError: If JSON is requested and parsing is impossible. 

718 :raises UnicodeDecodeError: If bytes is not UTF8 

719 :raises xml.etree.ElementTree.ParseError: If bytes is not valid XML 

720 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

721 """ 

722 # If response was asked as stream, do NOT read anything and quit now 

723 if response.context.options.get("stream", True): 

724 return 

725 

726 response_encoding = request.context.get("response_encoding") 

727 

728 response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics( 

729 response.http_response, response_encoding 

730 ) 

731 

732 

733class ProxyPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

734 """A proxy policy. 

735 

736 Dictionary mapping protocol or protocol and host to the URL of the proxy 

737 to be used on each Request. 

738 

739 :param MutableMapping proxies: Maps protocol or protocol and hostname to the URL 

740 of the proxy. 

741 

742 .. admonition:: Example: 

743 

744 .. literalinclude:: ../samples/test_example_sansio.py 

745 :start-after: [START proxy_policy] 

746 :end-before: [END proxy_policy] 

747 :language: python 

748 :dedent: 4 

749 :caption: Configuring a proxy policy. 

750 """ 

751 

752 def __init__( 

753 self, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any 

754 ): # pylint: disable=unused-argument 

755 self.proxies = proxies 

756 

757 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

758 """Adds the proxy information to the request context. 

759 

760 :param request: The PipelineRequest object 

761 :type request: ~azure.core.pipeline.PipelineRequest 

762 """ 

763 ctxt = request.context.options 

764 if self.proxies and "proxies" not in ctxt: 

765 ctxt["proxies"] = self.proxies