1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from confluent_kafka.cimpl import KafkaException, KafkaError
19from confluent_kafka.serialization import SerializationError
20
21
22class _KafkaClientError(KafkaException):
23 """
24 Wraps all errors encountered by a Kafka Client
25
26 Args:
27 kafka_error (KafkaError): KafkaError instance.
28
29 exception(Exception, optional): The original exception
30
31 kafka_message (Message, optional): The Kafka Message returned
32 by the broker.
33 """
34
35 def __init__(self, kafka_error, exception=None, kafka_message=None):
36 super(_KafkaClientError, self).__init__(kafka_error)
37 self.exception = exception
38 self.kafka_message = kafka_message
39
40 @property
41 def code(self):
42 return self.args[0].code()
43
44 @property
45 def name(self):
46 return self.args[0].name()
47
48
49class ConsumeError(_KafkaClientError):
50 """
51 Wraps all errors encountered during the consumption of a message.
52
53 Note:
54 In the event of a serialization error the original message
55 contents may be retrieved from the ``kafka_message`` attribute.
56
57 Args:
58 kafka_error (KafkaError): KafkaError instance.
59
60 exception(Exception, optional): The original exception
61
62 kafka_message (Message, optional): The Kafka Message
63 returned by the broker.
64
65 """
66
67 def __init__(self, kafka_error, exception=None, kafka_message=None):
68 super(ConsumeError, self).__init__(kafka_error, exception, kafka_message)
69
70
71class KeyDeserializationError(ConsumeError, SerializationError):
72 """
73 Wraps all errors encountered during the deserialization of a Kafka
74 Message's key.
75
76 Args:
77 exception(Exception, optional): The original exception
78
79 kafka_message (Message, optional): The Kafka Message returned
80 by the broker.
81
82 """
83
84 def __init__(self, exception=None, kafka_message=None):
85 super(KeyDeserializationError, self).__init__(
86 KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)),
87 exception=exception, kafka_message=kafka_message)
88
89
90class ValueDeserializationError(ConsumeError, SerializationError):
91 """
92 Wraps all errors encountered during the deserialization of a Kafka
93 Message's value.
94
95 Args:
96 exception(Exception, optional): The original exception
97
98 kafka_message (Message, optional): The Kafka Message returned
99 by the broker.
100
101 """
102
103 def __init__(self, exception=None, kafka_message=None):
104 super(ValueDeserializationError, self).__init__(
105 KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)),
106 exception=exception, kafka_message=kafka_message)
107
108
109class ProduceError(_KafkaClientError):
110 """
111 Wraps all errors encountered when Producing messages.
112
113 Args:
114 kafka_error (KafkaError): KafkaError instance.
115
116 exception(Exception, optional): The original exception.
117 """
118
119 def __init__(self, kafka_error, exception=None):
120 super(ProduceError, self).__init__(kafka_error, exception, None)
121
122
123class KeySerializationError(ProduceError, SerializationError):
124 """
125 Wraps all errors encountered during the serialization of a Message key.
126
127 Args:
128 exception (Exception): The exception that occurred during serialization.
129 """
130
131 def __init__(self, exception=None):
132 super(KeySerializationError, self).__init__(
133 KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)),
134 exception=exception)
135
136
137class ValueSerializationError(ProduceError, SerializationError):
138 """
139 Wraps all errors encountered during the serialization of a Message value.
140
141 Args:
142 exception (Exception): The exception that occurred during serialization.
143 """
144
145 def __init__(self, exception=None):
146 super(ValueSerializationError, self).__init__(
147 KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)),
148 exception=exception)