1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for
5# license information.
6# --------------------------------------------------------------------------
7import base64
8from json import JSONEncoder
9from typing import Dict, List, Optional, Union, cast, Any
10from datetime import datetime, date, time, timedelta
11from datetime import timezone
12
13
14__all__ = ["NULL", "AzureJSONEncoder", "is_generated_model", "as_attribute_dict", "attribute_list"]
15TZ_UTC = timezone.utc
16
17
18class _Null:
19 """To create a Falsy object"""
20
21 def __bool__(self) -> bool:
22 return False
23
24
25NULL = _Null()
26"""
27A falsy sentinel object which is supposed to be used to specify attributes
28with no data. This gets serialized to `null` on the wire.
29"""
30
31
32def _timedelta_as_isostr(td: timedelta) -> str:
33 """Converts a datetime.timedelta object into an ISO 8601 formatted string, e.g. 'P4DT12H30M05S'
34
35 Function adapted from the Tin Can Python project: https://github.com/RusticiSoftware/TinCanPython
36
37 :param td: The timedelta object to convert
38 :type td: datetime.timedelta
39 :return: An ISO 8601 formatted string representing the timedelta object
40 :rtype: str
41 """
42
43 # Split seconds to larger units
44 seconds = td.total_seconds()
45 minutes, seconds = divmod(seconds, 60)
46 hours, minutes = divmod(minutes, 60)
47 days, hours = divmod(hours, 24)
48
49 days, hours, minutes = list(map(int, (days, hours, minutes)))
50 seconds = round(seconds, 6)
51
52 # Build date
53 date_str = ""
54 if days:
55 date_str = "%sD" % days
56
57 # Build time
58 time_str = "T"
59
60 # Hours
61 bigger_exists = date_str or hours
62 if bigger_exists:
63 time_str += "{:02}H".format(hours)
64
65 # Minutes
66 bigger_exists = bigger_exists or minutes
67 if bigger_exists:
68 time_str += "{:02}M".format(minutes)
69
70 # Seconds
71 try:
72 if seconds.is_integer():
73 seconds_string = "{:02}".format(int(seconds))
74 else:
75 # 9 chars long w/ leading 0, 6 digits after decimal
76 seconds_string = "%09.6f" % seconds
77 # Remove trailing zeros
78 seconds_string = seconds_string.rstrip("0")
79 except AttributeError: # int.is_integer() raises
80 seconds_string = "{:02}".format(seconds)
81
82 time_str += "{}S".format(seconds_string)
83
84 return "P" + date_str + time_str
85
86
87def _datetime_as_isostr(dt: Union[datetime, date, time, timedelta]) -> str:
88 """Converts a datetime.(datetime|date|time|timedelta) object into an ISO 8601 formatted string.
89
90 :param dt: The datetime object to convert
91 :type dt: datetime.datetime or datetime.date or datetime.time or datetime.timedelta
92 :return: An ISO 8601 formatted string representing the datetime object
93 :rtype: str
94 """
95 # First try datetime.datetime
96 if hasattr(dt, "year") and hasattr(dt, "hour"):
97 dt = cast(datetime, dt)
98 # astimezone() fails for naive times in Python 2.7, so make make sure dt is aware (tzinfo is set)
99 if not dt.tzinfo:
100 iso_formatted = dt.replace(tzinfo=TZ_UTC).isoformat()
101 else:
102 iso_formatted = dt.astimezone(TZ_UTC).isoformat()
103 # Replace the trailing "+00:00" UTC offset with "Z" (RFC 3339: https://www.ietf.org/rfc/rfc3339.txt)
104 return iso_formatted.replace("+00:00", "Z")
105 # Next try datetime.date or datetime.time
106 try:
107 dt = cast(Union[date, time], dt)
108 return dt.isoformat()
109 # Last, try datetime.timedelta
110 except AttributeError:
111 dt = cast(timedelta, dt)
112 return _timedelta_as_isostr(dt)
113
114
115class AzureJSONEncoder(JSONEncoder):
116 """A JSON encoder that's capable of serializing datetime objects and bytes."""
117
118 def default(self, o: Any) -> Any:
119 """Override the default method to handle datetime and bytes serialization.
120 :param o: The object to serialize.
121 :type o: any
122 :return: A JSON-serializable representation of the object.
123 :rtype: any
124 """
125 if isinstance(o, (bytes, bytearray)):
126 return base64.b64encode(o).decode()
127 try:
128 return _datetime_as_isostr(o)
129 except AttributeError:
130 pass
131 return super(AzureJSONEncoder, self).default(o)
132
133
134def is_generated_model(obj: Any) -> bool:
135 """Check if the object is a generated SDK model.
136
137 :param obj: The object to check.
138 :type obj: any
139 :return: True if the object is a generated SDK model, False otherwise.
140 :rtype: bool
141 """
142 return bool(getattr(obj, "_is_model", False) or hasattr(obj, "_attribute_map"))
143
144
145def _is_readonly(p: Any) -> bool:
146 """Check if an attribute is readonly.
147
148 :param any p: The property to check.
149 :return: True if the property is readonly, False otherwise.
150 :rtype: bool
151 """
152 try:
153 return p._visibility == ["read"] # pylint: disable=protected-access
154 except AttributeError:
155 return False
156
157
158def _as_attribute_dict_value(v: Any, *, exclude_readonly: bool = False) -> Any:
159 if v is None or isinstance(v, _Null):
160 return None
161 if isinstance(v, (list, tuple, set)):
162 return type(v)(_as_attribute_dict_value(x, exclude_readonly=exclude_readonly) for x in v)
163 if isinstance(v, dict):
164 return {dk: _as_attribute_dict_value(dv, exclude_readonly=exclude_readonly) for dk, dv in v.items()}
165 return as_attribute_dict(v, exclude_readonly=exclude_readonly) if is_generated_model(v) else v
166
167
168def _get_flattened_attribute(obj: Any) -> Optional[str]:
169 """Get the name of the flattened attribute in a generated TypeSpec model if one exists.
170
171 :param any obj: The object to check.
172 :return: The name of the flattened attribute if it exists, otherwise None.
173 :rtype: Optional[str]
174 """
175 flattened_items = None
176 try:
177 flattened_items = getattr(obj, next(a for a in dir(obj) if "__flattened_items" in a), None)
178 except StopIteration:
179 return None
180
181 if flattened_items is None:
182 return None
183
184 for k, v in obj._attr_to_rest_field.items(): # pylint: disable=protected-access
185 try:
186 if set(v._class_type._attr_to_rest_field.keys()).intersection( # pylint: disable=protected-access
187 set(flattened_items)
188 ):
189 return k
190 except AttributeError:
191 # if the attribute does not have _class_type, it is not a typespec generated model
192 continue
193 return None
194
195
196def attribute_list(obj: Any) -> List[str]:
197 """Get a list of attribute names for a generated SDK model.
198
199 :param obj: The object to get attributes from.
200 :type obj: any
201 :return: A list of attribute names.
202 :rtype: List[str]
203 """
204 if not is_generated_model(obj):
205 raise TypeError("Object is not a generated SDK model.")
206 if hasattr(obj, "_attribute_map"):
207 # msrest model
208 return list(obj._attribute_map.keys()) # pylint: disable=protected-access
209 flattened_attribute = _get_flattened_attribute(obj)
210 retval: List[str] = []
211 for attr_name, rest_field in obj._attr_to_rest_field.items(): # pylint: disable=protected-access
212 if flattened_attribute == attr_name:
213 retval.extend(attribute_list(rest_field._class_type)) # pylint: disable=protected-access
214 else:
215 retval.append(attr_name)
216 return retval
217
218
219def as_attribute_dict(obj: Any, *, exclude_readonly: bool = False) -> Dict[str, Any]:
220 """Convert an object to a dictionary of its attributes.
221
222 Made solely for backcompatibility with the legacy `.as_dict()` on msrest models.
223
224 .. deprecated::1.35.0
225 This function is added for backcompat purposes only.
226
227 :param any obj: The object to convert to a dictionary
228 :keyword bool exclude_readonly: Whether to exclude readonly properties
229 :return: A dictionary containing the object's attributes
230 :rtype: dict[str, any]
231 :raises TypeError: If the object is not a generated model instance
232 """
233 if not is_generated_model(obj):
234 raise TypeError("Object must be a generated model instance.")
235 if hasattr(obj, "_attribute_map"):
236 # msrest generated model
237 return obj.as_dict(keep_readonly=not exclude_readonly)
238 try:
239 # now we're a typespec generated model
240 result = {}
241 readonly_props = set()
242
243 # create a reverse mapping from rest field name to attribute name
244 rest_to_attr = {}
245 flattened_attribute = _get_flattened_attribute(obj)
246 for attr_name, rest_field in obj._attr_to_rest_field.items(): # pylint: disable=protected-access
247
248 if exclude_readonly and _is_readonly(rest_field):
249 # if we're excluding readonly properties, we need to track them
250 readonly_props.add(rest_field._rest_name) # pylint: disable=protected-access
251 if flattened_attribute == attr_name:
252 for fk, fv in rest_field._class_type._attr_to_rest_field.items(): # pylint: disable=protected-access
253 rest_to_attr[fv._rest_name] = fk # pylint: disable=protected-access
254 else:
255 rest_to_attr[rest_field._rest_name] = attr_name # pylint: disable=protected-access
256 for k, v in obj.items():
257 if exclude_readonly and k in readonly_props: # pyright: ignore
258 continue
259 if k == flattened_attribute:
260 for fk, fv in v.items():
261 result[rest_to_attr.get(fk, fk)] = _as_attribute_dict_value(fv, exclude_readonly=exclude_readonly)
262 else:
263 is_multipart_file_input = False
264 try:
265 is_multipart_file_input = next( # pylint: disable=protected-access
266 rf
267 for rf in obj._attr_to_rest_field.values() # pylint: disable=protected-access
268 if rf._rest_name == k # pylint: disable=protected-access
269 )._is_multipart_file_input
270 except StopIteration:
271 pass
272
273 result[rest_to_attr.get(k, k)] = (
274 v if is_multipart_file_input else _as_attribute_dict_value(v, exclude_readonly=exclude_readonly)
275 )
276 return result
277 except AttributeError as exc:
278 # not a typespec generated model
279 raise TypeError("Object must be a generated model instance.") from exc