Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/deserializing_consumer.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

37 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from typing import Any, Dict, List, Optional 

20 

21from confluent_kafka.cimpl import Consumer as _ConsumerImpl, Message 

22from .error import (ConsumeError, 

23 KeyDeserializationError, 

24 ValueDeserializationError) 

25from .serialization import (SerializationContext, 

26 MessageField) 

27 

28 

29class DeserializingConsumer(_ConsumerImpl): 

30 """ 

31 A high level Kafka consumer with deserialization capabilities. 

32 

33 `This class is experimental and likely to be removed, or subject to incompatible API 

34 changes in future versions of the library. To avoid breaking changes on upgrading, we 

35 recommend using deserializers directly.` 

36 

37 Derived from the :py:class:`Consumer` class, overriding the :py:func:`Consumer.poll` 

38 method to add deserialization capabilities. 

39 

40 Additional configuration properties: 

41 

42 +-------------------------+---------------------+-----------------------------------------------------+ 

43 | Property Name | Type | Description | 

44 +=========================+=====================+=====================================================+ 

45 | | | Callable(bytes, SerializationContext) -> obj | 

46 | ``key.deserializer`` | callable | | 

47 | | | Deserializer used for message keys. | 

48 +-------------------------+---------------------+-----------------------------------------------------+ 

49 | | | Callable(bytes, SerializationContext) -> obj | 

50 | ``value.deserializer`` | callable | | 

51 | | | Deserializer used for message values. | 

52 +-------------------------+---------------------+-----------------------------------------------------+ 

53 

54 Deserializers for string, integer and double (:py:class:`StringDeserializer`, :py:class:`IntegerDeserializer` 

55 and :py:class:`DoubleDeserializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization`` 

56 namespace. 

57 

58 Deserializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufDeserializer`, :py:class:`JSONDeserializer` 

59 and :py:class:`AvroDeserializer`) with Confluent Schema Registry integration are supplied out-of-the-box 

60 in the ``confluent_kafka.schema_registry`` namespace. 

61 

62 See Also: 

63 - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client. 

64 - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties. 

65 - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb 

66 - The :py:class:`Consumer` class for inherited methods. 

67 

68 Args: 

69 conf (dict): DeserializingConsumer configuration. 

70 

71 Raises: 

72 ValueError: if configuration validation fails 

73 """ # noqa: E501 

74 

75 def __init__(self, conf: Dict[str, Any]) -> None: 

76 conf_copy = conf.copy() 

77 self._key_deserializer = conf_copy.pop('key.deserializer', None) 

78 self._value_deserializer = conf_copy.pop('value.deserializer', None) 

79 

80 super(DeserializingConsumer, self).__init__(conf_copy) 

81 

82 def poll(self, timeout: float = -1) -> Optional[Message]: 

83 """ 

84 Consume messages and calls callbacks. 

85 

86 Args: 

87 timeout (float): Maximum time to block waiting for message(Seconds). 

88 

89 Returns: 

90 :py:class:`Message` or None on timeout 

91 

92 Raises: 

93 KeyDeserializationError: If an error occurs during key deserialization. 

94 

95 ValueDeserializationError: If an error occurs during value deserialization. 

96 

97 ConsumeError: If an error was encountered while polling. 

98 """ 

99 

100 msg = super(DeserializingConsumer, self).poll(timeout) 

101 

102 if msg is None: 

103 return None 

104 

105 error = msg.error() 

106 if error is not None: 

107 raise ConsumeError(error, kafka_message=msg) 

108 

109 ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers()) 

110 value = msg.value() 

111 if self._value_deserializer is not None: 

112 try: 

113 value = self._value_deserializer(value, ctx) 

114 except Exception as se: 

115 raise ValueDeserializationError(exception=se, kafka_message=msg) 

116 

117 key = msg.key() 

118 ctx.field = MessageField.KEY 

119 if self._key_deserializer is not None: 

120 try: 

121 key = self._key_deserializer(key, ctx) 

122 except Exception as se: 

123 raise KeyDeserializationError(exception=se, kafka_message=msg) 

124 

125 msg.set_key(key) 

126 msg.set_value(value) 

127 return msg 

128 

129 def consume(self, num_messages: int = 1, timeout: float = -1) -> List[Message]: 

130 """ 

131 :py:func:`Consumer.consume` not implemented, use 

132 :py:func:`DeserializingConsumer.poll` instead 

133 """ 

134 

135 raise NotImplementedError