Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/confluent_kafka/serialization/__init__.py: 42%

74 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18import struct as _struct 

19from confluent_kafka.error import KafkaException 

20 

21__all__ = ['Deserializer', 

22 'IntegerDeserializer', 

23 'IntegerSerializer', 

24 'DoubleDeserializer', 

25 'DoubleSerializer', 

26 'StringDeserializer', 

27 'StringSerializer', 

28 'MessageField', 

29 'SerializationContext', 

30 'SerializationError', 

31 'Serializer'] 

32 

33 

34class MessageField(object): 

35 """ 

36 Enum like object for identifying Message fields. 

37 

38 Attributes: 

39 KEY (str): Message key 

40 

41 VALUE (str): Message value 

42 """ 

43 

44 NONE = 'none' 

45 KEY = 'key' 

46 VALUE = 'value' 

47 

48 

49class SerializationContext(object): 

50 """ 

51 SerializationContext provides additional context to the 

52 serializer/deserializer about the data it's serializing/deserializing. 

53 

54 Args: 

55 topic (str): Topic data is being produce to or consumed from. 

56 

57 field (MessageField): Describes what part of the message is 

58 being serialized. 

59 

60 headers (list): List of message header tuples. Defaults to None. 

61 """ 

62 

63 def __init__(self, topic, field, headers=None): 

64 self.topic = topic 

65 self.field = field 

66 self.headers = headers 

67 

68 

69class SerializationError(KafkaException): 

70 """Generic error from serializer package""" 

71 pass 

72 

73 

74class Serializer(object): 

75 """ 

76 Extensible class from which all Serializer implementations derive. 

77 Serializers instruct Kafka clients on how to convert Python objects 

78 to bytes. 

79 

80 See built-in implementations, listed below, for an example of how to 

81 extend this class. 

82 

83 Note: 

84 This class is not directly instantiable. The derived classes must be 

85 used instead. 

86 

87 The following implementations are provided by this module. 

88 

89 Note: 

90 Unless noted elsewhere all numeric types are signed and serialization 

91 is big-endian. 

92 

93 .. list-table:: 

94 :header-rows: 1 

95 

96 * - Name 

97 - Type 

98 - Binary Format 

99 * - DoubleSerializer 

100 - float 

101 - IEEE 764 binary64 

102 * - IntegerSerializer 

103 - int 

104 - int32 

105 * - StringSerializer 

106 - unicode 

107 - unicode(encoding) 

108 """ 

109 

110 __slots__ = [] 

111 

112 def __call__(self, obj, ctx=None): 

113 """ 

114 Converts obj to bytes. 

115 

116 Args: 

117 obj (object): object to be serialized 

118 

119 ctx (SerializationContext): Metadata pertaining to the serialization 

120 operation 

121 

122 Raises: 

123 SerializerError if an error occurs during serialization 

124 

125 Returns: 

126 bytes if obj is not None, otherwise None 

127 """ 

128 

129 raise NotImplementedError 

130 

131 

132class Deserializer(object): 

133 """ 

134 Extensible class from which all Deserializer implementations derive. 

135 Deserializers instruct Kafka clients on how to convert bytes to objects. 

136 

137 See built-in implementations, listed below, for an example of how to 

138 extend this class. 

139 

140 Note: 

141 This class is not directly instantiable. The derived classes must be 

142 used instead. 

143 

144 The following implementations are provided by this module. 

145 

146 Note: 

147 Unless noted elsewhere all numeric types are signed and 

148 serialization is big-endian. 

149 

150 .. list-table:: 

151 :header-rows: 1 

152 

153 * - Name 

154 - Type 

155 - Binary Format 

156 * - DoubleDeserializer 

157 - float 

158 - IEEE 764 binary64 

159 * - IntegerDeserializer 

160 - int 

161 - int32 

162 * - StringDeserializer 

163 - unicode 

164 - unicode(encoding) 

165 """ 

166 

167 __slots__ = [] 

168 

169 def __call__(self, value, ctx=None): 

170 """ 

171 Convert bytes to object 

172 

173 Args: 

174 value (bytes): bytes to be deserialized 

175 

176 ctx (SerializationContext): Metadata pertaining to the serialization 

177 operation 

178 

179 Raises: 

180 SerializerError if an error occurs during deserialization 

181 

182 Returns: 

183 object if data is not None, otherwise None 

184 """ 

185 

186 raise NotImplementedError 

187 

188 

189class DoubleSerializer(Serializer): 

190 """ 

191 Serializes float to IEEE 764 binary64. 

192 

193 See Also: 

194 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_ 

195 

196 """ # noqa: E501 

197 

198 def __call__(self, obj, ctx=None): 

199 """ 

200 Args: 

201 obj (object): object to be serialized 

202 

203 ctx (SerializationContext): Metadata pertaining to the serialization 

204 operation 

205 

206 Note: 

207 None objects are represented as Kafka Null. 

208 

209 Raises: 

210 SerializerError if an error occurs during serialization. 

211 

212 Returns: 

213 IEEE 764 binary64 bytes if obj is not None, otherwise None 

214 """ 

215 

216 if obj is None: 

217 return None 

218 

219 try: 

220 return _struct.pack('>d', obj) 

221 except _struct.error as e: 

222 raise SerializationError(str(e)) 

223 

224 

225class DoubleDeserializer(Deserializer): 

226 """ 

227 Deserializes float to IEEE 764 binary64. 

228 

229 See Also: 

230 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_ 

231 """ # noqa: E501 

232 

233 def __call__(self, value, ctx=None): 

234 """ 

235 Deserializes float from IEEE 764 binary64 bytes. 

236 

237 Args: 

238 value (bytes): bytes to be deserialized 

239 

240 ctx (SerializationContext): Metadata pertaining to the serialization 

241 operation 

242 

243 Raises: 

244 SerializerError if an error occurs during deserialization. 

245 

246 Returns: 

247 float if data is not None, otherwise None 

248 """ 

249 

250 if value is None: 

251 return None 

252 

253 try: 

254 return _struct.unpack('>d', value)[0] 

255 except _struct.error as e: 

256 raise SerializationError(str(e)) 

257 

258 

259class IntegerSerializer(Serializer): 

260 """ 

261 Serializes int to int32 bytes. 

262 

263 See Also: 

264 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_ 

265 """ # noqa: E501 

266 

267 def __call__(self, obj, ctx=None): 

268 """ 

269 Serializes int as int32 bytes. 

270 

271 Args: 

272 obj (object): object to be serialized 

273 

274 ctx (SerializationContext): Metadata pertaining to the serialization 

275 operation 

276 

277 Note: 

278 None objects are represented as Kafka Null. 

279 

280 Raises: 

281 SerializerError if an error occurs during serialization 

282 

283 Returns: 

284 int32 bytes if obj is not None, else None 

285 """ 

286 

287 if obj is None: 

288 return None 

289 

290 try: 

291 return _struct.pack('>i', obj) 

292 except _struct.error as e: 

293 raise SerializationError(str(e)) 

294 

295 

296class IntegerDeserializer(Deserializer): 

297 """ 

298 Deserializes int to int32 bytes. 

299 

300 See Also: 

301 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_ 

302 """ # noqa: E501 

303 

304 def __call__(self, value, ctx=None): 

305 """ 

306 Deserializes int from int32 bytes. 

307 

308 Args: 

309 value (bytes): bytes to be deserialized 

310 

311 ctx (SerializationContext): Metadata pertaining to the serialization 

312 operation 

313 

314 Raises: 

315 SerializerError if an error occurs during deserialization. 

316 

317 Returns: 

318 int if data is not None, otherwise None 

319 """ 

320 

321 if value is None: 

322 return None 

323 

324 try: 

325 return _struct.unpack('>i', value)[0] 

326 except _struct.error as e: 

327 raise SerializationError(str(e)) 

328 

329 

330class StringSerializer(Serializer): 

331 """ 

332 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

333 

334 Note: 

335 None objects are represented as Kafka Null. 

336 

337 Args: 

338 codec (str, optional): encoding scheme. Defaults to utf_8 

339 

340 See Also: 

341 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

342 

343 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_ 

344 """ # noqa: E501 

345 

346 def __init__(self, codec='utf_8'): 

347 self.codec = codec 

348 

349 def __call__(self, obj, ctx=None): 

350 """ 

351 Serializes a str(py2:unicode) to bytes. 

352 

353 Compatibility Note: 

354 Python 2 str objects must be converted to unicode objects. 

355 Python 3 all str objects are already unicode objects. 

356 

357 Args: 

358 obj (object): object to be serialized 

359 

360 ctx (SerializationContext): Metadata pertaining to the serialization 

361 operation 

362 

363 Raises: 

364 SerializerError if an error occurs during serialization. 

365 

366 Returns: 

367 serialized bytes if obj is not None, otherwise None 

368 """ 

369 

370 if obj is None: 

371 return None 

372 

373 try: 

374 return obj.encode(self.codec) 

375 except _struct.error as e: 

376 raise SerializationError(str(e)) 

377 

378 

379class StringDeserializer(Deserializer): 

380 """ 

381 Deserializes a str(py2:unicode) from bytes. 

382 

383 Args: 

384 codec (str, optional): encoding scheme. Defaults to utf_8 

385 

386 See Also: 

387 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

388 

389 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_ 

390 """ # noqa: E501 

391 

392 def __init__(self, codec='utf_8'): 

393 self.codec = codec 

394 

395 def __call__(self, value, ctx=None): 

396 """ 

397 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

398 

399 Compatibility Note: 

400 Python 2 str objects must be converted to unicode objects by the 

401 application prior to using this serializer. 

402 

403 Python 3 all str objects are already unicode objects. 

404 

405 Args: 

406 value (bytes): bytes to be deserialized 

407 

408 ctx (SerializationContext): Metadata pertaining to the serialization 

409 operation 

410 

411 Raises: 

412 SerializerError if an error occurs during deserialization. 

413 

414 Returns: 

415 unicode if data is not None, otherwise None 

416 """ 

417 

418 if value is None: 

419 return None 

420 

421 try: 

422 return value.decode(self.codec) 

423 except _struct.error as e: 

424 raise SerializationError(str(e))