Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc8785/_impl.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

132 statements  

1""" 

2Internal implementation module for `rfc8785`. 

3 

4This module is NOT a public API, and is not considered stable. 

5""" 

6 

7from __future__ import annotations 

8 

9import math 

10import re 

11import typing 

12from io import BytesIO 

13 

14_Scalar = typing.Union[bool, int, str, float, None] 

15 

16_Value = typing.Union[ 

17 _Scalar, 

18 typing.Sequence["_Value"], 

19 typing.Tuple["_Value"], 

20 typing.Mapping[str, "_Value"], 

21] 

22 

23_INT_MAX = 2**53 - 1 

24_INT_MIN = -(2**53) + 1 

25 

26# These are adapted from Andrew Rundgren's reference implementation, 

27# which is licensed under the Apache License, version 2.0. 

28# See: <https://github.com/cyberphone/json-canonicalization/blob/ba74d44ecf5/python3/src/org/webpki/json/Canonicalize.py> 

29# See: <https://github.com/cyberphone/json-canonicalization/blob/ba74d44ecf5/python3/src/org/webpki/json/LICENSE> 

30_ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]') 

31_ESCAPE_DCT = { 

32 "\\": "\\\\", 

33 '"': '\\"', 

34 "\b": "\\b", 

35 "\f": "\\f", 

36 "\n": "\\n", 

37 "\r": "\\r", 

38 "\t": "\\t", 

39} 

40for i in range(0x20): 

41 _ESCAPE_DCT.setdefault(chr(i), f"\\u{i:04x}") 

42 

43 

44class CanonicalizationError(ValueError): 

45 """ 

46 The base error for all errors during canonicalization. 

47 """ 

48 

49 pass 

50 

51 

52class IntegerDomainError(CanonicalizationError): 

53 """ 

54 The given integer exceeds the true integer precision of an 

55 IEEE 754 double-precision float, which is what JSON uses. 

56 """ 

57 

58 def __init__(self, n: int) -> None: 

59 """ 

60 Initialize an `IntegerDomainError`. 

61 """ 

62 super().__init__(f"{n} exceeds safe integer domain for JSON floats") 

63 

64 

65class FloatDomainError(CanonicalizationError): 

66 """ 

67 The given float cannot be represented in JCS, typically because it's 

68 infinite, NaN, or an invalid representation. 

69 """ 

70 

71 def __init__(self, f: float) -> None: 

72 """ 

73 Initialize an `FloatDomainError`. 

74 """ 

75 

76 super().__init__(f"{f} is not representable in JCS") 

77 

78 

79def _serialize_str(s: str, sink: typing.IO[bytes]) -> None: 

80 """ 

81 Serialize a string as a JSON string, per RFC 8785 3.2.2.2. 

82 """ 

83 

84 def _replace(match: re.Match) -> str: 

85 return _ESCAPE_DCT[match.group(0)] 

86 

87 sink.write(b'"') 

88 try: 

89 # Encoding to UTF-8 means that we'll reject surrogates and other 

90 # non-UTF-8-isms. 

91 sink.write(_ESCAPE.sub(_replace, s).encode("utf-8")) 

92 except UnicodeEncodeError as e: 

93 raise CanonicalizationError("input contains non-UTF-8 codepoints") from e 

94 sink.write(b'"') 

95 

96 

97def _serialize_float(f: float, sink: typing.IO[bytes]) -> None: 

98 """ 

99 Serialize a floating point number to a stable string format, as 

100 defined in ECMA 262 7.1.12.1 and amended by RFC 8785 3.2.2.3. 

101 """ 

102 

103 # NaN and infinite forms are prohibited. 

104 if math.isnan(f) or math.isinf(f): 

105 raise FloatDomainError(f) 

106 

107 # Python does not distinguish between +0 and -0. 

108 if f == 0: 

109 sink.write(b"0") 

110 return 

111 

112 # Negatives get serialized by prepending the sign marker and serializing 

113 # the positive form. 

114 if f < 0: 

115 sink.write(b"-") 

116 _serialize_float(-f, sink) 

117 return 

118 

119 # The remainder of this implementation is adapted from 

120 # Andrew Rundgren's reference implementation. 

121 

122 # Now we should only have valid non-zero values 

123 stringified = str(f) 

124 

125 exponent_str = "" 

126 exponent_value = 0 

127 q = stringified.find("e") 

128 if q > 0: 

129 # Grab the exponent and remove it from the number 

130 exponent_str = stringified[q:] 

131 if exponent_str[2:3] == "0": 

132 # Suppress leading zero on exponents 

133 exponent_str = exponent_str[:2] + exponent_str[3:] 

134 stringified = stringified[0:q] 

135 exponent_value = int(exponent_str[1:]) 

136 

137 # Split number in first + dot + last 

138 first = stringified 

139 dot = "" 

140 last = "" 

141 q = stringified.find(".") 

142 if q > 0: 

143 dot = "." 

144 first = stringified[:q] 

145 last = stringified[q + 1 :] 

146 

147 # Now the string is split into: first + dot + last + exponent_str 

148 if last == "0": 

149 # Always remove trailing .0 

150 dot = "" 

151 last = "" 

152 

153 if exponent_value > 0 and exponent_value < 21: 

154 # Integers are shown as is with up to 21 digits 

155 first += last 

156 last = "" 

157 dot = "" 

158 exponent_str = "" 

159 q = exponent_value - len(first) 

160 while q >= 0: 

161 q -= 1 

162 first += "0" 

163 elif exponent_value < 0 and exponent_value > -7: 

164 # Small numbers are shown as 0.etc with e-6 as lower limit 

165 last = first + last 

166 first = "0" 

167 dot = "." 

168 exponent_str = "" 

169 q = exponent_value 

170 while q < -1: 

171 q += 1 

172 last = "0" + last 

173 

174 sink.write(f"{first}{dot}{last}{exponent_str}".encode()) 

175 

176 

177def dumps(obj: _Value) -> bytes: 

178 """ 

179 Perform JCS serialization of `obj`, returning the canonical serialization 

180 as `bytes`. 

181 """ 

182 # TODO: Optimize this? 

183 sink = BytesIO() 

184 dump(obj, sink) 

185 return sink.getvalue() 

186 

187 

188def dump(obj: _Value, sink: typing.IO[bytes]) -> None: 

189 """ 

190 Perform JCS serialization of `obj` into `sink`. 

191 """ 

192 

193 if obj is None: 

194 sink.write(b"null") 

195 elif isinstance(obj, bool): 

196 obj = bool(obj) 

197 if obj is True: 

198 sink.write(b"true") 

199 else: 

200 sink.write(b"false") 

201 elif isinstance(obj, int): 

202 obj = int(obj) 

203 if obj < _INT_MIN or obj > _INT_MAX: 

204 raise IntegerDomainError(obj) 

205 sink.write(str(obj).encode("utf-8")) 

206 elif isinstance(obj, str): 

207 # NOTE: We don't coerce with `str(...)`` here, since that will do 

208 # the wrong thing for `(str, Enum)` subtypes where `__str__` is 

209 # `Enum.__str__`. 

210 _serialize_str(obj, sink) 

211 elif isinstance(obj, float): 

212 obj = float(obj) 

213 _serialize_float(obj, sink) 

214 elif isinstance(obj, (list, tuple)): 

215 obj = list(obj) 

216 if not obj: 

217 # Optimization for empty lists. 

218 sink.write(b"[]") 

219 return 

220 

221 sink.write(b"[") 

222 for idx, elem in enumerate(obj): 

223 if idx > 0: 

224 sink.write(b",") 

225 dump(elem, sink) 

226 sink.write(b"]") 

227 elif isinstance(obj, dict): 

228 obj = dict(obj) 

229 if not obj: 

230 # Optimization for empty dicts. 

231 sink.write(b"{}") 

232 return 

233 

234 # RFC 8785 3.2.3: Objects are sorted by key; keys are ordered 

235 # by their UTF-16 encoding. The spec isn't clear about which endianness, 

236 # but the examples imply that the big endian encoding is used. 

237 try: 

238 obj_sorted = sorted(obj.items(), key=lambda kv: kv[0].encode("utf-16be")) 

239 except AttributeError: 

240 # Failing to call `encode()` indicates that a key isn't a string. 

241 raise CanonicalizationError("object keys must be strings") 

242 

243 sink.write(b"{") 

244 for idx, (key, value) in enumerate(obj_sorted): 

245 if idx > 0: 

246 sink.write(b",") 

247 

248 _serialize_str(key, sink) 

249 sink.write(b":") 

250 dump(value, sink) 

251 

252 sink.write(b"}") 

253 else: 

254 raise CanonicalizationError(f"unsupported type: {type(obj)}")