Coverage Report

Created: 2025-11-07 08:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/lib/librdkafka-2.10.1/src/rdkafka_broker.h
Line
Count
Source
1
/*
2
 * librdkafka - Apache Kafka C library
3
 *
4
 * Copyright (c) 2012,2022, Magnus Edenhill
5
 *               2023 Confluent Inc.
6
 * All rights reserved.
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions are met:
10
 *
11
 * 1. Redistributions of source code must retain the above copyright notice,
12
 *    this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright notice,
14
 *    this list of conditions and the following disclaimer in the documentation
15
 *    and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 */
29
30
#ifndef _RDKAFKA_BROKER_H_
31
#define _RDKAFKA_BROKER_H_
32
33
#include "rdkafka_feature.h"
34
35
36
extern const char *rd_kafka_broker_state_names[];
37
extern const char *rd_kafka_secproto_names[];
38
39
40
/**
41
 * @enum Broker states
42
 */
43
typedef enum {
44
        RD_KAFKA_BROKER_STATE_INIT,
45
        RD_KAFKA_BROKER_STATE_DOWN,
46
        RD_KAFKA_BROKER_STATE_TRY_CONNECT,
47
        RD_KAFKA_BROKER_STATE_CONNECT,
48
        RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE,
49
        RD_KAFKA_BROKER_STATE_AUTH_LEGACY,
50
51
        /* Any state >= STATE_UP means the Kafka protocol layer
52
         * is operational (to some degree). */
53
        RD_KAFKA_BROKER_STATE_UP,
54
        RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
55
        RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE,
56
        RD_KAFKA_BROKER_STATE_AUTH_REQ,
57
        RD_KAFKA_BROKER_STATE_REAUTH,
58
} rd_kafka_broker_state_t;
59
60
/**
61
 * @struct Broker state monitor.
62
 *
63
 * @warning The monitor object lifetime should be the same as
64
 *          the rd_kafka_t object, not shorter.
65
 */
66
typedef struct rd_kafka_broker_monitor_s {
67
        TAILQ_ENTRY(rd_kafka_broker_monitor_s) rkbmon_link; /**< rkb_monitors*/
68
        struct rd_kafka_broker_s *rkbmon_rkb; /**< Broker being monitored. */
69
        rd_kafka_q_t *rkbmon_q;               /**< Queue to enqueue op on. */
70
71
        /**< Callback triggered on the monitoree's op handler thread.
72
         *   Do note that the callback might be triggered even after
73
         *   it has been deleted due to the queueing nature of op queues. */
74
        void (*rkbmon_cb)(rd_kafka_broker_t *rkb);
75
} rd_kafka_broker_monitor_t;
76
77
78
/**
79
 * @struct Broker instance
80
 */
81
struct rd_kafka_broker_s { /* rd_kafka_broker_t */
82
        TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;
83
84
        int32_t rkb_nodeid; /**< Broker Node Id, read only. */
85
0
#define RD_KAFKA_NODEID_UA -1
86
87
        rd_sockaddr_list_t *rkb_rsal;
88
        rd_ts_t rkb_ts_rsal_last;
89
        const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */
90
91
        rd_kafka_transport_t *rkb_transport;
92
93
        uint32_t rkb_corrid;
94
        int rkb_connid; /* Connection id, increased by
95
                         * one for each connection by
96
                         * this broker. Used as a safe-guard
97
                         * to help troubleshooting buffer
98
                         * problems across disconnects. */
99
100
        rd_kafka_q_t *rkb_ops;
101
102
        mtx_t rkb_lock;
103
104
        int rkb_blocking_max_ms; /* Maximum IO poll blocking
105
                                  * time. */
106
107
        /* Toppars handled by this broker */
108
        TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
109
        int rkb_toppar_cnt;
110
111
        /* Active toppars that are eligible for:
112
         *  - (consumer) fetching due to underflow
113
         *  - (producer) producing
114
         *
115
         * The circleq provides round-robin scheduling for both cases.
116
         */
117
        CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars;
118
        int rkb_active_toppar_cnt;
119
        rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar
120
                                                    * in fetch list.
121
                                                    * This is used for
122
                                                    * round-robin. */
123
124
125
        rd_kafka_cgrp_t *rkb_cgrp;
126
127
        rd_ts_t rkb_ts_fetch_backoff;
128
        int rkb_fetching;
129
130
        rd_kafka_broker_state_t rkb_state; /**< Current broker state */
131
132
        rd_ts_t rkb_ts_state;                 /* Timestamp of last
133
                                               * state change */
134
        rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan
135
                                               * interval. */
136
137
        rd_atomic32_t rkb_blocking_request_cnt; /* The number of
138
                                                 * in-flight blocking
139
                                                 * requests.
140
                                                 * A blocking request is
141
                                                 * one that is known to
142
                                                 * possibly block on the
143
                                                 * broker for longer than
144
                                                 * the typical processing
145
                                                 * time, e.g.:
146
                                                 * JoinGroup, SyncGroup */
147
148
        int rkb_features; /* Protocol features supported
149
                           * by this broker.
150
                           * See RD_KAFKA_FEATURE_* in
151
                           * rdkafka_proto.h */
152
153
        struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
154
                                                      * (MUST be sorted) */
155
        size_t rkb_ApiVersions_cnt;
156
        rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long
157
                                                  * the fallback proto
158
                                                  * will be used after
159
                                                  * ApiVersionRequest
160
                                                  * failure. */
161
162
        rd_kafka_confsource_t rkb_source;
163
        struct {
164
                rd_atomic64_t tx_bytes;
165
                rd_atomic64_t tx; /**< Kafka requests */
166
                rd_atomic64_t tx_err;
167
                rd_atomic64_t tx_retries;
168
                rd_atomic64_t req_timeouts; /* Accumulated value */
169
170
                rd_atomic64_t rx_bytes;
171
                rd_atomic64_t rx; /**< Kafka responses */
172
                rd_atomic64_t rx_err;
173
                rd_atomic64_t rx_corrid_err; /* CorrId misses */
174
                rd_atomic64_t rx_partial;    /* Partial messages received
175
                                              * and dropped. */
176
                rd_atomic64_t zbuf_grow;     /* Compression/decompression buffer
177
                                                grows needed */
178
                rd_atomic64_t buf_grow;      /* rkbuf grows needed */
179
                rd_atomic64_t wakeups;       /* Poll wakeups */
180
181
                rd_atomic32_t connects; /**< Connection attempts,
182
                                         *   successful or not. */
183
184
                rd_atomic32_t disconnects; /**< Disconnects.
185
                                            *   Always peer-triggered. */
186
187
                rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type
188
                                                        *   counter */
189
190
                rd_atomic64_t ts_send; /**< Timestamp of last send */
191
                rd_atomic64_t ts_recv; /**< Timestamp of last receive */
192
        } rkb_c;
193
194
        struct {
195
                struct {
196
                        int32_t connects; /**< Connection attempts,
197
                                           *   successful or not. */
198
                } rkb_historic_c;
199
200
                struct {
201
                        rd_avg_t rkb_avg_rtt;      /* Current RTT avg */
202
                        rd_avg_t rkb_avg_throttle; /* Current throttle avg */
203
                        rd_avg_t
204
                            rkb_avg_outbuf_latency;       /**< Current latency
205
                                                           *   between buf_enq0
206
                                                           *   and writing to socket
207
                                                           */
208
                        rd_avg_t rkb_avg_fetch_latency;   /**< Current fetch
209
                                                           *   latency avg */
210
                        rd_avg_t rkb_avg_produce_latency; /**< Current produce
211
                                                           *   latency avg */
212
                } rd_avg_current;
213
214
                struct {
215
                        rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */
216
                        rd_avg_t
217
                            rkb_avg_throttle; /**< Rolled over throttle avg */
218
                        rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf
219
                                                          *   latency avg */
220
                        rd_avg_t rkb_avg_fetch_latency;  /**< Rolled over fetch
221
                                                          *   latency avg */
222
                        rd_avg_t
223
                            rkb_avg_produce_latency; /**< Rolled over produce
224
                                                      *   latency avg */
225
                } rd_avg_rollover;
226
        } rkb_telemetry;
227
228
        int rkb_req_timeouts; /* Current value */
229
230
        thrd_t rkb_thread;
231
232
        rd_refcnt_t rkb_refcnt;
233
234
        rd_kafka_t *rkb_rk;
235
236
        rd_kafka_buf_t *rkb_recv_buf;
237
238
        int rkb_max_inflight; /* Maximum number of in-flight
239
                               * requests to broker.
240
                               * Compared to rkb_waitresps length.*/
241
        rd_kafka_bufq_t rkb_outbufs;
242
        rd_kafka_bufq_t rkb_waitresps;
243
        rd_kafka_bufq_t rkb_retrybufs;
244
245
        rd_avg_t rkb_avg_int_latency;    /* Current internal latency period*/
246
        rd_avg_t rkb_avg_outbuf_latency; /**< Current latency
247
                                          *   between buf_enq0
248
                                          *   and writing to socket
249
                                          */
250
        rd_avg_t rkb_avg_rtt;            /* Current RTT period */
251
        rd_avg_t rkb_avg_throttle;       /* Current throttle period */
252
253
        /* These are all protected by rkb_lock */
254
        char rkb_name[RD_KAFKA_NODENAME_SIZE];     /* Displ name */
255
        char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
256
        uint16_t rkb_port;                         /* TCP port */
257
        char *rkb_origname;                        /* Original
258
                                                    * host name */
259
        int rkb_nodename_epoch;                    /**< Bumped each time
260
                                                    *   the nodename is changed.
261
                                                    *   Compared to
262
                                                    *   rkb_connect_epoch
263
                                                    *   to trigger a reconnect
264
                                                    *   for logical broker
265
                                                    *   when the nodename is
266
                                                    *   updated. */
267
        int rkb_connect_epoch;                     /**< The value of
268
                                                    *   rkb_nodename_epoch at the
269
                                                    *   last connection attempt.
270
                                                    */
271
272
        /* Logging name is a copy of rkb_name, protected by its own mutex */
273
        char *rkb_logname;
274
        mtx_t rkb_logname_lock;
275
276
        rd_socket_t rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake
277
                                       * up from IO-wait when
278
                                       * queues have content. */
279
280
        /**< Current, exponentially increased, reconnect backoff. */
281
        int rkb_reconnect_backoff_ms;
282
283
        /**< Absolute timestamp of next allowed reconnect. */
284
        rd_ts_t rkb_ts_reconnect;
285
286
        /** Absolute time of last connection attempt. */
287
        rd_ts_t rkb_ts_connect;
288
289
        /** True if a reauthentication is in progress. */
290
        rd_bool_t rkb_reauth_in_progress;
291
292
        /**< Persistent connection demand is tracked by
293
         *   a counter for each type of demand.
294
         *   The broker thread will maintain a persistent connection
295
         *   if any of the counters are non-zero, and revert to
296
         *   on-demand mode when they all reach zero.
297
         *   After incrementing any of the counters a broker wakeup
298
         *   should be signalled to expedite handling. */
299
        struct {
300
                /**< Producer: partitions are being produced to.
301
                 *   Consumer: partitions are being fetched from.
302
                 *
303
                 *   Counter is maintained by the broker handler thread
304
                 *   itself, no need for atomic/locking.
305
                 *   Is reset to 0 on each producer|consumer_serve() loop
306
                 *   and updated according to current need, which
307
                 *   will trigger a state transition to
308
                 *   TRY_CONNECT if a connection is needed. */
309
                int internal;
310
311
                /**< Consumer: Broker is the group coordinator.
312
                 *   Counter is maintained by cgrp logic in
313
                 *   rdkafka main thread.
314
                 *
315
                 *   Producer: Broker is the transaction coordinator.
316
                 *   Counter is maintained by rdkafka_idempotence.c.
317
                 *
318
                 *   All: A coord_req_t is waiting for this broker to come up.
319
                 */
320
321
                rd_atomic32_t coord;
322
        } rkb_persistconn;
323
324
        /**< Currently registered state monitors.
325
         *   @locks rkb_lock */
326
        TAILQ_HEAD(, rd_kafka_broker_monitor_s) rkb_monitors;
327
328
        /**< Coordinator request's broker monitor.
329
         *   Will trigger the coord_req fsm on broker state change. */
330
        rd_kafka_broker_monitor_t rkb_coord_monitor;
331
332
        rd_kafka_secproto_t rkb_proto;
333
334
        int rkb_down_reported; /* Down event reported */
335
#if WITH_SASL_CYRUS
336
        rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr;
337
#endif
338
339
340
        /*
341
         * Log suppression
342
         */
343
        struct {
344
                /**< Log: compression type not supported by broker. */
345
                rd_interval_t unsupported_compression;
346
347
                /**< Log: KIP-62 not supported by broker. */
348
                rd_interval_t unsupported_kip62;
349
350
                /**< Log: KIP-345 not supported by broker. */
351
                rd_interval_t unsupported_kip345;
352
353
                /**< Log & Error: identical broker_fail() errors. */
354
                rd_interval_t fail_error;
355
        } rkb_suppress;
356
357
        /** Last error. This is used to suppress repeated logs. */
358
        struct {
359
                char errstr[512];        /**< Last error string */
360
                rd_kafka_resp_err_t err; /**< Last error code */
361
                int cnt;                 /**< Number of identical errors */
362
        } rkb_last_err;
363
364
365
        rd_kafka_timer_t rkb_sasl_reauth_tmr;
366
367
        /** > 0 if this broker thread is terminating */
368
        rd_atomic32_t termination_in_progress;
369
};
370
371
0
#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
372
#define rd_kafka_broker_keep_fl(FUNC, LINE, RKB)                               \
373
0
        rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt)
374
0
#define rd_kafka_broker_lock(rkb)   mtx_lock(&(rkb)->rkb_lock)
375
0
#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)
376
377
378
/**
379
 * @brief Locks broker, acquires the states, unlocks, and returns
380
 *        the state.
381
 * @locks broker_lock MUST NOT be held.
382
 * @locality any
383
 */
384
static RD_INLINE RD_UNUSED rd_kafka_broker_state_t
385
0
rd_kafka_broker_get_state(rd_kafka_broker_t *rkb) {
386
0
        rd_kafka_broker_state_t state;
387
0
        rd_kafka_broker_lock(rkb);
388
0
        state = rkb->rkb_state;
389
0
        rd_kafka_broker_unlock(rkb);
390
0
        return state;
391
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_get_state
Unexecuted instantiation: snappy.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdavl.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_get_state
Unexecuted instantiation: rdlog.c:rd_kafka_broker_get_state
392
393
394
395
/**
396
 * @returns true if the broker state is UP
397
 */
398
0
#define rd_kafka_broker_state_is_up(state) ((state) == RD_KAFKA_BROKER_STATE_UP)
399
400
/**
401
 * @returns true if the broker state is DOWN
402
 */
403
#define rd_kafka_broker_state_is_down(state)                                   \
404
0
        ((state) == RD_KAFKA_BROKER_STATE_DOWN)
405
406
/**
407
 * @returns true if the error is a broker destroy error, because of
408
 *          termination or because of decommissioning.
409
 */
410
#define rd_kafka_broker_is_any_err_destroy(err)                                \
411
0
        ((err) == RD_KAFKA_RESP_ERR__DESTROY ||                                \
412
0
         (err) == RD_KAFKA_RESP_ERR__DESTROY_BROKER)
413
414
415
#define rd_kafka_broker_or_instance_terminating(rkb)                           \
416
0
        (rd_kafka_broker_termination_in_progress(rkb) ||                       \
417
0
         rd_kafka_terminating((rkb)->rkb_rk))
418
419
/**
420
 * @returns true if the broker connection is up, else false.
421
 * @locks broker_lock MUST NOT be held.
422
 * @locality any
423
 */
424
static RD_UNUSED RD_INLINE rd_bool_t
425
0
rd_kafka_broker_is_up(rd_kafka_broker_t *rkb) {
426
0
        rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb);
427
0
        return rd_kafka_broker_state_is_up(state);
428
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_is_up
Unexecuted instantiation: snappy.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdavl.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_is_up
Unexecuted instantiation: rdlog.c:rd_kafka_broker_is_up
429
430
/**
431
 * @returns true if the broker needs a persistent connection
432
 * @locality any
433
 */
434
static RD_UNUSED RD_INLINE rd_bool_t
435
0
rd_kafka_broker_termination_in_progress(rd_kafka_broker_t *rkb) {
436
0
        return rd_atomic32_get(&rkb->termination_in_progress) > 0;
437
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: snappy.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdavl.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_termination_in_progress
Unexecuted instantiation: rdlog.c:rd_kafka_broker_termination_in_progress
438
439
/**
440
 * @brief Broker comparator
441
 */
442
static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp(const void *_a,
443
0
                                                   const void *_b) {
444
0
        const rd_kafka_broker_t *a = _a, *b = _b;
445
0
        return RD_CMP(a, b);
446
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_cmp
Unexecuted instantiation: snappy.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdavl.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_cmp
Unexecuted instantiation: rdlog.c:rd_kafka_broker_cmp
447
448
449
/**
450
 * @returns true if broker supports \p features, else false.
451
 */
452
static RD_UNUSED int rd_kafka_broker_supports(rd_kafka_broker_t *rkb,
453
0
                                              int features) {
454
0
        const rd_bool_t do_lock = !thrd_is_current(rkb->rkb_thread);
455
0
        int r;
456
457
0
        if (do_lock)
458
0
                rd_kafka_broker_lock(rkb);
459
460
0
        r = (rkb->rkb_features & features) == features;
461
462
0
        if (do_lock)
463
0
                rd_kafka_broker_unlock(rkb);
464
0
        return r;
465
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_supports
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_supports
Unexecuted instantiation: snappy.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_supports
Unexecuted instantiation: rdavl.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_supports
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_supports
Unexecuted instantiation: rdlog.c:rd_kafka_broker_supports
466
467
int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
468
                                             int16_t ApiKey,
469
                                             int16_t minver,
470
                                             int16_t maxver,
471
                                             int *featuresp);
472
473
int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb,
474
                                              int16_t ApiKey,
475
                                              int16_t minver,
476
                                              int16_t maxver,
477
                                              int *featuresp,
478
                                              rd_bool_t do_lock);
479
480
rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func,
481
                                                      int line,
482
                                                      rd_kafka_t *rk,
483
                                                      int32_t nodeid,
484
                                                      int state,
485
                                                      rd_bool_t do_connect);
486
487
#define rd_kafka_broker_find_by_nodeid0(rk, nodeid, state, do_connect)         \
488
0
        rd_kafka_broker_find_by_nodeid0_fl(__FUNCTION__, __LINE__, rk, nodeid, \
489
0
                                           state, do_connect)
490
#define rd_kafka_broker_find_by_nodeid(rk, nodeid)                             \
491
0
        rd_kafka_broker_find_by_nodeid0(rk, nodeid, -1, rd_false)
492
493
494
/**
495
 * Filter out brokers that don't support Idempotent Producer.
496
 */
497
static RD_INLINE RD_UNUSED int
498
0
rd_kafka_broker_filter_non_idempotent(rd_kafka_broker_t *rkb, void *opaque) {
499
0
        return !(rkb->rkb_features & RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER);
500
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: snappy.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdavl.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_filter_non_idempotent
Unexecuted instantiation: rdlog.c:rd_kafka_broker_filter_non_idempotent
501
502
503
rd_kafka_broker_t *rd_kafka_broker_any(rd_kafka_t *rk,
504
                                       int state,
505
                                       int (*filter)(rd_kafka_broker_t *rkb,
506
                                                     void *opaque),
507
                                       void *opaque,
508
                                       const char *reason);
509
rd_kafka_broker_t *rd_kafka_broker_any_up(rd_kafka_t *rk,
510
                                          int *filtered_cnt,
511
                                          int (*filter)(rd_kafka_broker_t *rkb,
512
                                                        void *opaque),
513
                                          void *opaque,
514
                                          const char *reason);
515
rd_kafka_broker_t *rd_kafka_broker_any_usable(rd_kafka_t *rk,
516
                                              int timeout_ms,
517
                                              rd_dolock_t do_lock,
518
                                              int features,
519
                                              const char *reason);
520
521
rd_kafka_broker_t *
522
rd_kafka_broker_prefer(rd_kafka_t *rk, int32_t broker_id, int state);
523
524
rd_kafka_broker_t *rd_kafka_broker_get_async(rd_kafka_t *rk,
525
                                             int32_t broker_id,
526
                                             int state,
527
                                             rd_kafka_enq_once_t *eonce);
528
529
rd_list_t *rd_kafka_brokers_get_nodeids_async(rd_kafka_t *rk,
530
                                              rd_kafka_enq_once_t *eonce);
531
532
rd_kafka_broker_t *
533
rd_kafka_broker_controller(rd_kafka_t *rk, int state, rd_ts_t abs_timeout);
534
rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk,
535
                                                    int state,
536
                                                    rd_kafka_enq_once_t *eonce);
537
538
int rd_kafka_brokers_add0(rd_kafka_t *rk,
539
                          const char *brokerlist,
540
                          rd_bool_t is_bootstrap_server_list);
541
void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state);
542
543
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
544
                          int level,
545
                          rd_kafka_resp_err_t err,
546
                          const char *fmt,
547
                          ...) RD_FORMAT(printf, 4, 5);
548
549
void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb,
550
                                 rd_kafka_resp_err_t err,
551
                                 const char *errstr);
552
553
void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb);
554
555
#define rd_kafka_broker_destroy(rkb)                                           \
556
0
        rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt,                           \
557
0
                                 rd_kafka_broker_destroy_final(rkb))
558
559
560
void rd_kafka_broker_update(rd_kafka_t *rk,
561
                            rd_kafka_secproto_t proto,
562
                            const struct rd_kafka_metadata_broker *mdb,
563
                            rd_kafka_broker_t **rkbp);
564
rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk,
565
                                       rd_kafka_confsource_t source,
566
                                       rd_kafka_secproto_t proto,
567
                                       const char *name,
568
                                       uint16_t port,
569
                                       int32_t nodeid);
570
571
rd_kafka_broker_t *rd_kafka_broker_add_logical(rd_kafka_t *rk,
572
                                               const char *name);
573
574
/** @define returns true if broker is logical. No locking is needed. */
575
0
#define RD_KAFKA_BROKER_IS_LOGICAL(rkb) ((rkb)->rkb_source == RD_KAFKA_LOGICAL)
576
577
void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,
578
                                  rd_kafka_broker_t *from_rkb);
579
580
void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb);
581
void rd_kafka_broker_connect_done(rd_kafka_broker_t *rkb, const char *errstr);
582
583
int rd_kafka_send(rd_kafka_broker_t *rkb);
584
int rd_kafka_recv(rd_kafka_broker_t *rkb);
585
586
#define rd_kafka_dr_msgq(rkt, rkmq, err)                                       \
587
0
        rd_kafka_dr_msgq0(rkt, rkmq, err, NULL /*no produce result*/)
588
589
void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt,
590
                       rd_kafka_msgq_t *rkmq,
591
                       rd_kafka_resp_err_t err,
592
                       const rd_kafka_Produce_result_t *presult);
593
594
void rd_kafka_dr_implicit_ack(rd_kafka_broker_t *rkb,
595
                              rd_kafka_toppar_t *rktp,
596
                              uint64_t last_msgid);
597
598
void rd_kafka_broker_buf_enq1(rd_kafka_broker_t *rkb,
599
                              rd_kafka_buf_t *rkbuf,
600
                              rd_kafka_resp_cb_t *resp_cb,
601
                              void *opaque);
602
603
void rd_kafka_broker_buf_enq_replyq(rd_kafka_broker_t *rkb,
604
                                    rd_kafka_buf_t *rkbuf,
605
                                    rd_kafka_replyq_t replyq,
606
                                    rd_kafka_resp_cb_t *resp_cb,
607
                                    void *opaque);
608
609
void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
610
611
612
rd_kafka_broker_t *rd_kafka_broker_internal(rd_kafka_t *rk);
613
614
void msghdr_print(rd_kafka_t *rk,
615
                  const char *what,
616
                  const struct msghdr *msg,
617
                  int hexdump);
618
619
int32_t rd_kafka_broker_id(rd_kafka_broker_t *rkb);
620
const char *rd_kafka_broker_name(rd_kafka_broker_t *rkb);
621
void rd_kafka_broker_wakeup(rd_kafka_broker_t *rkb, const char *reason);
622
int rd_kafka_all_brokers_wakeup(rd_kafka_t *rk,
623
                                int min_state,
624
                                const char *reason);
625
626
void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason);
627
628
void rd_kafka_broker_purge_queues(rd_kafka_broker_t *rkb,
629
                                  int purge_flags,
630
                                  rd_kafka_replyq_t replyq);
631
632
int rd_kafka_brokers_get_state_version(rd_kafka_t *rk);
633
int rd_kafka_brokers_wait_state_change(rd_kafka_t *rk,
634
                                       int stored_version,
635
                                       int timeout_ms);
636
int rd_kafka_brokers_wait_state_change_async(rd_kafka_t *rk,
637
                                             int stored_version,
638
                                             rd_kafka_enq_once_t *eonce);
639
void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk);
640
641
rd_kafka_broker_t *rd_kafka_broker_random0(const char *func,
642
                                           int line,
643
                                           rd_kafka_t *rk,
644
                                           rd_bool_t is_up,
645
                                           int state,
646
                                           int *filtered_cnt,
647
                                           int (*filter)(rd_kafka_broker_t *rk,
648
                                                         void *opaque),
649
                                           void *opaque);
650
651
#define rd_kafka_broker_random(rk, state, filter, opaque)                      \
652
0
        rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_false, state,   \
653
0
                                NULL, filter, opaque)
654
655
#define rd_kafka_broker_random_up(rk, filter, opaque)                          \
656
0
        rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_true,           \
657
0
                                RD_KAFKA_BROKER_STATE_UP, NULL, filter,        \
658
0
                                opaque)
659
660
661
662
/**
663
 * Updates the current toppar active round-robin next pointer.
664
 */
665
static RD_INLINE RD_UNUSED void
666
rd_kafka_broker_active_toppar_next(rd_kafka_broker_t *rkb,
667
0
                                   rd_kafka_toppar_t *sugg_next) {
668
0
        if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) ||
669
0
            (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars))
670
0
                rkb->rkb_active_toppar_next = NULL;
671
0
        else if (sugg_next)
672
0
                rkb->rkb_active_toppar_next = sugg_next;
673
0
        else
674
0
                rkb->rkb_active_toppar_next =
675
0
                    CIRCLEQ_FIRST(&rkb->rkb_active_toppars);
676
0
}
Unexecuted instantiation: rdkafka.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdunittest.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: snappy.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdavl.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_active_toppar_next
Unexecuted instantiation: rdlog.c:rd_kafka_broker_active_toppar_next
677
678
679
void rd_kafka_broker_active_toppar_add(rd_kafka_broker_t *rkb,
680
                                       rd_kafka_toppar_t *rktp,
681
                                       const char *reason);
682
683
void rd_kafka_broker_active_toppar_del(rd_kafka_broker_t *rkb,
684
                                       rd_kafka_toppar_t *rktp,
685
                                       const char *reason);
686
687
688
void rd_kafka_broker_schedule_connection(rd_kafka_broker_t *rkb);
689
690
void rd_kafka_broker_persistent_connection_add(rd_kafka_broker_t *rkb,
691
                                               rd_atomic32_t *acntp);
692
693
void rd_kafka_broker_persistent_connection_del(rd_kafka_broker_t *rkb,
694
                                               rd_atomic32_t *acntp);
695
696
697
void rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t *rkbmon,
698
                                 rd_kafka_broker_t *rkb,
699
                                 rd_kafka_q_t *rkq,
700
                                 void (*callback)(rd_kafka_broker_t *rkb));
701
702
void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon);
703
704
void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb,
705
                                        int64_t connections_max_reauth_ms);
706
707
void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *rkb);
708
709
void rd_kafka_broker_decommission(rd_kafka_t *rk,
710
                                  rd_kafka_broker_t *rkb,
711
                                  rd_list_t *wait_thrds);
712
713
int unittest_broker(void);
714
715
#endif /* _RDKAFKA_BROKER_H_ */