Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/pipeline/policies/_universal.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

334 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26""" 

27This module is the requests implementation of Pipeline ABC 

28""" 

29import json 

30import inspect 

31import logging 

32import os 

33import platform 

34import xml.etree.ElementTree as ET 

35import types 

36import re 

37import uuid 

38from typing import IO, cast, Union, Optional, AnyStr, Dict, Any, Set, MutableMapping, Iterable 

39 

40from azure.core import __version__ as azcore_version 

41from azure.core.exceptions import DecodeError 

42 

43from azure.core.pipeline import PipelineRequest, PipelineResponse 

44from ._base import SansIOHTTPPolicy 

45from ._utils import sanitize_url 

46from ...utils._utils import CaseInsensitiveSet 

47 

48from ..transport import HttpRequest as LegacyHttpRequest 

49from ..transport._base import _HttpResponseBase as LegacySansIOHttpResponse 

50from ...rest import HttpRequest 

51from ...rest._rest_py3 import _HttpResponseBase as SansIOHttpResponse 

52 

53_LOGGER = logging.getLogger(__name__) 

54 

55HTTPRequestType = Union[LegacyHttpRequest, HttpRequest] 

56HTTPResponseType = Union[LegacySansIOHttpResponse, SansIOHttpResponse] 

57PipelineResponseType = PipelineResponse[HTTPRequestType, HTTPResponseType] 

58 

59 

60class HeadersPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

61 """A simple policy that sends the given headers with the request. 

62 

63 This will overwrite any headers already defined in the request. Headers can be 

64 configured up front, where any custom headers will be applied to all outgoing 

65 operations, and additional headers can also be added dynamically per operation. 

66 

67 :param dict base_headers: Headers to send with the request. 

68 

69 .. admonition:: Example: 

70 

71 .. literalinclude:: ../samples/test_example_sansio.py 

72 :start-after: [START headers_policy] 

73 :end-before: [END headers_policy] 

74 :language: python 

75 :dedent: 4 

76 :caption: Configuring a headers policy. 

77 """ 

78 

79 def __init__(self, base_headers: Optional[Dict[str, str]] = None, **kwargs: Any) -> None: 

80 self._headers: Dict[str, str] = base_headers or {} 

81 self._headers.update(kwargs.pop("headers", {})) 

82 

83 @property 

84 def headers(self) -> Dict[str, str]: 

85 """The current headers collection. 

86 

87 :rtype: dict[str, str] 

88 :return: The current headers collection. 

89 """ 

90 return self._headers 

91 

92 def add_header(self, key: str, value: str) -> None: 

93 """Add a header to the configuration to be applied to all requests. 

94 

95 :param str key: The header. 

96 :param str value: The header's value. 

97 """ 

98 self._headers[key] = value 

99 

100 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

101 """Updates with the given headers before sending the request to the next policy. 

102 

103 :param request: The PipelineRequest object 

104 :type request: ~azure.core.pipeline.PipelineRequest 

105 """ 

106 request.http_request.headers.update(self.headers) 

107 additional_headers = request.context.options.pop("headers", {}) 

108 if additional_headers: 

109 request.http_request.headers.update(additional_headers) 

110 

111 

112class _Unset: 

113 pass 

114 

115 

116class RequestIdPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

117 """A simple policy that sets the given request id in the header. 

118 

119 This will overwrite request id that is already defined in the request. Request id can be 

120 configured up front, where the request id will be applied to all outgoing 

121 operations, and additional request id can also be set dynamically per operation. 

122 

123 :keyword str request_id: The request id to be added into header. 

124 :keyword bool auto_request_id: Auto generates a unique request ID per call if true which is by default. 

125 :keyword str request_id_header_name: Header name to use. Default is "x-ms-client-request-id". 

126 

127 .. admonition:: Example: 

128 

129 .. literalinclude:: ../samples/test_example_sansio.py 

130 :start-after: [START request_id_policy] 

131 :end-before: [END request_id_policy] 

132 :language: python 

133 :dedent: 4 

134 :caption: Configuring a request id policy. 

135 """ 

136 

137 def __init__( 

138 self, # pylint: disable=unused-argument 

139 *, 

140 request_id: Union[str, Any] = _Unset, 

141 auto_request_id: bool = True, 

142 request_id_header_name: str = "x-ms-client-request-id", 

143 **kwargs: Any 

144 ) -> None: 

145 super() 

146 self._request_id = request_id 

147 self._auto_request_id = auto_request_id 

148 self._request_id_header_name = request_id_header_name 

149 

150 def set_request_id(self, value: str) -> None: 

151 """Add the request id to the configuration to be applied to all requests. 

152 

153 :param str value: The request id value. 

154 """ 

155 self._request_id = value 

156 

157 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

158 """Updates with the given request id before sending the request to the next policy. 

159 

160 :param request: The PipelineRequest object 

161 :type request: ~azure.core.pipeline.PipelineRequest 

162 """ 

163 request_id = unset = object() 

164 if "request_id" in request.context.options: 

165 request_id = request.context.options.pop("request_id") 

166 if request_id is None: 

167 return 

168 elif self._request_id is None: 

169 return 

170 elif self._request_id is not _Unset: 

171 if self._request_id_header_name in request.http_request.headers: 

172 return 

173 request_id = self._request_id 

174 elif self._auto_request_id: 

175 if self._request_id_header_name in request.http_request.headers: 

176 return 

177 request_id = str(uuid.uuid1()) 

178 if request_id is not unset: 

179 header = {self._request_id_header_name: cast(str, request_id)} 

180 request.http_request.headers.update(header) 

181 

182 

183class UserAgentPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

184 """User-Agent Policy. Allows custom values to be added to the User-Agent header. 

185 

186 :param str base_user_agent: Sets the base user agent value. 

187 

188 :keyword bool user_agent_overwrite: Overwrites User-Agent when True. Defaults to False. 

189 :keyword bool user_agent_use_env: Gets user-agent from environment. Defaults to True. 

190 :keyword str user_agent: If specified, this will be added in front of the user agent string. 

191 :keyword str sdk_moniker: If specified, the user agent string will be 

192 azsdk-python-[sdk_moniker] Python/[python_version] ([platform_version]) 

193 

194 Environment variables: 

195 

196 * ``AZURE_HTTP_USER_AGENT`` - If set and ``user_agent_use_env`` is True (the default), 

197 the value is appended to the User-Agent header string sent with each request. 

198 

199 .. admonition:: Example: 

200 

201 .. literalinclude:: ../samples/test_example_sansio.py 

202 :start-after: [START user_agent_policy] 

203 :end-before: [END user_agent_policy] 

204 :language: python 

205 :dedent: 4 

206 :caption: Configuring a user agent policy. 

207 """ 

208 

209 _USERAGENT = "User-Agent" 

210 _ENV_ADDITIONAL_USER_AGENT = "AZURE_HTTP_USER_AGENT" 

211 

212 def __init__(self, base_user_agent: Optional[str] = None, **kwargs: Any) -> None: 

213 self.overwrite: bool = kwargs.pop("user_agent_overwrite", False) 

214 self.use_env: bool = kwargs.pop("user_agent_use_env", True) 

215 application_id: Optional[str] = kwargs.pop("user_agent", None) 

216 sdk_moniker: str = kwargs.pop("sdk_moniker", "core/{}".format(azcore_version)) 

217 

218 if base_user_agent: 

219 self._user_agent = base_user_agent 

220 else: 

221 self._user_agent = "azsdk-python-{} Python/{} ({})".format( 

222 sdk_moniker, platform.python_version(), platform.platform() 

223 ) 

224 

225 if application_id: 

226 self._user_agent = "{} {}".format(application_id, self._user_agent) 

227 

228 @property 

229 def user_agent(self) -> str: 

230 """The current user agent value. 

231 

232 :return: The current user agent value. 

233 :rtype: str 

234 """ 

235 if self.use_env: 

236 add_user_agent_header = os.environ.get(self._ENV_ADDITIONAL_USER_AGENT, None) 

237 if add_user_agent_header is not None: 

238 return "{} {}".format(self._user_agent, add_user_agent_header) 

239 return self._user_agent 

240 

241 def add_user_agent(self, value: str) -> None: 

242 """Add value to current user agent with a space. 

243 

244 :param str value: value to add to user agent. 

245 """ 

246 self._user_agent = "{} {}".format(self._user_agent, value) 

247 

248 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

249 """Modifies the User-Agent header before the request is sent. 

250 

251 :param request: The PipelineRequest object 

252 :type request: ~azure.core.pipeline.PipelineRequest 

253 """ 

254 http_request = request.http_request 

255 options_dict = request.context.options 

256 if "user_agent" in options_dict: 

257 user_agent = options_dict.pop("user_agent") 

258 if options_dict.pop("user_agent_overwrite", self.overwrite): 

259 http_request.headers[self._USERAGENT] = user_agent 

260 else: 

261 user_agent = "{} {}".format(user_agent, self.user_agent) 

262 http_request.headers[self._USERAGENT] = user_agent 

263 

264 elif self.overwrite or self._USERAGENT not in http_request.headers: 

265 http_request.headers[self._USERAGENT] = self.user_agent 

266 

267 

268class NetworkTraceLoggingPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

269 """The logging policy in the pipeline is used to output HTTP network trace to the configured logger. 

270 

271 This accepts both global configuration, and per-request level with "enable_http_logger" 

272 

273 :param bool logging_enable: Use to enable per operation. Defaults to False. 

274 

275 .. admonition:: Example: 

276 

277 .. literalinclude:: ../samples/test_example_sansio.py 

278 :start-after: [START network_trace_logging_policy] 

279 :end-before: [END network_trace_logging_policy] 

280 :language: python 

281 :dedent: 4 

282 :caption: Configuring a network trace logging policy. 

283 """ 

284 

285 def __init__(self, logging_enable: bool = False, **kwargs: Any): # pylint: disable=unused-argument 

286 self.enable_http_logger = logging_enable 

287 

288 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

289 """Logs HTTP request to the DEBUG logger. 

290 

291 :param request: The PipelineRequest object. 

292 :type request: ~azure.core.pipeline.PipelineRequest 

293 """ 

294 http_request = request.http_request 

295 options = request.context.options 

296 logging_enable = options.pop("logging_enable", self.enable_http_logger) 

297 request.context["logging_enable"] = logging_enable 

298 if logging_enable: 

299 if not _LOGGER.isEnabledFor(logging.DEBUG): 

300 return 

301 

302 try: 

303 log_string = "Request URL: '{}'".format(http_request.url) 

304 log_string += "\nRequest method: '{}'".format(http_request.method) 

305 log_string += "\nRequest headers:" 

306 for header, value in http_request.headers.items(): 

307 log_string += "\n '{}': '{}'".format(header, value) 

308 log_string += "\nRequest body:" 

309 

310 # We don't want to log the binary data of a file upload. 

311 if isinstance(http_request.body, types.GeneratorType): 

312 log_string += "\nFile upload" 

313 _LOGGER.debug(log_string) 

314 return 

315 try: 

316 if isinstance(http_request.body, types.AsyncGeneratorType): 

317 log_string += "\nFile upload" 

318 _LOGGER.debug(log_string) 

319 return 

320 except AttributeError: 

321 pass 

322 if http_request.body: 

323 log_string += "\n{}".format(str(http_request.body)) 

324 _LOGGER.debug(log_string) 

325 return 

326 log_string += "\nThis request has no body" 

327 _LOGGER.debug(log_string) 

328 except Exception as err: # pylint: disable=broad-except 

329 _LOGGER.debug("Failed to log request: %r", err) 

330 

331 def on_response( 

332 self, 

333 request: PipelineRequest[HTTPRequestType], 

334 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

335 ) -> None: 

336 """Logs HTTP response to the DEBUG logger. 

337 

338 :param request: The PipelineRequest object. 

339 :type request: ~azure.core.pipeline.PipelineRequest 

340 :param response: The PipelineResponse object. 

341 :type response: ~azure.core.pipeline.PipelineResponse 

342 """ 

343 http_response = response.http_response 

344 try: 

345 logging_enable = response.context["logging_enable"] 

346 if logging_enable: 

347 if not _LOGGER.isEnabledFor(logging.DEBUG): 

348 return 

349 

350 log_string = "Response status: '{}'".format(http_response.status_code) 

351 log_string += "\nResponse headers:" 

352 for res_header, value in http_response.headers.items(): 

353 log_string += "\n '{}': '{}'".format(res_header, value) 

354 

355 # We don't want to log binary data if the response is a file. 

356 log_string += "\nResponse content:" 

357 pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE) 

358 header = http_response.headers.get("content-disposition") 

359 

360 if header and pattern.match(header): 

361 filename = header.partition("=")[2] 

362 log_string += "\nFile attachments: {}".format(filename) 

363 elif http_response.headers.get("content-type", "").endswith("octet-stream"): 

364 log_string += "\nBody contains binary data." 

365 elif http_response.headers.get("content-type", "").startswith("image"): 

366 log_string += "\nBody contains image data." 

367 else: 

368 if response.context.options.get("stream", False): 

369 log_string += "\nBody is streamable." 

370 else: 

371 log_string += "\n{}".format(http_response.text()) 

372 _LOGGER.debug(log_string) 

373 except Exception as err: # pylint: disable=broad-except 

374 _LOGGER.debug("Failed to log response: %s", repr(err)) 

375 

376 

377class _HiddenClassProperties(type): 

378 # Backward compatible for DEFAULT_HEADERS_WHITELIST 

379 # https://github.com/Azure/azure-sdk-for-python/issues/26331 

380 

381 @property 

382 def DEFAULT_HEADERS_WHITELIST(cls) -> Set[str]: 

383 return cls.DEFAULT_HEADERS_ALLOWLIST 

384 

385 @DEFAULT_HEADERS_WHITELIST.setter 

386 def DEFAULT_HEADERS_WHITELIST(cls, value: Set[str]) -> None: 

387 cls.DEFAULT_HEADERS_ALLOWLIST = value 

388 

389 

390class HttpLoggingPolicy( 

391 SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType], 

392 metaclass=_HiddenClassProperties, 

393): 

394 """The Pipeline policy that handles logging of HTTP requests and responses. 

395 

396 :param logger: The logger to use for logging. Default to azure.core.pipeline.policies.http_logging_policy. 

397 :type logger: logging.Logger 

398 :keyword int http_logging_level: The logging level to use for HTTP request and response logs. 

399 Defaults to logging.INFO. 

400 :type http_logging_level: int 

401 :keyword additional_allowed_query_params: Query parameter names whose values are allowed in recorded URLs. 

402 These are added to the default set which includes "api-version". 

403 :type additional_allowed_query_params: Iterable[str] 

404 

405 Environment variables: 

406 

407 * ``AZURE_SDK_LOGGING_MULTIRECORD`` - If set to any truthy value, HTTP request and response 

408 details are logged as separate log records instead of a single combined record. 

409 """ 

410 

411 DEFAULT_QUERY_PARAMS_ALLOWLIST: Set[str] = set(["api-version"]) 

412 DEFAULT_HEADERS_ALLOWLIST: Set[str] = set( 

413 [ 

414 "x-ms-request-id", 

415 "x-ms-client-request-id", 

416 "x-ms-return-client-request-id", 

417 "x-ms-error-code", 

418 "traceparent", 

419 "Accept", 

420 "Cache-Control", 

421 "Connection", 

422 "Content-Length", 

423 "Content-Type", 

424 "Date", 

425 "ETag", 

426 "Expires", 

427 "If-Match", 

428 "If-Modified-Since", 

429 "If-None-Match", 

430 "If-Unmodified-Since", 

431 "Last-Modified", 

432 "Pragma", 

433 "Request-Id", 

434 "Retry-After", 

435 "Server", 

436 "Transfer-Encoding", 

437 "User-Agent", 

438 "WWW-Authenticate", # OAuth Challenge header. 

439 "x-vss-e2eid", # Needed by Azure DevOps pipelines. 

440 "x-msedge-ref", # Needed by Azure DevOps pipelines. 

441 ] 

442 ) 

443 REDACTED_PLACEHOLDER: str = "REDACTED" 

444 MULTI_RECORD_LOG: str = "AZURE_SDK_LOGGING_MULTIRECORD" 

445 

446 def __init__( 

447 self, 

448 logger: Optional[logging.Logger] = None, 

449 *, 

450 http_logging_level: int = logging.INFO, 

451 additional_allowed_query_params: Optional[Iterable[str]] = None, 

452 **kwargs: Any 

453 ): # pylint: disable=unused-argument 

454 self.logger: logging.Logger = logger or logging.getLogger("azure.core.pipeline.policies.http_logging_policy") 

455 self.http_logging_level: int = http_logging_level 

456 self.allowed_query_params: Set[str] = CaseInsensitiveSet(self.__class__.DEFAULT_QUERY_PARAMS_ALLOWLIST) 

457 if additional_allowed_query_params: 

458 self.allowed_query_params.update(additional_allowed_query_params) 

459 self.allowed_header_names: Set[str] = CaseInsensitiveSet(self.__class__.DEFAULT_HEADERS_ALLOWLIST) 

460 

461 def _redact_header(self, key: str, value: str) -> str: 

462 if isinstance(self.allowed_header_names, CaseInsensitiveSet): 

463 return value if key in self.allowed_header_names else self.REDACTED_PLACEHOLDER 

464 lower_case_allowed_header_names = [header.lower() for header in self.allowed_header_names] 

465 return value if key.lower() in lower_case_allowed_header_names else self.REDACTED_PLACEHOLDER 

466 

467 def on_request( # pylint: disable=too-many-return-statements 

468 self, request: PipelineRequest[HTTPRequestType] 

469 ) -> None: 

470 """Logs HTTP method, url and headers. 

471 

472 :param request: The PipelineRequest object. 

473 :type request: ~azure.core.pipeline.PipelineRequest 

474 """ 

475 http_request = request.http_request 

476 options = request.context.options 

477 # Get logger in my context first (request has been retried) 

478 # then read from kwargs (pop if that's the case) 

479 # then use my instance logger 

480 logger = request.context.setdefault("logger", options.pop("logger", self.logger)) 

481 log_level = request.context.setdefault( 

482 "http_logging_level", options.pop("http_logging_level", self.http_logging_level) 

483 ) 

484 

485 if not logger.isEnabledFor(log_level): 

486 return 

487 

488 try: 

489 redacted_url = sanitize_url(http_request.url, self.allowed_query_params, self.REDACTED_PLACEHOLDER) 

490 

491 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) 

492 if multi_record: 

493 logger.log(log_level, "Request URL: %r", redacted_url) 

494 logger.log(log_level, "Request method: %r", http_request.method) 

495 logger.log(log_level, "Request headers:") 

496 for header, value in http_request.headers.items(): 

497 value = self._redact_header(header, value) 

498 logger.log(log_level, " %r: %r", header, value) 

499 if isinstance(http_request.body, types.GeneratorType): 

500 logger.log(log_level, "File upload") 

501 return 

502 try: 

503 if isinstance(http_request.body, types.AsyncGeneratorType): 

504 logger.log(log_level, "File upload") 

505 return 

506 except AttributeError: 

507 pass 

508 if http_request.body: 

509 logger.log(log_level, "A body is sent with the request") 

510 return 

511 logger.log(log_level, "No body was attached to the request") 

512 return 

513 log_string = "Request URL: '{}'".format(redacted_url) 

514 log_string += "\nRequest method: '{}'".format(http_request.method) 

515 log_string += "\nRequest headers:" 

516 for header, value in http_request.headers.items(): 

517 value = self._redact_header(header, value) 

518 log_string += "\n '{}': '{}'".format(header, value) 

519 if isinstance(http_request.body, types.GeneratorType): 

520 log_string += "\nFile upload" 

521 logger.log(log_level, log_string) 

522 return 

523 try: 

524 if isinstance(http_request.body, types.AsyncGeneratorType): 

525 log_string += "\nFile upload" 

526 logger.log(log_level, log_string) 

527 return 

528 except AttributeError: 

529 pass 

530 if http_request.body: 

531 log_string += "\nA body is sent with the request" 

532 logger.log(log_level, log_string) 

533 return 

534 log_string += "\nNo body was attached to the request" 

535 logger.log(log_level, log_string) 

536 

537 except Exception: # pylint: disable=broad-except 

538 logger.warning("Failed to log request.") 

539 

540 def on_response( 

541 self, 

542 request: PipelineRequest[HTTPRequestType], 

543 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

544 ) -> None: 

545 """Logs HTTP response status and headers. 

546 

547 :param request: The PipelineRequest object. 

548 :type request: ~azure.core.pipeline.PipelineRequest 

549 :param response: The PipelineResponse object. 

550 :type response: ~azure.core.pipeline.PipelineResponse 

551 """ 

552 http_response = response.http_response 

553 

554 # Get logger in my context first (request has been retried) 

555 # then read from kwargs (pop if that's the case) 

556 # then use my instance logger 

557 # If on_request was called, should always read from context 

558 options = request.context.options 

559 logger = request.context.setdefault("logger", options.pop("logger", self.logger)) 

560 log_level = request.context.setdefault( 

561 "http_logging_level", options.pop("http_logging_level", self.http_logging_level) 

562 ) 

563 

564 try: 

565 if not logger.isEnabledFor(log_level): 

566 return 

567 

568 multi_record = os.environ.get(HttpLoggingPolicy.MULTI_RECORD_LOG, False) 

569 if multi_record: 

570 logger.log(log_level, "Response status: %r", http_response.status_code) 

571 logger.log(log_level, "Response headers:") 

572 for res_header, value in http_response.headers.items(): 

573 value = self._redact_header(res_header, value) 

574 logger.log(log_level, " %r: %r", res_header, value) 

575 return 

576 log_string = "Response status: {}".format(http_response.status_code) 

577 log_string += "\nResponse headers:" 

578 for res_header, value in http_response.headers.items(): 

579 value = self._redact_header(res_header, value) 

580 log_string += "\n '{}': '{}'".format(res_header, value) 

581 logger.log(log_level, log_string) 

582 except Exception: # pylint: disable=broad-except 

583 logger.warning("Failed to log response.") 

584 

585 

586class ContentDecodePolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

587 """Policy for decoding unstreamed response content. 

588 

589 :param response_encoding: The encoding to use if known for this service (will disable auto-detection) 

590 :type response_encoding: str 

591 """ 

592 

593 # Accept "text" because we're open minded people... 

594 JSON_REGEXP = re.compile(r"^(application|text)/([0-9a-z+.-]+\+)?json$") 

595 

596 # Name used in context 

597 CONTEXT_NAME = "deserialized_data" 

598 

599 def __init__( 

600 self, response_encoding: Optional[str] = None, **kwargs: Any # pylint: disable=unused-argument 

601 ) -> None: 

602 self._response_encoding = response_encoding 

603 

604 @classmethod 

605 def deserialize_from_text( 

606 cls, 

607 data: Optional[Union[AnyStr, IO[AnyStr]]], 

608 mime_type: Optional[str] = None, 

609 response: Optional[HTTPResponseType] = None, 

610 ) -> Any: 

611 """Decode response data according to content-type. 

612 

613 Accept a stream of data as well, but will be load at once in memory for now. 

614 If no content-type, will return the string version (not bytes, not stream) 

615 

616 :param data: The data to deserialize. 

617 :type data: str or bytes or file-like object 

618 :param response: The HTTP response. 

619 :type response: ~azure.core.pipeline.transport.HttpResponse 

620 :param str mime_type: The mime type. As mime type, charset is not expected. 

621 :param response: If passed, exception will be annotated with that response 

622 :type response: any 

623 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

624 :returns: A dict (JSON), XML tree or str, depending of the mime_type 

625 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str 

626 """ 

627 if not data: 

628 return None 

629 

630 if hasattr(data, "read"): 

631 # Assume a stream 

632 data = cast(IO, data).read() 

633 

634 if isinstance(data, bytes): 

635 data_as_str = data.decode(encoding="utf-8-sig") 

636 else: 

637 # Explain to mypy the correct type. 

638 data_as_str = cast(str, data) 

639 

640 if mime_type is None: 

641 return data_as_str 

642 

643 if cls.JSON_REGEXP.match(mime_type): 

644 try: 

645 return json.loads(data_as_str) 

646 except ValueError as err: 

647 raise DecodeError( 

648 message="JSON is invalid: {}".format(err), 

649 response=response, 

650 error=err, 

651 ) from err 

652 elif "xml" in (mime_type or []): 

653 try: 

654 return ET.fromstring(data_as_str) # nosec 

655 except ET.ParseError as err: 

656 # It might be because the server has an issue, and returned JSON with 

657 # content-type XML.... 

658 # So let's try a JSON load, and if it's still broken 

659 # let's flow the initial exception 

660 def _json_attemp(data): 

661 try: 

662 return True, json.loads(data) 

663 except ValueError: 

664 return False, None # Don't care about this one 

665 

666 success, json_result = _json_attemp(data) 

667 if success: 

668 return json_result 

669 # If i'm here, it's not JSON, it's not XML, let's scream 

670 # and raise the last context in this block (the XML exception) 

671 # The function hack is because Py2.7 messes up with exception 

672 # context otherwise. 

673 _LOGGER.critical("Wasn't XML not JSON, failing") 

674 raise DecodeError("XML is invalid", response=response) from err 

675 elif mime_type.startswith("text/"): 

676 return data_as_str 

677 raise DecodeError("Cannot deserialize content-type: {}".format(mime_type)) 

678 

679 @classmethod 

680 def deserialize_from_http_generics( 

681 cls, 

682 response: HTTPResponseType, 

683 encoding: Optional[str] = None, 

684 ) -> Any: 

685 """Deserialize from HTTP response. 

686 

687 Headers will tested for "content-type" 

688 

689 :param response: The HTTP response 

690 :type response: any 

691 :param str encoding: The encoding to use if known for this service (will disable auto-detection) 

692 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

693 :returns: A dict (JSON), XML tree or str, depending of the mime_type 

694 :rtype: dict[str, Any] or xml.etree.ElementTree.Element or str 

695 """ 

696 # Try to use content-type from headers if available 

697 if response.content_type: 

698 mime_type = response.content_type.split(";")[0].strip().lower() 

699 # Ouch, this server did not declare what it sent... 

700 # Let's guess it's JSON... 

701 # Also, since Autorest was considering that an empty body was a valid JSON, 

702 # need that test as well.... 

703 else: 

704 mime_type = "application/json" 

705 

706 # Rely on transport implementation to give me "text()" decoded correctly 

707 if hasattr(response, "read"): 

708 # since users can call deserialize_from_http_generics by themselves 

709 # we want to make sure our new responses are read before we try to 

710 # deserialize. Only read sync responses since we're in a sync function 

711 # 

712 # Technically HttpResponse do not contain a "read()", but we don't know what 

713 # people have been able to pass here, so keep this code for safety, 

714 # even if it's likely dead code 

715 if not inspect.iscoroutinefunction(response.read): # type: ignore 

716 response.read() # type: ignore 

717 return cls.deserialize_from_text(response.text(encoding), mime_type, response=response) 

718 

719 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

720 """Set the response encoding in the request context. 

721 

722 :param request: The PipelineRequest object. 

723 :type request: ~azure.core.pipeline.PipelineRequest 

724 """ 

725 options = request.context.options 

726 response_encoding = options.pop("response_encoding", self._response_encoding) 

727 if response_encoding: 

728 request.context["response_encoding"] = response_encoding 

729 

730 def on_response( 

731 self, 

732 request: PipelineRequest[HTTPRequestType], 

733 response: PipelineResponse[HTTPRequestType, HTTPResponseType], 

734 ) -> None: 

735 """Extract data from the body of a REST response object. 

736 This will load the entire payload in memory. 

737 Will follow Content-Type to parse. 

738 We assume everything is UTF8 (BOM acceptable). 

739 

740 :param request: The PipelineRequest object. 

741 :type request: ~azure.core.pipeline.PipelineRequest 

742 :param response: The PipelineResponse object. 

743 :type response: ~azure.core.pipeline.PipelineResponse 

744 :raises JSONDecodeError: If JSON is requested and parsing is impossible. 

745 :raises UnicodeDecodeError: If bytes is not UTF8 

746 :raises xml.etree.ElementTree.ParseError: If bytes is not valid XML 

747 :raises ~azure.core.exceptions.DecodeError: If deserialization fails 

748 """ 

749 # If response was asked as stream, do NOT read anything and quit now 

750 if response.context.options.get("stream", True): 

751 return 

752 

753 response_encoding = request.context.get("response_encoding") 

754 

755 response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics( 

756 response.http_response, response_encoding 

757 ) 

758 

759 

760class ProxyPolicy(SansIOHTTPPolicy[HTTPRequestType, HTTPResponseType]): 

761 """A proxy policy. 

762 

763 Dictionary mapping protocol or protocol and host to the URL of the proxy 

764 to be used on each Request. 

765 

766 :param MutableMapping proxies: Maps protocol or protocol and hostname to the URL 

767 of the proxy. 

768 

769 .. admonition:: Example: 

770 

771 .. literalinclude:: ../samples/test_example_sansio.py 

772 :start-after: [START proxy_policy] 

773 :end-before: [END proxy_policy] 

774 :language: python 

775 :dedent: 4 

776 :caption: Configuring a proxy policy. 

777 """ 

778 

779 def __init__( 

780 self, proxies: Optional[MutableMapping[str, str]] = None, **kwargs: Any 

781 ): # pylint: disable=unused-argument 

782 self.proxies = proxies 

783 

784 def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: 

785 """Adds the proxy information to the request context. 

786 

787 :param request: The PipelineRequest object 

788 :type request: ~azure.core.pipeline.PipelineRequest 

789 """ 

790 ctxt = request.context.options 

791 if self.proxies and "proxies" not in ctxt: 

792 ctxt["proxies"] = self.proxies