Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/azure/core/serialization.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

135 statements  

1# coding=utf-8 

2# -------------------------------------------------------------------------- 

3# Copyright (c) Microsoft Corporation. All rights reserved. 

4# Licensed under the MIT License. See License.txt in the project root for 

5# license information. 

6# -------------------------------------------------------------------------- 

7import base64 

8from json import JSONEncoder 

9from typing import Dict, List, Optional, Union, cast, Any 

10from datetime import datetime, date, time, timedelta 

11from datetime import timezone 

12 

13 

14__all__ = ["NULL", "AzureJSONEncoder", "is_generated_model", "as_attribute_dict", "attribute_list"] 

15TZ_UTC = timezone.utc 

16 

17 

18class _Null: 

19 """To create a Falsy object""" 

20 

21 def __bool__(self) -> bool: 

22 return False 

23 

24 

25NULL = _Null() 

26""" 

27A falsy sentinel object which is supposed to be used to specify attributes 

28with no data. This gets serialized to `null` on the wire. 

29""" 

30 

31 

32def _timedelta_as_isostr(td: timedelta) -> str: 

33 """Converts a datetime.timedelta object into an ISO 8601 formatted string, e.g. 'P4DT12H30M05S' 

34 

35 Function adapted from the Tin Can Python project: https://github.com/RusticiSoftware/TinCanPython 

36 

37 :param td: The timedelta object to convert 

38 :type td: datetime.timedelta 

39 :return: An ISO 8601 formatted string representing the timedelta object 

40 :rtype: str 

41 """ 

42 

43 # Split seconds to larger units 

44 seconds = td.total_seconds() 

45 minutes, seconds = divmod(seconds, 60) 

46 hours, minutes = divmod(minutes, 60) 

47 days, hours = divmod(hours, 24) 

48 

49 days, hours, minutes = list(map(int, (days, hours, minutes))) 

50 seconds = round(seconds, 6) 

51 

52 # Build date 

53 date_str = "" 

54 if days: 

55 date_str = "%sD" % days 

56 

57 # Build time 

58 time_str = "T" 

59 

60 # Hours 

61 bigger_exists = date_str or hours 

62 if bigger_exists: 

63 time_str += "{:02}H".format(hours) 

64 

65 # Minutes 

66 bigger_exists = bigger_exists or minutes 

67 if bigger_exists: 

68 time_str += "{:02}M".format(minutes) 

69 

70 # Seconds 

71 try: 

72 if seconds.is_integer(): 

73 seconds_string = "{:02}".format(int(seconds)) 

74 else: 

75 # 9 chars long w/ leading 0, 6 digits after decimal 

76 seconds_string = "%09.6f" % seconds 

77 # Remove trailing zeros 

78 seconds_string = seconds_string.rstrip("0") 

79 except AttributeError: # int.is_integer() raises 

80 seconds_string = "{:02}".format(seconds) 

81 

82 time_str += "{}S".format(seconds_string) 

83 

84 return "P" + date_str + time_str 

85 

86 

87def _datetime_as_isostr(dt: Union[datetime, date, time, timedelta]) -> str: 

88 """Converts a datetime.(datetime|date|time|timedelta) object into an ISO 8601 formatted string. 

89 

90 :param dt: The datetime object to convert 

91 :type dt: datetime.datetime or datetime.date or datetime.time or datetime.timedelta 

92 :return: An ISO 8601 formatted string representing the datetime object 

93 :rtype: str 

94 """ 

95 # First try datetime.datetime 

96 if hasattr(dt, "year") and hasattr(dt, "hour"): 

97 dt = cast(datetime, dt) 

98 # astimezone() fails for naive times in Python 2.7, so make make sure dt is aware (tzinfo is set) 

99 if not dt.tzinfo: 

100 iso_formatted = dt.replace(tzinfo=TZ_UTC).isoformat() 

101 else: 

102 iso_formatted = dt.astimezone(TZ_UTC).isoformat() 

103 # Replace the trailing "+00:00" UTC offset with "Z" (RFC 3339: https://www.ietf.org/rfc/rfc3339.txt) 

104 return iso_formatted.replace("+00:00", "Z") 

105 # Next try datetime.date or datetime.time 

106 try: 

107 dt = cast(Union[date, time], dt) 

108 return dt.isoformat() 

109 # Last, try datetime.timedelta 

110 except AttributeError: 

111 dt = cast(timedelta, dt) 

112 return _timedelta_as_isostr(dt) 

113 

114 

115class AzureJSONEncoder(JSONEncoder): 

116 """A JSON encoder that's capable of serializing datetime objects and bytes.""" 

117 

118 def default(self, o: Any) -> Any: 

119 """Override the default method to handle datetime and bytes serialization. 

120 :param o: The object to serialize. 

121 :type o: any 

122 :return: A JSON-serializable representation of the object. 

123 :rtype: any 

124 """ 

125 if isinstance(o, (bytes, bytearray)): 

126 return base64.b64encode(o).decode() 

127 try: 

128 return _datetime_as_isostr(o) 

129 except AttributeError: 

130 pass 

131 return super(AzureJSONEncoder, self).default(o) 

132 

133 

134def is_generated_model(obj: Any) -> bool: 

135 """Check if the object is a generated SDK model. 

136 

137 :param obj: The object to check. 

138 :type obj: any 

139 :return: True if the object is a generated SDK model, False otherwise. 

140 :rtype: bool 

141 """ 

142 return bool(getattr(obj, "_is_model", False) or hasattr(obj, "_attribute_map")) 

143 

144 

145def _is_readonly(p: Any) -> bool: 

146 """Check if an attribute is readonly. 

147 

148 :param any p: The property to check. 

149 :return: True if the property is readonly, False otherwise. 

150 :rtype: bool 

151 """ 

152 try: 

153 return p._visibility == ["read"] # pylint: disable=protected-access 

154 except AttributeError: 

155 return False 

156 

157 

158def _as_attribute_dict_value(v: Any, *, exclude_readonly: bool = False) -> Any: 

159 if v is None or isinstance(v, _Null): 

160 return None 

161 if isinstance(v, (list, tuple, set)): 

162 return type(v)(_as_attribute_dict_value(x, exclude_readonly=exclude_readonly) for x in v) 

163 if isinstance(v, dict): 

164 return {dk: _as_attribute_dict_value(dv, exclude_readonly=exclude_readonly) for dk, dv in v.items()} 

165 return as_attribute_dict(v, exclude_readonly=exclude_readonly) if is_generated_model(v) else v 

166 

167 

168def _get_flattened_attribute(obj: Any) -> Optional[str]: 

169 """Get the name of the flattened attribute in a generated TypeSpec model if one exists. 

170 

171 :param any obj: The object to check. 

172 :return: The name of the flattened attribute if it exists, otherwise None. 

173 :rtype: Optional[str] 

174 """ 

175 flattened_items = None 

176 try: 

177 flattened_items = getattr(obj, next(a for a in dir(obj) if "__flattened_items" in a), None) 

178 except StopIteration: 

179 return None 

180 

181 if flattened_items is None: 

182 return None 

183 

184 for k, v in obj._attr_to_rest_field.items(): # pylint: disable=protected-access 

185 try: 

186 if set(v._class_type._attr_to_rest_field.keys()).intersection( # pylint: disable=protected-access 

187 set(flattened_items) 

188 ): 

189 return k 

190 except AttributeError: 

191 # if the attribute does not have _class_type, it is not a typespec generated model 

192 continue 

193 return None 

194 

195 

196def attribute_list(obj: Any) -> List[str]: 

197 """Get a list of attribute names for a generated SDK model. 

198 

199 :param obj: The object to get attributes from. 

200 :type obj: any 

201 :return: A list of attribute names. 

202 :rtype: List[str] 

203 """ 

204 if not is_generated_model(obj): 

205 raise TypeError("Object is not a generated SDK model.") 

206 if hasattr(obj, "_attribute_map"): 

207 # msrest model 

208 return list(obj._attribute_map.keys()) # pylint: disable=protected-access 

209 flattened_attribute = _get_flattened_attribute(obj) 

210 retval: List[str] = [] 

211 for attr_name, rest_field in obj._attr_to_rest_field.items(): # pylint: disable=protected-access 

212 if flattened_attribute == attr_name: 

213 retval.extend(attribute_list(rest_field._class_type)) # pylint: disable=protected-access 

214 else: 

215 retval.append(attr_name) 

216 return retval 

217 

218 

219def as_attribute_dict(obj: Any, *, exclude_readonly: bool = False) -> Dict[str, Any]: 

220 """Convert an object to a dictionary of its attributes. 

221 

222 Made solely for backcompatibility with the legacy `.as_dict()` on msrest models. 

223 

224 .. deprecated::1.35.0 

225 This function is added for backcompat purposes only. 

226 

227 :param any obj: The object to convert to a dictionary 

228 :keyword bool exclude_readonly: Whether to exclude readonly properties 

229 :return: A dictionary containing the object's attributes 

230 :rtype: dict[str, any] 

231 :raises TypeError: If the object is not a generated model instance 

232 """ 

233 if not is_generated_model(obj): 

234 raise TypeError("Object must be a generated model instance.") 

235 if hasattr(obj, "_attribute_map"): 

236 # msrest generated model 

237 return obj.as_dict(keep_readonly=not exclude_readonly) 

238 try: 

239 # now we're a typespec generated model 

240 result = {} 

241 readonly_props = set() 

242 

243 # create a reverse mapping from rest field name to attribute name 

244 rest_to_attr = {} 

245 flattened_attribute = _get_flattened_attribute(obj) 

246 for attr_name, rest_field in obj._attr_to_rest_field.items(): # pylint: disable=protected-access 

247 

248 if exclude_readonly and _is_readonly(rest_field): 

249 # if we're excluding readonly properties, we need to track them 

250 readonly_props.add(rest_field._rest_name) # pylint: disable=protected-access 

251 if flattened_attribute == attr_name: 

252 for fk, fv in rest_field._class_type._attr_to_rest_field.items(): # pylint: disable=protected-access 

253 rest_to_attr[fv._rest_name] = fk # pylint: disable=protected-access 

254 else: 

255 rest_to_attr[rest_field._rest_name] = attr_name # pylint: disable=protected-access 

256 for k, v in obj.items(): 

257 if exclude_readonly and k in readonly_props: # pyright: ignore 

258 continue 

259 if k == flattened_attribute: 

260 for fk, fv in v.items(): 

261 result[rest_to_attr.get(fk, fk)] = _as_attribute_dict_value(fv, exclude_readonly=exclude_readonly) 

262 else: 

263 is_multipart_file_input = False 

264 try: 

265 is_multipart_file_input = next( # pylint: disable=protected-access 

266 rf 

267 for rf in obj._attr_to_rest_field.values() # pylint: disable=protected-access 

268 if rf._rest_name == k # pylint: disable=protected-access 

269 )._is_multipart_file_input 

270 except StopIteration: 

271 pass 

272 

273 result[rest_to_attr.get(k, k)] = ( 

274 v if is_multipart_file_input else _as_attribute_dict_value(v, exclude_readonly=exclude_readonly) 

275 ) 

276 return result 

277 except AttributeError as exc: 

278 # not a typespec generated model 

279 raise TypeError("Object must be a generated model instance.") from exc