Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/__init__.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

49 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from .deserializing_consumer import DeserializingConsumer 

20from .serializing_producer import SerializingProducer 

21from .error import KafkaException, KafkaError 

22from ._model import (Node, # noqa: F401 

23 ConsumerGroupTopicPartitions, 

24 ConsumerGroupState, 

25 ConsumerGroupType, 

26 TopicCollection, 

27 TopicPartitionInfo, 

28 IsolationLevel, 

29 ElectionType) 

30 

31from .cimpl import (Producer, 

32 Consumer, 

33 Message, 

34 TopicPartition, 

35 Uuid, 

36 libversion, 

37 version, 

38 TIMESTAMP_NOT_AVAILABLE, 

39 TIMESTAMP_CREATE_TIME, 

40 TIMESTAMP_LOG_APPEND_TIME, 

41 OFFSET_BEGINNING, 

42 OFFSET_END, 

43 OFFSET_STORED, 

44 OFFSET_INVALID) 

45 

46__all__ = ['admin', 'Consumer', 

47 'KafkaError', 'KafkaException', 

48 'kafkatest', 'libversion', 'Message', 

49 'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED', 

50 'Producer', 'DeserializingConsumer', 

51 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 

52 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', 

53 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 

54 'ConsumerGroupType', 'Uuid', 

55 'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo'] 

56 

57__version__ = version()[0] 

58 

59 

60class ThrottleEvent(object): 

61 """ 

62 ThrottleEvent contains details about a throttled request. 

63 Set up a throttle callback by setting the ``throttle_cb`` configuration 

64 property to a callable that takes a ThrottleEvent object as its only argument. 

65 The callback will be triggered from poll(), consume() or flush() when a request 

66 has been throttled by the broker. 

67 

68 This class is typically not user instantiated. 

69 

70 :ivar str broker_name: The hostname of the broker which throttled the request 

71 :ivar int broker_id: The broker id 

72 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request 

73 """ 

74 

75 def __init__(self, broker_name, 

76 broker_id, 

77 throttle_time): 

78 self.broker_name = broker_name 

79 self.broker_id = broker_id 

80 self.throttle_time = throttle_time 

81 

82 def __str__(self): 

83 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, 

84 int(self.throttle_time * 1000)) 

85 

86 

87def _resolve_plugins(plugins): 

88 """ Resolve embedded plugins from the wheel's library directory. 

89 

90 For internal module use only. 

91 

92 :param str plugins: The plugin.library.paths value 

93 """ 

94 import os 

95 from sys import platform 

96 

97 # Location of __init__.py and the embedded library directory 

98 basedir = os.path.dirname(__file__) 

99 

100 if platform in ('win32', 'cygwin'): 

101 paths_sep = ';' 

102 ext = '.dll' 

103 libdir = basedir 

104 elif platform in ('linux', 'linux2'): 

105 paths_sep = ':' 

106 ext = '.so' 

107 libdir = os.path.join(basedir, '.libs') 

108 elif platform == 'darwin': 

109 paths_sep = ':' 

110 ext = '.dylib' 

111 libdir = os.path.join(basedir, '.dylibs') 

112 else: 

113 # Unknown platform, there are probably no embedded plugins. 

114 return plugins 

115 

116 if not os.path.isdir(libdir): 

117 # No embedded library directory, probably not a wheel installation. 

118 return plugins 

119 

120 resolved = [] 

121 for plugin in plugins.split(paths_sep): 

122 if '/' in plugin or '\\' in plugin: 

123 # Path specified, leave unchanged 

124 resolved.append(plugin) 

125 continue 

126 

127 # See if the plugin can be found in the wheel's 

128 # embedded library directory. 

129 # The user might not have supplied a file extension, so try both. 

130 good = None 

131 for file in [plugin, plugin + ext]: 

132 fpath = os.path.join(libdir, file) 

133 if os.path.isfile(fpath): 

134 good = fpath 

135 break 

136 

137 if good is not None: 

138 resolved.append(good) 

139 else: 

140 resolved.append(plugin) 

141 

142 return paths_sep.join(resolved)