1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from typing import Optional
20
21from confluent_kafka.cimpl import KafkaError, KafkaException, Message
22from confluent_kafka.serialization import SerializationError
23
24
25class _KafkaClientError(KafkaException):
26 """
27 Wraps all errors encountered by a Kafka Client
28
29 Args:
30 kafka_error (KafkaError): KafkaError instance.
31
32 exception(Exception, optional): The original exception
33
34 kafka_message (Message, optional): The Kafka Message returned
35 by the broker.
36 """
37
38 def __init__(
39 self, kafka_error: KafkaError, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None
40 ) -> None:
41 super(_KafkaClientError, self).__init__(kafka_error)
42 self.exception = exception
43 self.kafka_message = kafka_message
44
45 @property
46 def code(self) -> int:
47 return self.args[0].code()
48
49 @property
50 def name(self) -> str:
51 return self.args[0].name()
52
53
54class ConsumeError(_KafkaClientError):
55 """
56 Wraps all errors encountered during the consumption of a message.
57
58 Note:
59 In the event of a serialization error the original message
60 contents may be retrieved from the ``kafka_message`` attribute.
61
62 Args:
63 kafka_error (KafkaError): KafkaError instance.
64
65 exception(Exception, optional): The original exception
66
67 kafka_message (Message, optional): The Kafka Message
68 returned by the broker.
69
70 """
71
72 def __init__(
73 self, kafka_error: KafkaError, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None
74 ) -> None:
75 super(ConsumeError, self).__init__(kafka_error, exception, kafka_message)
76
77
78class KeyDeserializationError(ConsumeError, SerializationError):
79 """
80 Wraps all errors encountered during the deserialization of a Kafka
81 Message's key.
82
83 Args:
84 exception(Exception, optional): The original exception
85
86 kafka_message (Message, optional): The Kafka Message returned
87 by the broker.
88
89 """
90
91 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None:
92 super(KeyDeserializationError, self).__init__(
93 KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)),
94 exception=exception,
95 kafka_message=kafka_message,
96 )
97
98
99class ValueDeserializationError(ConsumeError, SerializationError):
100 """
101 Wraps all errors encountered during the deserialization of a Kafka
102 Message's value.
103
104 Args:
105 exception(Exception, optional): The original exception
106
107 kafka_message (Message, optional): The Kafka Message returned
108 by the broker.
109
110 """
111
112 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None:
113 super(ValueDeserializationError, self).__init__(
114 KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)),
115 exception=exception,
116 kafka_message=kafka_message,
117 )
118
119
120class ProduceError(_KafkaClientError):
121 """
122 Wraps all errors encountered when Producing messages.
123
124 Args:
125 kafka_error (KafkaError): KafkaError instance.
126
127 exception(Exception, optional): The original exception.
128 """
129
130 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None) -> None:
131 super(ProduceError, self).__init__(kafka_error, exception, None)
132
133
134class KeySerializationError(ProduceError, SerializationError):
135 """
136 Wraps all errors encountered during the serialization of a Message key.
137
138 Args:
139 exception (Exception): The exception that occurred during serialization.
140 """
141
142 def __init__(self, exception: Optional[Exception] = None) -> None:
143 super(KeySerializationError, self).__init__(
144 KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)), exception=exception
145 )
146
147
148class ValueSerializationError(ProduceError, SerializationError):
149 """
150 Wraps all errors encountered during the serialization of a Message value.
151
152 Args:
153 exception (Exception): The exception that occurred during serialization.
154 """
155
156 def __init__(self, exception: Optional[Exception] = None) -> None:
157 super(ValueSerializationError, self).__init__(
158 KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)), exception=exception
159 )