Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/__init__.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

49 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from .deserializing_consumer import DeserializingConsumer 

20from .serializing_producer import SerializingProducer 

21from .error import KafkaException, KafkaError 

22from ._model import ( 

23 Node, # noqa: F401 

24 ConsumerGroupTopicPartitions, 

25 ConsumerGroupState, 

26 ConsumerGroupType, 

27 TopicCollection, 

28 TopicPartitionInfo, 

29 IsolationLevel, 

30 ElectionType 

31) 

32import os 

33from .cimpl import ( 

34 Producer, 

35 Consumer, 

36 Message, 

37 TopicPartition, 

38 Uuid, 

39 libversion, 

40 version, 

41 murmur2, 

42 consistent, 

43 fnv1a, 

44 TIMESTAMP_NOT_AVAILABLE, 

45 TIMESTAMP_CREATE_TIME, 

46 TIMESTAMP_LOG_APPEND_TIME, 

47 OFFSET_BEGINNING, 

48 OFFSET_END, 

49 OFFSET_STORED, 

50 OFFSET_INVALID, 

51) 

52 

53__all__ = [ 

54 "admin", 

55 "Consumer", 

56 "experimental", 

57 "KafkaError", 

58 "KafkaException", 

59 "kafkatest", 

60 "libversion", 

61 "version", 

62 "murmur2", 

63 "consistent", 

64 "fnv1a", 

65 "Message", 

66 "OFFSET_BEGINNING", 

67 "OFFSET_END", 

68 "OFFSET_INVALID", 

69 "OFFSET_STORED", 

70 "Producer", 

71 "DeserializingConsumer", 

72 "SerializingProducer", 

73 "TIMESTAMP_CREATE_TIME", 

74 "TIMESTAMP_LOG_APPEND_TIME", 

75 "TIMESTAMP_NOT_AVAILABLE", 

76 "TopicPartition", 

77 "Node", 

78 "ConsumerGroupTopicPartitions", 

79 "ConsumerGroupState", 

80 "ConsumerGroupType", 

81 "Uuid", 

82 "IsolationLevel", 

83 "TopicCollection", 

84 "TopicPartitionInfo", 

85 "ElectionType" 

86] 

87 

88 

89__version__ = version() 

90 

91 

92class ThrottleEvent(object): 

93 """ 

94 ThrottleEvent contains details about a throttled request. 

95 Set up a throttle callback by setting the ``throttle_cb`` configuration 

96 property to a callable that takes a ThrottleEvent object as its only argument. 

97 The callback will be triggered from poll(), consume() or flush() when a request 

98 has been throttled by the broker. 

99 

100 This class is typically not user instantiated. 

101 

102 :ivar str broker_name: The hostname of the broker which throttled the request 

103 :ivar int broker_id: The broker id 

104 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request 

105 """ 

106 

107 def __init__(self, broker_name: str, 

108 broker_id: int, 

109 throttle_time: float) -> None: 

110 self.broker_name = broker_name 

111 self.broker_id = broker_id 

112 self.throttle_time = throttle_time 

113 

114 def __str__(self) -> str: 

115 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, 

116 int(self.throttle_time * 1000)) 

117 

118 

119def _resolve_plugins(plugins: str) -> str: 

120 """ Resolve embedded plugins from the wheel's library directory. 

121 

122 For internal module use only. 

123 

124 :param str plugins: The plugin.library.paths value 

125 """ 

126 from sys import platform 

127 

128 # Location of __init__.py and the embedded library directory 

129 basedir = os.path.dirname(__file__) 

130 

131 if platform in ("win32", "cygwin"): 

132 paths_sep = ";" 

133 ext = ".dll" 

134 libdir = basedir 

135 elif platform in ("linux", "linux2"): 

136 paths_sep = ":" 

137 ext = ".so" 

138 libdir = os.path.join(basedir, ".libs") 

139 elif platform == "darwin": 

140 paths_sep = ":" 

141 ext = ".dylib" 

142 libdir = os.path.join(basedir, ".dylibs") 

143 else: 

144 # Unknown platform, there are probably no embedded plugins. 

145 return plugins 

146 

147 if not os.path.isdir(libdir): 

148 # No embedded library directory, probably not a wheel installation. 

149 return plugins 

150 

151 resolved = [] 

152 for plugin in plugins.split(paths_sep): 

153 if "/" in plugin or "\\" in plugin: 

154 # Path specified, leave unchanged 

155 resolved.append(plugin) 

156 continue 

157 

158 # See if the plugin can be found in the wheel's 

159 # embedded library directory. 

160 # The user might not have supplied a file extension, so try both. 

161 good = None 

162 for file in [plugin, plugin + ext]: 

163 fpath = os.path.join(libdir, file) 

164 if os.path.isfile(fpath): 

165 good = fpath 

166 break 

167 

168 if good is not None: 

169 resolved.append(good) 

170 else: 

171 resolved.append(plugin) 

172 

173 return paths_sep.join(resolved)