1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from .deserializing_consumer import DeserializingConsumer
20from .serializing_producer import SerializingProducer
21from .error import KafkaException, KafkaError
22from ._model import (Node, # noqa: F401
23 ConsumerGroupTopicPartitions,
24 ConsumerGroupState,
25 ConsumerGroupType,
26 TopicCollection,
27 TopicPartitionInfo,
28 IsolationLevel,
29 ElectionType)
30
31from .cimpl import (Producer,
32 Consumer,
33 Message,
34 TopicPartition,
35 Uuid,
36 libversion,
37 version,
38 TIMESTAMP_NOT_AVAILABLE,
39 TIMESTAMP_CREATE_TIME,
40 TIMESTAMP_LOG_APPEND_TIME,
41 OFFSET_BEGINNING,
42 OFFSET_END,
43 OFFSET_STORED,
44 OFFSET_INVALID)
45
46__all__ = ['admin', 'Consumer',
47 'KafkaError', 'KafkaException',
48 'kafkatest', 'libversion', 'Message',
49 'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED',
50 'Producer', 'DeserializingConsumer',
51 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
52 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
53 'ConsumerGroupTopicPartitions', 'ConsumerGroupState',
54 'ConsumerGroupType', 'Uuid',
55 'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']
56
57__version__ = version()[0]
58
59
60class ThrottleEvent(object):
61 """
62 ThrottleEvent contains details about a throttled request.
63 Set up a throttle callback by setting the ``throttle_cb`` configuration
64 property to a callable that takes a ThrottleEvent object as its only argument.
65 The callback will be triggered from poll(), consume() or flush() when a request
66 has been throttled by the broker.
67
68 This class is typically not user instantiated.
69
70 :ivar str broker_name: The hostname of the broker which throttled the request
71 :ivar int broker_id: The broker id
72 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
73 """
74
75 def __init__(self, broker_name,
76 broker_id,
77 throttle_time):
78 self.broker_name = broker_name
79 self.broker_id = broker_id
80 self.throttle_time = throttle_time
81
82 def __str__(self):
83 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
84 int(self.throttle_time * 1000))
85
86
87def _resolve_plugins(plugins):
88 """ Resolve embedded plugins from the wheel's library directory.
89
90 For internal module use only.
91
92 :param str plugins: The plugin.library.paths value
93 """
94 import os
95 from sys import platform
96
97 # Location of __init__.py and the embedded library directory
98 basedir = os.path.dirname(__file__)
99
100 if platform in ('win32', 'cygwin'):
101 paths_sep = ';'
102 ext = '.dll'
103 libdir = basedir
104 elif platform in ('linux', 'linux2'):
105 paths_sep = ':'
106 ext = '.so'
107 libdir = os.path.join(basedir, '.libs')
108 elif platform == 'darwin':
109 paths_sep = ':'
110 ext = '.dylib'
111 libdir = os.path.join(basedir, '.dylibs')
112 else:
113 # Unknown platform, there are probably no embedded plugins.
114 return plugins
115
116 if not os.path.isdir(libdir):
117 # No embedded library directory, probably not a wheel installation.
118 return plugins
119
120 resolved = []
121 for plugin in plugins.split(paths_sep):
122 if '/' in plugin or '\\' in plugin:
123 # Path specified, leave unchanged
124 resolved.append(plugin)
125 continue
126
127 # See if the plugin can be found in the wheel's
128 # embedded library directory.
129 # The user might not have supplied a file extension, so try both.
130 good = None
131 for file in [plugin, plugin + ext]:
132 fpath = os.path.join(libdir, file)
133 if os.path.isfile(fpath):
134 good = fpath
135 break
136
137 if good is not None:
138 resolved.append(good)
139 else:
140 resolved.append(plugin)
141
142 return paths_sep.join(resolved)