1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from typing import Any, Dict, Optional
20
21from confluent_kafka.cimpl import Producer as _ProducerImpl
22from .serialization import (MessageField,
23 SerializationContext)
24from .error import (KeySerializationError,
25 ValueSerializationError)
26from ._types import HeadersType, DeliveryCallback
27
28
29class SerializingProducer(_ProducerImpl):
30 """
31 A high level Kafka producer with serialization capabilities.
32
33 `This class is experimental and likely to be removed, or subject to incompatible API
34 changes in future versions of the library. To avoid breaking changes on upgrading, we
35 recommend using serializers directly.`
36
37 Derived from the :py:class:`Producer` class, overriding the :py:func:`Producer.produce`
38 method to add serialization capabilities.
39
40 Additional configuration properties:
41
42 +-------------------------+---------------------+-----------------------------------------------------+
43 | Property Name | Type | Description |
44 +=========================+=====================+=====================================================+
45 | | | Callable(obj, SerializationContext) -> bytes |
46 | ``key.serializer`` | callable | |
47 | | | Serializer used for message keys. |
48 +-------------------------+---------------------+-----------------------------------------------------+
49 | | | Callable(obj, SerializationContext) -> bytes |
50 | ``value.serializer`` | callable | |
51 | | | Serializer used for message values. |
52 +-------------------------+---------------------+-----------------------------------------------------+
53
54 Serializers for string, integer and double (:py:class:`StringSerializer`, :py:class:`IntegerSerializer`
55 and :py:class:`DoubleSerializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
56 namespace.
57
58 Serializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufSerializer`, :py:class:`JSONSerializer`
59 and :py:class:`AvroSerializer`) with Confluent Schema Registry integration are supplied out-of-the-box
60 in the ``confluent_kafka.schema_registry`` namespace.
61
62 See Also:
63 - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
64 - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
65 - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb
66 - The :py:class:`Producer` class for inherited methods.
67
68 Args:
69 conf (producer): SerializingProducer configuration.
70 """ # noqa E501
71
72 def __init__(self, conf: Dict[str, Any]) -> None:
73 conf_copy = conf.copy()
74
75 self._key_serializer = conf_copy.pop('key.serializer', None)
76 self._value_serializer = conf_copy.pop('value.serializer', None)
77
78 super(SerializingProducer, self).__init__(conf_copy)
79
80 def produce( # type: ignore[override]
81 self, topic: str, key: Any = None, value: Any = None, partition: int = -1,
82 on_delivery: Optional[DeliveryCallback] = None, timestamp: int = 0,
83 headers: Optional[HeadersType] = None
84 ) -> None:
85 """
86 Produce a message.
87
88 This is an asynchronous operation. An application may use the
89 ``on_delivery`` argument to pass a function (or lambda) that will be
90 called from :py:func:`SerializingProducer.poll` when the message has
91 been successfully delivered or permanently fails delivery.
92
93 Note:
94 Currently message headers are not supported on the message returned to
95 the callback. The ``msg.headers()`` will return None even if the
96 original message had headers set.
97
98 Args:
99 topic (str): Topic to produce message to.
100
101 key (object, optional): Message payload key.
102
103 value (object, optional): Message payload value.
104
105 partition (int, optional): Partition to produce to, else the
106 configured built-in partitioner will be used.
107
108 on_delivery (callable(KafkaError, Message), optional): Delivery
109 report callback. Called as a side effect of
110 :py:func:`SerializingProducer.poll` or
111 :py:func:`SerializingProducer.flush` on successful or
112 failed delivery.
113
114 timestamp (int, optional): Message timestamp (CreateTime) in
115 milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0).
116 Default value is current time.
117
118 headers (dict, optional): Message headers. The header key must be
119 a str while the value must be binary, unicode or None. (Requires
120 broker version >= 0.11.0.0)
121
122 Raises:
123 BufferError: if the internal producer message queue is full.
124 (``queue.buffering.max.messages`` exceeded). If this happens
125 the application should call :py:func:`SerializingProducer.Poll`
126 and try again.
127
128 KeySerializationError: If an error occurs during key serialization.
129
130 ValueSerializationError: If an error occurs during value serialization.
131
132 KafkaException: For all other errors
133 """
134
135 ctx = SerializationContext(topic, MessageField.KEY, headers)
136 if self._key_serializer is not None:
137 try:
138 key = self._key_serializer(key, ctx)
139 except Exception as se:
140 raise KeySerializationError(se)
141 ctx.field = MessageField.VALUE
142 if self._value_serializer is not None:
143 try:
144 value = self._value_serializer(value, ctx)
145 except Exception as se:
146 raise ValueSerializationError(se)
147
148 super(SerializingProducer, self).produce(topic, value, key,
149 headers=headers,
150 partition=partition,
151 timestamp=timestamp,
152 on_delivery=on_delivery)