1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from typing import Optional
20
21from confluent_kafka.cimpl import KafkaException, KafkaError, Message
22from confluent_kafka.serialization import SerializationError
23
24
25class _KafkaClientError(KafkaException):
26 """
27 Wraps all errors encountered by a Kafka Client
28
29 Args:
30 kafka_error (KafkaError): KafkaError instance.
31
32 exception(Exception, optional): The original exception
33
34 kafka_message (Message, optional): The Kafka Message returned
35 by the broker.
36 """
37
38 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None,
39 kafka_message: Optional[Message] = None) -> None:
40 super(_KafkaClientError, self).__init__(kafka_error)
41 self.exception = exception
42 self.kafka_message = kafka_message
43
44 @property
45 def code(self) -> int:
46 return self.args[0].code()
47
48 @property
49 def name(self) -> str:
50 return self.args[0].name()
51
52
53class ConsumeError(_KafkaClientError):
54 """
55 Wraps all errors encountered during the consumption of a message.
56
57 Note:
58 In the event of a serialization error the original message
59 contents may be retrieved from the ``kafka_message`` attribute.
60
61 Args:
62 kafka_error (KafkaError): KafkaError instance.
63
64 exception(Exception, optional): The original exception
65
66 kafka_message (Message, optional): The Kafka Message
67 returned by the broker.
68
69 """
70
71 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None,
72 kafka_message: Optional[Message] = None) -> None:
73 super(ConsumeError, self).__init__(kafka_error, exception, kafka_message)
74
75
76class KeyDeserializationError(ConsumeError, SerializationError):
77 """
78 Wraps all errors encountered during the deserialization of a Kafka
79 Message's key.
80
81 Args:
82 exception(Exception, optional): The original exception
83
84 kafka_message (Message, optional): The Kafka Message returned
85 by the broker.
86
87 """
88
89 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None:
90 super(KeyDeserializationError, self).__init__(
91 KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)),
92 exception=exception, kafka_message=kafka_message)
93
94
95class ValueDeserializationError(ConsumeError, SerializationError):
96 """
97 Wraps all errors encountered during the deserialization of a Kafka
98 Message's value.
99
100 Args:
101 exception(Exception, optional): The original exception
102
103 kafka_message (Message, optional): The Kafka Message returned
104 by the broker.
105
106 """
107
108 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None:
109 super(ValueDeserializationError, self).__init__(
110 KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)),
111 exception=exception, kafka_message=kafka_message)
112
113
114class ProduceError(_KafkaClientError):
115 """
116 Wraps all errors encountered when Producing messages.
117
118 Args:
119 kafka_error (KafkaError): KafkaError instance.
120
121 exception(Exception, optional): The original exception.
122 """
123
124 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None) -> None:
125 super(ProduceError, self).__init__(kafka_error, exception, None)
126
127
128class KeySerializationError(ProduceError, SerializationError):
129 """
130 Wraps all errors encountered during the serialization of a Message key.
131
132 Args:
133 exception (Exception): The exception that occurred during serialization.
134 """
135
136 def __init__(self, exception: Optional[Exception] = None) -> None:
137 super(KeySerializationError, self).__init__(
138 KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)),
139 exception=exception)
140
141
142class ValueSerializationError(ProduceError, SerializationError):
143 """
144 Wraps all errors encountered during the serialization of a Message value.
145
146 Args:
147 exception (Exception): The exception that occurred during serialization.
148 """
149
150 def __init__(self, exception: Optional[Exception] = None) -> None:
151 super(ValueSerializationError, self).__init__(
152 KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)),
153 exception=exception)