Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/error.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from typing import Optional 

20 

21from confluent_kafka.cimpl import KafkaException, KafkaError, Message 

22from confluent_kafka.serialization import SerializationError 

23 

24 

25class _KafkaClientError(KafkaException): 

26 """ 

27 Wraps all errors encountered by a Kafka Client 

28 

29 Args: 

30 kafka_error (KafkaError): KafkaError instance. 

31 

32 exception(Exception, optional): The original exception 

33 

34 kafka_message (Message, optional): The Kafka Message returned 

35 by the broker. 

36 """ 

37 

38 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None, 

39 kafka_message: Optional[Message] = None) -> None: 

40 super(_KafkaClientError, self).__init__(kafka_error) 

41 self.exception = exception 

42 self.kafka_message = kafka_message 

43 

44 @property 

45 def code(self) -> int: 

46 return self.args[0].code() 

47 

48 @property 

49 def name(self) -> str: 

50 return self.args[0].name() 

51 

52 

53class ConsumeError(_KafkaClientError): 

54 """ 

55 Wraps all errors encountered during the consumption of a message. 

56 

57 Note: 

58 In the event of a serialization error the original message 

59 contents may be retrieved from the ``kafka_message`` attribute. 

60 

61 Args: 

62 kafka_error (KafkaError): KafkaError instance. 

63 

64 exception(Exception, optional): The original exception 

65 

66 kafka_message (Message, optional): The Kafka Message 

67 returned by the broker. 

68 

69 """ 

70 

71 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None, 

72 kafka_message: Optional[Message] = None) -> None: 

73 super(ConsumeError, self).__init__(kafka_error, exception, kafka_message) 

74 

75 

76class KeyDeserializationError(ConsumeError, SerializationError): 

77 """ 

78 Wraps all errors encountered during the deserialization of a Kafka 

79 Message's key. 

80 

81 Args: 

82 exception(Exception, optional): The original exception 

83 

84 kafka_message (Message, optional): The Kafka Message returned 

85 by the broker. 

86 

87 """ 

88 

89 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None: 

90 super(KeyDeserializationError, self).__init__( 

91 KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)), 

92 exception=exception, kafka_message=kafka_message) 

93 

94 

95class ValueDeserializationError(ConsumeError, SerializationError): 

96 """ 

97 Wraps all errors encountered during the deserialization of a Kafka 

98 Message's value. 

99 

100 Args: 

101 exception(Exception, optional): The original exception 

102 

103 kafka_message (Message, optional): The Kafka Message returned 

104 by the broker. 

105 

106 """ 

107 

108 def __init__(self, exception: Optional[Exception] = None, kafka_message: Optional[Message] = None) -> None: 

109 super(ValueDeserializationError, self).__init__( 

110 KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)), 

111 exception=exception, kafka_message=kafka_message) 

112 

113 

114class ProduceError(_KafkaClientError): 

115 """ 

116 Wraps all errors encountered when Producing messages. 

117 

118 Args: 

119 kafka_error (KafkaError): KafkaError instance. 

120 

121 exception(Exception, optional): The original exception. 

122 """ 

123 

124 def __init__(self, kafka_error: KafkaError, exception: Optional[Exception] = None) -> None: 

125 super(ProduceError, self).__init__(kafka_error, exception, None) 

126 

127 

128class KeySerializationError(ProduceError, SerializationError): 

129 """ 

130 Wraps all errors encountered during the serialization of a Message key. 

131 

132 Args: 

133 exception (Exception): The exception that occurred during serialization. 

134 """ 

135 

136 def __init__(self, exception: Optional[Exception] = None) -> None: 

137 super(KeySerializationError, self).__init__( 

138 KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)), 

139 exception=exception) 

140 

141 

142class ValueSerializationError(ProduceError, SerializationError): 

143 """ 

144 Wraps all errors encountered during the serialization of a Message value. 

145 

146 Args: 

147 exception (Exception): The exception that occurred during serialization. 

148 """ 

149 

150 def __init__(self, exception: Optional[Exception] = None) -> None: 

151 super(ValueSerializationError, self).__init__( 

152 KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)), 

153 exception=exception)