1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from typing import Any, Dict, List, Optional
20
21from confluent_kafka.cimpl import Consumer as _ConsumerImpl
22from confluent_kafka.cimpl import Message
23
24from .error import ConsumeError, KeyDeserializationError, ValueDeserializationError
25from .serialization import MessageField, SerializationContext
26
27
28class DeserializingConsumer(_ConsumerImpl):
29 """
30 A high level Kafka consumer with deserialization capabilities.
31
32 `This class is experimental and likely to be removed, or subject to incompatible API
33 changes in future versions of the library. To avoid breaking changes on upgrading, we
34 recommend using deserializers directly.`
35
36 Derived from the :py:class:`Consumer` class, overriding the :py:func:`Consumer.poll`
37 method to add deserialization capabilities.
38
39 Additional configuration properties:
40
41 +-------------------------+---------------------+-----------------------------------------------------+
42 | Property Name | Type | Description |
43 +=========================+=====================+=====================================================+
44 | | | Callable(bytes, SerializationContext) -> obj |
45 | ``key.deserializer`` | callable | |
46 | | | Deserializer used for message keys. |
47 +-------------------------+---------------------+-----------------------------------------------------+
48 | | | Callable(bytes, SerializationContext) -> obj |
49 | ``value.deserializer`` | callable | |
50 | | | Deserializer used for message values. |
51 +-------------------------+---------------------+-----------------------------------------------------+
52
53 Deserializers for string, integer and double (:py:class:`StringDeserializer`, :py:class:`IntegerDeserializer`
54 and :py:class:`DoubleDeserializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
55 namespace.
56
57 Deserializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufDeserializer`, :py:class:`JSONDeserializer`
58 and :py:class:`AvroDeserializer`) with Confluent Schema Registry integration are supplied out-of-the-box
59 in the ``confluent_kafka.schema_registry`` namespace.
60
61 See Also:
62 - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
63 - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
64 - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb
65 - The :py:class:`Consumer` class for inherited methods.
66
67 Args:
68 conf (dict): DeserializingConsumer configuration.
69
70 Raises:
71 ValueError: if configuration validation fails
72 """ # noqa: E501
73
74 def __init__(self, conf: Dict[str, Any]) -> None:
75 conf_copy = conf.copy()
76 self._key_deserializer = conf_copy.pop('key.deserializer', None)
77 self._value_deserializer = conf_copy.pop('value.deserializer', None)
78
79 super(DeserializingConsumer, self).__init__(conf_copy)
80
81 def poll(self, timeout: float = -1) -> Optional[Message]:
82 """
83 Consume messages and calls callbacks.
84
85 Args:
86 timeout (float): Maximum time to block waiting for message(Seconds).
87
88 Returns:
89 :py:class:`Message` or None on timeout
90
91 Raises:
92 KeyDeserializationError: If an error occurs during key deserialization.
93
94 ValueDeserializationError: If an error occurs during value deserialization.
95
96 ConsumeError: If an error was encountered while polling.
97 """
98
99 msg = super(DeserializingConsumer, self).poll(timeout)
100
101 if msg is None:
102 return None
103
104 error = msg.error()
105 if error is not None:
106 raise ConsumeError(error, kafka_message=msg)
107
108 topic = msg.topic()
109 if topic is None:
110 raise TypeError("Message topic is None")
111 ctx = SerializationContext(topic, MessageField.VALUE, msg.headers())
112
113 value = msg.value()
114 if self._value_deserializer is not None:
115 try:
116 value = self._value_deserializer(value, ctx)
117 except Exception as se:
118 raise ValueDeserializationError(exception=se, kafka_message=msg)
119
120 key = msg.key()
121 ctx.field = MessageField.KEY
122 if self._key_deserializer is not None:
123 try:
124 key = self._key_deserializer(key, ctx)
125 except Exception as se:
126 raise KeyDeserializationError(exception=se, kafka_message=msg)
127
128 msg.set_key(key)
129 msg.set_value(value)
130 return msg
131
132 def consume(self, num_messages: int = 1, timeout: float = -1) -> List[Message]:
133 """
134 :py:func:`Consumer.consume` not implemented, use
135 :py:func:`DeserializingConsumer.poll` instead
136 """
137
138 raise NotImplementedError