Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/__init__.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

49 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from .deserializing_consumer import DeserializingConsumer 

20from .serializing_producer import SerializingProducer 

21from .error import KafkaException, KafkaError 

22from ._model import ( 

23 Node, # noqa: F401 

24 ConsumerGroupTopicPartitions, 

25 ConsumerGroupState, 

26 ConsumerGroupType, 

27 TopicCollection, 

28 TopicPartitionInfo, 

29 IsolationLevel, 

30) 

31import os 

32from .cimpl import ( 

33 Producer, 

34 Consumer, 

35 Message, 

36 TopicPartition, 

37 Uuid, 

38 libversion, 

39 version, 

40 TIMESTAMP_NOT_AVAILABLE, 

41 TIMESTAMP_CREATE_TIME, 

42 TIMESTAMP_LOG_APPEND_TIME, 

43 OFFSET_BEGINNING, 

44 OFFSET_END, 

45 OFFSET_STORED, 

46 OFFSET_INVALID, 

47) 

48 

49__all__ = [ 

50 "admin", 

51 "Consumer", 

52 "experimental", 

53 "KafkaError", 

54 "KafkaException", 

55 "kafkatest", 

56 "libversion", 

57 "Message", 

58 "OFFSET_BEGINNING", 

59 "OFFSET_END", 

60 "OFFSET_INVALID", 

61 "OFFSET_STORED", 

62 "Producer", 

63 "DeserializingConsumer", 

64 "SerializingProducer", 

65 "TIMESTAMP_CREATE_TIME", 

66 "TIMESTAMP_LOG_APPEND_TIME", 

67 "TIMESTAMP_NOT_AVAILABLE", 

68 "TopicPartition", 

69 "Node", 

70 "ConsumerGroupTopicPartitions", 

71 "ConsumerGroupState", 

72 "ConsumerGroupType", 

73 "Uuid", 

74 "IsolationLevel", 

75 "TopicCollection", 

76 "TopicPartitionInfo", 

77] 

78 

79 

80__version__ = version() 

81 

82 

83class ThrottleEvent(object): 

84 """ 

85 ThrottleEvent contains details about a throttled request. 

86 Set up a throttle callback by setting the ``throttle_cb`` configuration 

87 property to a callable that takes a ThrottleEvent object as its only argument. 

88 The callback will be triggered from poll(), consume() or flush() when a request 

89 has been throttled by the broker. 

90 

91 This class is typically not user instantiated. 

92 

93 :ivar str broker_name: The hostname of the broker which throttled the request 

94 :ivar int broker_id: The broker id 

95 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request 

96 """ 

97 

98 def __init__(self, broker_name, broker_id, throttle_time): 

99 self.broker_name = broker_name 

100 self.broker_id = broker_id 

101 self.throttle_time = throttle_time 

102 

103 def __str__(self): 

104 return "{}/{} throttled for {} ms".format( 

105 self.broker_name, self.broker_id, int(self.throttle_time * 1000) 

106 ) 

107 

108 

109def _resolve_plugins(plugins): 

110 """Resolve embedded plugins from the wheel's library directory. 

111 

112 For internal module use only. 

113 

114 :param str plugins: The plugin.library.paths value 

115 """ 

116 from sys import platform 

117 

118 # Location of __init__.py and the embedded library directory 

119 basedir = os.path.dirname(__file__) 

120 

121 if platform in ("win32", "cygwin"): 

122 paths_sep = ";" 

123 ext = ".dll" 

124 libdir = basedir 

125 elif platform in ("linux", "linux2"): 

126 paths_sep = ":" 

127 ext = ".so" 

128 libdir = os.path.join(basedir, ".libs") 

129 elif platform == "darwin": 

130 paths_sep = ":" 

131 ext = ".dylib" 

132 libdir = os.path.join(basedir, ".dylibs") 

133 else: 

134 # Unknown platform, there are probably no embedded plugins. 

135 return plugins 

136 

137 if not os.path.isdir(libdir): 

138 # No embedded library directory, probably not a wheel installation. 

139 return plugins 

140 

141 resolved = [] 

142 for plugin in plugins.split(paths_sep): 

143 if "/" in plugin or "\\" in plugin: 

144 # Path specified, leave unchanged 

145 resolved.append(plugin) 

146 continue 

147 

148 # See if the plugin can be found in the wheel's 

149 # embedded library directory. 

150 # The user might not have supplied a file extension, so try both. 

151 good = None 

152 for file in [plugin, plugin + ext]: 

153 fpath = os.path.join(libdir, file) 

154 if os.path.isfile(fpath): 

155 good = fpath 

156 break 

157 

158 if good is not None: 

159 resolved.append(good) 

160 else: 

161 resolved.append(plugin) 

162 

163 return paths_sep.join(resolved)