1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from .deserializing_consumer import DeserializingConsumer
20from .serializing_producer import SerializingProducer
21from .error import KafkaException, KafkaError
22from ._model import (
23 Node, # noqa: F401
24 ConsumerGroupTopicPartitions,
25 ConsumerGroupState,
26 ConsumerGroupType,
27 TopicCollection,
28 TopicPartitionInfo,
29 IsolationLevel,
30)
31import os
32from .cimpl import (
33 Producer,
34 Consumer,
35 Message,
36 TopicPartition,
37 Uuid,
38 libversion,
39 version,
40 TIMESTAMP_NOT_AVAILABLE,
41 TIMESTAMP_CREATE_TIME,
42 TIMESTAMP_LOG_APPEND_TIME,
43 OFFSET_BEGINNING,
44 OFFSET_END,
45 OFFSET_STORED,
46 OFFSET_INVALID,
47)
48
49__all__ = [
50 "admin",
51 "Consumer",
52 "experimental",
53 "KafkaError",
54 "KafkaException",
55 "kafkatest",
56 "libversion",
57 "Message",
58 "OFFSET_BEGINNING",
59 "OFFSET_END",
60 "OFFSET_INVALID",
61 "OFFSET_STORED",
62 "Producer",
63 "DeserializingConsumer",
64 "SerializingProducer",
65 "TIMESTAMP_CREATE_TIME",
66 "TIMESTAMP_LOG_APPEND_TIME",
67 "TIMESTAMP_NOT_AVAILABLE",
68 "TopicPartition",
69 "Node",
70 "ConsumerGroupTopicPartitions",
71 "ConsumerGroupState",
72 "ConsumerGroupType",
73 "Uuid",
74 "IsolationLevel",
75 "TopicCollection",
76 "TopicPartitionInfo",
77]
78
79
80__version__ = version()
81
82
83class ThrottleEvent(object):
84 """
85 ThrottleEvent contains details about a throttled request.
86 Set up a throttle callback by setting the ``throttle_cb`` configuration
87 property to a callable that takes a ThrottleEvent object as its only argument.
88 The callback will be triggered from poll(), consume() or flush() when a request
89 has been throttled by the broker.
90
91 This class is typically not user instantiated.
92
93 :ivar str broker_name: The hostname of the broker which throttled the request
94 :ivar int broker_id: The broker id
95 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
96 """
97
98 def __init__(self, broker_name, broker_id, throttle_time):
99 self.broker_name = broker_name
100 self.broker_id = broker_id
101 self.throttle_time = throttle_time
102
103 def __str__(self):
104 return "{}/{} throttled for {} ms".format(
105 self.broker_name, self.broker_id, int(self.throttle_time * 1000)
106 )
107
108
109def _resolve_plugins(plugins):
110 """Resolve embedded plugins from the wheel's library directory.
111
112 For internal module use only.
113
114 :param str plugins: The plugin.library.paths value
115 """
116 from sys import platform
117
118 # Location of __init__.py and the embedded library directory
119 basedir = os.path.dirname(__file__)
120
121 if platform in ("win32", "cygwin"):
122 paths_sep = ";"
123 ext = ".dll"
124 libdir = basedir
125 elif platform in ("linux", "linux2"):
126 paths_sep = ":"
127 ext = ".so"
128 libdir = os.path.join(basedir, ".libs")
129 elif platform == "darwin":
130 paths_sep = ":"
131 ext = ".dylib"
132 libdir = os.path.join(basedir, ".dylibs")
133 else:
134 # Unknown platform, there are probably no embedded plugins.
135 return plugins
136
137 if not os.path.isdir(libdir):
138 # No embedded library directory, probably not a wheel installation.
139 return plugins
140
141 resolved = []
142 for plugin in plugins.split(paths_sep):
143 if "/" in plugin or "\\" in plugin:
144 # Path specified, leave unchanged
145 resolved.append(plugin)
146 continue
147
148 # See if the plugin can be found in the wheel's
149 # embedded library directory.
150 # The user might not have supplied a file extension, so try both.
151 good = None
152 for file in [plugin, plugin + ext]:
153 fpath = os.path.join(libdir, file)
154 if os.path.isfile(fpath):
155 good = fpath
156 break
157
158 if good is not None:
159 resolved.append(good)
160 else:
161 resolved.append(plugin)
162
163 return paths_sep.join(resolved)