Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/confluent_kafka/__init__.py: 23%
48 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2020 Confluent Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
19from .deserializing_consumer import DeserializingConsumer
20from .serializing_producer import SerializingProducer
21from .error import KafkaException, KafkaError
22from ._model import Node, ConsumerGroupTopicPartitions, ConsumerGroupState
24from .cimpl import (Producer,
25 Consumer,
26 Message,
27 TopicPartition,
28 libversion,
29 version,
30 TIMESTAMP_NOT_AVAILABLE,
31 TIMESTAMP_CREATE_TIME,
32 TIMESTAMP_LOG_APPEND_TIME,
33 OFFSET_BEGINNING,
34 OFFSET_END,
35 OFFSET_STORED,
36 OFFSET_INVALID)
38__all__ = ['admin', 'Consumer',
39 'KafkaError', 'KafkaException',
40 'kafkatest', 'libversion', 'Message',
41 'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED',
42 'Producer', 'DeserializingConsumer',
43 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
44 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
45 'ConsumerGroupTopicPartitions', 'ConsumerGroupState']
47__version__ = version()[0]
50class ThrottleEvent(object):
51 """
52 ThrottleEvent contains details about a throttled request.
53 Set up a throttle callback by setting the ``throttle_cb`` configuration
54 property to a callable that takes a ThrottleEvent object as its only argument.
55 The callback will be triggered from poll(), consume() or flush() when a request
56 has been throttled by the broker.
58 This class is typically not user instantiated.
60 :ivar str broker_name: The hostname of the broker which throttled the request
61 :ivar int broker_id: The broker id
62 :ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
63 """
65 def __init__(self, broker_name,
66 broker_id,
67 throttle_time):
68 self.broker_name = broker_name
69 self.broker_id = broker_id
70 self.throttle_time = throttle_time
72 def __str__(self):
73 return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
74 int(self.throttle_time * 1000))
77def _resolve_plugins(plugins):
78 """ Resolve embedded plugins from the wheel's library directory.
80 For internal module use only.
82 :param str plugins: The plugin.library.paths value
83 """
84 import os
85 from sys import platform
87 # Location of __init__.py and the embedded library directory
88 basedir = os.path.dirname(__file__)
90 if platform in ('win32', 'cygwin'):
91 paths_sep = ';'
92 ext = '.dll'
93 libdir = basedir
94 elif platform in ('linux', 'linux2'):
95 paths_sep = ':'
96 ext = '.so'
97 libdir = os.path.join(basedir, '.libs')
98 elif platform == 'darwin':
99 paths_sep = ':'
100 ext = '.dylib'
101 libdir = os.path.join(basedir, '.dylibs')
102 else:
103 # Unknown platform, there are probably no embedded plugins.
104 return plugins
106 if not os.path.isdir(libdir):
107 # No embedded library directory, probably not a wheel installation.
108 return plugins
110 resolved = []
111 for plugin in plugins.split(paths_sep):
112 if '/' in plugin or '\\' in plugin:
113 # Path specified, leave unchanged
114 resolved.append(plugin)
115 continue
117 # See if the plugin can be found in the wheel's
118 # embedded library directory.
119 # The user might not have supplied a file extension, so try both.
120 good = None
121 for file in [plugin, plugin + ext]:
122 fpath = os.path.join(libdir, file)
123 if os.path.isfile(fpath):
124 good = fpath
125 break
127 if good is not None:
128 resolved.append(good)
129 else:
130 resolved.append(plugin)
132 return paths_sep.join(resolved)