1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import struct as _struct
19from enum import Enum
20from typing import Any, List, Optional
21
22from confluent_kafka._types import HeadersType
23from confluent_kafka.error import KafkaException
24
25__all__ = [
26 'Deserializer',
27 'IntegerDeserializer',
28 'IntegerSerializer',
29 'DoubleDeserializer',
30 'DoubleSerializer',
31 'StringDeserializer',
32 'StringSerializer',
33 'MessageField',
34 'SerializationContext',
35 'SerializationError',
36 'Serializer',
37]
38
39
40class MessageField(str, Enum):
41 """
42 Enum like object for identifying Message fields.
43
44 Attributes:
45 KEY (str): Message key
46
47 VALUE (str): Message value
48 """
49
50 NONE = 'none'
51 KEY = 'key'
52 VALUE = 'value'
53
54 def __str__(self) -> str:
55 return str(self.value)
56
57
58class SerializationContext(object):
59 """
60 SerializationContext provides additional context to the
61 serializer/deserializer about the data it's serializing/deserializing.
62
63 Args:
64 topic (str): Topic data is being produce to or consumed from.
65
66 field (MessageField): Describes what part of the message is
67 being serialized.
68
69 headers (list): List of message header tuples. Defaults to None.
70 """
71
72 def __init__(self, topic: str, field: MessageField, headers: Optional[HeadersType] = None) -> None:
73 self.topic = topic
74 self.field = field
75 self.headers = headers
76
77
78class SerializationError(KafkaException):
79 """Generic error from serializer package"""
80
81 pass
82
83
84class Serializer(object):
85 """
86 Extensible class from which all Serializer implementations derive.
87 Serializers instruct Kafka clients on how to convert Python objects
88 to bytes.
89
90 See built-in implementations, listed below, for an example of how to
91 extend this class.
92
93 Note:
94 This class is not directly instantiable. The derived classes must be
95 used instead.
96
97 The following implementations are provided by this module.
98
99 Note:
100 Unless noted elsewhere all numeric types are signed and serialization
101 is big-endian.
102
103 .. list-table::
104 :header-rows: 1
105
106 * - Name
107 - Type
108 - Binary Format
109 * - DoubleSerializer
110 - float
111 - IEEE 764 binary64
112 * - IntegerSerializer
113 - int
114 - int32
115 * - StringSerializer
116 - unicode
117 - unicode(encoding)
118 """
119
120 __slots__: List[str] = []
121
122 def __call__(self, obj: Any, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
123 """
124 Converts obj to bytes.
125
126 Args:
127 obj (object): object to be serialized
128
129 ctx (SerializationContext): Metadata pertaining to the serialization
130 operation
131
132 Raises:
133 SerializerError if an error occurs during serialization
134
135 Returns:
136 bytes if obj is not None, otherwise None
137 """
138
139 raise NotImplementedError
140
141
142class Deserializer(object):
143 """
144 Extensible class from which all Deserializer implementations derive.
145 Deserializers instruct Kafka clients on how to convert bytes to objects.
146
147 See built-in implementations, listed below, for an example of how to
148 extend this class.
149
150 Note:
151 This class is not directly instantiable. The derived classes must be
152 used instead.
153
154 The following implementations are provided by this module.
155
156 Note:
157 Unless noted elsewhere all numeric types are signed and
158 serialization is big-endian.
159
160 .. list-table::
161 :header-rows: 1
162
163 * - Name
164 - Type
165 - Binary Format
166 * - DoubleDeserializer
167 - float
168 - IEEE 764 binary64
169 * - IntegerDeserializer
170 - int
171 - int32
172 * - StringDeserializer
173 - unicode
174 - unicode(encoding)
175 """
176
177 __slots__: List[str] = []
178
179 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Any:
180 """
181 Convert bytes to object
182
183 Args:
184 value (bytes): bytes to be deserialized
185
186 ctx (SerializationContext): Metadata pertaining to the serialization
187 operation
188
189 Raises:
190 SerializerError if an error occurs during deserialization
191
192 Returns:
193 object if data is not None, otherwise None
194 """
195
196 raise NotImplementedError
197
198
199class DoubleSerializer(Serializer):
200 """
201 Serializes float to IEEE 764 binary64.
202
203 See Also:
204 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_
205
206 """ # noqa: E501
207
208 def __call__(self, obj: Optional[float], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
209 """
210 Args:
211 obj (object): object to be serialized
212
213 ctx (SerializationContext): Metadata pertaining to the serialization
214 operation
215
216 Note:
217 None objects are represented as Kafka Null.
218
219 Raises:
220 SerializerError if an error occurs during serialization.
221
222 Returns:
223 IEEE 764 binary64 bytes if obj is not None, otherwise None
224 """
225
226 if obj is None:
227 return None
228
229 try:
230 return _struct.pack('>d', obj)
231 except _struct.error as e:
232 raise SerializationError(str(e))
233
234
235class DoubleDeserializer(Deserializer):
236 """
237 Deserializes float to IEEE 764 binary64.
238
239 See Also:
240 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_
241 """ # noqa: E501
242
243 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[float]:
244 """
245 Deserializes float from IEEE 764 binary64 bytes.
246
247 Args:
248 value (bytes): bytes to be deserialized
249
250 ctx (SerializationContext): Metadata pertaining to the serialization
251 operation
252
253 Raises:
254 SerializerError if an error occurs during deserialization.
255
256 Returns:
257 float if data is not None, otherwise None
258 """
259
260 if value is None:
261 return None
262
263 try:
264 return _struct.unpack('>d', value)[0]
265 except _struct.error as e:
266 raise SerializationError(str(e))
267
268
269class IntegerSerializer(Serializer):
270 """
271 Serializes int to int32 bytes.
272
273 See Also:
274 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_
275 """ # noqa: E501
276
277 def __call__(self, obj: Optional[int], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
278 """
279 Serializes int as int32 bytes.
280
281 Args:
282 obj (object): object to be serialized
283
284 ctx (SerializationContext): Metadata pertaining to the serialization
285 operation
286
287 Note:
288 None objects are represented as Kafka Null.
289
290 Raises:
291 SerializerError if an error occurs during serialization
292
293 Returns:
294 int32 bytes if obj is not None, else None
295 """
296
297 if obj is None:
298 return None
299
300 try:
301 return _struct.pack('>i', obj)
302 except _struct.error as e:
303 raise SerializationError(str(e))
304
305
306class IntegerDeserializer(Deserializer):
307 """
308 Deserializes int to int32 bytes.
309
310 See Also:
311 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_
312 """ # noqa: E501
313
314 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[int]:
315 """
316 Deserializes int from int32 bytes.
317
318 Args:
319 value (bytes): bytes to be deserialized
320
321 ctx (SerializationContext): Metadata pertaining to the serialization
322 operation
323
324 Raises:
325 SerializerError if an error occurs during deserialization.
326
327 Returns:
328 int if data is not None, otherwise None
329 """
330
331 if value is None:
332 return None
333
334 try:
335 return _struct.unpack('>i', value)[0]
336 except _struct.error as e:
337 raise SerializationError(str(e))
338
339
340class StringSerializer(Serializer):
341 """
342 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
343
344 Note:
345 None objects are represented as Kafka Null.
346
347 Args:
348 codec (str, optional): encoding scheme. Defaults to utf_8
349
350 See Also:
351 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
352
353 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_
354 """ # noqa: E501
355
356 def __init__(self, codec: str = 'utf_8') -> None:
357 self.codec = codec
358
359 def __call__(self, obj: Optional[str], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
360 """
361 Serializes a str(py2:unicode) to bytes.
362
363 Compatibility Note:
364 Python 2 str objects must be converted to unicode objects.
365 Python 3 all str objects are already unicode objects.
366
367 Args:
368 obj (object): object to be serialized
369
370 ctx (SerializationContext): Metadata pertaining to the serialization
371 operation
372
373 Raises:
374 SerializerError if an error occurs during serialization.
375
376 Returns:
377 serialized bytes if obj is not None, otherwise None
378 """
379
380 if obj is None:
381 return None
382
383 try:
384 return obj.encode(self.codec)
385 except _struct.error as e:
386 raise SerializationError(str(e))
387
388
389class StringDeserializer(Deserializer):
390 """
391 Deserializes a str(py2:unicode) from bytes.
392
393 Args:
394 codec (str, optional): encoding scheme. Defaults to utf_8
395
396 See Also:
397 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
398
399 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_
400 """ # noqa: E501
401
402 def __init__(self, codec: str = 'utf_8') -> None:
403 self.codec = codec
404
405 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[str]:
406 """
407 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
408
409 Compatibility Note:
410 Python 2 str objects must be converted to unicode objects by the
411 application prior to using this serializer.
412
413 Python 3 all str objects are already unicode objects.
414
415 Args:
416 value (bytes): bytes to be deserialized
417
418 ctx (SerializationContext): Metadata pertaining to the serialization
419 operation
420
421 Raises:
422 SerializerError if an error occurs during deserialization.
423
424 Returns:
425 unicode if data is not None, otherwise None
426 """
427
428 if value is None:
429 return None
430
431 try:
432 return value.decode(self.codec)
433 except _struct.error as e:
434 raise SerializationError(str(e))