1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from confluent_kafka.cimpl import Producer as _ProducerImpl
20from .serialization import (MessageField,
21 SerializationContext)
22from .error import (KeySerializationError,
23 ValueSerializationError)
24
25
26class SerializingProducer(_ProducerImpl):
27 """
28 A high level Kafka producer with serialization capabilities.
29
30 `This class is experimental and likely to be removed, or subject to incompatible API
31 changes in future versions of the library. To avoid breaking changes on upgrading, we
32 recommend using serializers directly.`
33
34 Derived from the :py:class:`Producer` class, overriding the :py:func:`Producer.produce`
35 method to add serialization capabilities.
36
37 Additional configuration properties:
38
39 +-------------------------+---------------------+-----------------------------------------------------+
40 | Property Name | Type | Description |
41 +=========================+=====================+=====================================================+
42 | | | Callable(obj, SerializationContext) -> bytes |
43 | ``key.serializer`` | callable | |
44 | | | Serializer used for message keys. |
45 +-------------------------+---------------------+-----------------------------------------------------+
46 | | | Callable(obj, SerializationContext) -> bytes |
47 | ``value.serializer`` | callable | |
48 | | | Serializer used for message values. |
49 +-------------------------+---------------------+-----------------------------------------------------+
50
51 Serializers for string, integer and double (:py:class:`StringSerializer`, :py:class:`IntegerSerializer`
52 and :py:class:`DoubleSerializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
53 namespace.
54
55 Serializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufSerializer`, :py:class:`JSONSerializer`
56 and :py:class:`AvroSerializer`) with Confluent Schema Registry integration are supplied out-of-the-box
57 in the ``confluent_kafka.schema_registry`` namespace.
58
59 See Also:
60 - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
61 - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
62 - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb
63 - The :py:class:`Producer` class for inherited methods.
64
65 Args:
66 conf (producer): SerializingProducer configuration.
67 """ # noqa E501
68
69 def __init__(self, conf):
70 conf_copy = conf.copy()
71
72 self._key_serializer = conf_copy.pop('key.serializer', None)
73 self._value_serializer = conf_copy.pop('value.serializer', None)
74
75 super(SerializingProducer, self).__init__(conf_copy)
76
77 def produce(self, topic, key=None, value=None, partition=-1,
78 on_delivery=None, timestamp=0, headers=None):
79 """
80 Produce a message.
81
82 This is an asynchronous operation. An application may use the
83 ``on_delivery`` argument to pass a function (or lambda) that will be
84 called from :py:func:`SerializingProducer.poll` when the message has
85 been successfully delivered or permanently fails delivery.
86
87 Note:
88 Currently message headers are not supported on the message returned to
89 the callback. The ``msg.headers()`` will return None even if the
90 original message had headers set.
91
92 Args:
93 topic (str): Topic to produce message to.
94
95 key (object, optional): Message payload key.
96
97 value (object, optional): Message payload value.
98
99 partition (int, optional): Partition to produce to, else the
100 configured built-in partitioner will be used.
101
102 on_delivery (callable(KafkaError, Message), optional): Delivery
103 report callback. Called as a side effect of
104 :py:func:`SerializingProducer.poll` or
105 :py:func:`SerializingProducer.flush` on successful or
106 failed delivery.
107
108 timestamp (float, optional): Message timestamp (CreateTime) in
109 milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0).
110 Default value is current time.
111
112 headers (dict, optional): Message headers. The header key must be
113 a str while the value must be binary, unicode or None. (Requires
114 broker version >= 0.11.0.0)
115
116 Raises:
117 BufferError: if the internal producer message queue is full.
118 (``queue.buffering.max.messages`` exceeded). If this happens
119 the application should call :py:func:`SerializingProducer.Poll`
120 and try again.
121
122 KeySerializationError: If an error occurs during key serialization.
123
124 ValueSerializationError: If an error occurs during value serialization.
125
126 KafkaException: For all other errors
127 """
128
129 ctx = SerializationContext(topic, MessageField.KEY, headers)
130 if self._key_serializer is not None:
131 try:
132 key = self._key_serializer(key, ctx)
133 except Exception as se:
134 raise KeySerializationError(se)
135 ctx.field = MessageField.VALUE
136 if self._value_serializer is not None:
137 try:
138 value = self._value_serializer(value, ctx)
139 except Exception as se:
140 raise ValueSerializationError(se)
141
142 super(SerializingProducer, self).produce(topic, value, key,
143 headers=headers,
144 partition=partition,
145 timestamp=timestamp,
146 on_delivery=on_delivery)