Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/confluent_kafka/serialization/__init__.py: 42%
74 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import struct as _struct
19from confluent_kafka.error import KafkaException
21__all__ = ['Deserializer',
22 'IntegerDeserializer',
23 'IntegerSerializer',
24 'DoubleDeserializer',
25 'DoubleSerializer',
26 'StringDeserializer',
27 'StringSerializer',
28 'MessageField',
29 'SerializationContext',
30 'SerializationError',
31 'Serializer']
34class MessageField(object):
35 """
36 Enum like object for identifying Message fields.
38 Attributes:
39 KEY (str): Message key
41 VALUE (str): Message value
42 """
44 NONE = 'none'
45 KEY = 'key'
46 VALUE = 'value'
49class SerializationContext(object):
50 """
51 SerializationContext provides additional context to the
52 serializer/deserializer about the data it's serializing/deserializing.
54 Args:
55 topic (str): Topic data is being produce to or consumed from.
57 field (MessageField): Describes what part of the message is
58 being serialized.
60 headers (list): List of message header tuples. Defaults to None.
61 """
63 def __init__(self, topic, field, headers=None):
64 self.topic = topic
65 self.field = field
66 self.headers = headers
69class SerializationError(KafkaException):
70 """Generic error from serializer package"""
71 pass
74class Serializer(object):
75 """
76 Extensible class from which all Serializer implementations derive.
77 Serializers instruct Kafka clients on how to convert Python objects
78 to bytes.
80 See built-in implementations, listed below, for an example of how to
81 extend this class.
83 Note:
84 This class is not directly instantiable. The derived classes must be
85 used instead.
87 The following implementations are provided by this module.
89 Note:
90 Unless noted elsewhere all numeric types are signed and serialization
91 is big-endian.
93 .. list-table::
94 :header-rows: 1
96 * - Name
97 - Type
98 - Binary Format
99 * - DoubleSerializer
100 - float
101 - IEEE 764 binary64
102 * - IntegerSerializer
103 - int
104 - int32
105 * - StringSerializer
106 - unicode
107 - unicode(encoding)
108 """
110 __slots__ = []
112 def __call__(self, obj, ctx=None):
113 """
114 Converts obj to bytes.
116 Args:
117 obj (object): object to be serialized
119 ctx (SerializationContext): Metadata pertaining to the serialization
120 operation
122 Raises:
123 SerializerError if an error occurs during serialization
125 Returns:
126 bytes if obj is not None, otherwise None
127 """
129 raise NotImplementedError
132class Deserializer(object):
133 """
134 Extensible class from which all Deserializer implementations derive.
135 Deserializers instruct Kafka clients on how to convert bytes to objects.
137 See built-in implementations, listed below, for an example of how to
138 extend this class.
140 Note:
141 This class is not directly instantiable. The derived classes must be
142 used instead.
144 The following implementations are provided by this module.
146 Note:
147 Unless noted elsewhere all numeric types are signed and
148 serialization is big-endian.
150 .. list-table::
151 :header-rows: 1
153 * - Name
154 - Type
155 - Binary Format
156 * - DoubleDeserializer
157 - float
158 - IEEE 764 binary64
159 * - IntegerDeserializer
160 - int
161 - int32
162 * - StringDeserializer
163 - unicode
164 - unicode(encoding)
165 """
167 __slots__ = []
169 def __call__(self, value, ctx=None):
170 """
171 Convert bytes to object
173 Args:
174 value (bytes): bytes to be deserialized
176 ctx (SerializationContext): Metadata pertaining to the serialization
177 operation
179 Raises:
180 SerializerError if an error occurs during deserialization
182 Returns:
183 object if data is not None, otherwise None
184 """
186 raise NotImplementedError
189class DoubleSerializer(Serializer):
190 """
191 Serializes float to IEEE 764 binary64.
193 See Also:
194 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_
196 """ # noqa: E501
198 def __call__(self, obj, ctx=None):
199 """
200 Args:
201 obj (object): object to be serialized
203 ctx (SerializationContext): Metadata pertaining to the serialization
204 operation
206 Note:
207 None objects are represented as Kafka Null.
209 Raises:
210 SerializerError if an error occurs during serialization.
212 Returns:
213 IEEE 764 binary64 bytes if obj is not None, otherwise None
214 """
216 if obj is None:
217 return None
219 try:
220 return _struct.pack('>d', obj)
221 except _struct.error as e:
222 raise SerializationError(str(e))
225class DoubleDeserializer(Deserializer):
226 """
227 Deserializes float to IEEE 764 binary64.
229 See Also:
230 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_
231 """ # noqa: E501
233 def __call__(self, value, ctx=None):
234 """
235 Deserializes float from IEEE 764 binary64 bytes.
237 Args:
238 value (bytes): bytes to be deserialized
240 ctx (SerializationContext): Metadata pertaining to the serialization
241 operation
243 Raises:
244 SerializerError if an error occurs during deserialization.
246 Returns:
247 float if data is not None, otherwise None
248 """
250 if value is None:
251 return None
253 try:
254 return _struct.unpack('>d', value)[0]
255 except _struct.error as e:
256 raise SerializationError(str(e))
259class IntegerSerializer(Serializer):
260 """
261 Serializes int to int32 bytes.
263 See Also:
264 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_
265 """ # noqa: E501
267 def __call__(self, obj, ctx=None):
268 """
269 Serializes int as int32 bytes.
271 Args:
272 obj (object): object to be serialized
274 ctx (SerializationContext): Metadata pertaining to the serialization
275 operation
277 Note:
278 None objects are represented as Kafka Null.
280 Raises:
281 SerializerError if an error occurs during serialization
283 Returns:
284 int32 bytes if obj is not None, else None
285 """
287 if obj is None:
288 return None
290 try:
291 return _struct.pack('>i', obj)
292 except _struct.error as e:
293 raise SerializationError(str(e))
296class IntegerDeserializer(Deserializer):
297 """
298 Deserializes int to int32 bytes.
300 See Also:
301 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_
302 """ # noqa: E501
304 def __call__(self, value, ctx=None):
305 """
306 Deserializes int from int32 bytes.
308 Args:
309 value (bytes): bytes to be deserialized
311 ctx (SerializationContext): Metadata pertaining to the serialization
312 operation
314 Raises:
315 SerializerError if an error occurs during deserialization.
317 Returns:
318 int if data is not None, otherwise None
319 """
321 if value is None:
322 return None
324 try:
325 return _struct.unpack('>i', value)[0]
326 except _struct.error as e:
327 raise SerializationError(str(e))
330class StringSerializer(Serializer):
331 """
332 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
334 Note:
335 None objects are represented as Kafka Null.
337 Args:
338 codec (str, optional): encoding scheme. Defaults to utf_8
340 See Also:
341 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
343 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_
344 """ # noqa: E501
346 def __init__(self, codec='utf_8'):
347 self.codec = codec
349 def __call__(self, obj, ctx=None):
350 """
351 Serializes a str(py2:unicode) to bytes.
353 Compatibility Note:
354 Python 2 str objects must be converted to unicode objects.
355 Python 3 all str objects are already unicode objects.
357 Args:
358 obj (object): object to be serialized
360 ctx (SerializationContext): Metadata pertaining to the serialization
361 operation
363 Raises:
364 SerializerError if an error occurs during serialization.
366 Returns:
367 serialized bytes if obj is not None, otherwise None
368 """
370 if obj is None:
371 return None
373 try:
374 return obj.encode(self.codec)
375 except _struct.error as e:
376 raise SerializationError(str(e))
379class StringDeserializer(Deserializer):
380 """
381 Deserializes a str(py2:unicode) from bytes.
383 Args:
384 codec (str, optional): encoding scheme. Defaults to utf_8
386 See Also:
387 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_
389 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_
390 """ # noqa: E501
392 def __init__(self, codec='utf_8'):
393 self.codec = codec
395 def __call__(self, value, ctx=None):
396 """
397 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``.
399 Compatibility Note:
400 Python 2 str objects must be converted to unicode objects by the
401 application prior to using this serializer.
403 Python 3 all str objects are already unicode objects.
405 Args:
406 value (bytes): bytes to be deserialized
408 ctx (SerializationContext): Metadata pertaining to the serialization
409 operation
411 Raises:
412 SerializerError if an error occurs during deserialization.
414 Returns:
415 unicode if data is not None, otherwise None
416 """
418 if value is None:
419 return None
421 try:
422 return value.decode(self.codec)
423 except _struct.error as e:
424 raise SerializationError(str(e))