Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/error.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from typing import Optional 

20 

21from confluent_kafka.cimpl import KafkaError, KafkaException, Message 

22from confluent_kafka.serialization import SerializationError 

23 

24 

25class _KafkaClientError(KafkaException): 

26 """ 

27 Wraps all errors encountered by a Kafka Client 

28 

29 Args: 

30 kafka_error (KafkaError): KafkaError instance. 

31 

32 exception(Exception, optional): The original exception 

33 

34 kafka_message (Message, optional): The Kafka Message returned 

35 by the broker. 

36 """ 

37 

38 def __init__( 

39 self, kafka_error: KafkaError, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None 

40 ) -> None: 

41 super(_KafkaClientError, self).__init__(kafka_error) 

42 self.exception = exception 

43 self.kafka_message = kafka_message 

44 

45 @property 

46 def code(self) -> int: 

47 return self.args[0].code() 

48 

49 @property 

50 def name(self) -> str: 

51 return self.args[0].name() 

52 

53 

54class ConsumeError(_KafkaClientError): 

55 """ 

56 Wraps all errors encountered during the consumption of a message. 

57 

58 Note: 

59 In the event of a serialization error the original message 

60 contents may be retrieved from the ``kafka_message`` attribute. 

61 

62 Args: 

63 kafka_error (KafkaError): KafkaError instance. 

64 

65 exception(Exception, optional): The original exception 

66 

67 kafka_message (Message, optional): The Kafka Message 

68 returned by the broker. 

69 

70 """ 

71 

72 def __init__( 

73 self, kafka_error: KafkaError, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None 

74 ) -> None: 

75 super(ConsumeError, self).__init__(kafka_error, exception, kafka_message) 

76 

77 

78class KeyDeserializationError(ConsumeError, SerializationError): 

79 """ 

80 Wraps all errors encountered during the deserialization of a Kafka 

81 Message's key. 

82 

83 Args: 

84 exception(Exception, optional): The original exception 

85 

86 kafka_message (Message, optional): The Kafka Message returned 

87 by the broker. 

88 

89 """ 

90 

91 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None: 

92 super(KeyDeserializationError, self).__init__( 

93 KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)), 

94 exception=exception, 

95 kafka_message=kafka_message, 

96 ) 

97 

98 

99class ValueDeserializationError(ConsumeError, SerializationError): 

100 """ 

101 Wraps all errors encountered during the deserialization of a Kafka 

102 Message's value. 

103 

104 Args: 

105 exception(Exception, optional): The original exception 

106 

107 kafka_message (Message, optional): The Kafka Message returned 

108 by the broker. 

109 

110 """ 

111 

112 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None: 

113 super(ValueDeserializationError, self).__init__( 

114 KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)), 

115 exception=exception, 

116 kafka_message=kafka_message, 

117 ) 

118 

119 

120class ProduceError(_KafkaClientError): 

121 """ 

122 Wraps all errors encountered when Producing messages. 

123 

124 Args: 

125 kafka_error (KafkaError): KafkaError instance. 

126 

127 exception(Exception, optional): The original exception. 

128 """ 

129 

130 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None) -> None: 

131 super(ProduceError, self).__init__(kafka_error, exception, None) 

132 

133 

134class KeySerializationError(ProduceError, SerializationError): 

135 """ 

136 Wraps all errors encountered during the serialization of a Message key. 

137 

138 Args: 

139 exception (Exception): The exception that occurred during serialization. 

140 """ 

141 

142 def __init__(self, exception: Optional[Exception] = None) -> None: 

143 super(KeySerializationError, self).__init__( 

144 KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)), exception=exception 

145 ) 

146 

147 

148class ValueSerializationError(ProduceError, SerializationError): 

149 """ 

150 Wraps all errors encountered during the serialization of a Message value. 

151 

152 Args: 

153 exception (Exception): The exception that occurred during serialization. 

154 """ 

155 

156 def __init__(self, exception: Optional[Exception] = None) -> None: 

157 super(ValueSerializationError, self).__init__( 

158 KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)), exception=exception 

159 )