1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from .deserializing_consumer import DeserializingConsumer
20from .serializing_producer import SerializingProducer
21from .error import KafkaException, KafkaError
22from ._model import (
23 Node, # noqa: F401
24 ConsumerGroupTopicPartitions,
25 ConsumerGroupState,
26 ConsumerGroupType,
27 TopicCollection,
28 TopicPartitionInfo,
29 IsolationLevel,
30 ElectionType
31)
32import os
33from .cimpl import (
34 Producer,
35 Consumer,
36 Message,
37 TopicPartition,
38 Uuid,
39 libversion,
40 version,
41 murmur2,
42 consistent,
43 fnv1a,
44 TIMESTAMP_NOT_AVAILABLE,
45 TIMESTAMP_CREATE_TIME,
46 TIMESTAMP_LOG_APPEND_TIME,
47 OFFSET_BEGINNING,
48 OFFSET_END,
49 OFFSET_STORED,
50 OFFSET_INVALID,
51)
52
53__all__ = [
54 "admin",
55 "Consumer",
56 "experimental",
57 "KafkaError",
58 "KafkaException",
59 "kafkatest",
60 "libversion",
61 "version",
62 "murmur2",
63 "consistent",
64 "fnv1a",
65 "Message",
66 "OFFSET_BEGINNING",
67 "OFFSET_END",
68 "OFFSET_INVALID",
69 "OFFSET_STORED",
70 "Producer",
71 "DeserializingConsumer",
72 "SerializingProducer",
73 "TIMESTAMP_CREATE_TIME",
74 "TIMESTAMP_LOG_APPEND_TIME",
75 "TIMESTAMP_NOT_AVAILABLE",
76 "TopicPartition",
77 "Node",
78 "ConsumerGroupTopicPartitions",
79 "ConsumerGroupState",
80 "ConsumerGroupType",
81 "Uuid",
82 "IsolationLevel",
83 "TopicCollection",
84 "TopicPartitionInfo",
85 "ElectionType"
86]
87
88
89__version__ = version()
90
91
92class ThrottleEvent(object):
93 """
94 ThrottleEvent contains details about a throttled request.
95 Set up a throttle callback by setting the ``throttle_cb`` configuration
96 property to a callable that takes a ThrottleEvent object as its only argument.
97 The callback will be triggered from poll(), consume() or flush() when a request
98 has been throttled by the broker.
99
100 This class is typically not user instantiated.
101
102 :ivar str broker_name: The hostname of the broker which throttled the request
103 :ivar int broker_id: The broker id
104 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
105 """
106
107 def __init__(self, broker_name: str,
108 broker_id: int,
109 throttle_time: float) -> None:
110 self.broker_name = broker_name
111 self.broker_id = broker_id
112 self.throttle_time = throttle_time
113
114 def __str__(self) -> str:
115 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
116 int(self.throttle_time * 1000))
117
118
119def _resolve_plugins(plugins: str) -> str:
120 """ Resolve embedded plugins from the wheel's library directory.
121
122 For internal module use only.
123
124 :param str plugins: The plugin.library.paths value
125 """
126 from sys import platform
127
128 # Location of __init__.py and the embedded library directory
129 basedir = os.path.dirname(__file__)
130
131 if platform in ("win32", "cygwin"):
132 paths_sep = ";"
133 ext = ".dll"
134 libdir = basedir
135 elif platform in ("linux", "linux2"):
136 paths_sep = ":"
137 ext = ".so"
138 libdir = os.path.join(basedir, ".libs")
139 elif platform == "darwin":
140 paths_sep = ":"
141 ext = ".dylib"
142 libdir = os.path.join(basedir, ".dylibs")
143 else:
144 # Unknown platform, there are probably no embedded plugins.
145 return plugins
146
147 if not os.path.isdir(libdir):
148 # No embedded library directory, probably not a wheel installation.
149 return plugins
150
151 resolved = []
152 for plugin in plugins.split(paths_sep):
153 if "/" in plugin or "\\" in plugin:
154 # Path specified, leave unchanged
155 resolved.append(plugin)
156 continue
157
158 # See if the plugin can be found in the wheel's
159 # embedded library directory.
160 # The user might not have supplied a file extension, so try both.
161 good = None
162 for file in [plugin, plugin + ext]:
163 fpath = os.path.join(libdir, file)
164 if os.path.isfile(fpath):
165 good = fpath
166 break
167
168 if good is not None:
169 resolved.append(good)
170 else:
171 resolved.append(plugin)
172
173 return paths_sep.join(resolved)