1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from typing import Any, Dict, Optional
20
21from confluent_kafka.cimpl import Producer as _ProducerImpl
22
23from ._types import DeliveryCallback, HeadersType
24from .error import KeySerializationError, ValueSerializationError
25from .serialization import MessageField, SerializationContext
26
27
28class SerializingProducer(_ProducerImpl):
29 """
30 A high level Kafka producer with serialization capabilities.
31
32 `This class is experimental and likely to be removed, or subject to incompatible API
33 changes in future versions of the library. To avoid breaking changes on upgrading, we
34 recommend using serializers directly.`
35
36 Derived from the :py:class:`Producer` class, overriding the :py:func:`Producer.produce`
37 method to add serialization capabilities.
38
39 Additional configuration properties:
40
41 +-------------------------+---------------------+-----------------------------------------------------+
42 | Property Name | Type | Description |
43 +=========================+=====================+=====================================================+
44 | | | Callable(obj, SerializationContext) -> bytes |
45 | ``key.serializer`` | callable | |
46 | | | Serializer used for message keys. |
47 +-------------------------+---------------------+-----------------------------------------------------+
48 | | | Callable(obj, SerializationContext) -> bytes |
49 | ``value.serializer`` | callable | |
50 | | | Serializer used for message values. |
51 +-------------------------+---------------------+-----------------------------------------------------+
52
53 Serializers for string, integer and double (:py:class:`StringSerializer`, :py:class:`IntegerSerializer`
54 and :py:class:`DoubleSerializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
55 namespace.
56
57 Serializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufSerializer`, :py:class:`JSONSerializer`
58 and :py:class:`AvroSerializer`) with Confluent Schema Registry integration are supplied out-of-the-box
59 in the ``confluent_kafka.schema_registry`` namespace.
60
61 See Also:
62 - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
63 - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
64 - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb
65 - The :py:class:`Producer` class for inherited methods.
66
67 Args:
68 conf (producer): SerializingProducer configuration.
69 """ # noqa E501
70
71 def __init__(self, conf: Dict[str, Any]) -> None:
72 conf_copy = conf.copy()
73
74 self._key_serializer = conf_copy.pop('key.serializer', None)
75 self._value_serializer = conf_copy.pop('value.serializer', None)
76
77 super(SerializingProducer, self).__init__(conf_copy)
78
79 def produce( # type: ignore[override]
80 self,
81 topic: str,
82 key: Any = None,
83 value: Any = None,
84 partition: int = -1,
85 on_delivery: Optional[DeliveryCallback] = None,
86 timestamp: int = 0,
87 headers: Optional[HeadersType] = None,
88 ) -> None:
89 """
90 Produce a message.
91
92 This is an asynchronous operation. An application may use the
93 ``on_delivery`` argument to pass a function (or lambda) that will be
94 called from :py:func:`SerializingProducer.poll` when the message has
95 been successfully delivered or permanently fails delivery.
96
97 Note:
98 Currently message headers are not supported on the message returned to
99 the callback. The ``msg.headers()`` will return None even if the
100 original message had headers set.
101
102 Args:
103 topic (str): Topic to produce message to.
104
105 key (object, optional): Message payload key.
106
107 value (object, optional): Message payload value.
108
109 partition (int, optional): Partition to produce to, else the
110 configured built-in partitioner will be used.
111
112 on_delivery (callable(KafkaError, Message), optional): Delivery
113 report callback. Called as a side effect of
114 :py:func:`SerializingProducer.poll` or
115 :py:func:`SerializingProducer.flush` on successful or
116 failed delivery.
117
118 timestamp (int, optional): Message timestamp (CreateTime) in
119 milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0).
120 Default value is current time.
121
122 headers (dict, optional): Message headers. The header key must be
123 a str while the value must be binary, unicode or None. (Requires
124 broker version >= 0.11.0.0)
125
126 Raises:
127 BufferError: if the internal producer message queue is full.
128 (``queue.buffering.max.messages`` exceeded). If this happens
129 the application should call :py:func:`SerializingProducer.Poll`
130 and try again.
131
132 KeySerializationError: If an error occurs during key serialization.
133
134 ValueSerializationError: If an error occurs during value serialization.
135
136 KafkaException: For all other errors
137 """
138
139 ctx = SerializationContext(topic, MessageField.KEY, headers)
140 if self._key_serializer is not None:
141 try:
142 key = self._key_serializer(key, ctx)
143 except Exception as se:
144 raise KeySerializationError(se)
145 ctx.field = MessageField.VALUE
146 if self._value_serializer is not None:
147 try:
148 value = self._value_serializer(value, ctx)
149 except Exception as se:
150 raise ValueSerializationError(se)
151
152 super(SerializingProducer, self).produce(
153 topic, value, key, headers=headers, partition=partition, timestamp=timestamp, on_delivery=on_delivery
154 )