Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/serialization/__init__.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

78 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18import struct as _struct 

19from enum import Enum 

20 

21from confluent_kafka.error import KafkaException 

22 

23__all__ = ['Deserializer', 

24 'IntegerDeserializer', 

25 'IntegerSerializer', 

26 'DoubleDeserializer', 

27 'DoubleSerializer', 

28 'StringDeserializer', 

29 'StringSerializer', 

30 'MessageField', 

31 'SerializationContext', 

32 'SerializationError', 

33 'Serializer'] 

34 

35 

36class MessageField(str, Enum): 

37 """ 

38 Enum like object for identifying Message fields. 

39 

40 Attributes: 

41 KEY (str): Message key 

42 

43 VALUE (str): Message value 

44 """ 

45 

46 NONE = 'none' 

47 KEY = 'key' 

48 VALUE = 'value' 

49 

50 def __str__(self) -> str: 

51 return str(self.value) 

52 

53 

54class SerializationContext(object): 

55 """ 

56 SerializationContext provides additional context to the 

57 serializer/deserializer about the data it's serializing/deserializing. 

58 

59 Args: 

60 topic (str): Topic data is being produce to or consumed from. 

61 

62 field (MessageField): Describes what part of the message is 

63 being serialized. 

64 

65 headers (list): List of message header tuples. Defaults to None. 

66 """ 

67 

68 def __init__(self, topic, field, headers=None): 

69 self.topic = topic 

70 self.field = field 

71 self.headers = headers 

72 

73 

74class SerializationError(KafkaException): 

75 """Generic error from serializer package""" 

76 pass 

77 

78 

79class Serializer(object): 

80 """ 

81 Extensible class from which all Serializer implementations derive. 

82 Serializers instruct Kafka clients on how to convert Python objects 

83 to bytes. 

84 

85 See built-in implementations, listed below, for an example of how to 

86 extend this class. 

87 

88 Note: 

89 This class is not directly instantiable. The derived classes must be 

90 used instead. 

91 

92 The following implementations are provided by this module. 

93 

94 Note: 

95 Unless noted elsewhere all numeric types are signed and serialization 

96 is big-endian. 

97 

98 .. list-table:: 

99 :header-rows: 1 

100 

101 * - Name 

102 - Type 

103 - Binary Format 

104 * - DoubleSerializer 

105 - float 

106 - IEEE 764 binary64 

107 * - IntegerSerializer 

108 - int 

109 - int32 

110 * - StringSerializer 

111 - unicode 

112 - unicode(encoding) 

113 """ 

114 

115 __slots__ = [] 

116 

117 def __call__(self, obj, ctx=None): 

118 """ 

119 Converts obj to bytes. 

120 

121 Args: 

122 obj (object): object to be serialized 

123 

124 ctx (SerializationContext): Metadata pertaining to the serialization 

125 operation 

126 

127 Raises: 

128 SerializerError if an error occurs during serialization 

129 

130 Returns: 

131 bytes if obj is not None, otherwise None 

132 """ 

133 

134 raise NotImplementedError 

135 

136 

137class Deserializer(object): 

138 """ 

139 Extensible class from which all Deserializer implementations derive. 

140 Deserializers instruct Kafka clients on how to convert bytes to objects. 

141 

142 See built-in implementations, listed below, for an example of how to 

143 extend this class. 

144 

145 Note: 

146 This class is not directly instantiable. The derived classes must be 

147 used instead. 

148 

149 The following implementations are provided by this module. 

150 

151 Note: 

152 Unless noted elsewhere all numeric types are signed and 

153 serialization is big-endian. 

154 

155 .. list-table:: 

156 :header-rows: 1 

157 

158 * - Name 

159 - Type 

160 - Binary Format 

161 * - DoubleDeserializer 

162 - float 

163 - IEEE 764 binary64 

164 * - IntegerDeserializer 

165 - int 

166 - int32 

167 * - StringDeserializer 

168 - unicode 

169 - unicode(encoding) 

170 """ 

171 

172 __slots__ = [] 

173 

174 def __call__(self, value, ctx=None): 

175 """ 

176 Convert bytes to object 

177 

178 Args: 

179 value (bytes): bytes to be deserialized 

180 

181 ctx (SerializationContext): Metadata pertaining to the serialization 

182 operation 

183 

184 Raises: 

185 SerializerError if an error occurs during deserialization 

186 

187 Returns: 

188 object if data is not None, otherwise None 

189 """ 

190 

191 raise NotImplementedError 

192 

193 

194class DoubleSerializer(Serializer): 

195 """ 

196 Serializes float to IEEE 764 binary64. 

197 

198 See Also: 

199 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_ 

200 

201 """ # noqa: E501 

202 

203 def __call__(self, obj, ctx=None): 

204 """ 

205 Args: 

206 obj (object): object to be serialized 

207 

208 ctx (SerializationContext): Metadata pertaining to the serialization 

209 operation 

210 

211 Note: 

212 None objects are represented as Kafka Null. 

213 

214 Raises: 

215 SerializerError if an error occurs during serialization. 

216 

217 Returns: 

218 IEEE 764 binary64 bytes if obj is not None, otherwise None 

219 """ 

220 

221 if obj is None: 

222 return None 

223 

224 try: 

225 return _struct.pack('>d', obj) 

226 except _struct.error as e: 

227 raise SerializationError(str(e)) 

228 

229 

230class DoubleDeserializer(Deserializer): 

231 """ 

232 Deserializes float to IEEE 764 binary64. 

233 

234 See Also: 

235 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_ 

236 """ # noqa: E501 

237 

238 def __call__(self, value, ctx=None): 

239 """ 

240 Deserializes float from IEEE 764 binary64 bytes. 

241 

242 Args: 

243 value (bytes): bytes to be deserialized 

244 

245 ctx (SerializationContext): Metadata pertaining to the serialization 

246 operation 

247 

248 Raises: 

249 SerializerError if an error occurs during deserialization. 

250 

251 Returns: 

252 float if data is not None, otherwise None 

253 """ 

254 

255 if value is None: 

256 return None 

257 

258 try: 

259 return _struct.unpack('>d', value)[0] 

260 except _struct.error as e: 

261 raise SerializationError(str(e)) 

262 

263 

264class IntegerSerializer(Serializer): 

265 """ 

266 Serializes int to int32 bytes. 

267 

268 See Also: 

269 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_ 

270 """ # noqa: E501 

271 

272 def __call__(self, obj, ctx=None): 

273 """ 

274 Serializes int as int32 bytes. 

275 

276 Args: 

277 obj (object): object to be serialized 

278 

279 ctx (SerializationContext): Metadata pertaining to the serialization 

280 operation 

281 

282 Note: 

283 None objects are represented as Kafka Null. 

284 

285 Raises: 

286 SerializerError if an error occurs during serialization 

287 

288 Returns: 

289 int32 bytes if obj is not None, else None 

290 """ 

291 

292 if obj is None: 

293 return None 

294 

295 try: 

296 return _struct.pack('>i', obj) 

297 except _struct.error as e: 

298 raise SerializationError(str(e)) 

299 

300 

301class IntegerDeserializer(Deserializer): 

302 """ 

303 Deserializes int to int32 bytes. 

304 

305 See Also: 

306 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_ 

307 """ # noqa: E501 

308 

309 def __call__(self, value, ctx=None): 

310 """ 

311 Deserializes int from int32 bytes. 

312 

313 Args: 

314 value (bytes): bytes to be deserialized 

315 

316 ctx (SerializationContext): Metadata pertaining to the serialization 

317 operation 

318 

319 Raises: 

320 SerializerError if an error occurs during deserialization. 

321 

322 Returns: 

323 int if data is not None, otherwise None 

324 """ 

325 

326 if value is None: 

327 return None 

328 

329 try: 

330 return _struct.unpack('>i', value)[0] 

331 except _struct.error as e: 

332 raise SerializationError(str(e)) 

333 

334 

335class StringSerializer(Serializer): 

336 """ 

337 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

338 

339 Note: 

340 None objects are represented as Kafka Null. 

341 

342 Args: 

343 codec (str, optional): encoding scheme. Defaults to utf_8 

344 

345 See Also: 

346 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

347 

348 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_ 

349 """ # noqa: E501 

350 

351 def __init__(self, codec='utf_8'): 

352 self.codec = codec 

353 

354 def __call__(self, obj, ctx=None): 

355 """ 

356 Serializes a str(py2:unicode) to bytes. 

357 

358 Compatibility Note: 

359 Python 2 str objects must be converted to unicode objects. 

360 Python 3 all str objects are already unicode objects. 

361 

362 Args: 

363 obj (object): object to be serialized 

364 

365 ctx (SerializationContext): Metadata pertaining to the serialization 

366 operation 

367 

368 Raises: 

369 SerializerError if an error occurs during serialization. 

370 

371 Returns: 

372 serialized bytes if obj is not None, otherwise None 

373 """ 

374 

375 if obj is None: 

376 return None 

377 

378 try: 

379 return obj.encode(self.codec) 

380 except _struct.error as e: 

381 raise SerializationError(str(e)) 

382 

383 

384class StringDeserializer(Deserializer): 

385 """ 

386 Deserializes a str(py2:unicode) from bytes. 

387 

388 Args: 

389 codec (str, optional): encoding scheme. Defaults to utf_8 

390 

391 See Also: 

392 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

393 

394 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_ 

395 """ # noqa: E501 

396 

397 def __init__(self, codec='utf_8'): 

398 self.codec = codec 

399 

400 def __call__(self, value, ctx=None): 

401 """ 

402 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

403 

404 Compatibility Note: 

405 Python 2 str objects must be converted to unicode objects by the 

406 application prior to using this serializer. 

407 

408 Python 3 all str objects are already unicode objects. 

409 

410 Args: 

411 value (bytes): bytes to be deserialized 

412 

413 ctx (SerializationContext): Metadata pertaining to the serialization 

414 operation 

415 

416 Raises: 

417 SerializerError if an error occurs during deserialization. 

418 

419 Returns: 

420 unicode if data is not None, otherwise None 

421 """ 

422 

423 if value is None: 

424 return None 

425 

426 try: 

427 return value.decode(self.codec) 

428 except _struct.error as e: 

429 raise SerializationError(str(e))