/src/fluent-bit/lib/librdkafka-2.10.1/src/rdkafka_broker.h
Line | Count | Source |
1 | | /* |
2 | | * librdkafka - Apache Kafka C library |
3 | | * |
4 | | * Copyright (c) 2012,2022, Magnus Edenhill |
5 | | * 2023 Confluent Inc. |
6 | | * All rights reserved. |
7 | | * |
8 | | * Redistribution and use in source and binary forms, with or without |
9 | | * modification, are permitted provided that the following conditions are met: |
10 | | * |
11 | | * 1. Redistributions of source code must retain the above copyright notice, |
12 | | * this list of conditions and the following disclaimer. |
13 | | * 2. Redistributions in binary form must reproduce the above copyright notice, |
14 | | * this list of conditions and the following disclaimer in the documentation |
15 | | * and/or other materials provided with the distribution. |
16 | | * |
17 | | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | | * POSSIBILITY OF SUCH DAMAGE. |
28 | | */ |
29 | | |
30 | | #ifndef _RDKAFKA_BROKER_H_ |
31 | | #define _RDKAFKA_BROKER_H_ |
32 | | |
33 | | #include "rdkafka_feature.h" |
34 | | |
35 | | |
36 | | extern const char *rd_kafka_broker_state_names[]; |
37 | | extern const char *rd_kafka_secproto_names[]; |
38 | | |
39 | | |
40 | | /** |
41 | | * @enum Broker states |
42 | | */ |
43 | | typedef enum { |
44 | | RD_KAFKA_BROKER_STATE_INIT, |
45 | | RD_KAFKA_BROKER_STATE_DOWN, |
46 | | RD_KAFKA_BROKER_STATE_TRY_CONNECT, |
47 | | RD_KAFKA_BROKER_STATE_CONNECT, |
48 | | RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE, |
49 | | RD_KAFKA_BROKER_STATE_AUTH_LEGACY, |
50 | | |
51 | | /* Any state >= STATE_UP means the Kafka protocol layer |
52 | | * is operational (to some degree). */ |
53 | | RD_KAFKA_BROKER_STATE_UP, |
54 | | RD_KAFKA_BROKER_STATE_APIVERSION_QUERY, |
55 | | RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE, |
56 | | RD_KAFKA_BROKER_STATE_AUTH_REQ, |
57 | | RD_KAFKA_BROKER_STATE_REAUTH, |
58 | | } rd_kafka_broker_state_t; |
59 | | |
60 | | /** |
61 | | * @struct Broker state monitor. |
62 | | * |
63 | | * @warning The monitor object lifetime should be the same as |
64 | | * the rd_kafka_t object, not shorter. |
65 | | */ |
66 | | typedef struct rd_kafka_broker_monitor_s { |
67 | | TAILQ_ENTRY(rd_kafka_broker_monitor_s) rkbmon_link; /**< rkb_monitors*/ |
68 | | struct rd_kafka_broker_s *rkbmon_rkb; /**< Broker being monitored. */ |
69 | | rd_kafka_q_t *rkbmon_q; /**< Queue to enqueue op on. */ |
70 | | |
71 | | /**< Callback triggered on the monitoree's op handler thread. |
72 | | * Do note that the callback might be triggered even after |
73 | | * it has been deleted due to the queueing nature of op queues. */ |
74 | | void (*rkbmon_cb)(rd_kafka_broker_t *rkb); |
75 | | } rd_kafka_broker_monitor_t; |
76 | | |
77 | | |
78 | | /** |
79 | | * @struct Broker instance |
80 | | */ |
81 | | struct rd_kafka_broker_s { /* rd_kafka_broker_t */ |
82 | | TAILQ_ENTRY(rd_kafka_broker_s) rkb_link; |
83 | | |
84 | | int32_t rkb_nodeid; /**< Broker Node Id, read only. */ |
85 | 0 | #define RD_KAFKA_NODEID_UA -1 |
86 | | |
87 | | rd_sockaddr_list_t *rkb_rsal; |
88 | | rd_ts_t rkb_ts_rsal_last; |
89 | | const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */ |
90 | | |
91 | | rd_kafka_transport_t *rkb_transport; |
92 | | |
93 | | uint32_t rkb_corrid; |
94 | | int rkb_connid; /* Connection id, increased by |
95 | | * one for each connection by |
96 | | * this broker. Used as a safe-guard |
97 | | * to help troubleshooting buffer |
98 | | * problems across disconnects. */ |
99 | | |
100 | | rd_kafka_q_t *rkb_ops; |
101 | | |
102 | | mtx_t rkb_lock; |
103 | | |
104 | | int rkb_blocking_max_ms; /* Maximum IO poll blocking |
105 | | * time. */ |
106 | | |
107 | | /* Toppars handled by this broker */ |
108 | | TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars; |
109 | | int rkb_toppar_cnt; |
110 | | |
111 | | /* Active toppars that are eligible for: |
112 | | * - (consumer) fetching due to underflow |
113 | | * - (producer) producing |
114 | | * |
115 | | * The circleq provides round-robin scheduling for both cases. |
116 | | */ |
117 | | CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars; |
118 | | int rkb_active_toppar_cnt; |
119 | | rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar |
120 | | * in fetch list. |
121 | | * This is used for |
122 | | * round-robin. */ |
123 | | |
124 | | |
125 | | rd_kafka_cgrp_t *rkb_cgrp; |
126 | | |
127 | | rd_ts_t rkb_ts_fetch_backoff; |
128 | | int rkb_fetching; |
129 | | |
130 | | rd_kafka_broker_state_t rkb_state; /**< Current broker state */ |
131 | | |
132 | | rd_ts_t rkb_ts_state; /* Timestamp of last |
133 | | * state change */ |
134 | | rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan |
135 | | * interval. */ |
136 | | |
137 | | rd_atomic32_t rkb_blocking_request_cnt; /* The number of |
138 | | * in-flight blocking |
139 | | * requests. |
140 | | * A blocking request is |
141 | | * one that is known to |
142 | | * possibly block on the |
143 | | * broker for longer than |
144 | | * the typical processing |
145 | | * time, e.g.: |
146 | | * JoinGroup, SyncGroup */ |
147 | | |
148 | | int rkb_features; /* Protocol features supported |
149 | | * by this broker. |
150 | | * See RD_KAFKA_FEATURE_* in |
151 | | * rdkafka_proto.h */ |
152 | | |
153 | | struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs |
154 | | * (MUST be sorted) */ |
155 | | size_t rkb_ApiVersions_cnt; |
156 | | rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long |
157 | | * the fallback proto |
158 | | * will be used after |
159 | | * ApiVersionRequest |
160 | | * failure. */ |
161 | | |
162 | | rd_kafka_confsource_t rkb_source; |
163 | | struct { |
164 | | rd_atomic64_t tx_bytes; |
165 | | rd_atomic64_t tx; /**< Kafka requests */ |
166 | | rd_atomic64_t tx_err; |
167 | | rd_atomic64_t tx_retries; |
168 | | rd_atomic64_t req_timeouts; /* Accumulated value */ |
169 | | |
170 | | rd_atomic64_t rx_bytes; |
171 | | rd_atomic64_t rx; /**< Kafka responses */ |
172 | | rd_atomic64_t rx_err; |
173 | | rd_atomic64_t rx_corrid_err; /* CorrId misses */ |
174 | | rd_atomic64_t rx_partial; /* Partial messages received |
175 | | * and dropped. */ |
176 | | rd_atomic64_t zbuf_grow; /* Compression/decompression buffer |
177 | | grows needed */ |
178 | | rd_atomic64_t buf_grow; /* rkbuf grows needed */ |
179 | | rd_atomic64_t wakeups; /* Poll wakeups */ |
180 | | |
181 | | rd_atomic32_t connects; /**< Connection attempts, |
182 | | * successful or not. */ |
183 | | |
184 | | rd_atomic32_t disconnects; /**< Disconnects. |
185 | | * Always peer-triggered. */ |
186 | | |
187 | | rd_atomic64_t reqtype[RD_KAFKAP__NUM]; /**< Per request-type |
188 | | * counter */ |
189 | | |
190 | | rd_atomic64_t ts_send; /**< Timestamp of last send */ |
191 | | rd_atomic64_t ts_recv; /**< Timestamp of last receive */ |
192 | | } rkb_c; |
193 | | |
194 | | struct { |
195 | | struct { |
196 | | int32_t connects; /**< Connection attempts, |
197 | | * successful or not. */ |
198 | | } rkb_historic_c; |
199 | | |
200 | | struct { |
201 | | rd_avg_t rkb_avg_rtt; /* Current RTT avg */ |
202 | | rd_avg_t rkb_avg_throttle; /* Current throttle avg */ |
203 | | rd_avg_t |
204 | | rkb_avg_outbuf_latency; /**< Current latency |
205 | | * between buf_enq0 |
206 | | * and writing to socket |
207 | | */ |
208 | | rd_avg_t rkb_avg_fetch_latency; /**< Current fetch |
209 | | * latency avg */ |
210 | | rd_avg_t rkb_avg_produce_latency; /**< Current produce |
211 | | * latency avg */ |
212 | | } rd_avg_current; |
213 | | |
214 | | struct { |
215 | | rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */ |
216 | | rd_avg_t |
217 | | rkb_avg_throttle; /**< Rolled over throttle avg */ |
218 | | rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf |
219 | | * latency avg */ |
220 | | rd_avg_t rkb_avg_fetch_latency; /**< Rolled over fetch |
221 | | * latency avg */ |
222 | | rd_avg_t |
223 | | rkb_avg_produce_latency; /**< Rolled over produce |
224 | | * latency avg */ |
225 | | } rd_avg_rollover; |
226 | | } rkb_telemetry; |
227 | | |
228 | | int rkb_req_timeouts; /* Current value */ |
229 | | |
230 | | thrd_t rkb_thread; |
231 | | |
232 | | rd_refcnt_t rkb_refcnt; |
233 | | |
234 | | rd_kafka_t *rkb_rk; |
235 | | |
236 | | rd_kafka_buf_t *rkb_recv_buf; |
237 | | |
238 | | int rkb_max_inflight; /* Maximum number of in-flight |
239 | | * requests to broker. |
240 | | * Compared to rkb_waitresps length.*/ |
241 | | rd_kafka_bufq_t rkb_outbufs; |
242 | | rd_kafka_bufq_t rkb_waitresps; |
243 | | rd_kafka_bufq_t rkb_retrybufs; |
244 | | |
245 | | rd_avg_t rkb_avg_int_latency; /* Current internal latency period*/ |
246 | | rd_avg_t rkb_avg_outbuf_latency; /**< Current latency |
247 | | * between buf_enq0 |
248 | | * and writing to socket |
249 | | */ |
250 | | rd_avg_t rkb_avg_rtt; /* Current RTT period */ |
251 | | rd_avg_t rkb_avg_throttle; /* Current throttle period */ |
252 | | |
253 | | /* These are all protected by rkb_lock */ |
254 | | char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */ |
255 | | char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/ |
256 | | uint16_t rkb_port; /* TCP port */ |
257 | | char *rkb_origname; /* Original |
258 | | * host name */ |
259 | | int rkb_nodename_epoch; /**< Bumped each time |
260 | | * the nodename is changed. |
261 | | * Compared to |
262 | | * rkb_connect_epoch |
263 | | * to trigger a reconnect |
264 | | * for logical broker |
265 | | * when the nodename is |
266 | | * updated. */ |
267 | | int rkb_connect_epoch; /**< The value of |
268 | | * rkb_nodename_epoch at the |
269 | | * last connection attempt. |
270 | | */ |
271 | | |
272 | | /* Logging name is a copy of rkb_name, protected by its own mutex */ |
273 | | char *rkb_logname; |
274 | | mtx_t rkb_logname_lock; |
275 | | |
276 | | rd_socket_t rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake |
277 | | * up from IO-wait when |
278 | | * queues have content. */ |
279 | | |
280 | | /**< Current, exponentially increased, reconnect backoff. */ |
281 | | int rkb_reconnect_backoff_ms; |
282 | | |
283 | | /**< Absolute timestamp of next allowed reconnect. */ |
284 | | rd_ts_t rkb_ts_reconnect; |
285 | | |
286 | | /** Absolute time of last connection attempt. */ |
287 | | rd_ts_t rkb_ts_connect; |
288 | | |
289 | | /** True if a reauthentication is in progress. */ |
290 | | rd_bool_t rkb_reauth_in_progress; |
291 | | |
292 | | /**< Persistent connection demand is tracked by |
293 | | * a counter for each type of demand. |
294 | | * The broker thread will maintain a persistent connection |
295 | | * if any of the counters are non-zero, and revert to |
296 | | * on-demand mode when they all reach zero. |
297 | | * After incrementing any of the counters a broker wakeup |
298 | | * should be signalled to expedite handling. */ |
299 | | struct { |
300 | | /**< Producer: partitions are being produced to. |
301 | | * Consumer: partitions are being fetched from. |
302 | | * |
303 | | * Counter is maintained by the broker handler thread |
304 | | * itself, no need for atomic/locking. |
305 | | * Is reset to 0 on each producer|consumer_serve() loop |
306 | | * and updated according to current need, which |
307 | | * will trigger a state transition to |
308 | | * TRY_CONNECT if a connection is needed. */ |
309 | | int internal; |
310 | | |
311 | | /**< Consumer: Broker is the group coordinator. |
312 | | * Counter is maintained by cgrp logic in |
313 | | * rdkafka main thread. |
314 | | * |
315 | | * Producer: Broker is the transaction coordinator. |
316 | | * Counter is maintained by rdkafka_idempotence.c. |
317 | | * |
318 | | * All: A coord_req_t is waiting for this broker to come up. |
319 | | */ |
320 | | |
321 | | rd_atomic32_t coord; |
322 | | } rkb_persistconn; |
323 | | |
324 | | /**< Currently registered state monitors. |
325 | | * @locks rkb_lock */ |
326 | | TAILQ_HEAD(, rd_kafka_broker_monitor_s) rkb_monitors; |
327 | | |
328 | | /**< Coordinator request's broker monitor. |
329 | | * Will trigger the coord_req fsm on broker state change. */ |
330 | | rd_kafka_broker_monitor_t rkb_coord_monitor; |
331 | | |
332 | | rd_kafka_secproto_t rkb_proto; |
333 | | |
334 | | int rkb_down_reported; /* Down event reported */ |
335 | | #if WITH_SASL_CYRUS |
336 | | rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr; |
337 | | #endif |
338 | | |
339 | | |
340 | | /* |
341 | | * Log suppression |
342 | | */ |
343 | | struct { |
344 | | /**< Log: compression type not supported by broker. */ |
345 | | rd_interval_t unsupported_compression; |
346 | | |
347 | | /**< Log: KIP-62 not supported by broker. */ |
348 | | rd_interval_t unsupported_kip62; |
349 | | |
350 | | /**< Log: KIP-345 not supported by broker. */ |
351 | | rd_interval_t unsupported_kip345; |
352 | | |
353 | | /**< Log & Error: identical broker_fail() errors. */ |
354 | | rd_interval_t fail_error; |
355 | | } rkb_suppress; |
356 | | |
357 | | /** Last error. This is used to suppress repeated logs. */ |
358 | | struct { |
359 | | char errstr[512]; /**< Last error string */ |
360 | | rd_kafka_resp_err_t err; /**< Last error code */ |
361 | | int cnt; /**< Number of identical errors */ |
362 | | } rkb_last_err; |
363 | | |
364 | | |
365 | | rd_kafka_timer_t rkb_sasl_reauth_tmr; |
366 | | |
367 | | /** > 0 if this broker thread is terminating */ |
368 | | rd_atomic32_t termination_in_progress; |
369 | | }; |
370 | | |
371 | 0 | #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) |
372 | | #define rd_kafka_broker_keep_fl(FUNC, LINE, RKB) \ |
373 | 0 | rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt) |
374 | 0 | #define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock) |
375 | 0 | #define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock) |
376 | | |
377 | | |
378 | | /** |
379 | | * @brief Locks broker, acquires the states, unlocks, and returns |
380 | | * the state. |
381 | | * @locks broker_lock MUST NOT be held. |
382 | | * @locality any |
383 | | */ |
384 | | static RD_INLINE RD_UNUSED rd_kafka_broker_state_t |
385 | 0 | rd_kafka_broker_get_state(rd_kafka_broker_t *rkb) { |
386 | 0 | rd_kafka_broker_state_t state; |
387 | 0 | rd_kafka_broker_lock(rkb); |
388 | 0 | state = rkb->rkb_state; |
389 | 0 | rd_kafka_broker_unlock(rkb); |
390 | 0 | return state; |
391 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_get_state Unexecuted instantiation: rdunittest.c:rd_kafka_broker_get_state Unexecuted instantiation: snappy.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_get_state Unexecuted instantiation: rdavl.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_get_state Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_get_state Unexecuted instantiation: rdlog.c:rd_kafka_broker_get_state |
392 | | |
393 | | |
394 | | |
395 | | /** |
396 | | * @returns true if the broker state is UP |
397 | | */ |
398 | 0 | #define rd_kafka_broker_state_is_up(state) ((state) == RD_KAFKA_BROKER_STATE_UP) |
399 | | |
400 | | /** |
401 | | * @returns true if the broker state is DOWN |
402 | | */ |
403 | | #define rd_kafka_broker_state_is_down(state) \ |
404 | 0 | ((state) == RD_KAFKA_BROKER_STATE_DOWN) |
405 | | |
406 | | /** |
407 | | * @returns true if the error is a broker destroy error, because of |
408 | | * termination or because of decommissioning. |
409 | | */ |
410 | | #define rd_kafka_broker_is_any_err_destroy(err) \ |
411 | 0 | ((err) == RD_KAFKA_RESP_ERR__DESTROY || \ |
412 | 0 | (err) == RD_KAFKA_RESP_ERR__DESTROY_BROKER) |
413 | | |
414 | | |
415 | | #define rd_kafka_broker_or_instance_terminating(rkb) \ |
416 | 0 | (rd_kafka_broker_termination_in_progress(rkb) || \ |
417 | 0 | rd_kafka_terminating((rkb)->rkb_rk)) |
418 | | |
419 | | /** |
420 | | * @returns true if the broker connection is up, else false. |
421 | | * @locks broker_lock MUST NOT be held. |
422 | | * @locality any |
423 | | */ |
424 | | static RD_UNUSED RD_INLINE rd_bool_t |
425 | 0 | rd_kafka_broker_is_up(rd_kafka_broker_t *rkb) { |
426 | 0 | rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb); |
427 | 0 | return rd_kafka_broker_state_is_up(state); |
428 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_is_up Unexecuted instantiation: rdunittest.c:rd_kafka_broker_is_up Unexecuted instantiation: snappy.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_is_up Unexecuted instantiation: rdavl.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_is_up Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_is_up Unexecuted instantiation: rdlog.c:rd_kafka_broker_is_up |
429 | | |
430 | | /** |
431 | | * @returns true if the broker needs a persistent connection |
432 | | * @locality any |
433 | | */ |
434 | | static RD_UNUSED RD_INLINE rd_bool_t |
435 | 0 | rd_kafka_broker_termination_in_progress(rd_kafka_broker_t *rkb) { |
436 | 0 | return rd_atomic32_get(&rkb->termination_in_progress) > 0; |
437 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdunittest.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: snappy.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdavl.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_termination_in_progress Unexecuted instantiation: rdlog.c:rd_kafka_broker_termination_in_progress |
438 | | |
439 | | /** |
440 | | * @brief Broker comparator |
441 | | */ |
442 | | static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp(const void *_a, |
443 | 0 | const void *_b) { |
444 | 0 | const rd_kafka_broker_t *a = _a, *b = _b; |
445 | 0 | return RD_CMP(a, b); |
446 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_cmp Unexecuted instantiation: rdunittest.c:rd_kafka_broker_cmp Unexecuted instantiation: snappy.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_cmp Unexecuted instantiation: rdavl.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_cmp Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_cmp Unexecuted instantiation: rdlog.c:rd_kafka_broker_cmp |
447 | | |
448 | | |
449 | | /** |
450 | | * @returns true if broker supports \p features, else false. |
451 | | */ |
452 | | static RD_UNUSED int rd_kafka_broker_supports(rd_kafka_broker_t *rkb, |
453 | 0 | int features) { |
454 | 0 | const rd_bool_t do_lock = !thrd_is_current(rkb->rkb_thread); |
455 | 0 | int r; |
456 | |
|
457 | 0 | if (do_lock) |
458 | 0 | rd_kafka_broker_lock(rkb); |
459 | |
|
460 | 0 | r = (rkb->rkb_features & features) == features; |
461 | |
|
462 | 0 | if (do_lock) |
463 | 0 | rd_kafka_broker_unlock(rkb); |
464 | 0 | return r; |
465 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_supports Unexecuted instantiation: rdunittest.c:rd_kafka_broker_supports Unexecuted instantiation: snappy.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_supports Unexecuted instantiation: rdavl.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_supports Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_supports Unexecuted instantiation: rdlog.c:rd_kafka_broker_supports |
466 | | |
467 | | int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, |
468 | | int16_t ApiKey, |
469 | | int16_t minver, |
470 | | int16_t maxver, |
471 | | int *featuresp); |
472 | | |
473 | | int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb, |
474 | | int16_t ApiKey, |
475 | | int16_t minver, |
476 | | int16_t maxver, |
477 | | int *featuresp, |
478 | | rd_bool_t do_lock); |
479 | | |
480 | | rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func, |
481 | | int line, |
482 | | rd_kafka_t *rk, |
483 | | int32_t nodeid, |
484 | | int state, |
485 | | rd_bool_t do_connect); |
486 | | |
487 | | #define rd_kafka_broker_find_by_nodeid0(rk, nodeid, state, do_connect) \ |
488 | 0 | rd_kafka_broker_find_by_nodeid0_fl(__FUNCTION__, __LINE__, rk, nodeid, \ |
489 | 0 | state, do_connect) |
490 | | #define rd_kafka_broker_find_by_nodeid(rk, nodeid) \ |
491 | 0 | rd_kafka_broker_find_by_nodeid0(rk, nodeid, -1, rd_false) |
492 | | |
493 | | |
494 | | /** |
495 | | * Filter out brokers that don't support Idempotent Producer. |
496 | | */ |
497 | | static RD_INLINE RD_UNUSED int |
498 | 0 | rd_kafka_broker_filter_non_idempotent(rd_kafka_broker_t *rkb, void *opaque) { |
499 | 0 | return !(rkb->rkb_features & RD_KAFKA_FEATURE_IDEMPOTENT_PRODUCER); |
500 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdunittest.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: snappy.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdavl.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_filter_non_idempotent Unexecuted instantiation: rdlog.c:rd_kafka_broker_filter_non_idempotent |
501 | | |
502 | | |
503 | | rd_kafka_broker_t *rd_kafka_broker_any(rd_kafka_t *rk, |
504 | | int state, |
505 | | int (*filter)(rd_kafka_broker_t *rkb, |
506 | | void *opaque), |
507 | | void *opaque, |
508 | | const char *reason); |
509 | | rd_kafka_broker_t *rd_kafka_broker_any_up(rd_kafka_t *rk, |
510 | | int *filtered_cnt, |
511 | | int (*filter)(rd_kafka_broker_t *rkb, |
512 | | void *opaque), |
513 | | void *opaque, |
514 | | const char *reason); |
515 | | rd_kafka_broker_t *rd_kafka_broker_any_usable(rd_kafka_t *rk, |
516 | | int timeout_ms, |
517 | | rd_dolock_t do_lock, |
518 | | int features, |
519 | | const char *reason); |
520 | | |
521 | | rd_kafka_broker_t * |
522 | | rd_kafka_broker_prefer(rd_kafka_t *rk, int32_t broker_id, int state); |
523 | | |
524 | | rd_kafka_broker_t *rd_kafka_broker_get_async(rd_kafka_t *rk, |
525 | | int32_t broker_id, |
526 | | int state, |
527 | | rd_kafka_enq_once_t *eonce); |
528 | | |
529 | | rd_list_t *rd_kafka_brokers_get_nodeids_async(rd_kafka_t *rk, |
530 | | rd_kafka_enq_once_t *eonce); |
531 | | |
532 | | rd_kafka_broker_t * |
533 | | rd_kafka_broker_controller(rd_kafka_t *rk, int state, rd_ts_t abs_timeout); |
534 | | rd_kafka_broker_t *rd_kafka_broker_controller_async(rd_kafka_t *rk, |
535 | | int state, |
536 | | rd_kafka_enq_once_t *eonce); |
537 | | |
538 | | int rd_kafka_brokers_add0(rd_kafka_t *rk, |
539 | | const char *brokerlist, |
540 | | rd_bool_t is_bootstrap_server_list); |
541 | | void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state); |
542 | | |
543 | | void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, |
544 | | int level, |
545 | | rd_kafka_resp_err_t err, |
546 | | const char *fmt, |
547 | | ...) RD_FORMAT(printf, 4, 5); |
548 | | |
549 | | void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb, |
550 | | rd_kafka_resp_err_t err, |
551 | | const char *errstr); |
552 | | |
553 | | void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb); |
554 | | |
555 | | #define rd_kafka_broker_destroy(rkb) \ |
556 | 0 | rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \ |
557 | 0 | rd_kafka_broker_destroy_final(rkb)) |
558 | | |
559 | | |
560 | | void rd_kafka_broker_update(rd_kafka_t *rk, |
561 | | rd_kafka_secproto_t proto, |
562 | | const struct rd_kafka_metadata_broker *mdb, |
563 | | rd_kafka_broker_t **rkbp); |
564 | | rd_kafka_broker_t *rd_kafka_broker_add(rd_kafka_t *rk, |
565 | | rd_kafka_confsource_t source, |
566 | | rd_kafka_secproto_t proto, |
567 | | const char *name, |
568 | | uint16_t port, |
569 | | int32_t nodeid); |
570 | | |
571 | | rd_kafka_broker_t *rd_kafka_broker_add_logical(rd_kafka_t *rk, |
572 | | const char *name); |
573 | | |
574 | | /** @define returns true if broker is logical. No locking is needed. */ |
575 | 0 | #define RD_KAFKA_BROKER_IS_LOGICAL(rkb) ((rkb)->rkb_source == RD_KAFKA_LOGICAL) |
576 | | |
577 | | void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb, |
578 | | rd_kafka_broker_t *from_rkb); |
579 | | |
580 | | void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb); |
581 | | void rd_kafka_broker_connect_done(rd_kafka_broker_t *rkb, const char *errstr); |
582 | | |
583 | | int rd_kafka_send(rd_kafka_broker_t *rkb); |
584 | | int rd_kafka_recv(rd_kafka_broker_t *rkb); |
585 | | |
586 | | #define rd_kafka_dr_msgq(rkt, rkmq, err) \ |
587 | 0 | rd_kafka_dr_msgq0(rkt, rkmq, err, NULL /*no produce result*/) |
588 | | |
589 | | void rd_kafka_dr_msgq0(rd_kafka_topic_t *rkt, |
590 | | rd_kafka_msgq_t *rkmq, |
591 | | rd_kafka_resp_err_t err, |
592 | | const rd_kafka_Produce_result_t *presult); |
593 | | |
594 | | void rd_kafka_dr_implicit_ack(rd_kafka_broker_t *rkb, |
595 | | rd_kafka_toppar_t *rktp, |
596 | | uint64_t last_msgid); |
597 | | |
598 | | void rd_kafka_broker_buf_enq1(rd_kafka_broker_t *rkb, |
599 | | rd_kafka_buf_t *rkbuf, |
600 | | rd_kafka_resp_cb_t *resp_cb, |
601 | | void *opaque); |
602 | | |
603 | | void rd_kafka_broker_buf_enq_replyq(rd_kafka_broker_t *rkb, |
604 | | rd_kafka_buf_t *rkbuf, |
605 | | rd_kafka_replyq_t replyq, |
606 | | rd_kafka_resp_cb_t *resp_cb, |
607 | | void *opaque); |
608 | | |
609 | | void rd_kafka_broker_buf_retry(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf); |
610 | | |
611 | | |
612 | | rd_kafka_broker_t *rd_kafka_broker_internal(rd_kafka_t *rk); |
613 | | |
614 | | void msghdr_print(rd_kafka_t *rk, |
615 | | const char *what, |
616 | | const struct msghdr *msg, |
617 | | int hexdump); |
618 | | |
619 | | int32_t rd_kafka_broker_id(rd_kafka_broker_t *rkb); |
620 | | const char *rd_kafka_broker_name(rd_kafka_broker_t *rkb); |
621 | | void rd_kafka_broker_wakeup(rd_kafka_broker_t *rkb, const char *reason); |
622 | | int rd_kafka_all_brokers_wakeup(rd_kafka_t *rk, |
623 | | int min_state, |
624 | | const char *reason); |
625 | | |
626 | | void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason); |
627 | | |
628 | | void rd_kafka_broker_purge_queues(rd_kafka_broker_t *rkb, |
629 | | int purge_flags, |
630 | | rd_kafka_replyq_t replyq); |
631 | | |
632 | | int rd_kafka_brokers_get_state_version(rd_kafka_t *rk); |
633 | | int rd_kafka_brokers_wait_state_change(rd_kafka_t *rk, |
634 | | int stored_version, |
635 | | int timeout_ms); |
636 | | int rd_kafka_brokers_wait_state_change_async(rd_kafka_t *rk, |
637 | | int stored_version, |
638 | | rd_kafka_enq_once_t *eonce); |
639 | | void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk); |
640 | | |
641 | | rd_kafka_broker_t *rd_kafka_broker_random0(const char *func, |
642 | | int line, |
643 | | rd_kafka_t *rk, |
644 | | rd_bool_t is_up, |
645 | | int state, |
646 | | int *filtered_cnt, |
647 | | int (*filter)(rd_kafka_broker_t *rk, |
648 | | void *opaque), |
649 | | void *opaque); |
650 | | |
651 | | #define rd_kafka_broker_random(rk, state, filter, opaque) \ |
652 | 0 | rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_false, state, \ |
653 | 0 | NULL, filter, opaque) |
654 | | |
655 | | #define rd_kafka_broker_random_up(rk, filter, opaque) \ |
656 | 0 | rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_true, \ |
657 | 0 | RD_KAFKA_BROKER_STATE_UP, NULL, filter, \ |
658 | 0 | opaque) |
659 | | |
660 | | |
661 | | |
662 | | /** |
663 | | * Updates the current toppar active round-robin next pointer. |
664 | | */ |
665 | | static RD_INLINE RD_UNUSED void |
666 | | rd_kafka_broker_active_toppar_next(rd_kafka_broker_t *rkb, |
667 | 0 | rd_kafka_toppar_t *sugg_next) { |
668 | 0 | if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) || |
669 | 0 | (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars)) |
670 | 0 | rkb->rkb_active_toppar_next = NULL; |
671 | 0 | else if (sugg_next) |
672 | 0 | rkb->rkb_active_toppar_next = sugg_next; |
673 | 0 | else |
674 | 0 | rkb->rkb_active_toppar_next = |
675 | 0 | CIRCLEQ_FIRST(&rkb->rkb_active_toppars); |
676 | 0 | } Unexecuted instantiation: rdkafka.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_assignor.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_broker.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_buf.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_cgrp.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_conf.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_feature.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_metadata.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_metadata_cache.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_msg.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_offset.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_op.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_partition.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_pattern.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_queue.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_range_assignor.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_request.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_roundrobin_assignor.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_sasl.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_sasl_plain.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_sticky_assignor.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_subscription.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_assignment.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_timer.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_topic.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_transport.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_interceptor.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_header.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_admin.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_aux.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_background.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_idempotence.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_txnmgr.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_cert.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_coord.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_mock.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_mock_handlers.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_mock_cgrp.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_error.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_fetcher.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_telemetry.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_telemetry_decode.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_telemetry_encode.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdunittest.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: snappy.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_ssl.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_plugin.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdavl.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_event.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_lz4.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_msgset_reader.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdkafka_msgset_writer.c:rd_kafka_broker_active_toppar_next Unexecuted instantiation: rdlog.c:rd_kafka_broker_active_toppar_next |
677 | | |
678 | | |
679 | | void rd_kafka_broker_active_toppar_add(rd_kafka_broker_t *rkb, |
680 | | rd_kafka_toppar_t *rktp, |
681 | | const char *reason); |
682 | | |
683 | | void rd_kafka_broker_active_toppar_del(rd_kafka_broker_t *rkb, |
684 | | rd_kafka_toppar_t *rktp, |
685 | | const char *reason); |
686 | | |
687 | | |
688 | | void rd_kafka_broker_schedule_connection(rd_kafka_broker_t *rkb); |
689 | | |
690 | | void rd_kafka_broker_persistent_connection_add(rd_kafka_broker_t *rkb, |
691 | | rd_atomic32_t *acntp); |
692 | | |
693 | | void rd_kafka_broker_persistent_connection_del(rd_kafka_broker_t *rkb, |
694 | | rd_atomic32_t *acntp); |
695 | | |
696 | | |
697 | | void rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t *rkbmon, |
698 | | rd_kafka_broker_t *rkb, |
699 | | rd_kafka_q_t *rkq, |
700 | | void (*callback)(rd_kafka_broker_t *rkb)); |
701 | | |
702 | | void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon); |
703 | | |
704 | | void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb, |
705 | | int64_t connections_max_reauth_ms); |
706 | | |
707 | | void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *rkb); |
708 | | |
709 | | void rd_kafka_broker_decommission(rd_kafka_t *rk, |
710 | | rd_kafka_broker_t *rkb, |
711 | | rd_list_t *wait_thrds); |
712 | | |
713 | | int unittest_broker(void); |
714 | | |
715 | | #endif /* _RDKAFKA_BROKER_H_ */ |