1# --------------------------------------------------------------------------
2#
3# Copyright (c) Microsoft Corporation. All rights reserved.
4#
5# The MIT License (MIT)
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the ""Software""), to
9# deal in the Software without restriction, including without limitation the
10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11# sell copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24#
25# --------------------------------------------------------------------------
26
27# pylint: skip-file
28# pyright: reportUnnecessaryTypeIgnoreComment=false
29
30from base64 import b64decode, b64encode
31import calendar
32import datetime
33import decimal
34import email
35from enum import Enum
36import json
37import logging
38import re
39import sys
40import codecs
41from typing import (
42 Dict,
43 Any,
44 cast,
45 Optional,
46 Union,
47 AnyStr,
48 IO,
49 Mapping,
50 Callable,
51 TypeVar,
52 MutableMapping,
53 Type,
54 List,
55 Mapping,
56)
57
58try:
59 from urllib import quote # type: ignore
60except ImportError:
61 from urllib.parse import quote
62import xml.etree.ElementTree as ET
63
64import isodate # type: ignore
65
66from azure.core.exceptions import DeserializationError, SerializationError, raise_with_traceback
67from azure.core.serialization import NULL as AzureCoreNull
68
69_BOM = codecs.BOM_UTF8.decode(encoding="utf-8")
70
71ModelType = TypeVar("ModelType", bound="Model")
72JSON = MutableMapping[str, Any]
73
74
75class RawDeserializer:
76
77 # Accept "text" because we're open minded people...
78 JSON_REGEXP = re.compile(r"^(application|text)/([a-z+.]+\+)?json$")
79
80 # Name used in context
81 CONTEXT_NAME = "deserialized_data"
82
83 @classmethod
84 def deserialize_from_text(cls, data: Optional[Union[AnyStr, IO]], content_type: Optional[str] = None) -> Any:
85 """Decode data according to content-type.
86
87 Accept a stream of data as well, but will be load at once in memory for now.
88
89 If no content-type, will return the string version (not bytes, not stream)
90
91 :param data: Input, could be bytes or stream (will be decoded with UTF8) or text
92 :type data: str or bytes or IO
93 :param str content_type: The content type.
94 """
95 if hasattr(data, "read"):
96 # Assume a stream
97 data = cast(IO, data).read()
98
99 if isinstance(data, bytes):
100 data_as_str = data.decode(encoding="utf-8-sig")
101 else:
102 # Explain to mypy the correct type.
103 data_as_str = cast(str, data)
104
105 # Remove Byte Order Mark if present in string
106 data_as_str = data_as_str.lstrip(_BOM)
107
108 if content_type is None:
109 return data
110
111 if cls.JSON_REGEXP.match(content_type):
112 try:
113 return json.loads(data_as_str)
114 except ValueError as err:
115 raise DeserializationError("JSON is invalid: {}".format(err), err)
116 elif "xml" in (content_type or []):
117 try:
118
119 try:
120 if isinstance(data, unicode): # type: ignore
121 # If I'm Python 2.7 and unicode XML will scream if I try a "fromstring" on unicode string
122 data_as_str = data_as_str.encode(encoding="utf-8") # type: ignore
123 except NameError:
124 pass
125
126 return ET.fromstring(data_as_str) # nosec
127 except ET.ParseError:
128 # It might be because the server has an issue, and returned JSON with
129 # content-type XML....
130 # So let's try a JSON load, and if it's still broken
131 # let's flow the initial exception
132 def _json_attemp(data):
133 try:
134 return True, json.loads(data)
135 except ValueError:
136 return False, None # Don't care about this one
137
138 success, json_result = _json_attemp(data)
139 if success:
140 return json_result
141 # If i'm here, it's not JSON, it's not XML, let's scream
142 # and raise the last context in this block (the XML exception)
143 # The function hack is because Py2.7 messes up with exception
144 # context otherwise.
145 _LOGGER.critical("Wasn't XML not JSON, failing")
146 raise_with_traceback(DeserializationError, "XML is invalid")
147 raise DeserializationError("Cannot deserialize content-type: {}".format(content_type))
148
149 @classmethod
150 def deserialize_from_http_generics(cls, body_bytes: Optional[Union[AnyStr, IO]], headers: Mapping) -> Any:
151 """Deserialize from HTTP response.
152
153 Use bytes and headers to NOT use any requests/aiohttp or whatever
154 specific implementation.
155 Headers will tested for "content-type"
156 """
157 # Try to use content-type from headers if available
158 content_type = None
159 if "content-type" in headers:
160 content_type = headers["content-type"].split(";")[0].strip().lower()
161 # Ouch, this server did not declare what it sent...
162 # Let's guess it's JSON...
163 # Also, since Autorest was considering that an empty body was a valid JSON,
164 # need that test as well....
165 else:
166 content_type = "application/json"
167
168 if body_bytes:
169 return cls.deserialize_from_text(body_bytes, content_type)
170 return None
171
172
173try:
174 basestring # type: ignore
175 unicode_str = unicode # type: ignore
176except NameError:
177 basestring = str
178 unicode_str = str
179
180_LOGGER = logging.getLogger(__name__)
181
182try:
183 _long_type = long # type: ignore
184except NameError:
185 _long_type = int
186
187
188class UTC(datetime.tzinfo):
189 """Time Zone info for handling UTC"""
190
191 def utcoffset(self, dt):
192 """UTF offset for UTC is 0."""
193 return datetime.timedelta(0)
194
195 def tzname(self, dt):
196 """Timestamp representation."""
197 return "Z"
198
199 def dst(self, dt):
200 """No daylight saving for UTC."""
201 return datetime.timedelta(hours=1)
202
203
204try:
205 from datetime import timezone as _FixedOffset # type: ignore
206except ImportError: # Python 2.7
207
208 class _FixedOffset(datetime.tzinfo): # type: ignore
209 """Fixed offset in minutes east from UTC.
210 Copy/pasted from Python doc
211 :param datetime.timedelta offset: offset in timedelta format
212 """
213
214 def __init__(self, offset):
215 self.__offset = offset
216
217 def utcoffset(self, dt):
218 return self.__offset
219
220 def tzname(self, dt):
221 return str(self.__offset.total_seconds() / 3600)
222
223 def __repr__(self):
224 return "<FixedOffset {}>".format(self.tzname(None))
225
226 def dst(self, dt):
227 return datetime.timedelta(0)
228
229 def __getinitargs__(self):
230 return (self.__offset,)
231
232
233try:
234 from datetime import timezone
235
236 TZ_UTC = timezone.utc
237except ImportError:
238 TZ_UTC = UTC() # type: ignore
239
240_FLATTEN = re.compile(r"(?<!\\)\.")
241
242
243def attribute_transformer(key, attr_desc, value):
244 """A key transformer that returns the Python attribute.
245
246 :param str key: The attribute name
247 :param dict attr_desc: The attribute metadata
248 :param object value: The value
249 :returns: A key using attribute name
250 """
251 return (key, value)
252
253
254def full_restapi_key_transformer(key, attr_desc, value):
255 """A key transformer that returns the full RestAPI key path.
256
257 :param str _: The attribute name
258 :param dict attr_desc: The attribute metadata
259 :param object value: The value
260 :returns: A list of keys using RestAPI syntax.
261 """
262 keys = _FLATTEN.split(attr_desc["key"])
263 return ([_decode_attribute_map_key(k) for k in keys], value)
264
265
266def last_restapi_key_transformer(key, attr_desc, value):
267 """A key transformer that returns the last RestAPI key.
268
269 :param str key: The attribute name
270 :param dict attr_desc: The attribute metadata
271 :param object value: The value
272 :returns: The last RestAPI key.
273 """
274 key, value = full_restapi_key_transformer(key, attr_desc, value)
275 return (key[-1], value)
276
277
278def _create_xml_node(tag, prefix=None, ns=None):
279 """Create a XML node."""
280 if prefix and ns:
281 ET.register_namespace(prefix, ns)
282 if ns:
283 return ET.Element("{" + ns + "}" + tag)
284 else:
285 return ET.Element(tag)
286
287
288class Model(object):
289 """Mixin for all client request body/response body models to support
290 serialization and deserialization.
291 """
292
293 _subtype_map: Dict[str, Dict[str, Any]] = {}
294 _attribute_map: Dict[str, Dict[str, Any]] = {}
295 _validation: Dict[str, Dict[str, Any]] = {}
296
297 def __init__(self, **kwargs: Any) -> None:
298 self.additional_properties: Dict[str, Any] = {}
299 for k in kwargs:
300 if k not in self._attribute_map:
301 _LOGGER.warning("%s is not a known attribute of class %s and will be ignored", k, self.__class__)
302 elif k in self._validation and self._validation[k].get("readonly", False):
303 _LOGGER.warning("Readonly attribute %s will be ignored in class %s", k, self.__class__)
304 else:
305 setattr(self, k, kwargs[k])
306
307 def __eq__(self, other: Any) -> bool:
308 """Compare objects by comparing all attributes."""
309 if isinstance(other, self.__class__):
310 return self.__dict__ == other.__dict__
311 return False
312
313 def __ne__(self, other: Any) -> bool:
314 """Compare objects by comparing all attributes."""
315 return not self.__eq__(other)
316
317 def __str__(self) -> str:
318 return str(self.__dict__)
319
320 @classmethod
321 def enable_additional_properties_sending(cls) -> None:
322 cls._attribute_map["additional_properties"] = {"key": "", "type": "{object}"}
323
324 @classmethod
325 def is_xml_model(cls) -> bool:
326 try:
327 cls._xml_map # type: ignore
328 except AttributeError:
329 return False
330 return True
331
332 @classmethod
333 def _create_xml_node(cls):
334 """Create XML node."""
335 try:
336 xml_map = cls._xml_map # type: ignore
337 except AttributeError:
338 xml_map = {}
339
340 return _create_xml_node(xml_map.get("name", cls.__name__), xml_map.get("prefix", None), xml_map.get("ns", None))
341
342 def serialize(self, keep_readonly: bool = False, **kwargs: Any) -> JSON:
343 """Return the JSON that would be sent to azure from this model.
344
345 This is an alias to `as_dict(full_restapi_key_transformer, keep_readonly=False)`.
346
347 If you want XML serialization, you can pass the kwargs is_xml=True.
348
349 :param bool keep_readonly: If you want to serialize the readonly attributes
350 :returns: A dict JSON compatible object
351 :rtype: dict
352 """
353 serializer = Serializer(self._infer_class_models())
354 return serializer._serialize(self, keep_readonly=keep_readonly, **kwargs)
355
356 def as_dict(
357 self,
358 keep_readonly: bool = True,
359 key_transformer: Callable[[str, Dict[str, Any], Any], Any] = attribute_transformer,
360 **kwargs: Any
361 ) -> JSON:
362 """Return a dict that can be serialized using json.dump.
363
364 Advanced usage might optionally use a callback as parameter:
365
366 .. code::python
367
368 def my_key_transformer(key, attr_desc, value):
369 return key
370
371 Key is the attribute name used in Python. Attr_desc
372 is a dict of metadata. Currently contains 'type' with the
373 msrest type and 'key' with the RestAPI encoded key.
374 Value is the current value in this object.
375
376 The string returned will be used to serialize the key.
377 If the return type is a list, this is considered hierarchical
378 result dict.
379
380 See the three examples in this file:
381
382 - attribute_transformer
383 - full_restapi_key_transformer
384 - last_restapi_key_transformer
385
386 If you want XML serialization, you can pass the kwargs is_xml=True.
387
388 :param function key_transformer: A key transformer function.
389 :returns: A dict JSON compatible object
390 :rtype: dict
391 """
392 serializer = Serializer(self._infer_class_models())
393 return serializer._serialize(self, key_transformer=key_transformer, keep_readonly=keep_readonly, **kwargs)
394
395 @classmethod
396 def _infer_class_models(cls):
397 try:
398 str_models = cls.__module__.rsplit(".", 1)[0]
399 models = sys.modules[str_models]
400 client_models = {k: v for k, v in models.__dict__.items() if isinstance(v, type)}
401 if cls.__name__ not in client_models:
402 raise ValueError("Not Autorest generated code")
403 except Exception:
404 # Assume it's not Autorest generated (tests?). Add ourselves as dependencies.
405 client_models = {cls.__name__: cls}
406 return client_models
407
408 @classmethod
409 def deserialize(cls: Type[ModelType], data: Any, content_type: Optional[str] = None) -> ModelType:
410 """Parse a str using the RestAPI syntax and return a model.
411
412 :param str data: A str using RestAPI structure. JSON by default.
413 :param str content_type: JSON by default, set application/xml if XML.
414 :returns: An instance of this model
415 :raises: DeserializationError if something went wrong
416 """
417 deserializer = Deserializer(cls._infer_class_models())
418 return deserializer(cls.__name__, data, content_type=content_type)
419
420 @classmethod
421 def from_dict(
422 cls: Type[ModelType],
423 data: Any,
424 key_extractors: Optional[Callable[[str, Dict[str, Any], Any], Any]] = None,
425 content_type: Optional[str] = None,
426 ) -> ModelType:
427 """Parse a dict using given key extractor return a model.
428
429 By default consider key
430 extractors (rest_key_case_insensitive_extractor, attribute_key_case_insensitive_extractor
431 and last_rest_key_case_insensitive_extractor)
432
433 :param dict data: A dict using RestAPI structure
434 :param str content_type: JSON by default, set application/xml if XML.
435 :returns: An instance of this model
436 :raises: DeserializationError if something went wrong
437 """
438 deserializer = Deserializer(cls._infer_class_models())
439 deserializer.key_extractors = ( # type: ignore
440 [ # type: ignore
441 attribute_key_case_insensitive_extractor,
442 rest_key_case_insensitive_extractor,
443 last_rest_key_case_insensitive_extractor,
444 ]
445 if key_extractors is None
446 else key_extractors
447 )
448 return deserializer(cls.__name__, data, content_type=content_type)
449
450 @classmethod
451 def _flatten_subtype(cls, key, objects):
452 if "_subtype_map" not in cls.__dict__:
453 return {}
454 result = dict(cls._subtype_map[key])
455 for valuetype in cls._subtype_map[key].values():
456 result.update(objects[valuetype]._flatten_subtype(key, objects))
457 return result
458
459 @classmethod
460 def _classify(cls, response, objects):
461 """Check the class _subtype_map for any child classes.
462 We want to ignore any inherited _subtype_maps.
463 Remove the polymorphic key from the initial data.
464 """
465 for subtype_key in cls.__dict__.get("_subtype_map", {}).keys():
466 subtype_value = None
467
468 if not isinstance(response, ET.Element):
469 rest_api_response_key = cls._get_rest_key_parts(subtype_key)[-1]
470 subtype_value = response.pop(rest_api_response_key, None) or response.pop(subtype_key, None)
471 else:
472 subtype_value = xml_key_extractor(subtype_key, cls._attribute_map[subtype_key], response)
473 if subtype_value:
474 # Try to match base class. Can be class name only
475 # (bug to fix in Autorest to support x-ms-discriminator-name)
476 if cls.__name__ == subtype_value:
477 return cls
478 flatten_mapping_type = cls._flatten_subtype(subtype_key, objects)
479 try:
480 return objects[flatten_mapping_type[subtype_value]] # type: ignore
481 except KeyError:
482 _LOGGER.warning(
483 "Subtype value %s has no mapping, use base class %s.",
484 subtype_value,
485 cls.__name__,
486 )
487 break
488 else:
489 _LOGGER.warning("Discriminator %s is absent or null, use base class %s.", subtype_key, cls.__name__)
490 break
491 return cls
492
493 @classmethod
494 def _get_rest_key_parts(cls, attr_key):
495 """Get the RestAPI key of this attr, split it and decode part
496 :param str attr_key: Attribute key must be in attribute_map.
497 :returns: A list of RestAPI part
498 :rtype: list
499 """
500 rest_split_key = _FLATTEN.split(cls._attribute_map[attr_key]["key"])
501 return [_decode_attribute_map_key(key_part) for key_part in rest_split_key]
502
503
504def _decode_attribute_map_key(key):
505 """This decode a key in an _attribute_map to the actual key we want to look at
506 inside the received data.
507
508 :param str key: A key string from the generated code
509 """
510 return key.replace("\\.", ".")
511
512
513class Serializer(object):
514 """Request object model serializer."""
515
516 basic_types = {str: "str", int: "int", bool: "bool", float: "float"}
517
518 _xml_basic_types_serializers = {"bool": lambda x: str(x).lower()}
519 days = {0: "Mon", 1: "Tue", 2: "Wed", 3: "Thu", 4: "Fri", 5: "Sat", 6: "Sun"}
520 months = {
521 1: "Jan",
522 2: "Feb",
523 3: "Mar",
524 4: "Apr",
525 5: "May",
526 6: "Jun",
527 7: "Jul",
528 8: "Aug",
529 9: "Sep",
530 10: "Oct",
531 11: "Nov",
532 12: "Dec",
533 }
534 validation = {
535 "min_length": lambda x, y: len(x) < y,
536 "max_length": lambda x, y: len(x) > y,
537 "minimum": lambda x, y: x < y,
538 "maximum": lambda x, y: x > y,
539 "minimum_ex": lambda x, y: x <= y,
540 "maximum_ex": lambda x, y: x >= y,
541 "min_items": lambda x, y: len(x) < y,
542 "max_items": lambda x, y: len(x) > y,
543 "pattern": lambda x, y: not re.match(y, x, re.UNICODE),
544 "unique": lambda x, y: len(x) != len(set(x)),
545 "multiple": lambda x, y: x % y != 0,
546 }
547
548 def __init__(self, classes: Optional[Mapping[str, Type[ModelType]]] = None):
549 self.serialize_type = {
550 "iso-8601": Serializer.serialize_iso,
551 "rfc-1123": Serializer.serialize_rfc,
552 "unix-time": Serializer.serialize_unix,
553 "duration": Serializer.serialize_duration,
554 "date": Serializer.serialize_date,
555 "time": Serializer.serialize_time,
556 "decimal": Serializer.serialize_decimal,
557 "long": Serializer.serialize_long,
558 "bytearray": Serializer.serialize_bytearray,
559 "base64": Serializer.serialize_base64,
560 "object": self.serialize_object,
561 "[]": self.serialize_iter,
562 "{}": self.serialize_dict,
563 }
564 self.dependencies: Dict[str, Type[ModelType]] = dict(classes) if classes else {}
565 self.key_transformer = full_restapi_key_transformer
566 self.client_side_validation = True
567
568 def _serialize(self, target_obj, data_type=None, **kwargs):
569 """Serialize data into a string according to type.
570
571 :param target_obj: The data to be serialized.
572 :param str data_type: The type to be serialized from.
573 :rtype: str, dict
574 :raises: SerializationError if serialization fails.
575 """
576 key_transformer = kwargs.get("key_transformer", self.key_transformer)
577 keep_readonly = kwargs.get("keep_readonly", False)
578 if target_obj is None:
579 return None
580
581 attr_name = None
582 class_name = target_obj.__class__.__name__
583
584 if data_type:
585 return self.serialize_data(target_obj, data_type, **kwargs)
586
587 if not hasattr(target_obj, "_attribute_map"):
588 data_type = type(target_obj).__name__
589 if data_type in self.basic_types.values():
590 return self.serialize_data(target_obj, data_type, **kwargs)
591
592 # Force "is_xml" kwargs if we detect a XML model
593 try:
594 is_xml_model_serialization = kwargs["is_xml"]
595 except KeyError:
596 is_xml_model_serialization = kwargs.setdefault("is_xml", target_obj.is_xml_model())
597
598 serialized = {}
599 if is_xml_model_serialization:
600 serialized = target_obj._create_xml_node()
601 try:
602 attributes = target_obj._attribute_map
603 for attr, attr_desc in attributes.items():
604 attr_name = attr
605 if not keep_readonly and target_obj._validation.get(attr_name, {}).get("readonly", False):
606 continue
607
608 if attr_name == "additional_properties" and attr_desc["key"] == "":
609 if target_obj.additional_properties is not None:
610 serialized.update(target_obj.additional_properties)
611 continue
612 try:
613
614 orig_attr = getattr(target_obj, attr)
615 if is_xml_model_serialization:
616 pass # Don't provide "transformer" for XML for now. Keep "orig_attr"
617 else: # JSON
618 keys, orig_attr = key_transformer(attr, attr_desc.copy(), orig_attr)
619 keys = keys if isinstance(keys, list) else [keys]
620
621 kwargs["serialization_ctxt"] = attr_desc
622 new_attr = self.serialize_data(orig_attr, attr_desc["type"], **kwargs)
623
624 if is_xml_model_serialization:
625 xml_desc = attr_desc.get("xml", {})
626 xml_name = xml_desc.get("name", attr_desc["key"])
627 xml_prefix = xml_desc.get("prefix", None)
628 xml_ns = xml_desc.get("ns", None)
629 if xml_desc.get("attr", False):
630 if xml_ns:
631 ET.register_namespace(xml_prefix, xml_ns)
632 xml_name = "{{{}}}{}".format(xml_ns, xml_name)
633 serialized.set(xml_name, new_attr) # type: ignore
634 continue
635 if xml_desc.get("text", False):
636 serialized.text = new_attr # type: ignore
637 continue
638 if isinstance(new_attr, list):
639 serialized.extend(new_attr) # type: ignore
640 elif isinstance(new_attr, ET.Element):
641 # If the down XML has no XML/Name, we MUST replace the tag with the local tag. But keeping the namespaces.
642 if "name" not in getattr(orig_attr, "_xml_map", {}):
643 splitted_tag = new_attr.tag.split("}")
644 if len(splitted_tag) == 2: # Namespace
645 new_attr.tag = "}".join([splitted_tag[0], xml_name])
646 else:
647 new_attr.tag = xml_name
648 serialized.append(new_attr) # type: ignore
649 else: # That's a basic type
650 # Integrate namespace if necessary
651 local_node = _create_xml_node(xml_name, xml_prefix, xml_ns)
652 local_node.text = unicode_str(new_attr)
653 serialized.append(local_node) # type: ignore
654 else: # JSON
655 for k in reversed(keys): # type: ignore
656 new_attr = {k: new_attr}
657
658 _new_attr = new_attr
659 _serialized = serialized
660 for k in keys: # type: ignore
661 if k not in _serialized:
662 _serialized.update(_new_attr) # type: ignore
663 _new_attr = _new_attr[k] # type: ignore
664 _serialized = _serialized[k]
665 except ValueError as err:
666 if isinstance(err, SerializationError):
667 raise
668
669 except (AttributeError, KeyError, TypeError) as err:
670 msg = "Attribute {} in object {} cannot be serialized.\n{}".format(attr_name, class_name, str(target_obj))
671 raise_with_traceback(SerializationError, msg, err)
672 else:
673 return serialized
674
675 def body(self, data, data_type, **kwargs):
676 """Serialize data intended for a request body.
677
678 :param data: The data to be serialized.
679 :param str data_type: The type to be serialized from.
680 :rtype: dict
681 :raises: SerializationError if serialization fails.
682 :raises: ValueError if data is None
683 """
684
685 # Just in case this is a dict
686 internal_data_type_str = data_type.strip("[]{}")
687 internal_data_type = self.dependencies.get(internal_data_type_str, None)
688 try:
689 is_xml_model_serialization = kwargs["is_xml"]
690 except KeyError:
691 if internal_data_type and issubclass(internal_data_type, Model):
692 is_xml_model_serialization = kwargs.setdefault("is_xml", internal_data_type.is_xml_model())
693 else:
694 is_xml_model_serialization = False
695 if internal_data_type and not isinstance(internal_data_type, Enum):
696 try:
697 deserializer = Deserializer(self.dependencies)
698 # Since it's on serialization, it's almost sure that format is not JSON REST
699 # We're not able to deal with additional properties for now.
700 deserializer.additional_properties_detection = False
701 if is_xml_model_serialization:
702 deserializer.key_extractors = [ # type: ignore
703 attribute_key_case_insensitive_extractor,
704 ]
705 else:
706 deserializer.key_extractors = [
707 rest_key_case_insensitive_extractor,
708 attribute_key_case_insensitive_extractor,
709 last_rest_key_case_insensitive_extractor,
710 ]
711 data = deserializer._deserialize(data_type, data)
712 except DeserializationError as err:
713 raise_with_traceback(SerializationError, "Unable to build a model: " + str(err), err)
714
715 return self._serialize(data, data_type, **kwargs)
716
717 def url(self, name, data, data_type, **kwargs):
718 """Serialize data intended for a URL path.
719
720 :param data: The data to be serialized.
721 :param str data_type: The type to be serialized from.
722 :rtype: str
723 :raises: TypeError if serialization fails.
724 :raises: ValueError if data is None
725 """
726 try:
727 output = self.serialize_data(data, data_type, **kwargs)
728 if data_type == "bool":
729 output = json.dumps(output)
730
731 if kwargs.get("skip_quote") is True:
732 output = str(output)
733 else:
734 output = quote(str(output), safe="")
735 except SerializationError:
736 raise TypeError("{} must be type {}.".format(name, data_type))
737 else:
738 return output
739
740 def query(self, name, data, data_type, **kwargs):
741 """Serialize data intended for a URL query.
742
743 :param data: The data to be serialized.
744 :param str data_type: The type to be serialized from.
745 :keyword bool skip_quote: Whether to skip quote the serialized result.
746 Defaults to False.
747 :rtype: str
748 :raises: TypeError if serialization fails.
749 :raises: ValueError if data is None
750 """
751 try:
752 # Treat the list aside, since we don't want to encode the div separator
753 if data_type.startswith("["):
754 internal_data_type = data_type[1:-1]
755 do_quote = not kwargs.get("skip_quote", False)
756 return str(self.serialize_iter(data, internal_data_type, do_quote=do_quote, **kwargs))
757
758 # Not a list, regular serialization
759 output = self.serialize_data(data, data_type, **kwargs)
760 if data_type == "bool":
761 output = json.dumps(output)
762 if kwargs.get("skip_quote") is True:
763 output = str(output)
764 else:
765 output = quote(str(output), safe="")
766 except SerializationError:
767 raise TypeError("{} must be type {}.".format(name, data_type))
768 else:
769 return str(output)
770
771 def header(self, name, data, data_type, **kwargs):
772 """Serialize data intended for a request header.
773
774 :param data: The data to be serialized.
775 :param str data_type: The type to be serialized from.
776 :rtype: str
777 :raises: TypeError if serialization fails.
778 :raises: ValueError if data is None
779 """
780 try:
781 if data_type in ["[str]"]:
782 data = ["" if d is None else d for d in data]
783
784 output = self.serialize_data(data, data_type, **kwargs)
785 if data_type == "bool":
786 output = json.dumps(output)
787 except SerializationError:
788 raise TypeError("{} must be type {}.".format(name, data_type))
789 else:
790 return str(output)
791
792 def serialize_data(self, data, data_type, **kwargs):
793 """Serialize generic data according to supplied data type.
794
795 :param data: The data to be serialized.
796 :param str data_type: The type to be serialized from.
797 :param bool required: Whether it's essential that the data not be
798 empty or None
799 :raises: AttributeError if required data is None.
800 :raises: ValueError if data is None
801 :raises: SerializationError if serialization fails.
802 """
803 if data is None:
804 raise ValueError("No value for given attribute")
805
806 try:
807 if data is AzureCoreNull:
808 return None
809 if data_type in self.basic_types.values():
810 return self.serialize_basic(data, data_type, **kwargs)
811
812 elif data_type in self.serialize_type:
813 return self.serialize_type[data_type](data, **kwargs)
814
815 # If dependencies is empty, try with current data class
816 # It has to be a subclass of Enum anyway
817 enum_type = self.dependencies.get(data_type, data.__class__)
818 if issubclass(enum_type, Enum):
819 return Serializer.serialize_enum(data, enum_obj=enum_type)
820
821 iter_type = data_type[0] + data_type[-1]
822 if iter_type in self.serialize_type:
823 return self.serialize_type[iter_type](data, data_type[1:-1], **kwargs)
824
825 except (ValueError, TypeError) as err:
826 msg = "Unable to serialize value: {!r} as type: {!r}."
827 raise_with_traceback(SerializationError, msg.format(data, data_type), err)
828 else:
829 return self._serialize(data, **kwargs)
830
831 @classmethod
832 def _get_custom_serializers(cls, data_type, **kwargs):
833 custom_serializer = kwargs.get("basic_types_serializers", {}).get(data_type)
834 if custom_serializer:
835 return custom_serializer
836 if kwargs.get("is_xml", False):
837 return cls._xml_basic_types_serializers.get(data_type)
838
839 @classmethod
840 def serialize_basic(cls, data, data_type, **kwargs):
841 """Serialize basic builting data type.
842 Serializes objects to str, int, float or bool.
843
844 Possible kwargs:
845 - basic_types_serializers dict[str, callable] : If set, use the callable as serializer
846 - is_xml bool : If set, use xml_basic_types_serializers
847
848 :param data: Object to be serialized.
849 :param str data_type: Type of object in the iterable.
850 """
851 custom_serializer = cls._get_custom_serializers(data_type, **kwargs)
852 if custom_serializer:
853 return custom_serializer(data)
854 if data_type == "str":
855 return cls.serialize_unicode(data)
856 return eval(data_type)(data) # nosec
857
858 @classmethod
859 def serialize_unicode(cls, data):
860 """Special handling for serializing unicode strings in Py2.
861 Encode to UTF-8 if unicode, otherwise handle as a str.
862
863 :param data: Object to be serialized.
864 :rtype: str
865 """
866 try: # If I received an enum, return its value
867 return data.value
868 except AttributeError:
869 pass
870
871 try:
872 if isinstance(data, unicode): # type: ignore
873 # Don't change it, JSON and XML ElementTree are totally able
874 # to serialize correctly u'' strings
875 return data
876 except NameError:
877 return str(data)
878 else:
879 return str(data)
880
881 def serialize_iter(self, data, iter_type, div=None, **kwargs):
882 """Serialize iterable.
883
884 Supported kwargs:
885 - serialization_ctxt dict : The current entry of _attribute_map, or same format.
886 serialization_ctxt['type'] should be same as data_type.
887 - is_xml bool : If set, serialize as XML
888
889 :param list attr: Object to be serialized.
890 :param str iter_type: Type of object in the iterable.
891 :param bool required: Whether the objects in the iterable must
892 not be None or empty.
893 :param str div: If set, this str will be used to combine the elements
894 in the iterable into a combined string. Default is 'None'.
895 :keyword bool do_quote: Whether to quote the serialized result of each iterable element.
896 Defaults to False.
897 :rtype: list, str
898 """
899 if isinstance(data, str):
900 raise SerializationError("Refuse str type as a valid iter type.")
901
902 serialization_ctxt = kwargs.get("serialization_ctxt", {})
903 is_xml = kwargs.get("is_xml", False)
904
905 serialized = []
906 for d in data:
907 try:
908 serialized.append(self.serialize_data(d, iter_type, **kwargs))
909 except ValueError as err:
910 if isinstance(err, SerializationError):
911 raise
912 serialized.append(None)
913
914 if kwargs.get("do_quote", False):
915 serialized = ["" if s is None else quote(str(s), safe="") for s in serialized]
916
917 if div:
918 serialized = ["" if s is None else str(s) for s in serialized]
919 serialized = div.join(serialized)
920
921 if "xml" in serialization_ctxt or is_xml:
922 # XML serialization is more complicated
923 xml_desc = serialization_ctxt.get("xml", {})
924 xml_name = xml_desc.get("name")
925 if not xml_name:
926 xml_name = serialization_ctxt["key"]
927
928 # Create a wrap node if necessary (use the fact that Element and list have "append")
929 is_wrapped = xml_desc.get("wrapped", False)
930 node_name = xml_desc.get("itemsName", xml_name)
931 if is_wrapped:
932 final_result = _create_xml_node(xml_name, xml_desc.get("prefix", None), xml_desc.get("ns", None))
933 else:
934 final_result = []
935 # All list elements to "local_node"
936 for el in serialized:
937 if isinstance(el, ET.Element):
938 el_node = el
939 else:
940 el_node = _create_xml_node(node_name, xml_desc.get("prefix", None), xml_desc.get("ns", None))
941 if el is not None: # Otherwise it writes "None" :-p
942 el_node.text = str(el)
943 final_result.append(el_node)
944 return final_result
945 return serialized
946
947 def serialize_dict(self, attr, dict_type, **kwargs):
948 """Serialize a dictionary of objects.
949
950 :param dict attr: Object to be serialized.
951 :param str dict_type: Type of object in the dictionary.
952 :param bool required: Whether the objects in the dictionary must
953 not be None or empty.
954 :rtype: dict
955 """
956 serialization_ctxt = kwargs.get("serialization_ctxt", {})
957 serialized = {}
958 for key, value in attr.items():
959 try:
960 serialized[self.serialize_unicode(key)] = self.serialize_data(value, dict_type, **kwargs)
961 except ValueError as err:
962 if isinstance(err, SerializationError):
963 raise
964 serialized[self.serialize_unicode(key)] = None
965
966 if "xml" in serialization_ctxt:
967 # XML serialization is more complicated
968 xml_desc = serialization_ctxt["xml"]
969 xml_name = xml_desc["name"]
970
971 final_result = _create_xml_node(xml_name, xml_desc.get("prefix", None), xml_desc.get("ns", None))
972 for key, value in serialized.items():
973 ET.SubElement(final_result, key).text = value
974 return final_result
975
976 return serialized
977
978 def serialize_object(self, attr, **kwargs):
979 """Serialize a generic object.
980 This will be handled as a dictionary. If object passed in is not
981 a basic type (str, int, float, dict, list) it will simply be
982 cast to str.
983
984 :param dict attr: Object to be serialized.
985 :rtype: dict or str
986 """
987 if attr is None:
988 return None
989 if isinstance(attr, ET.Element):
990 return attr
991 obj_type = type(attr)
992 if obj_type in self.basic_types:
993 return self.serialize_basic(attr, self.basic_types[obj_type], **kwargs)
994 if obj_type is _long_type:
995 return self.serialize_long(attr)
996 if obj_type is unicode_str:
997 return self.serialize_unicode(attr)
998 if obj_type is datetime.datetime:
999 return self.serialize_iso(attr)
1000 if obj_type is datetime.date:
1001 return self.serialize_date(attr)
1002 if obj_type is datetime.time:
1003 return self.serialize_time(attr)
1004 if obj_type is datetime.timedelta:
1005 return self.serialize_duration(attr)
1006 if obj_type is decimal.Decimal:
1007 return self.serialize_decimal(attr)
1008
1009 # If it's a model or I know this dependency, serialize as a Model
1010 elif obj_type in self.dependencies.values() or isinstance(attr, Model):
1011 return self._serialize(attr)
1012
1013 if obj_type == dict:
1014 serialized = {}
1015 for key, value in attr.items():
1016 try:
1017 serialized[self.serialize_unicode(key)] = self.serialize_object(value, **kwargs)
1018 except ValueError:
1019 serialized[self.serialize_unicode(key)] = None
1020 return serialized
1021
1022 if obj_type == list:
1023 serialized = []
1024 for obj in attr:
1025 try:
1026 serialized.append(self.serialize_object(obj, **kwargs))
1027 except ValueError:
1028 pass
1029 return serialized
1030 return str(attr)
1031
1032 @staticmethod
1033 def serialize_enum(attr, enum_obj=None):
1034 try:
1035 result = attr.value
1036 except AttributeError:
1037 result = attr
1038 try:
1039 enum_obj(result) # type: ignore
1040 return result
1041 except ValueError:
1042 for enum_value in enum_obj: # type: ignore
1043 if enum_value.value.lower() == str(attr).lower():
1044 return enum_value.value
1045 error = "{!r} is not valid value for enum {!r}"
1046 raise SerializationError(error.format(attr, enum_obj))
1047
1048 @staticmethod
1049 def serialize_bytearray(attr, **kwargs):
1050 """Serialize bytearray into base-64 string.
1051
1052 :param attr: Object to be serialized.
1053 :rtype: str
1054 """
1055 return b64encode(attr).decode()
1056
1057 @staticmethod
1058 def serialize_base64(attr, **kwargs):
1059 """Serialize str into base-64 string.
1060
1061 :param attr: Object to be serialized.
1062 :rtype: str
1063 """
1064 encoded = b64encode(attr).decode("ascii")
1065 return encoded.strip("=").replace("+", "-").replace("/", "_")
1066
1067 @staticmethod
1068 def serialize_decimal(attr, **kwargs):
1069 """Serialize Decimal object to float.
1070
1071 :param attr: Object to be serialized.
1072 :rtype: float
1073 """
1074 return float(attr)
1075
1076 @staticmethod
1077 def serialize_long(attr, **kwargs):
1078 """Serialize long (Py2) or int (Py3).
1079
1080 :param attr: Object to be serialized.
1081 :rtype: int/long
1082 """
1083 return _long_type(attr)
1084
1085 @staticmethod
1086 def serialize_date(attr, **kwargs):
1087 """Serialize Date object into ISO-8601 formatted string.
1088
1089 :param Date attr: Object to be serialized.
1090 :rtype: str
1091 """
1092 if isinstance(attr, str):
1093 attr = isodate.parse_date(attr)
1094 t = "{:04}-{:02}-{:02}".format(attr.year, attr.month, attr.day)
1095 return t
1096
1097 @staticmethod
1098 def serialize_time(attr, **kwargs):
1099 """Serialize Time object into ISO-8601 formatted string.
1100
1101 :param datetime.time attr: Object to be serialized.
1102 :rtype: str
1103 """
1104 if isinstance(attr, str):
1105 attr = isodate.parse_time(attr)
1106 t = "{:02}:{:02}:{:02}".format(attr.hour, attr.minute, attr.second)
1107 if attr.microsecond:
1108 t += ".{:02}".format(attr.microsecond)
1109 return t
1110
1111 @staticmethod
1112 def serialize_duration(attr, **kwargs):
1113 """Serialize TimeDelta object into ISO-8601 formatted string.
1114
1115 :param TimeDelta attr: Object to be serialized.
1116 :rtype: str
1117 """
1118 if isinstance(attr, str):
1119 attr = isodate.parse_duration(attr)
1120 return isodate.duration_isoformat(attr)
1121
1122 @staticmethod
1123 def serialize_rfc(attr, **kwargs):
1124 """Serialize Datetime object into RFC-1123 formatted string.
1125
1126 :param Datetime attr: Object to be serialized.
1127 :rtype: str
1128 :raises: TypeError if format invalid.
1129 """
1130 try:
1131 if not attr.tzinfo:
1132 _LOGGER.warning("Datetime with no tzinfo will be considered UTC.")
1133 utc = attr.utctimetuple()
1134 except AttributeError:
1135 raise TypeError("RFC1123 object must be valid Datetime object.")
1136
1137 return "{}, {:02} {} {:04} {:02}:{:02}:{:02} GMT".format(
1138 Serializer.days[utc.tm_wday],
1139 utc.tm_mday,
1140 Serializer.months[utc.tm_mon],
1141 utc.tm_year,
1142 utc.tm_hour,
1143 utc.tm_min,
1144 utc.tm_sec,
1145 )
1146
1147 @staticmethod
1148 def serialize_iso(attr, **kwargs):
1149 """Serialize Datetime object into ISO-8601 formatted string.
1150
1151 :param Datetime attr: Object to be serialized.
1152 :rtype: str
1153 :raises: SerializationError if format invalid.
1154 """
1155 if isinstance(attr, str):
1156 attr = isodate.parse_datetime(attr)
1157 try:
1158 if not attr.tzinfo:
1159 _LOGGER.warning("Datetime with no tzinfo will be considered UTC.")
1160 utc = attr.utctimetuple()
1161 if utc.tm_year > 9999 or utc.tm_year < 1:
1162 raise OverflowError("Hit max or min date")
1163
1164 microseconds = str(attr.microsecond).rjust(6, "0").rstrip("0").ljust(3, "0")
1165 if microseconds:
1166 microseconds = "." + microseconds
1167 date = "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}".format(
1168 utc.tm_year, utc.tm_mon, utc.tm_mday, utc.tm_hour, utc.tm_min, utc.tm_sec
1169 )
1170 return date + microseconds + "Z"
1171 except (ValueError, OverflowError) as err:
1172 msg = "Unable to serialize datetime object."
1173 raise_with_traceback(SerializationError, msg, err)
1174 except AttributeError as err:
1175 msg = "ISO-8601 object must be valid Datetime object."
1176 raise_with_traceback(TypeError, msg, err)
1177
1178 @staticmethod
1179 def serialize_unix(attr, **kwargs):
1180 """Serialize Datetime object into IntTime format.
1181 This is represented as seconds.
1182
1183 :param Datetime attr: Object to be serialized.
1184 :rtype: int
1185 :raises: SerializationError if format invalid
1186 """
1187 if isinstance(attr, int):
1188 return attr
1189 try:
1190 if not attr.tzinfo:
1191 _LOGGER.warning("Datetime with no tzinfo will be considered UTC.")
1192 return int(calendar.timegm(attr.utctimetuple()))
1193 except AttributeError:
1194 raise TypeError("Unix time object must be valid Datetime object.")
1195
1196
1197def rest_key_extractor(attr, attr_desc, data):
1198 key = attr_desc["key"]
1199 working_data = data
1200
1201 while "." in key:
1202 # Need the cast, as for some reasons "split" is typed as list[str | Any]
1203 dict_keys = cast(List[str], _FLATTEN.split(key))
1204 if len(dict_keys) == 1:
1205 key = _decode_attribute_map_key(dict_keys[0])
1206 break
1207 working_key = _decode_attribute_map_key(dict_keys[0])
1208 working_data = working_data.get(working_key, data)
1209 if working_data is None:
1210 # If at any point while following flatten JSON path see None, it means
1211 # that all properties under are None as well
1212 # https://github.com/Azure/msrest-for-python/issues/197
1213 return None
1214 key = ".".join(dict_keys[1:])
1215
1216 return working_data.get(key)
1217
1218
1219def rest_key_case_insensitive_extractor(attr, attr_desc, data):
1220 key = attr_desc["key"]
1221 working_data = data
1222
1223 while "." in key:
1224 dict_keys = _FLATTEN.split(key)
1225 if len(dict_keys) == 1:
1226 key = _decode_attribute_map_key(dict_keys[0])
1227 break
1228 working_key = _decode_attribute_map_key(dict_keys[0])
1229 working_data = attribute_key_case_insensitive_extractor(working_key, None, working_data)
1230 if working_data is None:
1231 # If at any point while following flatten JSON path see None, it means
1232 # that all properties under are None as well
1233 # https://github.com/Azure/msrest-for-python/issues/197
1234 return None
1235 key = ".".join(dict_keys[1:])
1236
1237 if working_data:
1238 return attribute_key_case_insensitive_extractor(key, None, working_data)
1239
1240
1241def last_rest_key_extractor(attr, attr_desc, data):
1242 """Extract the attribute in "data" based on the last part of the JSON path key."""
1243 key = attr_desc["key"]
1244 dict_keys = _FLATTEN.split(key)
1245 return attribute_key_extractor(dict_keys[-1], None, data)
1246
1247
1248def last_rest_key_case_insensitive_extractor(attr, attr_desc, data):
1249 """Extract the attribute in "data" based on the last part of the JSON path key.
1250
1251 This is the case insensitive version of "last_rest_key_extractor"
1252 """
1253 key = attr_desc["key"]
1254 dict_keys = _FLATTEN.split(key)
1255 return attribute_key_case_insensitive_extractor(dict_keys[-1], None, data)
1256
1257
1258def attribute_key_extractor(attr, _, data):
1259 return data.get(attr)
1260
1261
1262def attribute_key_case_insensitive_extractor(attr, _, data):
1263 found_key = None
1264 lower_attr = attr.lower()
1265 for key in data:
1266 if lower_attr == key.lower():
1267 found_key = key
1268 break
1269
1270 return data.get(found_key)
1271
1272
1273def _extract_name_from_internal_type(internal_type):
1274 """Given an internal type XML description, extract correct XML name with namespace.
1275
1276 :param dict internal_type: An model type
1277 :rtype: tuple
1278 :returns: A tuple XML name + namespace dict
1279 """
1280 internal_type_xml_map = getattr(internal_type, "_xml_map", {})
1281 xml_name = internal_type_xml_map.get("name", internal_type.__name__)
1282 xml_ns = internal_type_xml_map.get("ns", None)
1283 if xml_ns:
1284 xml_name = "{{{}}}{}".format(xml_ns, xml_name)
1285 return xml_name
1286
1287
1288def xml_key_extractor(attr, attr_desc, data):
1289 if isinstance(data, dict):
1290 return None
1291
1292 # Test if this model is XML ready first
1293 if not isinstance(data, ET.Element):
1294 return None
1295
1296 xml_desc = attr_desc.get("xml", {})
1297 xml_name = xml_desc.get("name", attr_desc["key"])
1298
1299 # Look for a children
1300 is_iter_type = attr_desc["type"].startswith("[")
1301 is_wrapped = xml_desc.get("wrapped", False)
1302 internal_type = attr_desc.get("internalType", None)
1303 internal_type_xml_map = getattr(internal_type, "_xml_map", {})
1304
1305 # Integrate namespace if necessary
1306 xml_ns = xml_desc.get("ns", internal_type_xml_map.get("ns", None))
1307 if xml_ns:
1308 xml_name = "{{{}}}{}".format(xml_ns, xml_name)
1309
1310 # If it's an attribute, that's simple
1311 if xml_desc.get("attr", False):
1312 return data.get(xml_name)
1313
1314 # If it's x-ms-text, that's simple too
1315 if xml_desc.get("text", False):
1316 return data.text
1317
1318 # Scenario where I take the local name:
1319 # - Wrapped node
1320 # - Internal type is an enum (considered basic types)
1321 # - Internal type has no XML/Name node
1322 if is_wrapped or (internal_type and (issubclass(internal_type, Enum) or "name" not in internal_type_xml_map)):
1323 children = data.findall(xml_name)
1324 # If internal type has a local name and it's not a list, I use that name
1325 elif not is_iter_type and internal_type and "name" in internal_type_xml_map:
1326 xml_name = _extract_name_from_internal_type(internal_type)
1327 children = data.findall(xml_name)
1328 # That's an array
1329 else:
1330 if internal_type: # Complex type, ignore itemsName and use the complex type name
1331 items_name = _extract_name_from_internal_type(internal_type)
1332 else:
1333 items_name = xml_desc.get("itemsName", xml_name)
1334 children = data.findall(items_name)
1335
1336 if len(children) == 0:
1337 if is_iter_type:
1338 if is_wrapped:
1339 return None # is_wrapped no node, we want None
1340 else:
1341 return [] # not wrapped, assume empty list
1342 return None # Assume it's not there, maybe an optional node.
1343
1344 # If is_iter_type and not wrapped, return all found children
1345 if is_iter_type:
1346 if not is_wrapped:
1347 return children
1348 else: # Iter and wrapped, should have found one node only (the wrap one)
1349 if len(children) != 1:
1350 raise DeserializationError(
1351 "Tried to deserialize an array not wrapped, and found several nodes '{}'. Maybe you should declare this array as wrapped?".format(
1352 xml_name
1353 )
1354 )
1355 return list(children[0]) # Might be empty list and that's ok.
1356
1357 # Here it's not a itertype, we should have found one element only or empty
1358 if len(children) > 1:
1359 raise DeserializationError("Find several XML '{}' where it was not expected".format(xml_name))
1360 return children[0]
1361
1362
1363class Deserializer(object):
1364 """Response object model deserializer.
1365
1366 :param dict classes: Class type dictionary for deserializing complex types.
1367 :ivar list key_extractors: Ordered list of extractors to be used by this deserializer.
1368 """
1369
1370 basic_types = {str: "str", int: "int", bool: "bool", float: "float"}
1371
1372 valid_date = re.compile(r"\d{4}[-]\d{2}[-]\d{2}T\d{2}:\d{2}:\d{2}" r"\.?\d*Z?[-+]?[\d{2}]?:?[\d{2}]?")
1373
1374 def __init__(self, classes: Optional[Mapping[str, Type[ModelType]]] = None):
1375 self.deserialize_type = {
1376 "iso-8601": Deserializer.deserialize_iso,
1377 "rfc-1123": Deserializer.deserialize_rfc,
1378 "unix-time": Deserializer.deserialize_unix,
1379 "duration": Deserializer.deserialize_duration,
1380 "date": Deserializer.deserialize_date,
1381 "time": Deserializer.deserialize_time,
1382 "decimal": Deserializer.deserialize_decimal,
1383 "long": Deserializer.deserialize_long,
1384 "bytearray": Deserializer.deserialize_bytearray,
1385 "base64": Deserializer.deserialize_base64,
1386 "object": self.deserialize_object,
1387 "[]": self.deserialize_iter,
1388 "{}": self.deserialize_dict,
1389 }
1390 self.deserialize_expected_types = {
1391 "duration": (isodate.Duration, datetime.timedelta),
1392 "iso-8601": (datetime.datetime),
1393 }
1394 self.dependencies: Dict[str, Type[ModelType]] = dict(classes) if classes else {}
1395 self.key_extractors = [rest_key_extractor, xml_key_extractor]
1396 # Additional properties only works if the "rest_key_extractor" is used to
1397 # extract the keys. Making it to work whatever the key extractor is too much
1398 # complicated, with no real scenario for now.
1399 # So adding a flag to disable additional properties detection. This flag should be
1400 # used if your expect the deserialization to NOT come from a JSON REST syntax.
1401 # Otherwise, result are unexpected
1402 self.additional_properties_detection = True
1403
1404 def __call__(self, target_obj, response_data, content_type=None):
1405 """Call the deserializer to process a REST response.
1406
1407 :param str target_obj: Target data type to deserialize to.
1408 :param requests.Response response_data: REST response object.
1409 :param str content_type: Swagger "produces" if available.
1410 :raises: DeserializationError if deserialization fails.
1411 :return: Deserialized object.
1412 """
1413 data = self._unpack_content(response_data, content_type)
1414 return self._deserialize(target_obj, data)
1415
1416 def _deserialize(self, target_obj, data):
1417 """Call the deserializer on a model.
1418
1419 Data needs to be already deserialized as JSON or XML ElementTree
1420
1421 :param str target_obj: Target data type to deserialize to.
1422 :param object data: Object to deserialize.
1423 :raises: DeserializationError if deserialization fails.
1424 :return: Deserialized object.
1425 """
1426 # This is already a model, go recursive just in case
1427 if hasattr(data, "_attribute_map"):
1428 constants = [name for name, config in getattr(data, "_validation", {}).items() if config.get("constant")]
1429 try:
1430 for attr, mapconfig in data._attribute_map.items():
1431 if attr in constants:
1432 continue
1433 value = getattr(data, attr)
1434 if value is None:
1435 continue
1436 local_type = mapconfig["type"]
1437 internal_data_type = local_type.strip("[]{}")
1438 if internal_data_type not in self.dependencies or isinstance(internal_data_type, Enum):
1439 continue
1440 setattr(data, attr, self._deserialize(local_type, value))
1441 return data
1442 except AttributeError:
1443 return
1444
1445 response, class_name = self._classify_target(target_obj, data)
1446
1447 if isinstance(response, basestring):
1448 return self.deserialize_data(data, response)
1449 elif isinstance(response, type) and issubclass(response, Enum):
1450 return self.deserialize_enum(data, response)
1451
1452 if data is None:
1453 return data
1454 try:
1455 attributes = response._attribute_map # type: ignore
1456 d_attrs = {}
1457 for attr, attr_desc in attributes.items():
1458 # Check empty string. If it's not empty, someone has a real "additionalProperties"...
1459 if attr == "additional_properties" and attr_desc["key"] == "":
1460 continue
1461 raw_value = None
1462 # Enhance attr_desc with some dynamic data
1463 attr_desc = attr_desc.copy() # Do a copy, do not change the real one
1464 internal_data_type = attr_desc["type"].strip("[]{}")
1465 if internal_data_type in self.dependencies:
1466 attr_desc["internalType"] = self.dependencies[internal_data_type]
1467
1468 for key_extractor in self.key_extractors:
1469 found_value = key_extractor(attr, attr_desc, data)
1470 if found_value is not None:
1471 if raw_value is not None and raw_value != found_value:
1472 msg = (
1473 "Ignoring extracted value '%s' from %s for key '%s'"
1474 " (duplicate extraction, follow extractors order)"
1475 )
1476 _LOGGER.warning(msg, found_value, key_extractor, attr)
1477 continue
1478 raw_value = found_value
1479
1480 value = self.deserialize_data(raw_value, attr_desc["type"])
1481 d_attrs[attr] = value
1482 except (AttributeError, TypeError, KeyError) as err:
1483 msg = "Unable to deserialize to object: " + class_name # type: ignore
1484 raise_with_traceback(DeserializationError, msg, err)
1485 else:
1486 additional_properties = self._build_additional_properties(attributes, data)
1487 return self._instantiate_model(response, d_attrs, additional_properties)
1488
1489 def _build_additional_properties(self, attribute_map, data):
1490 if not self.additional_properties_detection:
1491 return None
1492 if "additional_properties" in attribute_map and attribute_map.get("additional_properties", {}).get("key") != "":
1493 # Check empty string. If it's not empty, someone has a real "additionalProperties"
1494 return None
1495 if isinstance(data, ET.Element):
1496 data = {el.tag: el.text for el in data}
1497
1498 known_keys = {
1499 _decode_attribute_map_key(_FLATTEN.split(desc["key"])[0])
1500 for desc in attribute_map.values()
1501 if desc["key"] != ""
1502 }
1503 present_keys = set(data.keys())
1504 missing_keys = present_keys - known_keys
1505 return {key: data[key] for key in missing_keys}
1506
1507 def _classify_target(self, target, data):
1508 """Check to see whether the deserialization target object can
1509 be classified into a subclass.
1510 Once classification has been determined, initialize object.
1511
1512 :param str target: The target object type to deserialize to.
1513 :param str/dict data: The response data to deserialize.
1514 """
1515 if target is None:
1516 return None, None
1517
1518 if isinstance(target, basestring):
1519 try:
1520 target = self.dependencies[target]
1521 except KeyError:
1522 return target, target
1523
1524 try:
1525 target = target._classify(data, self.dependencies)
1526 except AttributeError:
1527 pass # Target is not a Model, no classify
1528 return target, target.__class__.__name__ # type: ignore
1529
1530 def failsafe_deserialize(self, target_obj, data, content_type=None):
1531 """Ignores any errors encountered in deserialization,
1532 and falls back to not deserializing the object. Recommended
1533 for use in error deserialization, as we want to return the
1534 HttpResponseError to users, and not have them deal with
1535 a deserialization error.
1536
1537 :param str target_obj: The target object type to deserialize to.
1538 :param str/dict data: The response data to deserialize.
1539 :param str content_type: Swagger "produces" if available.
1540 """
1541 try:
1542 return self(target_obj, data, content_type=content_type)
1543 except:
1544 _LOGGER.debug(
1545 "Ran into a deserialization error. Ignoring since this is failsafe deserialization", exc_info=True
1546 )
1547 return None
1548
1549 @staticmethod
1550 def _unpack_content(raw_data, content_type=None):
1551 """Extract the correct structure for deserialization.
1552
1553 If raw_data is a PipelineResponse, try to extract the result of RawDeserializer.
1554 if we can't, raise. Your Pipeline should have a RawDeserializer.
1555
1556 If not a pipeline response and raw_data is bytes or string, use content-type
1557 to decode it. If no content-type, try JSON.
1558
1559 If raw_data is something else, bypass all logic and return it directly.
1560
1561 :param raw_data: Data to be processed.
1562 :param content_type: How to parse if raw_data is a string/bytes.
1563 :raises JSONDecodeError: If JSON is requested and parsing is impossible.
1564 :raises UnicodeDecodeError: If bytes is not UTF8
1565 """
1566 # Assume this is enough to detect a Pipeline Response without importing it
1567 context = getattr(raw_data, "context", {})
1568 if context:
1569 if RawDeserializer.CONTEXT_NAME in context:
1570 return context[RawDeserializer.CONTEXT_NAME]
1571 raise ValueError("This pipeline didn't have the RawDeserializer policy; can't deserialize")
1572
1573 # Assume this is enough to recognize universal_http.ClientResponse without importing it
1574 if hasattr(raw_data, "body"):
1575 return RawDeserializer.deserialize_from_http_generics(raw_data.text(), raw_data.headers)
1576
1577 # Assume this enough to recognize requests.Response without importing it.
1578 if hasattr(raw_data, "_content_consumed"):
1579 return RawDeserializer.deserialize_from_http_generics(raw_data.text, raw_data.headers)
1580
1581 if isinstance(raw_data, (basestring, bytes)) or hasattr(raw_data, "read"):
1582 return RawDeserializer.deserialize_from_text(raw_data, content_type) # type: ignore
1583 return raw_data
1584
1585 def _instantiate_model(self, response, attrs, additional_properties=None):
1586 """Instantiate a response model passing in deserialized args.
1587
1588 :param response: The response model class.
1589 :param d_attrs: The deserialized response attributes.
1590 """
1591 if callable(response):
1592 subtype = getattr(response, "_subtype_map", {})
1593 try:
1594 readonly = [k for k, v in response._validation.items() if v.get("readonly")]
1595 const = [k for k, v in response._validation.items() if v.get("constant")]
1596 kwargs = {k: v for k, v in attrs.items() if k not in subtype and k not in readonly + const}
1597 response_obj = response(**kwargs)
1598 for attr in readonly:
1599 setattr(response_obj, attr, attrs.get(attr))
1600 if additional_properties:
1601 response_obj.additional_properties = additional_properties
1602 return response_obj
1603 except TypeError as err:
1604 msg = "Unable to deserialize {} into model {}. ".format(kwargs, response) # type: ignore
1605 raise DeserializationError(msg + str(err))
1606 else:
1607 try:
1608 for attr, value in attrs.items():
1609 setattr(response, attr, value)
1610 return response
1611 except Exception as exp:
1612 msg = "Unable to populate response model. "
1613 msg += "Type: {}, Error: {}".format(type(response), exp)
1614 raise DeserializationError(msg)
1615
1616 def deserialize_data(self, data, data_type):
1617 """Process data for deserialization according to data type.
1618
1619 :param str data: The response string to be deserialized.
1620 :param str data_type: The type to deserialize to.
1621 :raises: DeserializationError if deserialization fails.
1622 :return: Deserialized object.
1623 """
1624 if data is None:
1625 return data
1626
1627 try:
1628 if not data_type:
1629 return data
1630 if data_type in self.basic_types.values():
1631 return self.deserialize_basic(data, data_type)
1632 if data_type in self.deserialize_type:
1633 if isinstance(data, self.deserialize_expected_types.get(data_type, tuple())):
1634 return data
1635
1636 is_a_text_parsing_type = lambda x: x not in ["object", "[]", r"{}"]
1637 if isinstance(data, ET.Element) and is_a_text_parsing_type(data_type) and not data.text:
1638 return None
1639 data_val = self.deserialize_type[data_type](data)
1640 return data_val
1641
1642 iter_type = data_type[0] + data_type[-1]
1643 if iter_type in self.deserialize_type:
1644 return self.deserialize_type[iter_type](data, data_type[1:-1])
1645
1646 obj_type = self.dependencies[data_type]
1647 if issubclass(obj_type, Enum):
1648 if isinstance(data, ET.Element):
1649 data = data.text
1650 return self.deserialize_enum(data, obj_type)
1651
1652 except (ValueError, TypeError, AttributeError) as err:
1653 msg = "Unable to deserialize response data."
1654 msg += " Data: {}, {}".format(data, data_type)
1655 raise_with_traceback(DeserializationError, msg, err)
1656 else:
1657 return self._deserialize(obj_type, data)
1658
1659 def deserialize_iter(self, attr, iter_type):
1660 """Deserialize an iterable.
1661
1662 :param list attr: Iterable to be deserialized.
1663 :param str iter_type: The type of object in the iterable.
1664 :rtype: list
1665 """
1666 if attr is None:
1667 return None
1668 if isinstance(attr, ET.Element): # If I receive an element here, get the children
1669 attr = list(attr)
1670 if not isinstance(attr, (list, set)):
1671 raise DeserializationError("Cannot deserialize as [{}] an object of type {}".format(iter_type, type(attr)))
1672 return [self.deserialize_data(a, iter_type) for a in attr]
1673
1674 def deserialize_dict(self, attr, dict_type):
1675 """Deserialize a dictionary.
1676
1677 :param dict/list attr: Dictionary to be deserialized. Also accepts
1678 a list of key, value pairs.
1679 :param str dict_type: The object type of the items in the dictionary.
1680 :rtype: dict
1681 """
1682 if isinstance(attr, list):
1683 return {x["key"]: self.deserialize_data(x["value"], dict_type) for x in attr}
1684
1685 if isinstance(attr, ET.Element):
1686 # Transform <Key>value</Key> into {"Key": "value"}
1687 attr = {el.tag: el.text for el in attr}
1688 return {k: self.deserialize_data(v, dict_type) for k, v in attr.items()}
1689
1690 def deserialize_object(self, attr, **kwargs):
1691 """Deserialize a generic object.
1692 This will be handled as a dictionary.
1693
1694 :param dict attr: Dictionary to be deserialized.
1695 :rtype: dict
1696 :raises: TypeError if non-builtin datatype encountered.
1697 """
1698 if attr is None:
1699 return None
1700 if isinstance(attr, ET.Element):
1701 # Do no recurse on XML, just return the tree as-is
1702 return attr
1703 if isinstance(attr, basestring):
1704 return self.deserialize_basic(attr, "str")
1705 obj_type = type(attr)
1706 if obj_type in self.basic_types:
1707 return self.deserialize_basic(attr, self.basic_types[obj_type])
1708 if obj_type is _long_type:
1709 return self.deserialize_long(attr)
1710
1711 if obj_type == dict:
1712 deserialized = {}
1713 for key, value in attr.items():
1714 try:
1715 deserialized[key] = self.deserialize_object(value, **kwargs)
1716 except ValueError:
1717 deserialized[key] = None
1718 return deserialized
1719
1720 if obj_type == list:
1721 deserialized = []
1722 for obj in attr:
1723 try:
1724 deserialized.append(self.deserialize_object(obj, **kwargs))
1725 except ValueError:
1726 pass
1727 return deserialized
1728
1729 else:
1730 error = "Cannot deserialize generic object with type: "
1731 raise TypeError(error + str(obj_type))
1732
1733 def deserialize_basic(self, attr, data_type):
1734 """Deserialize basic builtin data type from string.
1735 Will attempt to convert to str, int, float and bool.
1736 This function will also accept '1', '0', 'true' and 'false' as
1737 valid bool values.
1738
1739 :param str attr: response string to be deserialized.
1740 :param str data_type: deserialization data type.
1741 :rtype: str, int, float or bool
1742 :raises: TypeError if string format is not valid.
1743 """
1744 # If we're here, data is supposed to be a basic type.
1745 # If it's still an XML node, take the text
1746 if isinstance(attr, ET.Element):
1747 attr = attr.text
1748 if not attr:
1749 if data_type == "str":
1750 # None or '', node <a/> is empty string.
1751 return ""
1752 else:
1753 # None or '', node <a/> with a strong type is None.
1754 # Don't try to model "empty bool" or "empty int"
1755 return None
1756
1757 if data_type == "bool":
1758 if attr in [True, False, 1, 0]:
1759 return bool(attr)
1760 elif isinstance(attr, basestring):
1761 if attr.lower() in ["true", "1"]:
1762 return True
1763 elif attr.lower() in ["false", "0"]:
1764 return False
1765 raise TypeError("Invalid boolean value: {}".format(attr))
1766
1767 if data_type == "str":
1768 return self.deserialize_unicode(attr)
1769 return eval(data_type)(attr) # nosec
1770
1771 @staticmethod
1772 def deserialize_unicode(data):
1773 """Preserve unicode objects in Python 2, otherwise return data
1774 as a string.
1775
1776 :param str data: response string to be deserialized.
1777 :rtype: str or unicode
1778 """
1779 # We might be here because we have an enum modeled as string,
1780 # and we try to deserialize a partial dict with enum inside
1781 if isinstance(data, Enum):
1782 return data
1783
1784 # Consider this is real string
1785 try:
1786 if isinstance(data, unicode): # type: ignore
1787 return data
1788 except NameError:
1789 return str(data)
1790 else:
1791 return str(data)
1792
1793 @staticmethod
1794 def deserialize_enum(data, enum_obj):
1795 """Deserialize string into enum object.
1796
1797 If the string is not a valid enum value it will be returned as-is
1798 and a warning will be logged.
1799
1800 :param str data: Response string to be deserialized. If this value is
1801 None or invalid it will be returned as-is.
1802 :param Enum enum_obj: Enum object to deserialize to.
1803 :rtype: Enum
1804 """
1805 if isinstance(data, enum_obj) or data is None:
1806 return data
1807 if isinstance(data, Enum):
1808 data = data.value
1809 if isinstance(data, int):
1810 # Workaround. We might consider remove it in the future.
1811 # https://github.com/Azure/azure-rest-api-specs/issues/141
1812 try:
1813 return list(enum_obj.__members__.values())[data]
1814 except IndexError:
1815 error = "{!r} is not a valid index for enum {!r}"
1816 raise DeserializationError(error.format(data, enum_obj))
1817 try:
1818 return enum_obj(str(data))
1819 except ValueError:
1820 for enum_value in enum_obj:
1821 if enum_value.value.lower() == str(data).lower():
1822 return enum_value
1823 # We don't fail anymore for unknown value, we deserialize as a string
1824 _LOGGER.warning("Deserializer is not able to find %s as valid enum in %s", data, enum_obj)
1825 return Deserializer.deserialize_unicode(data)
1826
1827 @staticmethod
1828 def deserialize_bytearray(attr):
1829 """Deserialize string into bytearray.
1830
1831 :param str attr: response string to be deserialized.
1832 :rtype: bytearray
1833 :raises: TypeError if string format invalid.
1834 """
1835 if isinstance(attr, ET.Element):
1836 attr = attr.text
1837 return bytearray(b64decode(attr)) # type: ignore
1838
1839 @staticmethod
1840 def deserialize_base64(attr):
1841 """Deserialize base64 encoded string into string.
1842
1843 :param str attr: response string to be deserialized.
1844 :rtype: bytearray
1845 :raises: TypeError if string format invalid.
1846 """
1847 if isinstance(attr, ET.Element):
1848 attr = attr.text
1849 padding = "=" * (3 - (len(attr) + 3) % 4) # type: ignore
1850 attr = attr + padding # type: ignore
1851 encoded = attr.replace("-", "+").replace("_", "/")
1852 return b64decode(encoded)
1853
1854 @staticmethod
1855 def deserialize_decimal(attr):
1856 """Deserialize string into Decimal object.
1857
1858 :param str attr: response string to be deserialized.
1859 :rtype: Decimal
1860 :raises: DeserializationError if string format invalid.
1861 """
1862 if isinstance(attr, ET.Element):
1863 attr = attr.text
1864 try:
1865 return decimal.Decimal(attr) # type: ignore
1866 except decimal.DecimalException as err:
1867 msg = "Invalid decimal {}".format(attr)
1868 raise_with_traceback(DeserializationError, msg, err)
1869
1870 @staticmethod
1871 def deserialize_long(attr):
1872 """Deserialize string into long (Py2) or int (Py3).
1873
1874 :param str attr: response string to be deserialized.
1875 :rtype: long or int
1876 :raises: ValueError if string format invalid.
1877 """
1878 if isinstance(attr, ET.Element):
1879 attr = attr.text
1880 return _long_type(attr) # type: ignore
1881
1882 @staticmethod
1883 def deserialize_duration(attr):
1884 """Deserialize ISO-8601 formatted string into TimeDelta object.
1885
1886 :param str attr: response string to be deserialized.
1887 :rtype: TimeDelta
1888 :raises: DeserializationError if string format invalid.
1889 """
1890 if isinstance(attr, ET.Element):
1891 attr = attr.text
1892 try:
1893 duration = isodate.parse_duration(attr)
1894 except (ValueError, OverflowError, AttributeError) as err:
1895 msg = "Cannot deserialize duration object."
1896 raise_with_traceback(DeserializationError, msg, err)
1897 else:
1898 return duration
1899
1900 @staticmethod
1901 def deserialize_date(attr):
1902 """Deserialize ISO-8601 formatted string into Date object.
1903
1904 :param str attr: response string to be deserialized.
1905 :rtype: Date
1906 :raises: DeserializationError if string format invalid.
1907 """
1908 if isinstance(attr, ET.Element):
1909 attr = attr.text
1910 if re.search(r"[^\W\d_]", attr, re.I + re.U): # type: ignore
1911 raise DeserializationError("Date must have only digits and -. Received: %s" % attr)
1912 # This must NOT use defaultmonth/defaultday. Using None ensure this raises an exception.
1913 return isodate.parse_date(attr, defaultmonth=None, defaultday=None)
1914
1915 @staticmethod
1916 def deserialize_time(attr):
1917 """Deserialize ISO-8601 formatted string into time object.
1918
1919 :param str attr: response string to be deserialized.
1920 :rtype: datetime.time
1921 :raises: DeserializationError if string format invalid.
1922 """
1923 if isinstance(attr, ET.Element):
1924 attr = attr.text
1925 if re.search(r"[^\W\d_]", attr, re.I + re.U): # type: ignore
1926 raise DeserializationError("Date must have only digits and -. Received: %s" % attr)
1927 return isodate.parse_time(attr)
1928
1929 @staticmethod
1930 def deserialize_rfc(attr):
1931 """Deserialize RFC-1123 formatted string into Datetime object.
1932
1933 :param str attr: response string to be deserialized.
1934 :rtype: Datetime
1935 :raises: DeserializationError if string format invalid.
1936 """
1937 if isinstance(attr, ET.Element):
1938 attr = attr.text
1939 try:
1940 parsed_date = email.utils.parsedate_tz(attr) # type: ignore
1941 date_obj = datetime.datetime(
1942 *parsed_date[:6], tzinfo=_FixedOffset(datetime.timedelta(minutes=(parsed_date[9] or 0) / 60))
1943 )
1944 if not date_obj.tzinfo:
1945 date_obj = date_obj.astimezone(tz=TZ_UTC)
1946 except ValueError as err:
1947 msg = "Cannot deserialize to rfc datetime object."
1948 raise_with_traceback(DeserializationError, msg, err)
1949 else:
1950 return date_obj
1951
1952 @staticmethod
1953 def deserialize_iso(attr):
1954 """Deserialize ISO-8601 formatted string into Datetime object.
1955
1956 :param str attr: response string to be deserialized.
1957 :rtype: Datetime
1958 :raises: DeserializationError if string format invalid.
1959 """
1960 if isinstance(attr, ET.Element):
1961 attr = attr.text
1962 try:
1963 attr = attr.upper() # type: ignore
1964 match = Deserializer.valid_date.match(attr)
1965 if not match:
1966 raise ValueError("Invalid datetime string: " + attr)
1967
1968 check_decimal = attr.split(".")
1969 if len(check_decimal) > 1:
1970 decimal_str = ""
1971 for digit in check_decimal[1]:
1972 if digit.isdigit():
1973 decimal_str += digit
1974 else:
1975 break
1976 if len(decimal_str) > 6:
1977 attr = attr.replace(decimal_str, decimal_str[0:6])
1978
1979 date_obj = isodate.parse_datetime(attr)
1980 test_utc = date_obj.utctimetuple()
1981 if test_utc.tm_year > 9999 or test_utc.tm_year < 1:
1982 raise OverflowError("Hit max or min date")
1983 except (ValueError, OverflowError, AttributeError) as err:
1984 msg = "Cannot deserialize datetime object."
1985 raise_with_traceback(DeserializationError, msg, err)
1986 else:
1987 return date_obj
1988
1989 @staticmethod
1990 def deserialize_unix(attr):
1991 """Serialize Datetime object into IntTime format.
1992 This is represented as seconds.
1993
1994 :param int attr: Object to be serialized.
1995 :rtype: Datetime
1996 :raises: DeserializationError if format invalid
1997 """
1998 if isinstance(attr, ET.Element):
1999 attr = int(attr.text) # type: ignore
2000 try:
2001 date_obj = datetime.datetime.fromtimestamp(attr, TZ_UTC)
2002 except ValueError as err:
2003 msg = "Cannot deserialize to unix datetime object."
2004 raise_with_traceback(DeserializationError, msg, err)
2005 else:
2006 return date_obj