Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/azure/core/pipeline/policies/_universal.py: 23%

331 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-07 06:33 +0000

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26""" 

27This module is the requests implementation of Pipeline ABC 

28""" 

29import json 

30import inspect 

31import logging 

32import os 

33import platform 

34import xml.etree.ElementTree as ET 

35import types 

36import re 

37import uuid 

38from typing import IO, cast, Union, Optional, AnyStr, Dict, Any, Set, MutableMapping 

39import urllib.parse 

40 

41from azure.core import __version__ as azcore_version 

42from azure.core.exceptions import DecodeError 

43 

44from azure.core.pipeline import PipelineRequest, PipelineResponse 

45from ._base import SansIOHTTPPolicy 

46 

47from ..transport import HttpRequest as LegacyHttpRequest 

48from ..transport._base import _HttpResponseBase as LegacySansIOHttpResponse 

49from ...rest import HttpRequest 

50from ...rest._rest_py3 import _HttpResponseBase as SansIOHttpResponse 

51 

52_LOGGER = logging.getLogger(__name__) 

53 

54HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] 

55HTTPResponseType = Union[LegacySansIOHttpResponse, SansIOHttpResponse] 

56PipelineResponseType = PipelineResponse[HTTPRequestType, HTTPResponseType] 

57 

58 

59class HeadersPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

60 """A simple policy that sends the given headers with the request. 

61 

62 This will overwrite any headers already defined in the request. Headers can be 

63 configured up front, where any custom headers will be applied to all outgoing 

64 operations, and additional headers can also be added dynamically per operation. 

65 

66 :param dict base_headers: Headers to send with the request. 

67 

68 .. admonition:: Example: 

69 

70 .. literalinclude:: ../samples/test_example_sansio.py 

71 :start-after: [START headers_policy] 

72 :end-before: [END headers_policy] 

73 :language: python 

74 :dedent: 4 

75 :caption: Configuring a headers policy. 

76 """ 

77 

78 def __init__( 

79 self, base_headers: Optional[Dict[str, str]] = None, **kwargs: Any 

80 ) -> None: # pylint: disable=super-init-not-called 

81 self._headers: Dict[str, str] = base_headers or {} 

82 self._headers.update(kwargs.pop("headers", {})) 

83 

84 @property 

85 def headers(self) -> Dict[str, str]: 

86 """The current headers collection. 

87 

88 :rtype: dict[str, str] 

89 :return: The current headers collection. 

90 """ 

91 return self._headers 

92 

93 def add_header(self, key: str, value: str) -> None: 

94 """Add a header to the configuration to be applied to all requests. 

95 

96 :param str key: The header. 

97 :param str value: The header's value. 

98 """ 

99 self._headers[key] = value 

100 

101 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

102 """Updates with the given headers before sending the request to the next policy. 

103 

104 :param request: The PipelineRequest object 

105 :type request: ~azure.core.pipeline.PipelineRequest 

106 """ 

107 request.http_request.headers.update(self.headers) 

108 additional_headers = request.context.options.pop("headers", {}) 

109 if additional_headers: 

110 request.http_request.headers.update(additional_headers) 

111 

112 

113class _Unset: 

114 pass 

115 

116 

117class RequestIdPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

118 """A simple policy that sets the given request id in the header. 

119 

120 This will overwrite request id that is already defined in the request. Request id can be 

121 configured up front, where the request id will be applied to all outgoing 

122 operations, and additional request id can also be set dynamically per operation. 

123 

124 :keyword str request_id: The request id to be added into header. 

125 :keyword bool auto_request_id: Auto generates a unique request ID per call if true which is by default. 

126 :keyword str request_id_header_name: Header name to use. Default is "x-ms-client-request-id". 

127 

128 .. admonition:: Example: 

129 

130 .. literalinclude:: ../samples/test_example_sansio.py 

131 :start-after: [START request_id_policy] 

132 :end-before: [END request_id_policy] 

133 :language: python 

134 :dedent: 4 

135 :caption: Configuring a request id policy. 

136 """ 

137 

138 def __init__( 

139 self, # pylint: disable=unused-argument 

140 *, 

141 request_id: Union[str, Any] = _Unset, 

142 auto_request_id: bool = True, 

143 request_id_header_name: str = "x-ms-client-request-id", 

144 **kwargs: Any 

145 ) -> None: 

146 super() 

147 self._request_id = request_id 

148 self._auto_request_id = auto_request_id 

149 self._request_id_header_name = request_id_header_name 

150 

151 def set_request_id(self, value: str) -> None: 

152 """Add the request id to the configuration to be applied to all requests. 

153 

154 :param str value: The request id value. 

155 """ 

156 self._request_id = value 

157 

158 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

159 """Updates with the given request id before sending the request to the next policy. 

160 

161 :param request: The PipelineRequest object 

162 :type request: ~azure.core.pipeline.PipelineRequest 

163 """ 

164 request_id = unset = object() 

165 if "request_id" in request.context.options: 

166 request_id = request.context.options.pop("request_id") 

167 if request_id is None: 

168 return 

169 elif self._request_id is None: 

170 return 

171 elif self._request_id is not _Unset: 

172 if self._request_id_header_name in request.http_request.headers: 

173 return 

174 request_id = self._request_id 

175 elif self._auto_request_id: 

176 if self._request_id_header_name in request.http_request.headers: 

177 return 

178 request_id = str(uuid.uuid1()) 

179 if request_id is not unset: 

180 header = {self._request_id_header_name: cast(str, request_id)} 

181 request.http_request.headers.update(header) 

182 

183 

184class UserAgentPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

185 """User-Agent Policy. Allows custom values to be added to the User-Agent header. 

186 

187 :param str base_user_agent: Sets the base user agent value. 

188 

189 :keyword bool user_agent_overwrite: Overwrites User-Agent when True. Defaults to False. 

190 :keyword bool user_agent_use_env: Gets user-agent from environment. Defaults to True. 

191 :keyword str user_agent: If specified, this will be added in front of the user agent string. 

192 :keyword str sdk_moniker: If specified, the user agent string will be 

193 azsdk-python-[sdk_moniker] Python/[python_version] ([platform_version]) 

194 

195 .. admonition:: Example: 

196 

197 .. literalinclude:: ../samples/test_example_sansio.py 

198 :start-after: [START user_agent_policy] 

199 :end-before: [END user_agent_policy] 

200 :language: python 

201 :dedent: 4 

202 :caption: Configuring a user agent policy. 

203 """ 

204 

205 _USERAGENT = "User-Agent" 

206 _ENV_ADDITIONAL_USER_AGENT = "AZURE_HTTP_USER_AGENT" 

207 

208 def __init__( 

209 self, base_user_agent: Optional[str] = None, **kwargs: Any 

210 ) -> None: # pylint: disable=super-init-not-called 

211 self.overwrite: bool = kwargs.pop("user_agent_overwrite", False) 

212 self.use_env: bool = kwargs.pop("user_agent_use_env", True) 

213 application_id: Optional[str] = kwargs.pop("user_agent", None) 

214 sdk_moniker: str = kwargs.pop("sdk_moniker", "core/{}".format(azcore_version)) 

215 

216 if base_user_agent: 

217 self._user_agent = base_user_agent 

218 else: 

219 self._user_agent = "azsdk-python-{} Python/{} ({})".format( 

220 sdk_moniker, platform.python_version(), platform.platform() 

221 ) 

222 

223 if application_id: 

224 self._user_agent = "{} {}".format(application_id, self._user_agent) 

225 

226 @property 

227 def user_agent(self) -> str: 

228 """The current user agent value. 

229 

230 :return: The current user agent value. 

231 :rtype: str 

232 """ 

233 if self.use_env: 

234 add_user_agent_header = os.environ.get(self._ENV_ADDITIONAL_USER_AGENT, None) 

235 if add_user_agent_header is not None: 

236 return "{} {}".format(self._user_agent, add_user_agent_header) 

237 return self._user_agent 

238 

239 def add_user_agent(self, value: str) -> None: 

240 """Add value to current user agent with a space. 

241 :param str value: value to add to user agent. 

242 """ 

243 self._user_agent = "{} {}".format(self._user_agent, value) 

244 

245 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

246 """Modifies the User-Agent header before the request is sent. 

247 

248 :param request: The PipelineRequest object 

249 :type request: ~azure.core.pipeline.PipelineRequest 

250 """ 

251 http_request = request.http_request 

252 options_dict = request.context.options 

253 if "user_agent" in options_dict: 

254 user_agent = options_dict.pop("user_agent") 

255 if options_dict.pop("user_agent_overwrite", self.overwrite): 

256 http_request.headers[self._USERAGENT] = user_agent 

257 else: 

258 user_agent = "{} {}".format(user_agent, self.user_agent) 

259 http_request.headers[self._USERAGENT] = user_agent 

260 

261 elif self.overwrite or self._USERAGENT not in http_request.headers: 

262 http_request.headers[self._USERAGENT] = self.user_agent 

263 

264 

265class NetworkTraceLoggingPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

266 

267 """The logging policy in the pipeline is used to output HTTP network trace to the configured logger. 

268 

269 This accepts both global configuration, and per-request level with "enable_http_logger" 

270 

271 :param bool logging_enable: Use to enable per operation. Defaults to False. 

272 

273 .. admonition:: Example: 

274 

275 .. literalinclude:: ../samples/test_example_sansio.py 

276 :start-after: [START network_trace_logging_policy] 

277 :end-before: [END network_trace_logging_policy] 

278 :language: python 

279 :dedent: 4 

280 :caption: Configuring a network trace logging policy. 

281 """ 

282 

283 def __init__(self, logging_enable: bool = False, **kwargs: Any): # pylint: disable=unused-argument 

284 self.enable_http_logger = logging_enable 

285 

286 def on_request( 

287 self, request: PipelineRequest[HTTPRequestType] 

288 ) -> None: # pylint: disable=too-many-return-statements 

289 """Logs HTTP request to the DEBUG logger. 

290 

291 :param request: The PipelineRequest object. 

292 :type request: ~azure.core.pipeline.PipelineRequest 

293 """ 

294 http_request = request.http_request 

295 options = request.context.options 

296 logging_enable = options.pop("logging_enable", self.enable_http_logger) 

297 request.context["logging_enable"] = logging_enable 

298 if logging_enable: 

299 if not _LOGGER.isEnabledFor(logging.DEBUG): 

300 return 

301 

302 try: 

303 log_string = "Request URL: '{}'".format(http_request.url) 

304 log_string += "\nRequest method: '{}'".format(http_request.method) 

305 log_string += "\nRequest headers:" 

306 for header, value in http_request.headers.items(): 

307 log_string += "\n '{}': '{}'".format(header, value) 

308 log_string += "\nRequest body:" 

309 

310 # We don't want to log the binary data of a file upload. 

311 if isinstance(http_request.body, types.GeneratorType): 

312 log_string += "\nFile upload" 

313 _LOGGER.debug(log_string) 

314 return 

315 try: 

316 if isinstance(http_request.body, types.AsyncGeneratorType): 

317 log_string += "\nFile upload" 

318 _LOGGER.debug(log_string) 

319 return 

320 except AttributeError: 

321 pass 

322 if http_request.body: 

323 log_string += "\n{}".format(str(http_request.body)) 

324 _LOGGER.debug(log_string) 

325 return 

326 log_string += "\nThis request has no body" 

327 _LOGGER.debug(log_string) 

328 except Exception as err: # pylint: disable=broad-except 

329 _LOGGER.debug("Failed to log request: %r", err) 

330 

331 def on_response( 

332 self, 

333 request: PipelineRequest[HTTPRequestType], 

334 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

335 ) -> None: 

336 """Logs HTTP response to the DEBUG logger. 

337 

338 :param request: The PipelineRequest object. 

339 :type request: ~azure.core.pipeline.PipelineRequest 

340 :param response: The PipelineResponse object. 

341 :type response: ~azure.core.pipeline.PipelineResponse 

342 """ 

343 http_response = response.http_response 

344 try: 

345 logging_enable = response.context["logging_enable"] 

346 if logging_enable: 

347 if not _LOGGER.isEnabledFor(logging.DEBUG): 

348 return 

349 

350 log_string = "Response status: '{}'".format(http_response.status_code) 

351 log_string += "\nResponse headers:" 

352 for res_header, value in http_response.headers.items(): 

353 log_string += "\n '{}': '{}'".format(res_header, value) 

354 

355 # We don't want to log binary data if the response is a file. 

356 log_string += "\nResponse content:" 

357 pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE) 

358 header = http_response.headers.get("content-disposition") 

359 

360 if header and pattern.match(header): 

361 filename = header.partition("=")[2] 

362 log_string += "\nFile attachments: {}".format(filename) 

363 elif http_response.headers.get("content-type", "").endswith("octet-stream"): 

364 log_string += "\nBody contains binary data." 

365 elif http_response.headers.get("content-type", "").startswith("image"): 

366 log_string += "\nBody contains image data." 

367 else: 

368 if response.context.options.get("stream", False): 

369 log_string += "\nBody is streamable." 

370 else: 

371 log_string += "\n{}".format(http_response.text()) 

372 _LOGGER.debug(log_string) 

373 except Exception as err: # pylint: disable=broad-except 

374 _LOGGER.debug("Failed to log response: %s", repr(err)) 

375 

376 

377class _HiddenClassProperties(type): 

378 # Backward compatible for DEFAULT_HEADERS_WHITELIST 

379 # https://github.com/Azure/azure-sdk-for-python/issues/26331 

380 

381 @property 

382 def DEFAULT_HEADERS_WHITELIST(cls) -> Set[str]: 

383 return cls.DEFAULT_HEADERS_ALLOWLIST 

384 

385 @DEFAULT_HEADERS_WHITELIST.setter 

386 def DEFAULT_HEADERS_WHITELIST(cls, value: Set[str]) -> None: 

387 cls.DEFAULT_HEADERS_ALLOWLIST = value 

388 

389 

390class HttpLoggingPolicy( 

391 SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType], 

392 metaclass=_HiddenClassProperties, 

393): 

394 """The Pipeline policy that handles logging of HTTP requests and responses. 

395 

396 :param logger: The logger to use for logging. Default to azure.core.pipeline.policies.http_logging_policy. 

397 :type logger: logging.Logger 

398 """ 

399 

400 DEFAULT_HEADERS_ALLOWLIST: Set[str] = set( 

401 [ 

402 "x-ms-request-id", 

403 "x-ms-client-request-id", 

404 "x-ms-return-client-request-id", 

405 "x-ms-error-code", 

406 "traceparent", 

407 "Accept", 

408 "Cache-Control", 

409 "Connection", 

410 "Content-Length", 

411 "Content-Type", 

412 "Date", 

413 "ETag", 

414 "Expires", 

415 "If-Match", 

416 "If-Modified-Since", 

417 "If-None-Match", 

418 "If-Unmodified-Since", 

419 "Last-Modified", 

420 "Pragma", 

421 "Request-Id", 

422 "Retry-After", 

423 "Server", 

424 "Transfer-Encoding", 

425 "User-Agent", 

426 "WWW-Authenticate", # OAuth Challenge header. 

427 ] 

428 ) 

429 REDACTED_PLACEHOLDER: str = "REDACTED" 

430 MULTI_RECORD_LOG: str = "AZURE_SDK_LOGGING_MULTIRECORD" 

431 

432 def __init__(self, logger: Optional[logging.Logger] = None, **kwargs: Any): # pylint: disable=unused-argument 

433 self.logger: logging.Logger = logger or logging.getLogger("azure.core.pipeline.policies.http_logging_policy") 

434 self.allowed_query_params: Set[str] = set() 

435 self.allowed_header_names: Set[str] = set(self.__class__.DEFAULT_HEADERS_ALLOWLIST) 

436 

437 def _redact_query_param(self, key: str, value: str) -> str: 

438 lower_case_allowed_query_params = [param.lower() for param in self.allowed_query_params] 

439 return value if key.lower() in lower_case_allowed_query_params else HttpLoggingPolicy.REDACTED_PLACEHOLDER 

440 

441 def _redact_header(self, key: str, value: str) -> str: 

442 lower_case_allowed_header_names = [header.lower() for header in self.allowed_header_names] 

443 return value if key.lower() in lower_case_allowed_header_names else HttpLoggingPolicy.REDACTED_PLACEHOLDER 

444 

445 def on_request( # pylint: disable=too-many-return-statements 

446 self, request: PipelineRequest[HTTPRequestType] 

447 ) -> None: 

448 """Logs HTTP method, url and headers. 

449 :param request: The PipelineRequest object. 

450 :type request: ~azure.core.pipeline.PipelineRequest 

451 """ 

452 http_request = request.http_request 

453 options = request.context.options 

454 # Get logger in my context first (request has been retried) 

455 # then read from kwargs (pop if that's the case) 

456 # then use my instance logger 

457 logger = request.context.setdefault("logger", options.pop("logger", self.logger)) 

458 

459 if not logger.isEnabledFor(logging.INFO): 

460 return 

461 

462 try: 

463 parsed_url = list(urllib.parse.urlparse(http_request.url)) 

464 parsed_qp = urllib.parse.parse_qsl(parsed_url[4], keep_blank_values=True) 

465 filtered_qp = [(key, self._redact_query_param(key, value)) for key, value in parsed_qp] 

466 # 4 is query 

467 parsed_url[4] = "&".join(["=".join(part) for part in filtered_qp]) 

468 redacted_url = urllib.parse.urlunparse(parsed_url) 

469 

470 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) 

471 if multi_record: 

472 logger.info("Request URL: %r", redacted_url) 

473 logger.info("Request method: %r", http_request.method) 

474 logger.info("Request headers:") 

475 for header, value in http_request.headers.items(): 

476 value = self._redact_header(header, value) 

477 logger.info(" %r: %r", header, value) 

478 if isinstance(http_request.body, types.GeneratorType): 

479 logger.info("File upload") 

480 return 

481 try: 

482 if isinstance(http_request.body, types.AsyncGeneratorType): 

483 logger.info("File upload") 

484 return 

485 except AttributeError: 

486 pass 

487 if http_request.body: 

488 logger.info("A body is sent with the request") 

489 return 

490 logger.info("No body was attached to the request") 

491 return 

492 log_string = "Request URL: '{}'".format(redacted_url) 

493 log_string += "\nRequest method: '{}'".format(http_request.method) 

494 log_string += "\nRequest headers:" 

495 for header, value in http_request.headers.items(): 

496 value = self._redact_header(header, value) 

497 log_string += "\n '{}': '{}'".format(header, value) 

498 if isinstance(http_request.body, types.GeneratorType): 

499 log_string += "\nFile upload" 

500 logger.info(log_string) 

501 return 

502 try: 

503 if isinstance(http_request.body, types.AsyncGeneratorType): 

504 log_string += "\nFile upload" 

505 logger.info(log_string) 

506 return 

507 except AttributeError: 

508 pass 

509 if http_request.body: 

510 log_string += "\nA body is sent with the request" 

511 logger.info(log_string) 

512 return 

513 log_string += "\nNo body was attached to the request" 

514 logger.info(log_string) 

515 

516 except Exception as err: # pylint: disable=broad-except 

517 logger.warning("Failed to log request: %s", repr(err)) 

518 

519 def on_response( 

520 self, 

521 request: PipelineRequest[HTTPRequestType], 

522 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

523 ) -> None: 

524 http_response = response.http_response 

525 

526 # Get logger in my context first (request has been retried) 

527 # then read from kwargs (pop if that's the case) 

528 # then use my instance logger 

529 # If on_request was called, should always read from context 

530 options = request.context.options 

531 logger = request.context.setdefault("logger", options.pop("logger", self.logger)) 

532 

533 try: 

534 if not logger.isEnabledFor(logging.INFO): 

535 return 

536 

537 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) 

538 if multi_record: 

539 logger.info("Response status: %r", http_response.status_code) 

540 logger.info("Response headers:") 

541 for res_header, value in http_response.headers.items(): 

542 value = self._redact_header(res_header, value) 

543 logger.info(" %r: %r", res_header, value) 

544 return 

545 log_string = "Response status: {}".format(http_response.status_code) 

546 log_string += "\nResponse headers:" 

547 for res_header, value in http_response.headers.items(): 

548 value = self._redact_header(res_header, value) 

549 log_string += "\n '{}': '{}'".format(res_header, value) 

550 logger.info(log_string) 

551 except Exception as err: # pylint: disable=broad-except 

552 logger.warning("Failed to log response: %s", repr(err)) 

553 

554 

555class ContentDecodePolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

556 """Policy for decoding unstreamed response content. 

557 

558 :param response_encoding: The encoding to use if known for this service (will disable auto-detection) 

559 :type response_encoding: str 

560 """ 

561 

562 # Accept "text" because we're open minded people... 

563 JSON_REGEXP = re.compile(r"^(application|text)/([0-9a-z+.-]+\+)?json$") 

564 

565 # Name used in context 

566 CONTEXT_NAME = "deserialized_data" 

567 

568 def __init__( 

569 self, response_encoding: Optional[str] = None, **kwargs: Any # pylint: disable=unused-argument 

570 ) -> None: 

571 self._response_encoding = response_encoding 

572 

573 @classmethod 

574 def deserialize_from_text( 

575 cls, 

576 data: Optional[Union[AnyStr, IO[AnyStr]]], 

577 mime_type: Optional[str] = None, 

578 response: Optional[HTTPResponseType] = None, 

579 ) -> Any: 

580 """Decode response data according to content-type. 

581 

582 Accept a stream of data as well, but will be load at once in memory for now. 

583 If no content-type, will return the string version (not bytes, not stream) 

584 

585 :param data: The data to deserialize. 

586 :type data: str or bytes or file-like object 

587 :param response: The HTTP response. 

588 :type response: ~azure.core.pipeline.transport.HttpResponse 

589 :param str mime_type: The mime type. As mime type, charset is not expected. 

590 :param response: If passed, exception will be annotated with that response 

591 :type response: any 

592 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

593 :returns: A dict (JSON), XML tree or str, depending of the mime_type 

594 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str 

595 """ 

596 if not data: 

597 return None 

598 

599 if hasattr(data, "read"): 

600 # Assume a stream 

601 data = cast(IO, data).read() 

602 

603 if isinstance(data, bytes): 

604 data_as_str = data.decode(encoding="utf-8-sig") 

605 else: 

606 # Explain to mypy the correct type. 

607 data_as_str = cast(str, data) 

608 

609 if mime_type is None: 

610 return data_as_str 

611 

612 if cls.JSON_REGEXP.match(mime_type): 

613 try: 

614 return json.loads(data_as_str) 

615 except ValueError as err: 

616 raise DecodeError( 

617 message="JSON is invalid: {}".format(err), 

618 response=response, 

619 error=err, 

620 ) from err 

621 elif "xml" in (mime_type or []): 

622 try: 

623 return ET.fromstring(data_as_str) # nosec 

624 except ET.ParseError as err: 

625 # It might be because the server has an issue, and returned JSON with 

626 # content-type XML.... 

627 # So let's try a JSON load, and if it's still broken 

628 # let's flow the initial exception 

629 def _json_attemp(data): 

630 try: 

631 return True, json.loads(data) 

632 except ValueError: 

633 return False, None # Don't care about this one 

634 

635 success, json_result = _json_attemp(data) 

636 if success: 

637 return json_result 

638 # If i'm here, it's not JSON, it's not XML, let's scream 

639 # and raise the last context in this block (the XML exception) 

640 # The function hack is because Py2.7 messes up with exception 

641 # context otherwise. 

642 _LOGGER.critical("Wasn't XML not JSON, failing") 

643 raise DecodeError("XML is invalid", response=response) from err 

644 elif mime_type.startswith("text/"): 

645 return data_as_str 

646 raise DecodeError("Cannot deserialize content-type: {}".format(mime_type)) 

647 

648 @classmethod 

649 def deserialize_from_http_generics( 

650 cls, 

651 response: HTTPResponseType, 

652 encoding: Optional[str] = None, 

653 ) -> Any: 

654 """Deserialize from HTTP response. 

655 

656 Headers will tested for "content-type" 

657 

658 :param response: The HTTP response 

659 :type response: any 

660 :param str encoding: The encoding to use if known for this service (will disable auto-detection) 

661 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

662 :returns: A dict (JSON), XML tree or str, depending of the mime_type 

663 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str 

664 """ 

665 # Try to use content-type from headers if available 

666 if response.content_type: 

667 mime_type = response.content_type.split(";")[0].strip().lower() 

668 # Ouch, this server did not declare what it sent... 

669 # Let's guess it's JSON... 

670 # Also, since Autorest was considering that an empty body was a valid JSON, 

671 # need that test as well.... 

672 else: 

673 mime_type = "application/json" 

674 

675 # Rely on transport implementation to give me "text()" decoded correctly 

676 if hasattr(response, "read"): 

677 # since users can call deserialize_from_http_generics by themselves 

678 # we want to make sure our new responses are read before we try to 

679 # deserialize. Only read sync responses since we're in a sync function 

680 # 

681 # Technically HttpResponse do not contain a "read()", but we don't know what 

682 # people have been able to pass here, so keep this code for safety, 

683 # even if it's likely dead code 

684 if not inspect.iscoroutinefunction(response.read): # type: ignore 

685 response.read() # type: ignore 

686 return cls.deserialize_from_text(response.text(encoding), mime_type, response=response) 

687 

688 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

689 options = request.context.options 

690 response_encoding = options.pop("response_encoding", self._response_encoding) 

691 if response_encoding: 

692 request.context["response_encoding"] = response_encoding 

693 

694 def on_response( 

695 self, 

696 request: PipelineRequest[HTTPRequestType], 

697 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

698 ) -> None: 

699 """Extract data from the body of a REST response object. 

700 This will load the entire payload in memory. 

701 Will follow Content-Type to parse. 

702 We assume everything is UTF8 (BOM acceptable). 

703 

704 :param request: The PipelineRequest object. 

705 :type request: ~azure.core.pipeline.PipelineRequest 

706 :param response: The PipelineResponse object. 

707 :type response: ~azure.core.pipeline.PipelineResponse 

708 :raises JSONDecodeError: If JSON is requested and parsing is impossible. 

709 :raises UnicodeDecodeError: If bytes is not UTF8 

710 :raises xml.etree.ElementTree.ParseError: If bytes is not valid XML 

711 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

712 """ 

713 # If response was asked as stream, do NOT read anything and quit now 

714 if response.context.options.get("stream", True): 

715 return 

716 

717 response_encoding = request.context.get("response_encoding") 

718 

719 response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics( 

720 response.http_response, response_encoding 

721 ) 

722 

723 

724class ProxyPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

725 """A proxy policy. 

726 

727 Dictionary mapping protocol or protocol and host to the URL of the proxy 

728 to be used on each Request. 

729 

730 :param MutableMapping proxies: Maps protocol or protocol and hostname to the URL 

731 of the proxy. 

732 

733 .. admonition:: Example: 

734 

735 .. literalinclude:: ../samples/test_example_sansio.py 

736 :start-after: [START proxy_policy] 

737 :end-before: [END proxy_policy] 

738 :language: python 

739 :dedent: 4 

740 :caption: Configuring a proxy policy. 

741 """ 

742 

743 def __init__( 

744 self, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any 

745 ): # pylint: disable=unused-argument,super-init-not-called 

746 self.proxies = proxies 

747 

748 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

749 ctxt = request.context.options 

750 if self.proxies and "proxies" not in ctxt: 

751 ctxt["proxies"] = self.proxies