Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/confluent_kafka/__init__.py: 23%

48 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18 

19from .deserializing_consumer import DeserializingConsumer 

20from .serializing_producer import SerializingProducer 

21from .error import KafkaException, KafkaError 

22from ._model import Node, ConsumerGroupTopicPartitions, ConsumerGroupState 

23 

24from .cimpl import (Producer, 

25 Consumer, 

26 Message, 

27 TopicPartition, 

28 libversion, 

29 version, 

30 TIMESTAMP_NOT_AVAILABLE, 

31 TIMESTAMP_CREATE_TIME, 

32 TIMESTAMP_LOG_APPEND_TIME, 

33 OFFSET_BEGINNING, 

34 OFFSET_END, 

35 OFFSET_STORED, 

36 OFFSET_INVALID) 

37 

38__all__ = ['admin', 'Consumer', 

39 'KafkaError', 'KafkaException', 

40 'kafkatest', 'libversion', 'Message', 

41 'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED', 

42 'Producer', 'DeserializingConsumer', 

43 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 

44 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', 

45 'ConsumerGroupTopicPartitions', 'ConsumerGroupState'] 

46 

47__version__ = version()[0] 

48 

49 

50class ThrottleEvent(object): 

51 """ 

52 ThrottleEvent contains details about a throttled request. 

53 Set up a throttle callback by setting the ``throttle_cb`` configuration 

54 property to a callable that takes a ThrottleEvent object as its only argument. 

55 The callback will be triggered from poll(), consume() or flush() when a request 

56 has been throttled by the broker. 

57 

58 This class is typically not user instantiated. 

59 

60 :ivar str broker_name: The hostname of the broker which throttled the request 

61 :ivar int broker_id: The broker id 

62 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request 

63 """ 

64 

65 def __init__(self, broker_name, 

66 broker_id, 

67 throttle_time): 

68 self.broker_name = broker_name 

69 self.broker_id = broker_id 

70 self.throttle_time = throttle_time 

71 

72 def __str__(self): 

73 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, 

74 int(self.throttle_time * 1000)) 

75 

76 

77def _resolve_plugins(plugins): 

78 """ Resolve embedded plugins from the wheel's library directory. 

79 

80 For internal module use only. 

81 

82 :param str plugins: The plugin.library.paths value 

83 """ 

84 import os 

85 from sys import platform 

86 

87 # Location of __init__.py and the embedded library directory 

88 basedir = os.path.dirname(__file__) 

89 

90 if platform in ('win32', 'cygwin'): 

91 paths_sep = ';' 

92 ext = '.dll' 

93 libdir = basedir 

94 elif platform in ('linux', 'linux2'): 

95 paths_sep = ':' 

96 ext = '.so' 

97 libdir = os.path.join(basedir, '.libs') 

98 elif platform == 'darwin': 

99 paths_sep = ':' 

100 ext = '.dylib' 

101 libdir = os.path.join(basedir, '.dylibs') 

102 else: 

103 # Unknown platform, there are probably no embedded plugins. 

104 return plugins 

105 

106 if not os.path.isdir(libdir): 

107 # No embedded library directory, probably not a wheel installation. 

108 return plugins 

109 

110 resolved = [] 

111 for plugin in plugins.split(paths_sep): 

112 if '/' in plugin or '\\' in plugin: 

113 # Path specified, leave unchanged 

114 resolved.append(plugin) 

115 continue 

116 

117 # See if the plugin can be found in the wheel's 

118 # embedded library directory. 

119 # The user might not have supplied a file extension, so try both. 

120 good = None 

121 for file in [plugin, plugin + ext]: 

122 fpath = os.path.join(libdir, file) 

123 if os.path.isfile(fpath): 

124 good = fpath 

125 break 

126 

127 if good is not None: 

128 resolved.append(good) 

129 else: 

130 resolved.append(plugin) 

131 

132 return paths_sep.join(resolved)