Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/__init__.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19import os 

20 

21from ._model import Node # noqa: F401 

22from ._model import ( 

23 ConsumerGroupState, 

24 ConsumerGroupTopicPartitions, 

25 ConsumerGroupType, 

26 ElectionType, 

27 IsolationLevel, 

28 TopicCollection, 

29 TopicPartitionInfo, 

30) 

31from .cimpl import ( 

32 OFFSET_BEGINNING, 

33 OFFSET_END, 

34 OFFSET_INVALID, 

35 OFFSET_STORED, 

36 TIMESTAMP_CREATE_TIME, 

37 TIMESTAMP_LOG_APPEND_TIME, 

38 TIMESTAMP_NOT_AVAILABLE, 

39 Consumer, 

40 Message, 

41 Producer, 

42 TopicPartition, 

43 Uuid, 

44 consistent, 

45 fnv1a, 

46 libversion, 

47 murmur2, 

48 version, 

49) 

50from .deserializing_consumer import DeserializingConsumer 

51from .error import KafkaError, KafkaException 

52from .serializing_producer import SerializingProducer 

53 

54__all__ = [ 

55 "admin", 

56 "Consumer", 

57 "experimental", 

58 "KafkaError", 

59 "KafkaException", 

60 "kafkatest", 

61 "libversion", 

62 "version", 

63 "murmur2", 

64 "consistent", 

65 "fnv1a", 

66 "Message", 

67 "OFFSET_BEGINNING", 

68 "OFFSET_END", 

69 "OFFSET_INVALID", 

70 "OFFSET_STORED", 

71 "Producer", 

72 "DeserializingConsumer", 

73 "SerializingProducer", 

74 "TIMESTAMP_CREATE_TIME", 

75 "TIMESTAMP_LOG_APPEND_TIME", 

76 "TIMESTAMP_NOT_AVAILABLE", 

77 "TopicPartition", 

78 "Node", 

79 "ConsumerGroupTopicPartitions", 

80 "ConsumerGroupState", 

81 "ConsumerGroupType", 

82 "Uuid", 

83 "IsolationLevel", 

84 "TopicCollection", 

85 "TopicPartitionInfo", 

86 "ElectionType", 

87] 

88 

89 

90__version__ = version() 

91 

92 

93class ThrottleEvent(object): 

94 """ 

95 ThrottleEvent contains details about a throttled request. 

96 Set up a throttle callback by setting the ``throttle_cb`` configuration 

97 property to a callable that takes a ThrottleEvent object as its only argument. 

98 The callback will be triggered from poll(), consume() or flush() when a request 

99 has been throttled by the broker. 

100 

101 This class is typically not user instantiated. 

102 

103 :ivar str broker_name: The hostname of the broker which throttled the request 

104 :ivar int broker_id: The broker id 

105 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request 

106 """ 

107 

108 def __init__(self, broker_name: str, broker_id: int, throttle_time: float) -> None: 

109 self.broker_name = broker_name 

110 self.broker_id = broker_id 

111 self.throttle_time = throttle_time 

112 

113 def __str__(self) -> str: 

114 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, int(self.throttle_time * 1000)) 

115 

116 

117def _resolve_plugins(plugins: str) -> str: 

118 """Resolve embedded plugins from the wheel's library directory. 

119 

120 For internal module use only. 

121 

122 :param str plugins: The plugin.library.paths value 

123 """ 

124 from sys import platform 

125 

126 # Location of __init__.py and the embedded library directory 

127 basedir = os.path.dirname(__file__) 

128 

129 if platform in ("win32", "cygwin"): 

130 paths_sep = ";" 

131 ext = ".dll" 

132 libdir = basedir 

133 elif platform in ("linux", "linux2"): 

134 paths_sep = ":" 

135 ext = ".so" 

136 libdir = os.path.join(basedir, ".libs") 

137 elif platform == "darwin": 

138 paths_sep = ":" 

139 ext = ".dylib" 

140 libdir = os.path.join(basedir, ".dylibs") 

141 else: 

142 # Unknown platform, there are probably no embedded plugins. 

143 return plugins 

144 

145 if not os.path.isdir(libdir): 

146 # No embedded library directory, probably not a wheel installation. 

147 return plugins 

148 

149 resolved = [] 

150 for plugin in plugins.split(paths_sep): 

151 if "/" in plugin or "\\" in plugin: 

152 # Path specified, leave unchanged 

153 resolved.append(plugin) 

154 continue 

155 

156 # See if the plugin can be found in the wheel's 

157 # embedded library directory. 

158 # The user might not have supplied a file extension, so try both. 

159 good = None 

160 for file in [plugin, plugin + ext]: 

161 fpath = os.path.join(libdir, file) 

162 if os.path.isfile(fpath): 

163 good = fpath 

164 break 

165 

166 if good is not None: 

167 resolved.append(good) 

168 else: 

169 resolved.append(plugin) 

170 

171 return paths_sep.join(resolved)