1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import struct as _struct
19from enum import Enum
20from typing import Any, List, Optional
21
22from confluent_kafka.error import KafkaException
23from confluent_kafka._types import HeadersType
24
25__all__ = ['Deserializer',
26 'IntegerDeserializer',
27 'IntegerSerializer',
28 'DoubleDeserializer',
29 'DoubleSerializer',
30 'StringDeserializer',
31 'StringSerializer',
32 'MessageField',
33 'SerializationContext',
34 'SerializationError',
35 'Serializer']
36
37
38class MessageField(str, Enum):
39 """
40 Enum like object for identifying Message fields.
41
42 Attributes:
43 KEY (str): Message key
44
45 VALUE (str): Message value
46 """
47
48 NONE = 'none'
49 KEY = 'key'
50 VALUE = 'value'
51
52 def __str__(self) -> str:
53 return str(self.value)
54
55
56class SerializationContext(object):
57 """
58 SerializationContext provides additional context to the
59 serializer/deserializer about the data it's serializing/deserializing.
60
61 Args:
62 topic (str): Topic data is being produce to or consumed from.
63
64 field (MessageField): Describes what part of the message is
65 being serialized.
66
67 headers (list): List of message header tuples. Defaults to None.
68 """
69
70 def __init__(self, topic: str, field: MessageField, headers: Optional[HeadersType] = None) -> None:
71 self.topic = topic
72 self.field = field
73 self.headers = headers
74
75
76class SerializationError(KafkaException):
77 """Generic error from serializer package"""
78 pass
79
80
81class Serializer(object):
82 """
83 Extensible class from which all Serializer implementations derive.
84 Serializers instruct Kafka clients on how to convert Python objects
85 to bytes.
86
87 See built-in implementations, listed below, for an example of how to
88 extend this class.
89
90 Note:
91 This class is not directly instantiable. The derived classes must be
92 used instead.
93
94 The following implementations are provided by this module.
95
96 Note:
97 Unless noted elsewhere all numeric types are signed and serialization
98 is big-endian.
99
100 .. list-table::
101 :header-rows: 1
102
103 * - Name
104 - Type
105 - Binary Format
106 * - DoubleSerializer
107 - float
108 - IEEE 764 binary64
109 * - IntegerSerializer
110 - int
111 - int32
112 * - StringSerializer
113 - unicode
114 - unicode(encoding)
115 """
116
117 __slots__: List[str] = []
118
119 def __call__(self, obj: Any, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
120 """
121 Converts obj to bytes.
122
123 Args:
124 obj (object): object to be serialized
125
126 ctx (SerializationContext): Metadata pertaining to the serialization
127 operation
128
129 Raises:
130 SerializerError if an error occurs during serialization
131
132 Returns:
133 bytes if obj is not None, otherwise None
134 """
135
136 raise NotImplementedError
137
138
139class Deserializer(object):
140 """
141 Extensible class from which all Deserializer implementations derive.
142 Deserializers instruct Kafka clients on how to convert bytes to objects.
143
144 See built-in implementations, listed below, for an example of how to
145 extend this class.
146
147 Note:
148 This class is not directly instantiable. The derived classes must be
149 used instead.
150
151 The following implementations are provided by this module.
152
153 Note:
154 Unless noted elsewhere all numeric types are signed and
155 serialization is big-endian.
156
157 .. list-table::
158 :header-rows: 1
159
160 * - Name
161 - Type
162 - Binary Format
163 * - DoubleDeserializer
164 - float
165 - IEEE 764 binary64
166 * - IntegerDeserializer
167 - int
168 - int32
169 * - StringDeserializer
170 - unicode
171 - unicode(encoding)
172 """
173
174 __slots__: List[str] = []
175
176 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Any:
177 """
178 Convert bytes to object
179
180 Args:
181 value (bytes): bytes to be deserialized
182
183 ctx (SerializationContext): Metadata pertaining to the serialization
184 operation
185
186 Raises:
187 SerializerError if an error occurs during deserialization
188
189 Returns:
190 object if data is not None, otherwise None
191 """
192
193 raise NotImplementedError
194
195
196class DoubleSerializer(Serializer):
197 """
198 Serializes float to IEEE 764 binary64.
199
200 See Also:
201 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_
202
203 """ # noqa: E501
204
205 def __call__(self, obj: Optional[float], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
206 """
207 Args:
208 obj (object): object to be serialized
209
210 ctx (SerializationContext): Metadata pertaining to the serialization
211 operation
212
213 Note:
214 None objects are represented as Kafka Null.
215
216 Raises:
217 SerializerError if an error occurs during serialization.
218
219 Returns:
220 IEEE 764 binary64 bytes if obj is not None, otherwise None
221 """
222
223 if obj is None:
224 return None
225
226 try:
227 return _struct.pack('>d', obj)
228 except _struct.error as e:
229 raise SerializationError(str(e))
230
231
232class DoubleDeserializer(Deserializer):
233 """
234 Deserializes float to IEEE 764 binary64.
235
236 See Also:
237 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_
238 """ # noqa: E501
239
240 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[float]:
241 """
242 Deserializes float from IEEE 764 binary64 bytes.
243
244 Args:
245 value (bytes): bytes to be deserialized
246
247 ctx (SerializationContext): Metadata pertaining to the serialization
248 operation
249
250 Raises:
251 SerializerError if an error occurs during deserialization.
252
253 Returns:
254 float if data is not None, otherwise None
255 """
256
257 if value is None:
258 return None
259
260 try:
261 return _struct.unpack('>d', value)[0]
262 except _struct.error as e:
263 raise SerializationError(str(e))
264
265
266class IntegerSerializer(Serializer):
267 """
268 Serializes int to int32 bytes.
269
270 See Also:
271 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_
272 """ # noqa: E501
273
274 def __call__(self, obj: Optional[int], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
275 """
276 Serializes int as int32 bytes.
277
278 Args:
279 obj (object): object to be serialized
280
281 ctx (SerializationContext): Metadata pertaining to the serialization
282 operation
283
284 Note:
285 None objects are represented as Kafka Null.
286
287 Raises:
288 SerializerError if an error occurs during serialization
289
290 Returns:
291 int32 bytes if obj is not None, else None
292 """
293
294 if obj is None:
295 return None
296
297 try:
298 return _struct.pack('>i', obj)
299 except _struct.error as e:
300 raise SerializationError(str(e))
301
302
303class IntegerDeserializer(Deserializer):
304 """
305 Deserializes int to int32 bytes.
306
307 See Also:
308 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_
309 """ # noqa: E501
310
311 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[int]:
312 """
313 Deserializes int from int32 bytes.
314
315 Args:
316 value (bytes): bytes to be deserialized
317
318 ctx (SerializationContext): Metadata pertaining to the serialization
319 operation
320
321 Raises:
322 SerializerError if an error occurs during deserialization.
323
324 Returns:
325 int if data is not None, otherwise None
326 """
327
328 if value is None:
329 return None
330
331 try:
332 return _struct.unpack('>i', value)[0]
333 except _struct.error as e:
334 raise SerializationError(str(e))
335
336
337class StringSerializer(Serializer):
338 """
339 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
340
341 Note:
342 None objects are represented as Kafka Null.
343
344 Args:
345 codec (str, optional): encoding scheme. Defaults to utf_8
346
347 See Also:
348 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
349
350 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_
351 """ # noqa: E501
352
353 def __init__(self, codec: str = 'utf_8') -> None:
354 self.codec = codec
355
356 def __call__(self, obj: Optional[str], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
357 """
358 Serializes a str(py2:unicode) to bytes.
359
360 Compatibility Note:
361 Python 2 str objects must be converted to unicode objects.
362 Python 3 all str objects are already unicode objects.
363
364 Args:
365 obj (object): object to be serialized
366
367 ctx (SerializationContext): Metadata pertaining to the serialization
368 operation
369
370 Raises:
371 SerializerError if an error occurs during serialization.
372
373 Returns:
374 serialized bytes if obj is not None, otherwise None
375 """
376
377 if obj is None:
378 return None
379
380 try:
381 return obj.encode(self.codec)
382 except _struct.error as e:
383 raise SerializationError(str(e))
384
385
386class StringDeserializer(Deserializer):
387 """
388 Deserializes a str(py2:unicode) from bytes.
389
390 Args:
391 codec (str, optional): encoding scheme. Defaults to utf_8
392
393 See Also:
394 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
395
396 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_
397 """ # noqa: E501
398
399 def __init__(self, codec: str = 'utf_8') -> None:
400 self.codec = codec
401
402 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[str]:
403 """
404 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
405
406 Compatibility Note:
407 Python 2 str objects must be converted to unicode objects by the
408 application prior to using this serializer.
409
410 Python 3 all str objects are already unicode objects.
411
412 Args:
413 value (bytes): bytes to be deserialized
414
415 ctx (SerializationContext): Metadata pertaining to the serialization
416 operation
417
418 Raises:
419 SerializerError if an error occurs during deserialization.
420
421 Returns:
422 unicode if data is not None, otherwise None
423 """
424
425 if value is None:
426 return None
427
428 try:
429 return value.decode(self.codec)
430 except _struct.error as e:
431 raise SerializationError(str(e))