1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import struct as _struct
19from enum import Enum
20
21from confluent_kafka.error import KafkaException
22
23__all__ = ['Deserializer',
24 'IntegerDeserializer',
25 'IntegerSerializer',
26 'DoubleDeserializer',
27 'DoubleSerializer',
28 'StringDeserializer',
29 'StringSerializer',
30 'MessageField',
31 'SerializationContext',
32 'SerializationError',
33 'Serializer']
34
35
36class MessageField(str, Enum):
37 """
38 Enum like object for identifying Message fields.
39
40 Attributes:
41 KEY (str): Message key
42
43 VALUE (str): Message value
44 """
45
46 NONE = 'none'
47 KEY = 'key'
48 VALUE = 'value'
49
50 def __str__(self) -> str:
51 return str(self.value)
52
53
54class SerializationContext(object):
55 """
56 SerializationContext provides additional context to the
57 serializer/deserializer about the data it's serializing/deserializing.
58
59 Args:
60 topic (str): Topic data is being produce to or consumed from.
61
62 field (MessageField): Describes what part of the message is
63 being serialized.
64
65 headers (list): List of message header tuples. Defaults to None.
66 """
67
68 def __init__(self, topic, field, headers=None):
69 self.topic = topic
70 self.field = field
71 self.headers = headers
72
73
74class SerializationError(KafkaException):
75 """Generic error from serializer package"""
76 pass
77
78
79class Serializer(object):
80 """
81 Extensible class from which all Serializer implementations derive.
82 Serializers instruct Kafka clients on how to convert Python objects
83 to bytes.
84
85 See built-in implementations, listed below, for an example of how to
86 extend this class.
87
88 Note:
89 This class is not directly instantiable. The derived classes must be
90 used instead.
91
92 The following implementations are provided by this module.
93
94 Note:
95 Unless noted elsewhere all numeric types are signed and serialization
96 is big-endian.
97
98 .. list-table::
99 :header-rows: 1
100
101 * - Name
102 - Type
103 - Binary Format
104 * - DoubleSerializer
105 - float
106 - IEEE 764 binary64
107 * - IntegerSerializer
108 - int
109 - int32
110 * - StringSerializer
111 - unicode
112 - unicode(encoding)
113 """
114
115 __slots__ = []
116
117 def __call__(self, obj, ctx=None):
118 """
119 Converts obj to bytes.
120
121 Args:
122 obj (object): object to be serialized
123
124 ctx (SerializationContext): Metadata pertaining to the serialization
125 operation
126
127 Raises:
128 SerializerError if an error occurs during serialization
129
130 Returns:
131 bytes if obj is not None, otherwise None
132 """
133
134 raise NotImplementedError
135
136
137class Deserializer(object):
138 """
139 Extensible class from which all Deserializer implementations derive.
140 Deserializers instruct Kafka clients on how to convert bytes to objects.
141
142 See built-in implementations, listed below, for an example of how to
143 extend this class.
144
145 Note:
146 This class is not directly instantiable. The derived classes must be
147 used instead.
148
149 The following implementations are provided by this module.
150
151 Note:
152 Unless noted elsewhere all numeric types are signed and
153 serialization is big-endian.
154
155 .. list-table::
156 :header-rows: 1
157
158 * - Name
159 - Type
160 - Binary Format
161 * - DoubleDeserializer
162 - float
163 - IEEE 764 binary64
164 * - IntegerDeserializer
165 - int
166 - int32
167 * - StringDeserializer
168 - unicode
169 - unicode(encoding)
170 """
171
172 __slots__ = []
173
174 def __call__(self, value, ctx=None):
175 """
176 Convert bytes to object
177
178 Args:
179 value (bytes): bytes to be deserialized
180
181 ctx (SerializationContext): Metadata pertaining to the serialization
182 operation
183
184 Raises:
185 SerializerError if an error occurs during deserialization
186
187 Returns:
188 object if data is not None, otherwise None
189 """
190
191 raise NotImplementedError
192
193
194class DoubleSerializer(Serializer):
195 """
196 Serializes float to IEEE 764 binary64.
197
198 See Also:
199 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_
200
201 """ # noqa: E501
202
203 def __call__(self, obj, ctx=None):
204 """
205 Args:
206 obj (object): object to be serialized
207
208 ctx (SerializationContext): Metadata pertaining to the serialization
209 operation
210
211 Note:
212 None objects are represented as Kafka Null.
213
214 Raises:
215 SerializerError if an error occurs during serialization.
216
217 Returns:
218 IEEE 764 binary64 bytes if obj is not None, otherwise None
219 """
220
221 if obj is None:
222 return None
223
224 try:
225 return _struct.pack('>d', obj)
226 except _struct.error as e:
227 raise SerializationError(str(e))
228
229
230class DoubleDeserializer(Deserializer):
231 """
232 Deserializes float to IEEE 764 binary64.
233
234 See Also:
235 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_
236 """ # noqa: E501
237
238 def __call__(self, value, ctx=None):
239 """
240 Deserializes float from IEEE 764 binary64 bytes.
241
242 Args:
243 value (bytes): bytes to be deserialized
244
245 ctx (SerializationContext): Metadata pertaining to the serialization
246 operation
247
248 Raises:
249 SerializerError if an error occurs during deserialization.
250
251 Returns:
252 float if data is not None, otherwise None
253 """
254
255 if value is None:
256 return None
257
258 try:
259 return _struct.unpack('>d', value)[0]
260 except _struct.error as e:
261 raise SerializationError(str(e))
262
263
264class IntegerSerializer(Serializer):
265 """
266 Serializes int to int32 bytes.
267
268 See Also:
269 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_
270 """ # noqa: E501
271
272 def __call__(self, obj, ctx=None):
273 """
274 Serializes int as int32 bytes.
275
276 Args:
277 obj (object): object to be serialized
278
279 ctx (SerializationContext): Metadata pertaining to the serialization
280 operation
281
282 Note:
283 None objects are represented as Kafka Null.
284
285 Raises:
286 SerializerError if an error occurs during serialization
287
288 Returns:
289 int32 bytes if obj is not None, else None
290 """
291
292 if obj is None:
293 return None
294
295 try:
296 return _struct.pack('>i', obj)
297 except _struct.error as e:
298 raise SerializationError(str(e))
299
300
301class IntegerDeserializer(Deserializer):
302 """
303 Deserializes int to int32 bytes.
304
305 See Also:
306 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_
307 """ # noqa: E501
308
309 def __call__(self, value, ctx=None):
310 """
311 Deserializes int from int32 bytes.
312
313 Args:
314 value (bytes): bytes to be deserialized
315
316 ctx (SerializationContext): Metadata pertaining to the serialization
317 operation
318
319 Raises:
320 SerializerError if an error occurs during deserialization.
321
322 Returns:
323 int if data is not None, otherwise None
324 """
325
326 if value is None:
327 return None
328
329 try:
330 return _struct.unpack('>i', value)[0]
331 except _struct.error as e:
332 raise SerializationError(str(e))
333
334
335class StringSerializer(Serializer):
336 """
337 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
338
339 Note:
340 None objects are represented as Kafka Null.
341
342 Args:
343 codec (str, optional): encoding scheme. Defaults to utf_8
344
345 See Also:
346 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
347
348 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_
349 """ # noqa: E501
350
351 def __init__(self, codec='utf_8'):
352 self.codec = codec
353
354 def __call__(self, obj, ctx=None):
355 """
356 Serializes a str(py2:unicode) to bytes.
357
358 Compatibility Note:
359 Python 2 str objects must be converted to unicode objects.
360 Python 3 all str objects are already unicode objects.
361
362 Args:
363 obj (object): object to be serialized
364
365 ctx (SerializationContext): Metadata pertaining to the serialization
366 operation
367
368 Raises:
369 SerializerError if an error occurs during serialization.
370
371 Returns:
372 serialized bytes if obj is not None, otherwise None
373 """
374
375 if obj is None:
376 return None
377
378 try:
379 return obj.encode(self.codec)
380 except _struct.error as e:
381 raise SerializationError(str(e))
382
383
384class StringDeserializer(Deserializer):
385 """
386 Deserializes a str(py2:unicode) from bytes.
387
388 Args:
389 codec (str, optional): encoding scheme. Defaults to utf_8
390
391 See Also:
392 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
393
394 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_
395 """ # noqa: E501
396
397 def __init__(self, codec='utf_8'):
398 self.codec = codec
399
400 def __call__(self, value, ctx=None):
401 """
402 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
403
404 Compatibility Note:
405 Python 2 str objects must be converted to unicode objects by the
406 application prior to using this serializer.
407
408 Python 3 all str objects are already unicode objects.
409
410 Args:
411 value (bytes): bytes to be deserialized
412
413 ctx (SerializationContext): Metadata pertaining to the serialization
414 operation
415
416 Raises:
417 SerializerError if an error occurs during deserialization.
418
419 Returns:
420 unicode if data is not None, otherwise None
421 """
422
423 if value is None:
424 return None
425
426 try:
427 return value.decode(self.codec)
428 except _struct.error as e:
429 raise SerializationError(str(e))