Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/mgmt/dynatrace/_serialization.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1048 statements  

1# -------------------------------------------------------------------------- 

2# 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# 

5# The MIT License (MIT) 

6# 

7# Permission is hereby granted, free of charge, to any person obtaining a copy 

8# of this software and associated documentation files (the ""Software""), to 

9# deal in the Software without restriction, including without limitation the 

10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

11# sell copies of the Software, and to permit persons to whom the Software is 

12# furnished to do so, subject to the following conditions: 

13# 

14# The above copyright notice and this permission notice shall be included in 

15# all copies or substantial portions of the Software. 

16# 

17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

23# IN THE SOFTWARE. 

24# 

25# -------------------------------------------------------------------------- 

26 

27# pylint: skip-file 

28# pyright: reportUnnecessaryTypeIgnoreComment=false 

29 

30from base64 import b64decode, b64encode 

31import calendar 

32import datetime 

33import decimal 

34import email 

35from enum import Enum 

36import json 

37import logging 

38import re 

39import sys 

40import codecs 

41from typing import ( 

42 Dict, 

43 Any, 

44 cast, 

45 Optional, 

46 Union, 

47 AnyStr, 

48 IO, 

49 Mapping, 

50 Callable, 

51 TypeVar, 

52 MutableMapping, 

53 Type, 

54 List, 

55 Mapping, 

56) 

57 

58try: 

59 from urllib import quote # type: ignore 

60except ImportError: 

61 from urllib.parse import quote 

62import xml.etree.ElementTree as ET 

63 

64import isodate # type: ignore 

65 

66from azure.core.exceptions import DeserializationError, SerializationError, raise_with_traceback 

67from azure.core.serialization import NULL as AzureCoreNull 

68 

69_BOM = codecs.BOM_UTF8.decode(encoding="utf-8") 

70 

71ModelType = TypeVar("ModelType", bound="Model") 

72JSON = MutableMapping[str, Any] 

73 

74 

75class RawDeserializer: 

76 

77 # Accept "text" because we're open minded people... 

78 JSON_REGEXP = re.compile(r"^(application|text)/([a-z+.]+\+)?json$") 

79 

80 # Name used in context 

81 CONTEXT_NAME = "deserialized_data" 

82 

83 @classmethod 

84 def deserialize_from_text(cls, data: Optional[Union[AnyStr, IO]], content_type: Optional[str] = None) -> Any: 

85 """Decode data according to content-type. 

86 

87 Accept a stream of data as well, but will be load at once in memory for now. 

88 

89 If no content-type, will return the string version (not bytes, not stream) 

90 

91 :param data: Input, could be bytes or stream (will be decoded with UTF8) or text 

92 :type data: str or bytes or IO 

93 :param str content_type: The content type. 

94 """ 

95 if hasattr(data, "read"): 

96 # Assume a stream 

97 data = cast(IO, data).read() 

98 

99 if isinstance(data, bytes): 

100 data_as_str = data.decode(encoding="utf-8-sig") 

101 else: 

102 # Explain to mypy the correct type. 

103 data_as_str = cast(str, data) 

104 

105 # Remove Byte Order Mark if present in string 

106 data_as_str = data_as_str.lstrip(_BOM) 

107 

108 if content_type is None: 

109 return data 

110 

111 if cls.JSON_REGEXP.match(content_type): 

112 try: 

113 return json.loads(data_as_str) 

114 except ValueError as err: 

115 raise DeserializationError("JSON is invalid: {}".format(err), err) 

116 elif "xml" in (content_type or []): 

117 try: 

118 

119 try: 

120 if isinstance(data, unicode): # type: ignore 

121 # If I'm Python 2.7 and unicode XML will scream if I try a "fromstring" on unicode string 

122 data_as_str = data_as_str.encode(encoding="utf-8") # type: ignore 

123 except NameError: 

124 pass 

125 

126 return ET.fromstring(data_as_str) # nosec 

127 except ET.ParseError: 

128 # It might be because the server has an issue, and returned JSON with 

129 # content-type XML.... 

130 # So let's try a JSON load, and if it's still broken 

131 # let's flow the initial exception 

132 def _json_attemp(data): 

133 try: 

134 return True, json.loads(data) 

135 except ValueError: 

136 return False, None # Don't care about this one 

137 

138 success, json_result = _json_attemp(data) 

139 if success: 

140 return json_result 

141 # If i'm here, it's not JSON, it's not XML, let's scream 

142 # and raise the last context in this block (the XML exception) 

143 # The function hack is because Py2.7 messes up with exception 

144 # context otherwise. 

145 _LOGGER.critical("Wasn't XML not JSON, failing") 

146 raise_with_traceback(DeserializationError, "XML is invalid") 

147 raise DeserializationError("Cannot deserialize content-type: {}".format(content_type)) 

148 

149 @classmethod 

150 def deserialize_from_http_generics(cls, body_bytes: Optional[Union[AnyStr, IO]], headers: Mapping) -> Any: 

151 """Deserialize from HTTP response. 

152 

153 Use bytes and headers to NOT use any requests/aiohttp or whatever 

154 specific implementation. 

155 Headers will tested for "content-type" 

156 """ 

157 # Try to use content-type from headers if available 

158 content_type = None 

159 if "content-type" in headers: 

160 content_type = headers["content-type"].split(";")[0].strip().lower() 

161 # Ouch, this server did not declare what it sent... 

162 # Let's guess it's JSON... 

163 # Also, since Autorest was considering that an empty body was a valid JSON, 

164 # need that test as well.... 

165 else: 

166 content_type = "application/json" 

167 

168 if body_bytes: 

169 return cls.deserialize_from_text(body_bytes, content_type) 

170 return None 

171 

172 

173try: 

174 basestring # type: ignore 

175 unicode_str = unicode # type: ignore 

176except NameError: 

177 basestring = str 

178 unicode_str = str 

179 

180_LOGGER = logging.getLogger(__name__) 

181 

182try: 

183 _long_type = long # type: ignore 

184except NameError: 

185 _long_type = int 

186 

187 

188class UTC(datetime.tzinfo): 

189 """Time Zone info for handling UTC""" 

190 

191 def utcoffset(self, dt): 

192 """UTF offset for UTC is 0.""" 

193 return datetime.timedelta(0) 

194 

195 def tzname(self, dt): 

196 """Timestamp representation.""" 

197 return "Z" 

198 

199 def dst(self, dt): 

200 """No daylight saving for UTC.""" 

201 return datetime.timedelta(hours=1) 

202 

203 

204try: 

205 from datetime import timezone as _FixedOffset # type: ignore 

206except ImportError: # Python 2.7 

207 

208 class _FixedOffset(datetime.tzinfo): # type: ignore 

209 """Fixed offset in minutes east from UTC. 

210 Copy/pasted from Python doc 

211 :param datetime.timedelta offset: offset in timedelta format 

212 """ 

213 

214 def __init__(self, offset): 

215 self.__offset = offset 

216 

217 def utcoffset(self, dt): 

218 return self.__offset 

219 

220 def tzname(self, dt): 

221 return str(self.__offset.total_seconds() / 3600) 

222 

223 def __repr__(self): 

224 return "<FixedOffset {}>".format(self.tzname(None)) 

225 

226 def dst(self, dt): 

227 return datetime.timedelta(0) 

228 

229 def __getinitargs__(self): 

230 return (self.__offset,) 

231 

232 

233try: 

234 from datetime import timezone 

235 

236 TZ_UTC = timezone.utc 

237except ImportError: 

238 TZ_UTC = UTC() # type: ignore 

239 

240_FLATTEN = re.compile(r"(?<!\\)\.") 

241 

242 

243def attribute_transformer(key, attr_desc, value): 

244 """A key transformer that returns the Python attribute. 

245 

246 :param str key: The attribute name 

247 :param dict attr_desc: The attribute metadata 

248 :param object value: The value 

249 :returns: A key using attribute name 

250 """ 

251 return (key, value) 

252 

253 

254def full_restapi_key_transformer(key, attr_desc, value): 

255 """A key transformer that returns the full RestAPI key path. 

256 

257 :param str _: The attribute name 

258 :param dict attr_desc: The attribute metadata 

259 :param object value: The value 

260 :returns: A list of keys using RestAPI syntax. 

261 """ 

262 keys = _FLATTEN.split(attr_desc["key"]) 

263 return ([_decode_attribute_map_key(k) for k in keys], value) 

264 

265 

266def last_restapi_key_transformer(key, attr_desc, value): 

267 """A key transformer that returns the last RestAPI key. 

268 

269 :param str key: The attribute name 

270 :param dict attr_desc: The attribute metadata 

271 :param object value: The value 

272 :returns: The last RestAPI key. 

273 """ 

274 key, value = full_restapi_key_transformer(key, attr_desc, value) 

275 return (key[-1], value) 

276 

277 

278def _create_xml_node(tag, prefix=None, ns=None): 

279 """Create a XML node.""" 

280 if prefix and ns: 

281 ET.register_namespace(prefix, ns) 

282 if ns: 

283 return ET.Element("{" + ns + "}" + tag) 

284 else: 

285 return ET.Element(tag) 

286 

287 

288class Model(object): 

289 """Mixin for all client request body/response body models to support 

290 serialization and deserialization. 

291 """ 

292 

293 _subtype_map: Dict[str, Dict[str, Any]] = {} 

294 _attribute_map: Dict[str, Dict[str, Any]] = {} 

295 _validation: Dict[str, Dict[str, Any]] = {} 

296 

297 def __init__(self, **kwargs: Any) -> None: 

298 self.additional_properties: Dict[str, Any] = {} 

299 for k in kwargs: 

300 if k not in self._attribute_map: 

301 _LOGGER.warning("%s is not a known attribute of class %s and will be ignored", k, self.__class__) 

302 elif k in self._validation and self._validation[k].get("readonly", False): 

303 _LOGGER.warning("Readonly attribute %s will be ignored in class %s", k, self.__class__) 

304 else: 

305 setattr(self, k, kwargs[k]) 

306 

307 def __eq__(self, other: Any) -> bool: 

308 """Compare objects by comparing all attributes.""" 

309 if isinstance(other, self.__class__): 

310 return self.__dict__ == other.__dict__ 

311 return False 

312 

313 def __ne__(self, other: Any) -> bool: 

314 """Compare objects by comparing all attributes.""" 

315 return not self.__eq__(other) 

316 

317 def __str__(self) -> str: 

318 return str(self.__dict__) 

319 

320 @classmethod 

321 def enable_additional_properties_sending(cls) -> None: 

322 cls._attribute_map["additional_properties"] = {"key": "", "type": "{object}"} 

323 

324 @classmethod 

325 def is_xml_model(cls) -> bool: 

326 try: 

327 cls._xml_map # type: ignore 

328 except AttributeError: 

329 return False 

330 return True 

331 

332 @classmethod 

333 def _create_xml_node(cls): 

334 """Create XML node.""" 

335 try: 

336 xml_map = cls._xml_map # type: ignore 

337 except AttributeError: 

338 xml_map = {} 

339 

340 return _create_xml_node(xml_map.get("name", cls.__name__), xml_map.get("prefix", None), xml_map.get("ns", None)) 

341 

342 def serialize(self, keep_readonly: bool = False, **kwargs: Any) -> JSON: 

343 """Return the JSON that would be sent to azure from this model. 

344 

345 This is an alias to `as_dict(full_restapi_key_transformer, keep_readonly=False)`. 

346 

347 If you want XML serialization, you can pass the kwargs is_xml=True. 

348 

349 :param bool keep_readonly: If you want to serialize the readonly attributes 

350 :returns: A dict JSON compatible object 

351 :rtype: dict 

352 """ 

353 serializer = Serializer(self._infer_class_models()) 

354 return serializer._serialize(self, keep_readonly=keep_readonly, **kwargs) 

355 

356 def as_dict( 

357 self, 

358 keep_readonly: bool = True, 

359 key_transformer: Callable[[str, Dict[str, Any], Any], Any] = attribute_transformer, 

360 **kwargs: Any 

361 ) -> JSON: 

362 """Return a dict that can be serialized using json.dump. 

363 

364 Advanced usage might optionally use a callback as parameter: 

365 

366 .. code::python 

367 

368 def my_key_transformer(key, attr_desc, value): 

369 return key 

370 

371 Key is the attribute name used in Python. Attr_desc 

372 is a dict of metadata. Currently contains 'type' with the 

373 msrest type and 'key' with the RestAPI encoded key. 

374 Value is the current value in this object. 

375 

376 The string returned will be used to serialize the key. 

377 If the return type is a list, this is considered hierarchical 

378 result dict. 

379 

380 See the three examples in this file: 

381 

382 - attribute_transformer 

383 - full_restapi_key_transformer 

384 - last_restapi_key_transformer 

385 

386 If you want XML serialization, you can pass the kwargs is_xml=True. 

387 

388 :param function key_transformer: A key transformer function. 

389 :returns: A dict JSON compatible object 

390 :rtype: dict 

391 """ 

392 serializer = Serializer(self._infer_class_models()) 

393 return serializer._serialize(self, key_transformer=key_transformer, keep_readonly=keep_readonly, **kwargs) 

394 

395 @classmethod 

396 def _infer_class_models(cls): 

397 try: 

398 str_models = cls.__module__.rsplit(".", 1)[0] 

399 models = sys.modules[str_models] 

400 client_models = {k: v for k, v in models.__dict__.items() if isinstance(v, type)} 

401 if cls.__name__ not in client_models: 

402 raise ValueError("Not Autorest generated code") 

403 except Exception: 

404 # Assume it's not Autorest generated (tests?). Add ourselves as dependencies. 

405 client_models = {cls.__name__: cls} 

406 return client_models 

407 

408 @classmethod 

409 def deserialize(cls: Type[ModelType], data: Any, content_type: Optional[str] = None) -> ModelType: 

410 """Parse a str using the RestAPI syntax and return a model. 

411 

412 :param str data: A str using RestAPI structure. JSON by default. 

413 :param str content_type: JSON by default, set application/xml if XML. 

414 :returns: An instance of this model 

415 :raises: DeserializationError if something went wrong 

416 """ 

417 deserializer = Deserializer(cls._infer_class_models()) 

418 return deserializer(cls.__name__, data, content_type=content_type) 

419 

420 @classmethod 

421 def from_dict( 

422 cls: Type[ModelType], 

423 data: Any, 

424 key_extractors: Optional[Callable[[str, Dict[str, Any], Any], Any]] = None, 

425 content_type: Optional[str] = None, 

426 ) -> ModelType: 

427 """Parse a dict using given key extractor return a model. 

428 

429 By default consider key 

430 extractors (rest_key_case_insensitive_extractor, attribute_key_case_insensitive_extractor 

431 and last_rest_key_case_insensitive_extractor) 

432 

433 :param dict data: A dict using RestAPI structure 

434 :param str content_type: JSON by default, set application/xml if XML. 

435 :returns: An instance of this model 

436 :raises: DeserializationError if something went wrong 

437 """ 

438 deserializer = Deserializer(cls._infer_class_models()) 

439 deserializer.key_extractors = ( # type: ignore 

440 [ # type: ignore 

441 attribute_key_case_insensitive_extractor, 

442 rest_key_case_insensitive_extractor, 

443 last_rest_key_case_insensitive_extractor, 

444 ] 

445 if key_extractors is None 

446 else key_extractors 

447 ) 

448 return deserializer(cls.__name__, data, content_type=content_type) 

449 

450 @classmethod 

451 def _flatten_subtype(cls, key, objects): 

452 if "_subtype_map" not in cls.__dict__: 

453 return {} 

454 result = dict(cls._subtype_map[key]) 

455 for valuetype in cls._subtype_map[key].values(): 

456 result.update(objects[valuetype]._flatten_subtype(key, objects)) 

457 return result 

458 

459 @classmethod 

460 def _classify(cls, response, objects): 

461 """Check the class _subtype_map for any child classes. 

462 We want to ignore any inherited _subtype_maps. 

463 Remove the polymorphic key from the initial data. 

464 """ 

465 for subtype_key in cls.__dict__.get("_subtype_map", {}).keys(): 

466 subtype_value = None 

467 

468 if not isinstance(response, ET.Element): 

469 rest_api_response_key = cls._get_rest_key_parts(subtype_key)[-1] 

470 subtype_value = response.pop(rest_api_response_key, None) or response.pop(subtype_key, None) 

471 else: 

472 subtype_value = xml_key_extractor(subtype_key, cls._attribute_map[subtype_key], response) 

473 if subtype_value: 

474 # Try to match base class. Can be class name only 

475 # (bug to fix in Autorest to support x-ms-discriminator-name) 

476 if cls.__name__ == subtype_value: 

477 return cls 

478 flatten_mapping_type = cls._flatten_subtype(subtype_key, objects) 

479 try: 

480 return objects[flatten_mapping_type[subtype_value]] # type: ignore 

481 except KeyError: 

482 _LOGGER.warning( 

483 "Subtype value %s has no mapping, use base class %s.", 

484 subtype_value, 

485 cls.__name__, 

486 ) 

487 break 

488 else: 

489 _LOGGER.warning("Discriminator %s is absent or null, use base class %s.", subtype_key, cls.__name__) 

490 break 

491 return cls 

492 

493 @classmethod 

494 def _get_rest_key_parts(cls, attr_key): 

495 """Get the RestAPI key of this attr, split it and decode part 

496 :param str attr_key: Attribute key must be in attribute_map. 

497 :returns: A list of RestAPI part 

498 :rtype: list 

499 """ 

500 rest_split_key = _FLATTEN.split(cls._attribute_map[attr_key]["key"]) 

501 return [_decode_attribute_map_key(key_part) for key_part in rest_split_key] 

502 

503 

504def _decode_attribute_map_key(key): 

505 """This decode a key in an _attribute_map to the actual key we want to look at 

506 inside the received data. 

507 

508 :param str key: A key string from the generated code 

509 """ 

510 return key.replace("\\.", ".") 

511 

512 

513class Serializer(object): 

514 """Request object model serializer.""" 

515 

516 basic_types = {str: "str", int: "int", bool: "bool", float: "float"} 

517 

518 _xml_basic_types_serializers = {"bool": lambda x: str(x).lower()} 

519 days = {0: "Mon", 1: "Tue", 2: "Wed", 3: "Thu", 4: "Fri", 5: "Sat", 6: "Sun"} 

520 months = { 

521 1: "Jan", 

522 2: "Feb", 

523 3: "Mar", 

524 4: "Apr", 

525 5: "May", 

526 6: "Jun", 

527 7: "Jul", 

528 8: "Aug", 

529 9: "Sep", 

530 10: "Oct", 

531 11: "Nov", 

532 12: "Dec", 

533 } 

534 validation = { 

535 "min_length": lambda x, y: len(x) < y, 

536 "max_length": lambda x, y: len(x) > y, 

537 "minimum": lambda x, y: x < y, 

538 "maximum": lambda x, y: x > y, 

539 "minimum_ex": lambda x, y: x <= y, 

540 "maximum_ex": lambda x, y: x >= y, 

541 "min_items": lambda x, y: len(x) < y, 

542 "max_items": lambda x, y: len(x) > y, 

543 "pattern": lambda x, y: not re.match(y, x, re.UNICODE), 

544 "unique": lambda x, y: len(x) != len(set(x)), 

545 "multiple": lambda x, y: x % y != 0, 

546 } 

547 

548 def __init__(self, classes: Optional[Mapping[str, Type[ModelType]]] = None): 

549 self.serialize_type = { 

550 "iso-8601": Serializer.serialize_iso, 

551 "rfc-1123": Serializer.serialize_rfc, 

552 "unix-time": Serializer.serialize_unix, 

553 "duration": Serializer.serialize_duration, 

554 "date": Serializer.serialize_date, 

555 "time": Serializer.serialize_time, 

556 "decimal": Serializer.serialize_decimal, 

557 "long": Serializer.serialize_long, 

558 "bytearray": Serializer.serialize_bytearray, 

559 "base64": Serializer.serialize_base64, 

560 "object": self.serialize_object, 

561 "[]": self.serialize_iter, 

562 "{}": self.serialize_dict, 

563 } 

564 self.dependencies: Dict[str, Type[ModelType]] = dict(classes) if classes else {} 

565 self.key_transformer = full_restapi_key_transformer 

566 self.client_side_validation = True 

567 

568 def _serialize(self, target_obj, data_type=None, **kwargs): 

569 """Serialize data into a string according to type. 

570 

571 :param target_obj: The data to be serialized. 

572 :param str data_type: The type to be serialized from. 

573 :rtype: str, dict 

574 :raises: SerializationError if serialization fails. 

575 """ 

576 key_transformer = kwargs.get("key_transformer", self.key_transformer) 

577 keep_readonly = kwargs.get("keep_readonly", False) 

578 if target_obj is None: 

579 return None 

580 

581 attr_name = None 

582 class_name = target_obj.__class__.__name__ 

583 

584 if data_type: 

585 return self.serialize_data(target_obj, data_type, **kwargs) 

586 

587 if not hasattr(target_obj, "_attribute_map"): 

588 data_type = type(target_obj).__name__ 

589 if data_type in self.basic_types.values(): 

590 return self.serialize_data(target_obj, data_type, **kwargs) 

591 

592 # Force "is_xml" kwargs if we detect a XML model 

593 try: 

594 is_xml_model_serialization = kwargs["is_xml"] 

595 except KeyError: 

596 is_xml_model_serialization = kwargs.setdefault("is_xml", target_obj.is_xml_model()) 

597 

598 serialized = {} 

599 if is_xml_model_serialization: 

600 serialized = target_obj._create_xml_node() 

601 try: 

602 attributes = target_obj._attribute_map 

603 for attr, attr_desc in attributes.items(): 

604 attr_name = attr 

605 if not keep_readonly and target_obj._validation.get(attr_name, {}).get("readonly", False): 

606 continue 

607 

608 if attr_name == "additional_properties" and attr_desc["key"] == "": 

609 if target_obj.additional_properties is not None: 

610 serialized.update(target_obj.additional_properties) 

611 continue 

612 try: 

613 

614 orig_attr = getattr(target_obj, attr) 

615 if is_xml_model_serialization: 

616 pass # Don't provide "transformer" for XML for now. Keep "orig_attr" 

617 else: # JSON 

618 keys, orig_attr = key_transformer(attr, attr_desc.copy(), orig_attr) 

619 keys = keys if isinstance(keys, list) else [keys] 

620 

621 kwargs["serialization_ctxt"] = attr_desc 

622 new_attr = self.serialize_data(orig_attr, attr_desc["type"], **kwargs) 

623 

624 if is_xml_model_serialization: 

625 xml_desc = attr_desc.get("xml", {}) 

626 xml_name = xml_desc.get("name", attr_desc["key"]) 

627 xml_prefix = xml_desc.get("prefix", None) 

628 xml_ns = xml_desc.get("ns", None) 

629 if xml_desc.get("attr", False): 

630 if xml_ns: 

631 ET.register_namespace(xml_prefix, xml_ns) 

632 xml_name = "{{{}}}{}".format(xml_ns, xml_name) 

633 serialized.set(xml_name, new_attr) # type: ignore 

634 continue 

635 if xml_desc.get("text", False): 

636 serialized.text = new_attr # type: ignore 

637 continue 

638 if isinstance(new_attr, list): 

639 serialized.extend(new_attr) # type: ignore 

640 elif isinstance(new_attr, ET.Element): 

641 # If the down XML has no XML/Name, we MUST replace the tag with the local tag. But keeping the namespaces. 

642 if "name" not in getattr(orig_attr, "_xml_map", {}): 

643 splitted_tag = new_attr.tag.split("}") 

644 if len(splitted_tag) == 2: # Namespace 

645 new_attr.tag = "}".join([splitted_tag[0], xml_name]) 

646 else: 

647 new_attr.tag = xml_name 

648 serialized.append(new_attr) # type: ignore 

649 else: # That's a basic type 

650 # Integrate namespace if necessary 

651 local_node = _create_xml_node(xml_name, xml_prefix, xml_ns) 

652 local_node.text = unicode_str(new_attr) 

653 serialized.append(local_node) # type: ignore 

654 else: # JSON 

655 for k in reversed(keys): # type: ignore 

656 new_attr = {k: new_attr} 

657 

658 _new_attr = new_attr 

659 _serialized = serialized 

660 for k in keys: # type: ignore 

661 if k not in _serialized: 

662 _serialized.update(_new_attr) # type: ignore 

663 _new_attr = _new_attr[k] # type: ignore 

664 _serialized = _serialized[k] 

665 except ValueError as err: 

666 if isinstance(err, SerializationError): 

667 raise 

668 

669 except (AttributeError, KeyError, TypeError) as err: 

670 msg = "Attribute {} in object {} cannot be serialized.\n{}".format(attr_name, class_name, str(target_obj)) 

671 raise_with_traceback(SerializationError, msg, err) 

672 else: 

673 return serialized 

674 

675 def body(self, data, data_type, **kwargs): 

676 """Serialize data intended for a request body. 

677 

678 :param data: The data to be serialized. 

679 :param str data_type: The type to be serialized from. 

680 :rtype: dict 

681 :raises: SerializationError if serialization fails. 

682 :raises: ValueError if data is None 

683 """ 

684 

685 # Just in case this is a dict 

686 internal_data_type_str = data_type.strip("[]{}") 

687 internal_data_type = self.dependencies.get(internal_data_type_str, None) 

688 try: 

689 is_xml_model_serialization = kwargs["is_xml"] 

690 except KeyError: 

691 if internal_data_type and issubclass(internal_data_type, Model): 

692 is_xml_model_serialization = kwargs.setdefault("is_xml", internal_data_type.is_xml_model()) 

693 else: 

694 is_xml_model_serialization = False 

695 if internal_data_type and not isinstance(internal_data_type, Enum): 

696 try: 

697 deserializer = Deserializer(self.dependencies) 

698 # Since it's on serialization, it's almost sure that format is not JSON REST 

699 # We're not able to deal with additional properties for now. 

700 deserializer.additional_properties_detection = False 

701 if is_xml_model_serialization: 

702 deserializer.key_extractors = [ # type: ignore 

703 attribute_key_case_insensitive_extractor, 

704 ] 

705 else: 

706 deserializer.key_extractors = [ 

707 rest_key_case_insensitive_extractor, 

708 attribute_key_case_insensitive_extractor, 

709 last_rest_key_case_insensitive_extractor, 

710 ] 

711 data = deserializer._deserialize(data_type, data) 

712 except DeserializationError as err: 

713 raise_with_traceback(SerializationError, "Unable to build a model: " + str(err), err) 

714 

715 return self._serialize(data, data_type, **kwargs) 

716 

717 def url(self, name, data, data_type, **kwargs): 

718 """Serialize data intended for a URL path. 

719 

720 :param data: The data to be serialized. 

721 :param str data_type: The type to be serialized from. 

722 :rtype: str 

723 :raises: TypeError if serialization fails. 

724 :raises: ValueError if data is None 

725 """ 

726 try: 

727 output = self.serialize_data(data, data_type, **kwargs) 

728 if data_type == "bool": 

729 output = json.dumps(output) 

730 

731 if kwargs.get("skip_quote") is True: 

732 output = str(output) 

733 else: 

734 output = quote(str(output), safe="") 

735 except SerializationError: 

736 raise TypeError("{} must be type {}.".format(name, data_type)) 

737 else: 

738 return output 

739 

740 def query(self, name, data, data_type, **kwargs): 

741 """Serialize data intended for a URL query. 

742 

743 :param data: The data to be serialized. 

744 :param str data_type: The type to be serialized from. 

745 :keyword bool skip_quote: Whether to skip quote the serialized result. 

746 Defaults to False. 

747 :rtype: str 

748 :raises: TypeError if serialization fails. 

749 :raises: ValueError if data is None 

750 """ 

751 try: 

752 # Treat the list aside, since we don't want to encode the div separator 

753 if data_type.startswith("["): 

754 internal_data_type = data_type[1:-1] 

755 do_quote = not kwargs.get("skip_quote", False) 

756 return str(self.serialize_iter(data, internal_data_type, do_quote=do_quote, **kwargs)) 

757 

758 # Not a list, regular serialization 

759 output = self.serialize_data(data, data_type, **kwargs) 

760 if data_type == "bool": 

761 output = json.dumps(output) 

762 if kwargs.get("skip_quote") is True: 

763 output = str(output) 

764 else: 

765 output = quote(str(output), safe="") 

766 except SerializationError: 

767 raise TypeError("{} must be type {}.".format(name, data_type)) 

768 else: 

769 return str(output) 

770 

771 def header(self, name, data, data_type, **kwargs): 

772 """Serialize data intended for a request header. 

773 

774 :param data: The data to be serialized. 

775 :param str data_type: The type to be serialized from. 

776 :rtype: str 

777 :raises: TypeError if serialization fails. 

778 :raises: ValueError if data is None 

779 """ 

780 try: 

781 if data_type in ["[str]"]: 

782 data = ["" if d is None else d for d in data] 

783 

784 output = self.serialize_data(data, data_type, **kwargs) 

785 if data_type == "bool": 

786 output = json.dumps(output) 

787 except SerializationError: 

788 raise TypeError("{} must be type {}.".format(name, data_type)) 

789 else: 

790 return str(output) 

791 

792 def serialize_data(self, data, data_type, **kwargs): 

793 """Serialize generic data according to supplied data type. 

794 

795 :param data: The data to be serialized. 

796 :param str data_type: The type to be serialized from. 

797 :param bool required: Whether it's essential that the data not be 

798 empty or None 

799 :raises: AttributeError if required data is None. 

800 :raises: ValueError if data is None 

801 :raises: SerializationError if serialization fails. 

802 """ 

803 if data is None: 

804 raise ValueError("No value for given attribute") 

805 

806 try: 

807 if data is AzureCoreNull: 

808 return None 

809 if data_type in self.basic_types.values(): 

810 return self.serialize_basic(data, data_type, **kwargs) 

811 

812 elif data_type in self.serialize_type: 

813 return self.serialize_type[data_type](data, **kwargs) 

814 

815 # If dependencies is empty, try with current data class 

816 # It has to be a subclass of Enum anyway 

817 enum_type = self.dependencies.get(data_type, data.__class__) 

818 if issubclass(enum_type, Enum): 

819 return Serializer.serialize_enum(data, enum_obj=enum_type) 

820 

821 iter_type = data_type[0] + data_type[-1] 

822 if iter_type in self.serialize_type: 

823 return self.serialize_type[iter_type](data, data_type[1:-1], **kwargs) 

824 

825 except (ValueError, TypeError) as err: 

826 msg = "Unable to serialize value: {!r} as type: {!r}." 

827 raise_with_traceback(SerializationError, msg.format(data, data_type), err) 

828 else: 

829 return self._serialize(data, **kwargs) 

830 

831 @classmethod 

832 def _get_custom_serializers(cls, data_type, **kwargs): 

833 custom_serializer = kwargs.get("basic_types_serializers", {}).get(data_type) 

834 if custom_serializer: 

835 return custom_serializer 

836 if kwargs.get("is_xml", False): 

837 return cls._xml_basic_types_serializers.get(data_type) 

838 

839 @classmethod 

840 def serialize_basic(cls, data, data_type, **kwargs): 

841 """Serialize basic builting data type. 

842 Serializes objects to str, int, float or bool. 

843 

844 Possible kwargs: 

845 - basic_types_serializers dict[str, callable] : If set, use the callable as serializer 

846 - is_xml bool : If set, use xml_basic_types_serializers 

847 

848 :param data: Object to be serialized. 

849 :param str data_type: Type of object in the iterable. 

850 """ 

851 custom_serializer = cls._get_custom_serializers(data_type, **kwargs) 

852 if custom_serializer: 

853 return custom_serializer(data) 

854 if data_type == "str": 

855 return cls.serialize_unicode(data) 

856 return eval(data_type)(data) # nosec 

857 

858 @classmethod 

859 def serialize_unicode(cls, data): 

860 """Special handling for serializing unicode strings in Py2. 

861 Encode to UTF-8 if unicode, otherwise handle as a str. 

862 

863 :param data: Object to be serialized. 

864 :rtype: str 

865 """ 

866 try: # If I received an enum, return its value 

867 return data.value 

868 except AttributeError: 

869 pass 

870 

871 try: 

872 if isinstance(data, unicode): # type: ignore 

873 # Don't change it, JSON and XML ElementTree are totally able 

874 # to serialize correctly u'' strings 

875 return data 

876 except NameError: 

877 return str(data) 

878 else: 

879 return str(data) 

880 

881 def serialize_iter(self, data, iter_type, div=None, **kwargs): 

882 """Serialize iterable. 

883 

884 Supported kwargs: 

885 - serialization_ctxt dict : The current entry of _attribute_map, or same format. 

886 serialization_ctxt['type'] should be same as data_type. 

887 - is_xml bool : If set, serialize as XML 

888 

889 :param list attr: Object to be serialized. 

890 :param str iter_type: Type of object in the iterable. 

891 :param bool required: Whether the objects in the iterable must 

892 not be None or empty. 

893 :param str div: If set, this str will be used to combine the elements 

894 in the iterable into a combined string. Default is 'None'. 

895 :keyword bool do_quote: Whether to quote the serialized result of each iterable element. 

896 Defaults to False. 

897 :rtype: list, str 

898 """ 

899 if isinstance(data, str): 

900 raise SerializationError("Refuse str type as a valid iter type.") 

901 

902 serialization_ctxt = kwargs.get("serialization_ctxt", {}) 

903 is_xml = kwargs.get("is_xml", False) 

904 

905 serialized = [] 

906 for d in data: 

907 try: 

908 serialized.append(self.serialize_data(d, iter_type, **kwargs)) 

909 except ValueError as err: 

910 if isinstance(err, SerializationError): 

911 raise 

912 serialized.append(None) 

913 

914 if kwargs.get("do_quote", False): 

915 serialized = ["" if s is None else quote(str(s), safe="") for s in serialized] 

916 

917 if div: 

918 serialized = ["" if s is None else str(s) for s in serialized] 

919 serialized = div.join(serialized) 

920 

921 if "xml" in serialization_ctxt or is_xml: 

922 # XML serialization is more complicated 

923 xml_desc = serialization_ctxt.get("xml", {}) 

924 xml_name = xml_desc.get("name") 

925 if not xml_name: 

926 xml_name = serialization_ctxt["key"] 

927 

928 # Create a wrap node if necessary (use the fact that Element and list have "append") 

929 is_wrapped = xml_desc.get("wrapped", False) 

930 node_name = xml_desc.get("itemsName", xml_name) 

931 if is_wrapped: 

932 final_result = _create_xml_node(xml_name, xml_desc.get("prefix", None), xml_desc.get("ns", None)) 

933 else: 

934 final_result = [] 

935 # All list elements to "local_node" 

936 for el in serialized: 

937 if isinstance(el, ET.Element): 

938 el_node = el 

939 else: 

940 el_node = _create_xml_node(node_name, xml_desc.get("prefix", None), xml_desc.get("ns", None)) 

941 if el is not None: # Otherwise it writes "None" :-p 

942 el_node.text = str(el) 

943 final_result.append(el_node) 

944 return final_result 

945 return serialized 

946 

947 def serialize_dict(self, attr, dict_type, **kwargs): 

948 """Serialize a dictionary of objects. 

949 

950 :param dict attr: Object to be serialized. 

951 :param str dict_type: Type of object in the dictionary. 

952 :param bool required: Whether the objects in the dictionary must 

953 not be None or empty. 

954 :rtype: dict 

955 """ 

956 serialization_ctxt = kwargs.get("serialization_ctxt", {}) 

957 serialized = {} 

958 for key, value in attr.items(): 

959 try: 

960 serialized[self.serialize_unicode(key)] = self.serialize_data(value, dict_type, **kwargs) 

961 except ValueError as err: 

962 if isinstance(err, SerializationError): 

963 raise 

964 serialized[self.serialize_unicode(key)] = None 

965 

966 if "xml" in serialization_ctxt: 

967 # XML serialization is more complicated 

968 xml_desc = serialization_ctxt["xml"] 

969 xml_name = xml_desc["name"] 

970 

971 final_result = _create_xml_node(xml_name, xml_desc.get("prefix", None), xml_desc.get("ns", None)) 

972 for key, value in serialized.items(): 

973 ET.SubElement(final_result, key).text = value 

974 return final_result 

975 

976 return serialized 

977 

978 def serialize_object(self, attr, **kwargs): 

979 """Serialize a generic object. 

980 This will be handled as a dictionary. If object passed in is not 

981 a basic type (str, int, float, dict, list) it will simply be 

982 cast to str. 

983 

984 :param dict attr: Object to be serialized. 

985 :rtype: dict or str 

986 """ 

987 if attr is None: 

988 return None 

989 if isinstance(attr, ET.Element): 

990 return attr 

991 obj_type = type(attr) 

992 if obj_type in self.basic_types: 

993 return self.serialize_basic(attr, self.basic_types[obj_type], **kwargs) 

994 if obj_type is _long_type: 

995 return self.serialize_long(attr) 

996 if obj_type is unicode_str: 

997 return self.serialize_unicode(attr) 

998 if obj_type is datetime.datetime: 

999 return self.serialize_iso(attr) 

1000 if obj_type is datetime.date: 

1001 return self.serialize_date(attr) 

1002 if obj_type is datetime.time: 

1003 return self.serialize_time(attr) 

1004 if obj_type is datetime.timedelta: 

1005 return self.serialize_duration(attr) 

1006 if obj_type is decimal.Decimal: 

1007 return self.serialize_decimal(attr) 

1008 

1009 # If it's a model or I know this dependency, serialize as a Model 

1010 elif obj_type in self.dependencies.values() or isinstance(attr, Model): 

1011 return self._serialize(attr) 

1012 

1013 if obj_type == dict: 

1014 serialized = {} 

1015 for key, value in attr.items(): 

1016 try: 

1017 serialized[self.serialize_unicode(key)] = self.serialize_object(value, **kwargs) 

1018 except ValueError: 

1019 serialized[self.serialize_unicode(key)] = None 

1020 return serialized 

1021 

1022 if obj_type == list: 

1023 serialized = [] 

1024 for obj in attr: 

1025 try: 

1026 serialized.append(self.serialize_object(obj, **kwargs)) 

1027 except ValueError: 

1028 pass 

1029 return serialized 

1030 return str(attr) 

1031 

1032 @staticmethod 

1033 def serialize_enum(attr, enum_obj=None): 

1034 try: 

1035 result = attr.value 

1036 except AttributeError: 

1037 result = attr 

1038 try: 

1039 enum_obj(result) # type: ignore 

1040 return result 

1041 except ValueError: 

1042 for enum_value in enum_obj: # type: ignore 

1043 if enum_value.value.lower() == str(attr).lower(): 

1044 return enum_value.value 

1045 error = "{!r} is not valid value for enum {!r}" 

1046 raise SerializationError(error.format(attr, enum_obj)) 

1047 

1048 @staticmethod 

1049 def serialize_bytearray(attr, **kwargs): 

1050 """Serialize bytearray into base-64 string. 

1051 

1052 :param attr: Object to be serialized. 

1053 :rtype: str 

1054 """ 

1055 return b64encode(attr).decode() 

1056 

1057 @staticmethod 

1058 def serialize_base64(attr, **kwargs): 

1059 """Serialize str into base-64 string. 

1060 

1061 :param attr: Object to be serialized. 

1062 :rtype: str 

1063 """ 

1064 encoded = b64encode(attr).decode("ascii") 

1065 return encoded.strip("=").replace("+", "-").replace("/", "_") 

1066 

1067 @staticmethod 

1068 def serialize_decimal(attr, **kwargs): 

1069 """Serialize Decimal object to float. 

1070 

1071 :param attr: Object to be serialized. 

1072 :rtype: float 

1073 """ 

1074 return float(attr) 

1075 

1076 @staticmethod 

1077 def serialize_long(attr, **kwargs): 

1078 """Serialize long (Py2) or int (Py3). 

1079 

1080 :param attr: Object to be serialized. 

1081 :rtype: int/long 

1082 """ 

1083 return _long_type(attr) 

1084 

1085 @staticmethod 

1086 def serialize_date(attr, **kwargs): 

1087 """Serialize Date object into ISO-8601 formatted string. 

1088 

1089 :param Date attr: Object to be serialized. 

1090 :rtype: str 

1091 """ 

1092 if isinstance(attr, str): 

1093 attr = isodate.parse_date(attr) 

1094 t = "{:04}-{:02}-{:02}".format(attr.year, attr.month, attr.day) 

1095 return t 

1096 

1097 @staticmethod 

1098 def serialize_time(attr, **kwargs): 

1099 """Serialize Time object into ISO-8601 formatted string. 

1100 

1101 :param datetime.time attr: Object to be serialized. 

1102 :rtype: str 

1103 """ 

1104 if isinstance(attr, str): 

1105 attr = isodate.parse_time(attr) 

1106 t = "{:02}:{:02}:{:02}".format(attr.hour, attr.minute, attr.second) 

1107 if attr.microsecond: 

1108 t += ".{:02}".format(attr.microsecond) 

1109 return t 

1110 

1111 @staticmethod 

1112 def serialize_duration(attr, **kwargs): 

1113 """Serialize TimeDelta object into ISO-8601 formatted string. 

1114 

1115 :param TimeDelta attr: Object to be serialized. 

1116 :rtype: str 

1117 """ 

1118 if isinstance(attr, str): 

1119 attr = isodate.parse_duration(attr) 

1120 return isodate.duration_isoformat(attr) 

1121 

1122 @staticmethod 

1123 def serialize_rfc(attr, **kwargs): 

1124 """Serialize Datetime object into RFC-1123 formatted string. 

1125 

1126 :param Datetime attr: Object to be serialized. 

1127 :rtype: str 

1128 :raises: TypeError if format invalid. 

1129 """ 

1130 try: 

1131 if not attr.tzinfo: 

1132 _LOGGER.warning("Datetime with no tzinfo will be considered UTC.") 

1133 utc = attr.utctimetuple() 

1134 except AttributeError: 

1135 raise TypeError("RFC1123 object must be valid Datetime object.") 

1136 

1137 return "{}, {:02} {} {:04} {:02}:{:02}:{:02} GMT".format( 

1138 Serializer.days[utc.tm_wday], 

1139 utc.tm_mday, 

1140 Serializer.months[utc.tm_mon], 

1141 utc.tm_year, 

1142 utc.tm_hour, 

1143 utc.tm_min, 

1144 utc.tm_sec, 

1145 ) 

1146 

1147 @staticmethod 

1148 def serialize_iso(attr, **kwargs): 

1149 """Serialize Datetime object into ISO-8601 formatted string. 

1150 

1151 :param Datetime attr: Object to be serialized. 

1152 :rtype: str 

1153 :raises: SerializationError if format invalid. 

1154 """ 

1155 if isinstance(attr, str): 

1156 attr = isodate.parse_datetime(attr) 

1157 try: 

1158 if not attr.tzinfo: 

1159 _LOGGER.warning("Datetime with no tzinfo will be considered UTC.") 

1160 utc = attr.utctimetuple() 

1161 if utc.tm_year > 9999 or utc.tm_year < 1: 

1162 raise OverflowError("Hit max or min date") 

1163 

1164 microseconds = str(attr.microsecond).rjust(6, "0").rstrip("0").ljust(3, "0") 

1165 if microseconds: 

1166 microseconds = "." + microseconds 

1167 date = "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}".format( 

1168 utc.tm_year, utc.tm_mon, utc.tm_mday, utc.tm_hour, utc.tm_min, utc.tm_sec 

1169 ) 

1170 return date + microseconds + "Z" 

1171 except (ValueError, OverflowError) as err: 

1172 msg = "Unable to serialize datetime object." 

1173 raise_with_traceback(SerializationError, msg, err) 

1174 except AttributeError as err: 

1175 msg = "ISO-8601 object must be valid Datetime object." 

1176 raise_with_traceback(TypeError, msg, err) 

1177 

1178 @staticmethod 

1179 def serialize_unix(attr, **kwargs): 

1180 """Serialize Datetime object into IntTime format. 

1181 This is represented as seconds. 

1182 

1183 :param Datetime attr: Object to be serialized. 

1184 :rtype: int 

1185 :raises: SerializationError if format invalid 

1186 """ 

1187 if isinstance(attr, int): 

1188 return attr 

1189 try: 

1190 if not attr.tzinfo: 

1191 _LOGGER.warning("Datetime with no tzinfo will be considered UTC.") 

1192 return int(calendar.timegm(attr.utctimetuple())) 

1193 except AttributeError: 

1194 raise TypeError("Unix time object must be valid Datetime object.") 

1195 

1196 

1197def rest_key_extractor(attr, attr_desc, data): 

1198 key = attr_desc["key"] 

1199 working_data = data 

1200 

1201 while "." in key: 

1202 # Need the cast, as for some reasons "split" is typed as list[str | Any] 

1203 dict_keys = cast(List[str], _FLATTEN.split(key)) 

1204 if len(dict_keys) == 1: 

1205 key = _decode_attribute_map_key(dict_keys[0]) 

1206 break 

1207 working_key = _decode_attribute_map_key(dict_keys[0]) 

1208 working_data = working_data.get(working_key, data) 

1209 if working_data is None: 

1210 # If at any point while following flatten JSON path see None, it means 

1211 # that all properties under are None as well 

1212 # https://github.com/Azure/msrest-for-python/issues/197 

1213 return None 

1214 key = ".".join(dict_keys[1:]) 

1215 

1216 return working_data.get(key) 

1217 

1218 

1219def rest_key_case_insensitive_extractor(attr, attr_desc, data): 

1220 key = attr_desc["key"] 

1221 working_data = data 

1222 

1223 while "." in key: 

1224 dict_keys = _FLATTEN.split(key) 

1225 if len(dict_keys) == 1: 

1226 key = _decode_attribute_map_key(dict_keys[0]) 

1227 break 

1228 working_key = _decode_attribute_map_key(dict_keys[0]) 

1229 working_data = attribute_key_case_insensitive_extractor(working_key, None, working_data) 

1230 if working_data is None: 

1231 # If at any point while following flatten JSON path see None, it means 

1232 # that all properties under are None as well 

1233 # https://github.com/Azure/msrest-for-python/issues/197 

1234 return None 

1235 key = ".".join(dict_keys[1:]) 

1236 

1237 if working_data: 

1238 return attribute_key_case_insensitive_extractor(key, None, working_data) 

1239 

1240 

1241def last_rest_key_extractor(attr, attr_desc, data): 

1242 """Extract the attribute in "data" based on the last part of the JSON path key.""" 

1243 key = attr_desc["key"] 

1244 dict_keys = _FLATTEN.split(key) 

1245 return attribute_key_extractor(dict_keys[-1], None, data) 

1246 

1247 

1248def last_rest_key_case_insensitive_extractor(attr, attr_desc, data): 

1249 """Extract the attribute in "data" based on the last part of the JSON path key. 

1250 

1251 This is the case insensitive version of "last_rest_key_extractor" 

1252 """ 

1253 key = attr_desc["key"] 

1254 dict_keys = _FLATTEN.split(key) 

1255 return attribute_key_case_insensitive_extractor(dict_keys[-1], None, data) 

1256 

1257 

1258def attribute_key_extractor(attr, _, data): 

1259 return data.get(attr) 

1260 

1261 

1262def attribute_key_case_insensitive_extractor(attr, _, data): 

1263 found_key = None 

1264 lower_attr = attr.lower() 

1265 for key in data: 

1266 if lower_attr == key.lower(): 

1267 found_key = key 

1268 break 

1269 

1270 return data.get(found_key) 

1271 

1272 

1273def _extract_name_from_internal_type(internal_type): 

1274 """Given an internal type XML description, extract correct XML name with namespace. 

1275 

1276 :param dict internal_type: An model type 

1277 :rtype: tuple 

1278 :returns: A tuple XML name + namespace dict 

1279 """ 

1280 internal_type_xml_map = getattr(internal_type, "_xml_map", {}) 

1281 xml_name = internal_type_xml_map.get("name", internal_type.__name__) 

1282 xml_ns = internal_type_xml_map.get("ns", None) 

1283 if xml_ns: 

1284 xml_name = "{{{}}}{}".format(xml_ns, xml_name) 

1285 return xml_name 

1286 

1287 

1288def xml_key_extractor(attr, attr_desc, data): 

1289 if isinstance(data, dict): 

1290 return None 

1291 

1292 # Test if this model is XML ready first 

1293 if not isinstance(data, ET.Element): 

1294 return None 

1295 

1296 xml_desc = attr_desc.get("xml", {}) 

1297 xml_name = xml_desc.get("name", attr_desc["key"]) 

1298 

1299 # Look for a children 

1300 is_iter_type = attr_desc["type"].startswith("[") 

1301 is_wrapped = xml_desc.get("wrapped", False) 

1302 internal_type = attr_desc.get("internalType", None) 

1303 internal_type_xml_map = getattr(internal_type, "_xml_map", {}) 

1304 

1305 # Integrate namespace if necessary 

1306 xml_ns = xml_desc.get("ns", internal_type_xml_map.get("ns", None)) 

1307 if xml_ns: 

1308 xml_name = "{{{}}}{}".format(xml_ns, xml_name) 

1309 

1310 # If it's an attribute, that's simple 

1311 if xml_desc.get("attr", False): 

1312 return data.get(xml_name) 

1313 

1314 # If it's x-ms-text, that's simple too 

1315 if xml_desc.get("text", False): 

1316 return data.text 

1317 

1318 # Scenario where I take the local name: 

1319 # - Wrapped node 

1320 # - Internal type is an enum (considered basic types) 

1321 # - Internal type has no XML/Name node 

1322 if is_wrapped or (internal_type and (issubclass(internal_type, Enum) or "name" not in internal_type_xml_map)): 

1323 children = data.findall(xml_name) 

1324 # If internal type has a local name and it's not a list, I use that name 

1325 elif not is_iter_type and internal_type and "name" in internal_type_xml_map: 

1326 xml_name = _extract_name_from_internal_type(internal_type) 

1327 children = data.findall(xml_name) 

1328 # That's an array 

1329 else: 

1330 if internal_type: # Complex type, ignore itemsName and use the complex type name 

1331 items_name = _extract_name_from_internal_type(internal_type) 

1332 else: 

1333 items_name = xml_desc.get("itemsName", xml_name) 

1334 children = data.findall(items_name) 

1335 

1336 if len(children) == 0: 

1337 if is_iter_type: 

1338 if is_wrapped: 

1339 return None # is_wrapped no node, we want None 

1340 else: 

1341 return [] # not wrapped, assume empty list 

1342 return None # Assume it's not there, maybe an optional node. 

1343 

1344 # If is_iter_type and not wrapped, return all found children 

1345 if is_iter_type: 

1346 if not is_wrapped: 

1347 return children 

1348 else: # Iter and wrapped, should have found one node only (the wrap one) 

1349 if len(children) != 1: 

1350 raise DeserializationError( 

1351 "Tried to deserialize an array not wrapped, and found several nodes '{}'. Maybe you should declare this array as wrapped?".format( 

1352 xml_name 

1353 ) 

1354 ) 

1355 return list(children[0]) # Might be empty list and that's ok. 

1356 

1357 # Here it's not a itertype, we should have found one element only or empty 

1358 if len(children) > 1: 

1359 raise DeserializationError("Find several XML '{}' where it was not expected".format(xml_name)) 

1360 return children[0] 

1361 

1362 

1363class Deserializer(object): 

1364 """Response object model deserializer. 

1365 

1366 :param dict classes: Class type dictionary for deserializing complex types. 

1367 :ivar list key_extractors: Ordered list of extractors to be used by this deserializer. 

1368 """ 

1369 

1370 basic_types = {str: "str", int: "int", bool: "bool", float: "float"} 

1371 

1372 valid_date = re.compile(r"\d{4}[-]\d{2}[-]\d{2}T\d{2}:\d{2}:\d{2}" r"\.?\d*Z?[-+]?[\d{2}]?:?[\d{2}]?") 

1373 

1374 def __init__(self, classes: Optional[Mapping[str, Type[ModelType]]] = None): 

1375 self.deserialize_type = { 

1376 "iso-8601": Deserializer.deserialize_iso, 

1377 "rfc-1123": Deserializer.deserialize_rfc, 

1378 "unix-time": Deserializer.deserialize_unix, 

1379 "duration": Deserializer.deserialize_duration, 

1380 "date": Deserializer.deserialize_date, 

1381 "time": Deserializer.deserialize_time, 

1382 "decimal": Deserializer.deserialize_decimal, 

1383 "long": Deserializer.deserialize_long, 

1384 "bytearray": Deserializer.deserialize_bytearray, 

1385 "base64": Deserializer.deserialize_base64, 

1386 "object": self.deserialize_object, 

1387 "[]": self.deserialize_iter, 

1388 "{}": self.deserialize_dict, 

1389 } 

1390 self.deserialize_expected_types = { 

1391 "duration": (isodate.Duration, datetime.timedelta), 

1392 "iso-8601": (datetime.datetime), 

1393 } 

1394 self.dependencies: Dict[str, Type[ModelType]] = dict(classes) if classes else {} 

1395 self.key_extractors = [rest_key_extractor, xml_key_extractor] 

1396 # Additional properties only works if the "rest_key_extractor" is used to 

1397 # extract the keys. Making it to work whatever the key extractor is too much 

1398 # complicated, with no real scenario for now. 

1399 # So adding a flag to disable additional properties detection. This flag should be 

1400 # used if your expect the deserialization to NOT come from a JSON REST syntax. 

1401 # Otherwise, result are unexpected 

1402 self.additional_properties_detection = True 

1403 

1404 def __call__(self, target_obj, response_data, content_type=None): 

1405 """Call the deserializer to process a REST response. 

1406 

1407 :param str target_obj: Target data type to deserialize to. 

1408 :param requests.Response response_data: REST response object. 

1409 :param str content_type: Swagger "produces" if available. 

1410 :raises: DeserializationError if deserialization fails. 

1411 :return: Deserialized object. 

1412 """ 

1413 data = self._unpack_content(response_data, content_type) 

1414 return self._deserialize(target_obj, data) 

1415 

1416 def _deserialize(self, target_obj, data): 

1417 """Call the deserializer on a model. 

1418 

1419 Data needs to be already deserialized as JSON or XML ElementTree 

1420 

1421 :param str target_obj: Target data type to deserialize to. 

1422 :param object data: Object to deserialize. 

1423 :raises: DeserializationError if deserialization fails. 

1424 :return: Deserialized object. 

1425 """ 

1426 # This is already a model, go recursive just in case 

1427 if hasattr(data, "_attribute_map"): 

1428 constants = [name for name, config in getattr(data, "_validation", {}).items() if config.get("constant")] 

1429 try: 

1430 for attr, mapconfig in data._attribute_map.items(): 

1431 if attr in constants: 

1432 continue 

1433 value = getattr(data, attr) 

1434 if value is None: 

1435 continue 

1436 local_type = mapconfig["type"] 

1437 internal_data_type = local_type.strip("[]{}") 

1438 if internal_data_type not in self.dependencies or isinstance(internal_data_type, Enum): 

1439 continue 

1440 setattr(data, attr, self._deserialize(local_type, value)) 

1441 return data 

1442 except AttributeError: 

1443 return 

1444 

1445 response, class_name = self._classify_target(target_obj, data) 

1446 

1447 if isinstance(response, basestring): 

1448 return self.deserialize_data(data, response) 

1449 elif isinstance(response, type) and issubclass(response, Enum): 

1450 return self.deserialize_enum(data, response) 

1451 

1452 if data is None: 

1453 return data 

1454 try: 

1455 attributes = response._attribute_map # type: ignore 

1456 d_attrs = {} 

1457 for attr, attr_desc in attributes.items(): 

1458 # Check empty string. If it's not empty, someone has a real "additionalProperties"... 

1459 if attr == "additional_properties" and attr_desc["key"] == "": 

1460 continue 

1461 raw_value = None 

1462 # Enhance attr_desc with some dynamic data 

1463 attr_desc = attr_desc.copy() # Do a copy, do not change the real one 

1464 internal_data_type = attr_desc["type"].strip("[]{}") 

1465 if internal_data_type in self.dependencies: 

1466 attr_desc["internalType"] = self.dependencies[internal_data_type] 

1467 

1468 for key_extractor in self.key_extractors: 

1469 found_value = key_extractor(attr, attr_desc, data) 

1470 if found_value is not None: 

1471 if raw_value is not None and raw_value != found_value: 

1472 msg = ( 

1473 "Ignoring extracted value '%s' from %s for key '%s'" 

1474 " (duplicate extraction, follow extractors order)" 

1475 ) 

1476 _LOGGER.warning(msg, found_value, key_extractor, attr) 

1477 continue 

1478 raw_value = found_value 

1479 

1480 value = self.deserialize_data(raw_value, attr_desc["type"]) 

1481 d_attrs[attr] = value 

1482 except (AttributeError, TypeError, KeyError) as err: 

1483 msg = "Unable to deserialize to object: " + class_name # type: ignore 

1484 raise_with_traceback(DeserializationError, msg, err) 

1485 else: 

1486 additional_properties = self._build_additional_properties(attributes, data) 

1487 return self._instantiate_model(response, d_attrs, additional_properties) 

1488 

1489 def _build_additional_properties(self, attribute_map, data): 

1490 if not self.additional_properties_detection: 

1491 return None 

1492 if "additional_properties" in attribute_map and attribute_map.get("additional_properties", {}).get("key") != "": 

1493 # Check empty string. If it's not empty, someone has a real "additionalProperties" 

1494 return None 

1495 if isinstance(data, ET.Element): 

1496 data = {el.tag: el.text for el in data} 

1497 

1498 known_keys = { 

1499 _decode_attribute_map_key(_FLATTEN.split(desc["key"])[0]) 

1500 for desc in attribute_map.values() 

1501 if desc["key"] != "" 

1502 } 

1503 present_keys = set(data.keys()) 

1504 missing_keys = present_keys - known_keys 

1505 return {key: data[key] for key in missing_keys} 

1506 

1507 def _classify_target(self, target, data): 

1508 """Check to see whether the deserialization target object can 

1509 be classified into a subclass. 

1510 Once classification has been determined, initialize object. 

1511 

1512 :param str target: The target object type to deserialize to. 

1513 :param str/dict data: The response data to deserialize. 

1514 """ 

1515 if target is None: 

1516 return None, None 

1517 

1518 if isinstance(target, basestring): 

1519 try: 

1520 target = self.dependencies[target] 

1521 except KeyError: 

1522 return target, target 

1523 

1524 try: 

1525 target = target._classify(data, self.dependencies) 

1526 except AttributeError: 

1527 pass # Target is not a Model, no classify 

1528 return target, target.__class__.__name__ # type: ignore 

1529 

1530 def failsafe_deserialize(self, target_obj, data, content_type=None): 

1531 """Ignores any errors encountered in deserialization, 

1532 and falls back to not deserializing the object. Recommended 

1533 for use in error deserialization, as we want to return the 

1534 HttpResponseError to users, and not have them deal with 

1535 a deserialization error. 

1536 

1537 :param str target_obj: The target object type to deserialize to. 

1538 :param str/dict data: The response data to deserialize. 

1539 :param str content_type: Swagger "produces" if available. 

1540 """ 

1541 try: 

1542 return self(target_obj, data, content_type=content_type) 

1543 except: 

1544 _LOGGER.debug( 

1545 "Ran into a deserialization error. Ignoring since this is failsafe deserialization", exc_info=True 

1546 ) 

1547 return None 

1548 

1549 @staticmethod 

1550 def _unpack_content(raw_data, content_type=None): 

1551 """Extract the correct structure for deserialization. 

1552 

1553 If raw_data is a PipelineResponse, try to extract the result of RawDeserializer. 

1554 if we can't, raise. Your Pipeline should have a RawDeserializer. 

1555 

1556 If not a pipeline response and raw_data is bytes or string, use content-type 

1557 to decode it. If no content-type, try JSON. 

1558 

1559 If raw_data is something else, bypass all logic and return it directly. 

1560 

1561 :param raw_data: Data to be processed. 

1562 :param content_type: How to parse if raw_data is a string/bytes. 

1563 :raises JSONDecodeError: If JSON is requested and parsing is impossible. 

1564 :raises UnicodeDecodeError: If bytes is not UTF8 

1565 """ 

1566 # Assume this is enough to detect a Pipeline Response without importing it 

1567 context = getattr(raw_data, "context", {}) 

1568 if context: 

1569 if RawDeserializer.CONTEXT_NAME in context: 

1570 return context[RawDeserializer.CONTEXT_NAME] 

1571 raise ValueError("This pipeline didn't have the RawDeserializer policy; can't deserialize") 

1572 

1573 # Assume this is enough to recognize universal_http.ClientResponse without importing it 

1574 if hasattr(raw_data, "body"): 

1575 return RawDeserializer.deserialize_from_http_generics(raw_data.text(), raw_data.headers) 

1576 

1577 # Assume this enough to recognize requests.Response without importing it. 

1578 if hasattr(raw_data, "_content_consumed"): 

1579 return RawDeserializer.deserialize_from_http_generics(raw_data.text, raw_data.headers) 

1580 

1581 if isinstance(raw_data, (basestring, bytes)) or hasattr(raw_data, "read"): 

1582 return RawDeserializer.deserialize_from_text(raw_data, content_type) # type: ignore 

1583 return raw_data 

1584 

1585 def _instantiate_model(self, response, attrs, additional_properties=None): 

1586 """Instantiate a response model passing in deserialized args. 

1587 

1588 :param response: The response model class. 

1589 :param d_attrs: The deserialized response attributes. 

1590 """ 

1591 if callable(response): 

1592 subtype = getattr(response, "_subtype_map", {}) 

1593 try: 

1594 readonly = [k for k, v in response._validation.items() if v.get("readonly")] 

1595 const = [k for k, v in response._validation.items() if v.get("constant")] 

1596 kwargs = {k: v for k, v in attrs.items() if k not in subtype and k not in readonly + const} 

1597 response_obj = response(**kwargs) 

1598 for attr in readonly: 

1599 setattr(response_obj, attr, attrs.get(attr)) 

1600 if additional_properties: 

1601 response_obj.additional_properties = additional_properties 

1602 return response_obj 

1603 except TypeError as err: 

1604 msg = "Unable to deserialize {} into model {}. ".format(kwargs, response) # type: ignore 

1605 raise DeserializationError(msg + str(err)) 

1606 else: 

1607 try: 

1608 for attr, value in attrs.items(): 

1609 setattr(response, attr, value) 

1610 return response 

1611 except Exception as exp: 

1612 msg = "Unable to populate response model. " 

1613 msg += "Type: {}, Error: {}".format(type(response), exp) 

1614 raise DeserializationError(msg) 

1615 

1616 def deserialize_data(self, data, data_type): 

1617 """Process data for deserialization according to data type. 

1618 

1619 :param str data: The response string to be deserialized. 

1620 :param str data_type: The type to deserialize to. 

1621 :raises: DeserializationError if deserialization fails. 

1622 :return: Deserialized object. 

1623 """ 

1624 if data is None: 

1625 return data 

1626 

1627 try: 

1628 if not data_type: 

1629 return data 

1630 if data_type in self.basic_types.values(): 

1631 return self.deserialize_basic(data, data_type) 

1632 if data_type in self.deserialize_type: 

1633 if isinstance(data, self.deserialize_expected_types.get(data_type, tuple())): 

1634 return data 

1635 

1636 is_a_text_parsing_type = lambda x: x not in ["object", "[]", r"{}"] 

1637 if isinstance(data, ET.Element) and is_a_text_parsing_type(data_type) and not data.text: 

1638 return None 

1639 data_val = self.deserialize_type[data_type](data) 

1640 return data_val 

1641 

1642 iter_type = data_type[0] + data_type[-1] 

1643 if iter_type in self.deserialize_type: 

1644 return self.deserialize_type[iter_type](data, data_type[1:-1]) 

1645 

1646 obj_type = self.dependencies[data_type] 

1647 if issubclass(obj_type, Enum): 

1648 if isinstance(data, ET.Element): 

1649 data = data.text 

1650 return self.deserialize_enum(data, obj_type) 

1651 

1652 except (ValueError, TypeError, AttributeError) as err: 

1653 msg = "Unable to deserialize response data." 

1654 msg += " Data: {}, {}".format(data, data_type) 

1655 raise_with_traceback(DeserializationError, msg, err) 

1656 else: 

1657 return self._deserialize(obj_type, data) 

1658 

1659 def deserialize_iter(self, attr, iter_type): 

1660 """Deserialize an iterable. 

1661 

1662 :param list attr: Iterable to be deserialized. 

1663 :param str iter_type: The type of object in the iterable. 

1664 :rtype: list 

1665 """ 

1666 if attr is None: 

1667 return None 

1668 if isinstance(attr, ET.Element): # If I receive an element here, get the children 

1669 attr = list(attr) 

1670 if not isinstance(attr, (list, set)): 

1671 raise DeserializationError("Cannot deserialize as [{}] an object of type {}".format(iter_type, type(attr))) 

1672 return [self.deserialize_data(a, iter_type) for a in attr] 

1673 

1674 def deserialize_dict(self, attr, dict_type): 

1675 """Deserialize a dictionary. 

1676 

1677 :param dict/list attr: Dictionary to be deserialized. Also accepts 

1678 a list of key, value pairs. 

1679 :param str dict_type: The object type of the items in the dictionary. 

1680 :rtype: dict 

1681 """ 

1682 if isinstance(attr, list): 

1683 return {x["key"]: self.deserialize_data(x["value"], dict_type) for x in attr} 

1684 

1685 if isinstance(attr, ET.Element): 

1686 # Transform <Key>value</Key> into {"Key": "value"} 

1687 attr = {el.tag: el.text for el in attr} 

1688 return {k: self.deserialize_data(v, dict_type) for k, v in attr.items()} 

1689 

1690 def deserialize_object(self, attr, **kwargs): 

1691 """Deserialize a generic object. 

1692 This will be handled as a dictionary. 

1693 

1694 :param dict attr: Dictionary to be deserialized. 

1695 :rtype: dict 

1696 :raises: TypeError if non-builtin datatype encountered. 

1697 """ 

1698 if attr is None: 

1699 return None 

1700 if isinstance(attr, ET.Element): 

1701 # Do no recurse on XML, just return the tree as-is 

1702 return attr 

1703 if isinstance(attr, basestring): 

1704 return self.deserialize_basic(attr, "str") 

1705 obj_type = type(attr) 

1706 if obj_type in self.basic_types: 

1707 return self.deserialize_basic(attr, self.basic_types[obj_type]) 

1708 if obj_type is _long_type: 

1709 return self.deserialize_long(attr) 

1710 

1711 if obj_type == dict: 

1712 deserialized = {} 

1713 for key, value in attr.items(): 

1714 try: 

1715 deserialized[key] = self.deserialize_object(value, **kwargs) 

1716 except ValueError: 

1717 deserialized[key] = None 

1718 return deserialized 

1719 

1720 if obj_type == list: 

1721 deserialized = [] 

1722 for obj in attr: 

1723 try: 

1724 deserialized.append(self.deserialize_object(obj, **kwargs)) 

1725 except ValueError: 

1726 pass 

1727 return deserialized 

1728 

1729 else: 

1730 error = "Cannot deserialize generic object with type: " 

1731 raise TypeError(error + str(obj_type)) 

1732 

1733 def deserialize_basic(self, attr, data_type): 

1734 """Deserialize basic builtin data type from string. 

1735 Will attempt to convert to str, int, float and bool. 

1736 This function will also accept '1', '0', 'true' and 'false' as 

1737 valid bool values. 

1738 

1739 :param str attr: response string to be deserialized. 

1740 :param str data_type: deserialization data type. 

1741 :rtype: str, int, float or bool 

1742 :raises: TypeError if string format is not valid. 

1743 """ 

1744 # If we're here, data is supposed to be a basic type. 

1745 # If it's still an XML node, take the text 

1746 if isinstance(attr, ET.Element): 

1747 attr = attr.text 

1748 if not attr: 

1749 if data_type == "str": 

1750 # None or '', node <a/> is empty string. 

1751 return "" 

1752 else: 

1753 # None or '', node <a/> with a strong type is None. 

1754 # Don't try to model "empty bool" or "empty int" 

1755 return None 

1756 

1757 if data_type == "bool": 

1758 if attr in [True, False, 1, 0]: 

1759 return bool(attr) 

1760 elif isinstance(attr, basestring): 

1761 if attr.lower() in ["true", "1"]: 

1762 return True 

1763 elif attr.lower() in ["false", "0"]: 

1764 return False 

1765 raise TypeError("Invalid boolean value: {}".format(attr)) 

1766 

1767 if data_type == "str": 

1768 return self.deserialize_unicode(attr) 

1769 return eval(data_type)(attr) # nosec 

1770 

1771 @staticmethod 

1772 def deserialize_unicode(data): 

1773 """Preserve unicode objects in Python 2, otherwise return data 

1774 as a string. 

1775 

1776 :param str data: response string to be deserialized. 

1777 :rtype: str or unicode 

1778 """ 

1779 # We might be here because we have an enum modeled as string, 

1780 # and we try to deserialize a partial dict with enum inside 

1781 if isinstance(data, Enum): 

1782 return data 

1783 

1784 # Consider this is real string 

1785 try: 

1786 if isinstance(data, unicode): # type: ignore 

1787 return data 

1788 except NameError: 

1789 return str(data) 

1790 else: 

1791 return str(data) 

1792 

1793 @staticmethod 

1794 def deserialize_enum(data, enum_obj): 

1795 """Deserialize string into enum object. 

1796 

1797 If the string is not a valid enum value it will be returned as-is 

1798 and a warning will be logged. 

1799 

1800 :param str data: Response string to be deserialized. If this value is 

1801 None or invalid it will be returned as-is. 

1802 :param Enum enum_obj: Enum object to deserialize to. 

1803 :rtype: Enum 

1804 """ 

1805 if isinstance(data, enum_obj) or data is None: 

1806 return data 

1807 if isinstance(data, Enum): 

1808 data = data.value 

1809 if isinstance(data, int): 

1810 # Workaround. We might consider remove it in the future. 

1811 # https://github.com/Azure/azure-rest-api-specs/issues/141 

1812 try: 

1813 return list(enum_obj.__members__.values())[data] 

1814 except IndexError: 

1815 error = "{!r} is not a valid index for enum {!r}" 

1816 raise DeserializationError(error.format(data, enum_obj)) 

1817 try: 

1818 return enum_obj(str(data)) 

1819 except ValueError: 

1820 for enum_value in enum_obj: 

1821 if enum_value.value.lower() == str(data).lower(): 

1822 return enum_value 

1823 # We don't fail anymore for unknown value, we deserialize as a string 

1824 _LOGGER.warning("Deserializer is not able to find %s as valid enum in %s", data, enum_obj) 

1825 return Deserializer.deserialize_unicode(data) 

1826 

1827 @staticmethod 

1828 def deserialize_bytearray(attr): 

1829 """Deserialize string into bytearray. 

1830 

1831 :param str attr: response string to be deserialized. 

1832 :rtype: bytearray 

1833 :raises: TypeError if string format invalid. 

1834 """ 

1835 if isinstance(attr, ET.Element): 

1836 attr = attr.text 

1837 return bytearray(b64decode(attr)) # type: ignore 

1838 

1839 @staticmethod 

1840 def deserialize_base64(attr): 

1841 """Deserialize base64 encoded string into string. 

1842 

1843 :param str attr: response string to be deserialized. 

1844 :rtype: bytearray 

1845 :raises: TypeError if string format invalid. 

1846 """ 

1847 if isinstance(attr, ET.Element): 

1848 attr = attr.text 

1849 padding = "=" * (3 - (len(attr) + 3) % 4) # type: ignore 

1850 attr = attr + padding # type: ignore 

1851 encoded = attr.replace("-", "+").replace("_", "/") 

1852 return b64decode(encoded) 

1853 

1854 @staticmethod 

1855 def deserialize_decimal(attr): 

1856 """Deserialize string into Decimal object. 

1857 

1858 :param str attr: response string to be deserialized. 

1859 :rtype: Decimal 

1860 :raises: DeserializationError if string format invalid. 

1861 """ 

1862 if isinstance(attr, ET.Element): 

1863 attr = attr.text 

1864 try: 

1865 return decimal.Decimal(attr) # type: ignore 

1866 except decimal.DecimalException as err: 

1867 msg = "Invalid decimal {}".format(attr) 

1868 raise_with_traceback(DeserializationError, msg, err) 

1869 

1870 @staticmethod 

1871 def deserialize_long(attr): 

1872 """Deserialize string into long (Py2) or int (Py3). 

1873 

1874 :param str attr: response string to be deserialized. 

1875 :rtype: long or int 

1876 :raises: ValueError if string format invalid. 

1877 """ 

1878 if isinstance(attr, ET.Element): 

1879 attr = attr.text 

1880 return _long_type(attr) # type: ignore 

1881 

1882 @staticmethod 

1883 def deserialize_duration(attr): 

1884 """Deserialize ISO-8601 formatted string into TimeDelta object. 

1885 

1886 :param str attr: response string to be deserialized. 

1887 :rtype: TimeDelta 

1888 :raises: DeserializationError if string format invalid. 

1889 """ 

1890 if isinstance(attr, ET.Element): 

1891 attr = attr.text 

1892 try: 

1893 duration = isodate.parse_duration(attr) 

1894 except (ValueError, OverflowError, AttributeError) as err: 

1895 msg = "Cannot deserialize duration object." 

1896 raise_with_traceback(DeserializationError, msg, err) 

1897 else: 

1898 return duration 

1899 

1900 @staticmethod 

1901 def deserialize_date(attr): 

1902 """Deserialize ISO-8601 formatted string into Date object. 

1903 

1904 :param str attr: response string to be deserialized. 

1905 :rtype: Date 

1906 :raises: DeserializationError if string format invalid. 

1907 """ 

1908 if isinstance(attr, ET.Element): 

1909 attr = attr.text 

1910 if re.search(r"[^\W\d_]", attr, re.I + re.U): # type: ignore 

1911 raise DeserializationError("Date must have only digits and -. Received: %s" % attr) 

1912 # This must NOT use defaultmonth/defaultday. Using None ensure this raises an exception. 

1913 return isodate.parse_date(attr, defaultmonth=None, defaultday=None) 

1914 

1915 @staticmethod 

1916 def deserialize_time(attr): 

1917 """Deserialize ISO-8601 formatted string into time object. 

1918 

1919 :param str attr: response string to be deserialized. 

1920 :rtype: datetime.time 

1921 :raises: DeserializationError if string format invalid. 

1922 """ 

1923 if isinstance(attr, ET.Element): 

1924 attr = attr.text 

1925 if re.search(r"[^\W\d_]", attr, re.I + re.U): # type: ignore 

1926 raise DeserializationError("Date must have only digits and -. Received: %s" % attr) 

1927 return isodate.parse_time(attr) 

1928 

1929 @staticmethod 

1930 def deserialize_rfc(attr): 

1931 """Deserialize RFC-1123 formatted string into Datetime object. 

1932 

1933 :param str attr: response string to be deserialized. 

1934 :rtype: Datetime 

1935 :raises: DeserializationError if string format invalid. 

1936 """ 

1937 if isinstance(attr, ET.Element): 

1938 attr = attr.text 

1939 try: 

1940 parsed_date = email.utils.parsedate_tz(attr) # type: ignore 

1941 date_obj = datetime.datetime( 

1942 *parsed_date[:6], tzinfo=_FixedOffset(datetime.timedelta(minutes=(parsed_date[9] or 0) / 60)) 

1943 ) 

1944 if not date_obj.tzinfo: 

1945 date_obj = date_obj.astimezone(tz=TZ_UTC) 

1946 except ValueError as err: 

1947 msg = "Cannot deserialize to rfc datetime object." 

1948 raise_with_traceback(DeserializationError, msg, err) 

1949 else: 

1950 return date_obj 

1951 

1952 @staticmethod 

1953 def deserialize_iso(attr): 

1954 """Deserialize ISO-8601 formatted string into Datetime object. 

1955 

1956 :param str attr: response string to be deserialized. 

1957 :rtype: Datetime 

1958 :raises: DeserializationError if string format invalid. 

1959 """ 

1960 if isinstance(attr, ET.Element): 

1961 attr = attr.text 

1962 try: 

1963 attr = attr.upper() # type: ignore 

1964 match = Deserializer.valid_date.match(attr) 

1965 if not match: 

1966 raise ValueError("Invalid datetime string: " + attr) 

1967 

1968 check_decimal = attr.split(".") 

1969 if len(check_decimal) > 1: 

1970 decimal_str = "" 

1971 for digit in check_decimal[1]: 

1972 if digit.isdigit(): 

1973 decimal_str += digit 

1974 else: 

1975 break 

1976 if len(decimal_str) > 6: 

1977 attr = attr.replace(decimal_str, decimal_str[0:6]) 

1978 

1979 date_obj = isodate.parse_datetime(attr) 

1980 test_utc = date_obj.utctimetuple() 

1981 if test_utc.tm_year > 9999 or test_utc.tm_year < 1: 

1982 raise OverflowError("Hit max or min date") 

1983 except (ValueError, OverflowError, AttributeError) as err: 

1984 msg = "Cannot deserialize datetime object." 

1985 raise_with_traceback(DeserializationError, msg, err) 

1986 else: 

1987 return date_obj 

1988 

1989 @staticmethod 

1990 def deserialize_unix(attr): 

1991 """Serialize Datetime object into IntTime format. 

1992 This is represented as seconds. 

1993 

1994 :param int attr: Object to be serialized. 

1995 :rtype: Datetime 

1996 :raises: DeserializationError if format invalid 

1997 """ 

1998 if isinstance(attr, ET.Element): 

1999 attr = int(attr.text) # type: ignore 

2000 try: 

2001 date_obj = datetime.datetime.fromtimestamp(attr, TZ_UTC) 

2002 except ValueError as err: 

2003 msg = "Cannot deserialize to unix datetime object." 

2004 raise_with_traceback(DeserializationError, msg, err) 

2005 else: 

2006 return date_obj