1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20
21from ._model import Node # noqa: F401
22from ._model import (
23 ConsumerGroupState,
24 ConsumerGroupTopicPartitions,
25 ConsumerGroupType,
26 ElectionType,
27 IsolationLevel,
28 TopicCollection,
29 TopicPartitionInfo,
30)
31from .cimpl import (
32 OFFSET_BEGINNING,
33 OFFSET_END,
34 OFFSET_INVALID,
35 OFFSET_STORED,
36 TIMESTAMP_CREATE_TIME,
37 TIMESTAMP_LOG_APPEND_TIME,
38 TIMESTAMP_NOT_AVAILABLE,
39 Consumer,
40 Message,
41 Producer,
42 TopicPartition,
43 Uuid,
44 consistent,
45 fnv1a,
46 libversion,
47 murmur2,
48 version,
49)
50from .deserializing_consumer import DeserializingConsumer
51from .error import KafkaError, KafkaException
52from .serializing_producer import SerializingProducer
53
54__all__ = [
55 "admin",
56 "Consumer",
57 "experimental",
58 "KafkaError",
59 "KafkaException",
60 "kafkatest",
61 "libversion",
62 "version",
63 "murmur2",
64 "consistent",
65 "fnv1a",
66 "Message",
67 "OFFSET_BEGINNING",
68 "OFFSET_END",
69 "OFFSET_INVALID",
70 "OFFSET_STORED",
71 "Producer",
72 "DeserializingConsumer",
73 "SerializingProducer",
74 "TIMESTAMP_CREATE_TIME",
75 "TIMESTAMP_LOG_APPEND_TIME",
76 "TIMESTAMP_NOT_AVAILABLE",
77 "TopicPartition",
78 "Node",
79 "ConsumerGroupTopicPartitions",
80 "ConsumerGroupState",
81 "ConsumerGroupType",
82 "Uuid",
83 "IsolationLevel",
84 "TopicCollection",
85 "TopicPartitionInfo",
86 "ElectionType",
87]
88
89
90__version__ = version()
91
92
93class ThrottleEvent(object):
94 """
95 ThrottleEvent contains details about a throttled request.
96 Set up a throttle callback by setting the ``throttle_cb`` configuration
97 property to a callable that takes a ThrottleEvent object as its only argument.
98 The callback will be triggered from poll(), consume() or flush() when a request
99 has been throttled by the broker.
100
101 This class is typically not user instantiated.
102
103 :ivar str broker_name: The hostname of the broker which throttled the request
104 :ivar int broker_id: The broker id
105 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
106 """
107
108 def __init__(self, broker_name: str, broker_id: int, throttle_time: float) -> None:
109 self.broker_name = broker_name
110 self.broker_id = broker_id
111 self.throttle_time = throttle_time
112
113 def __str__(self) -> str:
114 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, int(self.throttle_time * 1000))
115
116
117def _resolve_plugins(plugins: str) -> str:
118 """Resolve embedded plugins from the wheel's library directory.
119
120 For internal module use only.
121
122 :param str plugins: The plugin.library.paths value
123 """
124 from sys import platform
125
126 # Location of __init__.py and the embedded library directory
127 basedir = os.path.dirname(__file__)
128
129 if platform in ("win32", "cygwin"):
130 paths_sep = ";"
131 ext = ".dll"
132 libdir = basedir
133 elif platform in ("linux", "linux2"):
134 paths_sep = ":"
135 ext = ".so"
136 libdir = os.path.join(basedir, ".libs")
137 elif platform == "darwin":
138 paths_sep = ":"
139 ext = ".dylib"
140 libdir = os.path.join(basedir, ".dylibs")
141 else:
142 # Unknown platform, there are probably no embedded plugins.
143 return plugins
144
145 if not os.path.isdir(libdir):
146 # No embedded library directory, probably not a wheel installation.
147 return plugins
148
149 resolved = []
150 for plugin in plugins.split(paths_sep):
151 if "/" in plugin or "\\" in plugin:
152 # Path specified, leave unchanged
153 resolved.append(plugin)
154 continue
155
156 # See if the plugin can be found in the wheel's
157 # embedded library directory.
158 # The user might not have supplied a file extension, so try both.
159 good = None
160 for file in [plugin, plugin + ext]:
161 fpath = os.path.join(libdir, file)
162 if os.path.isfile(fpath):
163 good = fpath
164 break
165
166 if good is not None:
167 resolved.append(good)
168 else:
169 resolved.append(plugin)
170
171 return paths_sep.join(resolved)