Coverage Report

Created: 2024-07-15 06:17

/src/fluent-bit/lib/librdkafka-2.4.0/src/rdkafka.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * librdkafka - Apache Kafka C library
3
 *
4
 * Copyright (c) 2012-2022, Magnus Edenhill
5
 *               2023, Confluent Inc.
6
 * All rights reserved.
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions are met:
10
 *
11
 * 1. Redistributions of source code must retain the above copyright notice,
12
 *    this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright notice,
14
 *    this list of conditions and the following disclaimer in the documentation
15
 *    and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 */
29
30
31
#define _GNU_SOURCE
32
#include <errno.h>
33
#include <string.h>
34
#include <stdarg.h>
35
#include <signal.h>
36
#include <stdlib.h>
37
#include <sys/stat.h>
38
#if !_WIN32
39
#include <sys/types.h>
40
#include <dirent.h>
41
#endif
42
43
#include "rdkafka_int.h"
44
#include "rdkafka_msg.h"
45
#include "rdkafka_broker.h"
46
#include "rdkafka_topic.h"
47
#include "rdkafka_partition.h"
48
#include "rdkafka_offset.h"
49
#include "rdkafka_transport.h"
50
#include "rdkafka_cgrp.h"
51
#include "rdkafka_assignor.h"
52
#include "rdkafka_request.h"
53
#include "rdkafka_event.h"
54
#include "rdkafka_error.h"
55
#include "rdkafka_sasl.h"
56
#include "rdkafka_interceptor.h"
57
#include "rdkafka_idempotence.h"
58
#include "rdkafka_sasl_oauthbearer.h"
59
#if WITH_OAUTHBEARER_OIDC
60
#include "rdkafka_sasl_oauthbearer_oidc.h"
61
#endif
62
#if WITH_SSL
63
#include "rdkafka_ssl.h"
64
#endif
65
66
#include "rdtime.h"
67
#include "rdmap.h"
68
#include "crc32c.h"
69
#include "rdunittest.h"
70
71
#ifdef _WIN32
72
#include <sys/types.h>
73
#include <sys/timeb.h>
74
#endif
75
76
#define CJSON_HIDE_SYMBOLS
77
#include "cJSON.h"
78
79
#if WITH_CURL
80
#include "rdhttp.h"
81
#endif
82
83
84
static once_flag rd_kafka_global_init_once  = ONCE_FLAG_INIT;
85
static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;
86
87
/**
88
 * @brief Global counter+lock for all active librdkafka instances
89
 */
90
mtx_t rd_kafka_global_lock;
91
int rd_kafka_global_cnt;
92
93
94
/**
95
 * Last API error code, per thread.
96
 * Shared among all rd_kafka_t instances.
97
 */
98
rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
99
100
101
/**
102
 * Current number of threads created by rdkafka.
103
 * This is used in regression tests.
104
 */
105
rd_atomic32_t rd_kafka_thread_cnt_curr;
106
0
int rd_kafka_thread_cnt(void) {
107
0
        return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
108
0
}
109
110
/**
111
 * Current thread's log name (TLS)
112
 */
113
char RD_TLS rd_kafka_thread_name[64] = "app";
114
115
0
void rd_kafka_set_thread_name(const char *fmt, ...) {
116
0
        va_list ap;
117
118
0
        va_start(ap, fmt);
119
0
        rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), fmt,
120
0
                     ap);
121
0
        va_end(ap);
122
0
}
123
124
/**
125
 * @brief Current thread's system name (TLS)
126
 *
127
 * Note the name must be 15 characters or less, because it is passed to
128
 * pthread_setname_np on Linux which imposes this limit.
129
 */
130
static char RD_TLS rd_kafka_thread_sysname[16] = "app";
131
132
0
void rd_kafka_set_thread_sysname(const char *fmt, ...) {
133
0
        va_list ap;
134
135
0
        va_start(ap, fmt);
136
0
        rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
137
0
                     fmt, ap);
138
0
        va_end(ap);
139
140
0
        thrd_setname(rd_kafka_thread_sysname);
141
0
}
142
143
0
static void rd_kafka_global_init0(void) {
144
0
        cJSON_Hooks json_hooks = {.malloc_fn = rd_malloc, .free_fn = rd_free};
145
146
0
        mtx_init(&rd_kafka_global_lock, mtx_plain);
147
#if ENABLE_DEVEL
148
        rd_atomic32_init(&rd_kafka_op_cnt, 0);
149
#endif
150
0
        rd_crc32c_global_init();
151
0
#if WITH_SSL
152
        /* The configuration interface might need to use
153
         * OpenSSL to parse keys, prior to any rd_kafka_t
154
         * object has been created. */
155
0
        rd_kafka_ssl_init();
156
0
#endif
157
158
0
        cJSON_InitHooks(&json_hooks);
159
160
#if WITH_CURL
161
        rd_http_global_init();
162
#endif
163
0
}
164
165
/**
166
 * @brief Initialize once per process
167
 */
168
0
void rd_kafka_global_init(void) {
169
0
        call_once(&rd_kafka_global_init_once, rd_kafka_global_init0);
170
0
}
171
172
173
/**
174
 * @brief Seed the PRNG with current_time.milliseconds
175
 */
176
0
static void rd_kafka_global_srand(void) {
177
0
        struct timeval tv;
178
179
0
        rd_gettimeofday(&tv, NULL);
180
181
0
        srand((unsigned int)(tv.tv_usec / 1000));
182
0
}
183
184
185
/**
186
 * @returns the current number of active librdkafka instances
187
 */
188
0
static int rd_kafka_global_cnt_get(void) {
189
0
        int r;
190
0
        mtx_lock(&rd_kafka_global_lock);
191
0
        r = rd_kafka_global_cnt;
192
0
        mtx_unlock(&rd_kafka_global_lock);
193
0
        return r;
194
0
}
195
196
197
/**
198
 * @brief Increase counter for active librdkafka instances.
199
 * If this is the first instance the global constructors will be called, if any.
200
 */
201
0
static void rd_kafka_global_cnt_incr(void) {
202
0
        mtx_lock(&rd_kafka_global_lock);
203
0
        rd_kafka_global_cnt++;
204
0
        if (rd_kafka_global_cnt == 1) {
205
0
                rd_kafka_transport_init();
206
0
#if WITH_SSL
207
0
                rd_kafka_ssl_init();
208
0
#endif
209
0
                rd_kafka_sasl_global_init();
210
0
        }
211
0
        mtx_unlock(&rd_kafka_global_lock);
212
0
}
213
214
/**
215
 * @brief Decrease counter for active librdkafka instances.
216
 * If this counter reaches 0 the global destructors will be called, if any.
217
 */
218
0
static void rd_kafka_global_cnt_decr(void) {
219
0
        mtx_lock(&rd_kafka_global_lock);
220
0
        rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
221
0
        rd_kafka_global_cnt--;
222
0
        if (rd_kafka_global_cnt == 0) {
223
0
                rd_kafka_sasl_global_term();
224
0
#if WITH_SSL
225
0
                rd_kafka_ssl_term();
226
0
#endif
227
0
        }
228
0
        mtx_unlock(&rd_kafka_global_lock);
229
0
}
230
231
232
/**
233
 * Wait for all rd_kafka_t objects to be destroyed.
234
 * Returns 0 if all kafka objects are now destroyed, or -1 if the
235
 * timeout was reached.
236
 */
237
0
int rd_kafka_wait_destroyed(int timeout_ms) {
238
0
        rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
239
240
0
        while (rd_kafka_thread_cnt() > 0 || rd_kafka_global_cnt_get() > 0) {
241
0
                if (rd_clock() >= timeout) {
242
0
                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
243
0
                                                ETIMEDOUT);
244
0
                        return -1;
245
0
                }
246
0
                rd_usleep(25000, NULL); /* 25ms */
247
0
        }
248
249
0
        return 0;
250
0
}
251
252
static void rd_kafka_log_buf(const rd_kafka_conf_t *conf,
253
                             const rd_kafka_t *rk,
254
                             int level,
255
                             int ctx,
256
                             const char *fac,
257
0
                             const char *buf) {
258
0
        if (level > conf->log_level)
259
0
                return;
260
0
        else if (rk && conf->log_queue) {
261
0
                rd_kafka_op_t *rko;
262
263
0
                if (!rk->rk_logq)
264
0
                        return; /* Terminating */
265
266
0
                rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
267
0
                rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
268
0
                rko->rko_u.log.level = level;
269
0
                rd_strlcpy(rko->rko_u.log.fac, fac, sizeof(rko->rko_u.log.fac));
270
0
                rko->rko_u.log.str = rd_strdup(buf);
271
0
                rko->rko_u.log.ctx = ctx;
272
0
                rd_kafka_q_enq(rk->rk_logq, rko);
273
274
0
        } else if (conf->log_cb) {
275
0
                conf->log_cb(rk, level, fac, buf);
276
0
        }
277
0
}
278
279
/**
280
 * @brief Logger
281
 *
282
 * @remark conf must be set, but rk may be NULL
283
 */
284
void rd_kafka_log0(const rd_kafka_conf_t *conf,
285
                   const rd_kafka_t *rk,
286
                   const char *extra,
287
                   int level,
288
                   int ctx,
289
                   const char *fac,
290
                   const char *fmt,
291
0
                   ...) {
292
0
        char buf[2048];
293
0
        va_list ap;
294
0
        unsigned int elen = 0;
295
0
        unsigned int of   = 0;
296
297
0
        if (level > conf->log_level)
298
0
                return;
299
300
0
        if (conf->log_thread_name) {
301
0
                elen = rd_snprintf(buf, sizeof(buf),
302
0
                                   "[thrd:%s]: ", rd_kafka_thread_name);
303
0
                if (unlikely(elen >= sizeof(buf)))
304
0
                        elen = sizeof(buf);
305
0
                of = elen;
306
0
        }
307
308
0
        if (extra) {
309
0
                elen = rd_snprintf(buf + of, sizeof(buf) - of, "%s: ", extra);
310
0
                if (unlikely(elen >= sizeof(buf) - of))
311
0
                        elen = sizeof(buf) - of;
312
0
                of += elen;
313
0
        }
314
315
0
        va_start(ap, fmt);
316
0
        rd_vsnprintf(buf + of, sizeof(buf) - of, fmt, ap);
317
0
        va_end(ap);
318
319
0
        rd_kafka_log_buf(conf, rk, level, ctx, fac, buf);
320
0
}
321
322
rd_kafka_resp_err_t
323
rd_kafka_oauthbearer_set_token(rd_kafka_t *rk,
324
                               const char *token_value,
325
                               int64_t md_lifetime_ms,
326
                               const char *md_principal_name,
327
                               const char **extensions,
328
                               size_t extension_size,
329
                               char *errstr,
330
0
                               size_t errstr_size) {
331
#if WITH_SASL_OAUTHBEARER
332
        return rd_kafka_oauthbearer_set_token0(
333
            rk, token_value, md_lifetime_ms, md_principal_name, extensions,
334
            extension_size, errstr, errstr_size);
335
#else
336
0
        rd_snprintf(errstr, errstr_size,
337
0
                    "librdkafka not built with SASL OAUTHBEARER support");
338
0
        return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
339
0
#endif
340
0
}
341
342
rd_kafka_resp_err_t rd_kafka_oauthbearer_set_token_failure(rd_kafka_t *rk,
343
0
                                                           const char *errstr) {
344
#if WITH_SASL_OAUTHBEARER
345
        return rd_kafka_oauthbearer_set_token_failure0(rk, errstr);
346
#else
347
0
        return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
348
0
#endif
349
0
}
350
351
void rd_kafka_log_print(const rd_kafka_t *rk,
352
                        int level,
353
                        const char *fac,
354
0
                        const char *buf) {
355
0
        int secs, msecs;
356
0
        struct timeval tv;
357
0
        rd_gettimeofday(&tv, NULL);
358
0
        secs  = (int)tv.tv_sec;
359
0
        msecs = (int)(tv.tv_usec / 1000);
360
0
        fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n", level, secs, msecs, fac,
361
0
                rk ? rk->rk_name : "", buf);
362
0
}
363
364
void rd_kafka_log_syslog(const rd_kafka_t *rk,
365
                         int level,
366
                         const char *fac,
367
0
                         const char *buf) {
368
#if WITH_SYSLOG
369
        static int initialized = 0;
370
371
        if (!initialized)
372
                openlog("rdkafka", LOG_PID | LOG_CONS, LOG_USER);
373
374
        syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
375
#else
376
0
        rd_assert(!*"syslog support not enabled in this build");
377
0
#endif
378
0
}
379
380
void rd_kafka_set_logger(rd_kafka_t *rk,
381
                         void (*func)(const rd_kafka_t *rk,
382
                                      int level,
383
                                      const char *fac,
384
0
                                      const char *buf)) {
385
0
#if !WITH_SYSLOG
386
0
        if (func == rd_kafka_log_syslog)
387
0
                rd_assert(!*"syslog support not enabled in this build");
388
0
#endif
389
0
        rk->rk_conf.log_cb = func;
390
0
}
391
392
0
void rd_kafka_set_log_level(rd_kafka_t *rk, int level) {
393
0
        rk->rk_conf.log_level = level;
394
0
}
395
396
397
398
0
static const char *rd_kafka_type2str(rd_kafka_type_t type) {
399
0
        static const char *types[] = {
400
0
            [RD_KAFKA_PRODUCER] = "producer",
401
0
            [RD_KAFKA_CONSUMER] = "consumer",
402
0
        };
403
0
        return types[type];
404
0
}
405
406
#define _ERR_DESC(ENUM, DESC)                                                  \
407
        [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = {ENUM, &(#ENUM)[18] /*pfx*/, DESC}
408
409
static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
410
    _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
411
    _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG, "Local: Bad message format"),
412
    _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
413
              "Local: Invalid compressed data"),
414
    _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY, "Local: Broker handle destroyed"),
415
    _ERR_DESC(
416
        RD_KAFKA_RESP_ERR__FAIL,
417
        "Local: Communication failure with broker"),  // FIXME: too specific
418
    _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT, "Local: Broker transport failure"),
419
    _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
420
              "Local: Critical system resource failure"),
421
    _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE, "Local: Host resolution failure"),
422
    _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, "Local: Message timed out"),
423
    _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF, "Broker: No more messages"),
424
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION, "Local: Unknown partition"),
425
    _ERR_DESC(RD_KAFKA_RESP_ERR__FS, "Local: File or filesystem error"),
426
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC, "Local: Unknown topic"),
427
    _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
428
              "Local: All broker connections are down"),
429
    _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
430
              "Local: Invalid argument or configuration"),
431
    _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT, "Local: Timed out"),
432
    _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL, "Local: Queue full"),
433
    _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF, "Local: ISR count insufficient"),
434
    _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE, "Local: Broker node update"),
435
    _ERR_DESC(RD_KAFKA_RESP_ERR__SSL, "Local: SSL error"),
436
    _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD, "Local: Waiting for coordinator"),
437
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP, "Local: Unknown group"),
438
    _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS, "Local: Operation in progress"),
439
    _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
440
              "Local: Previous operation in progress"),
441
    _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
442
              "Local: Existing subscription"),
443
    _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, "Local: Assign partitions"),
444
    _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, "Local: Revoke partitions"),
445
    _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT, "Local: Conflicting use"),
446
    _ERR_DESC(RD_KAFKA_RESP_ERR__STATE, "Local: Erroneous state"),
447
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL, "Local: Unknown protocol"),
448
    _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, "Local: Not implemented"),
449
    _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
450
              "Local: Authentication failure"),
451
    _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET, "Local: No offset stored"),
452
    _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED, "Local: Outdated"),
453
    _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, "Local: Timed out in queue"),
454
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
455
              "Local: Required feature not supported by broker"),
456
    _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE, "Local: Awaiting cache update"),
457
    _ERR_DESC(RD_KAFKA_RESP_ERR__INTR, "Local: Operation interrupted"),
458
    _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
459
              "Local: Key serialization error"),
460
    _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
461
              "Local: Value serialization error"),
462
    _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
463
              "Local: Key deserialization error"),
464
    _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
465
              "Local: Value deserialization error"),
466
    _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL, "Local: Partial response"),
467
    _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY, "Local: Read-only object"),
468
    _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT, "Local: No such entry"),
469
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW, "Local: Read underflow"),
470
    _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE, "Local: Invalid type"),
471
    _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY, "Local: Retry operation"),
472
    _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE, "Local: Purged in queue"),
473
    _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT, "Local: Purged in flight"),
474
    _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL, "Local: Fatal error"),
475
    _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT, "Local: Inconsistent state"),
476
    _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
477
              "Local: Gap-less ordering would not be guaranteed "
478
              "if proceeding"),
479
    _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
480
              "Local: Maximum application poll interval "
481
              "(max.poll.interval.ms) exceeded"),
482
    _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_BROKER, "Local: Unknown broker"),
483
    _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
484
              "Local: Functionality not configured"),
485
    _ERR_DESC(RD_KAFKA_RESP_ERR__FENCED,
486
              "Local: This instance has been fenced by a newer instance"),
487
    _ERR_DESC(RD_KAFKA_RESP_ERR__APPLICATION,
488
              "Local: Application generated error"),
489
    _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST,
490
              "Local: Group partition assignment lost"),
491
    _ERR_DESC(RD_KAFKA_RESP_ERR__NOOP, "Local: No operation performed"),
492
    _ERR_DESC(RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET,
493
              "Local: No offset to automatically reset to"),
494
    _ERR_DESC(RD_KAFKA_RESP_ERR__LOG_TRUNCATION,
495
              "Local: Partition log truncation detected"),
496
    _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_DIFFERENT_RECORD,
497
              "Local: an invalid record in the same batch caused "
498
              "the failure of this message too."),
499
500
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN, "Unknown broker error"),
501
    _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR, "Success"),
502
    _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
503
              "Broker: Offset out of range"),
504
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG, "Broker: Invalid message"),
505
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
506
              "Broker: Unknown topic or partition"),
507
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
508
              "Broker: Invalid message size"),
509
    _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
510
              "Broker: Leader not available"),
511
    _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
512
              "Broker: Not leader for partition"),
513
    _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, "Broker: Request timed out"),
514
    _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
515
              "Broker: Broker not available"),
516
    _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
517
              "Broker: Replica not available"),
518
    _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
519
              "Broker: Message size too large"),
520
    _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
521
              "Broker: StaleControllerEpochCode"),
522
    _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
523
              "Broker: Offset metadata string too large"),
524
    _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
525
              "Broker: Broker disconnected before response received"),
526
    _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
527
              "Broker: Coordinator load in progress"),
528
    _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
529
              "Broker: Coordinator not available"),
530
    _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR, "Broker: Not coordinator"),
531
    _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION, "Broker: Invalid topic"),
532
    _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
533
              "Broker: Message batch larger than configured server "
534
              "segment size"),
535
    _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
536
              "Broker: Not enough in-sync replicas"),
537
    _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
538
              "Broker: Message(s) written to insufficient number of "
539
              "in-sync replicas"),
540
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
541
              "Broker: Invalid required acks value"),
542
    _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
543
              "Broker: Specified group generation id is not valid"),
544
    _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
545
              "Broker: Inconsistent group protocol"),
546
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID, "Broker: Invalid group.id"),
547
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, "Broker: Unknown member"),
548
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
549
              "Broker: Invalid session timeout"),
550
    _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
551
              "Broker: Group rebalance in progress"),
552
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
553
              "Broker: Commit offset data size is not valid"),
554
    _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
555
              "Broker: Topic authorization failed"),
556
    _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
557
              "Broker: Group authorization failed"),
558
    _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
559
              "Broker: Cluster authorization failed"),
560
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP, "Broker: Invalid timestamp"),
561
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
562
              "Broker: Unsupported SASL mechanism"),
563
    _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
564
              "Broker: Request not valid in current SASL state"),
565
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
566
              "Broker: API version not supported"),
567
    _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
568
              "Broker: Topic already exists"),
569
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
570
              "Broker: Invalid number of partitions"),
571
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
572
              "Broker: Invalid replication factor"),
573
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
574
              "Broker: Invalid replica assignment"),
575
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
576
              "Broker: Configuration is invalid"),
577
    _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
578
              "Broker: Not controller for cluster"),
579
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST, "Broker: Invalid request"),
580
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
581
              "Broker: Message format on broker does not support request"),
582
    _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION, "Broker: Policy violation"),
583
    _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
584
              "Broker: Broker received an out of order sequence number"),
585
    _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
586
              "Broker: Broker received a duplicate sequence number"),
587
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
588
              "Broker: Producer attempted an operation with an old epoch"),
589
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
590
              "Broker: Producer attempted a transactional operation in "
591
              "an invalid state"),
592
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
593
              "Broker: Producer attempted to use a producer id which is "
594
              "not currently assigned to its transactional id"),
595
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
596
              "Broker: Transaction timeout is larger than the maximum "
597
              "value allowed by the broker's max.transaction.timeout.ms"),
598
    _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
599
              "Broker: Producer attempted to update a transaction while "
600
              "another concurrent operation on the same transaction was "
601
              "ongoing"),
602
    _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
603
              "Broker: Indicates that the transaction coordinator sending "
604
              "a WriteTxnMarker is no longer the current coordinator for "
605
              "a given producer"),
606
    _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
607
              "Broker: Transactional Id authorization failed"),
608
    _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
609
              "Broker: Security features are disabled"),
610
    _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
611
              "Broker: Operation not attempted"),
612
    _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
613
              "Broker: Disk error when trying to access log file on disk"),
614
    _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND,
615
              "Broker: The user-specified log directory is not found "
616
              "in the broker config"),
617
    _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED,
618
              "Broker: SASL Authentication failed"),
619
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
620
              "Broker: Unknown Producer Id"),
621
    _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS,
622
              "Broker: Partition reassignment is in progress"),
623
    _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED,
624
              "Broker: Delegation Token feature is not enabled"),
625
    _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND,
626
              "Broker: Delegation Token is not found on server"),
627
    _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH,
628
              "Broker: Specified Principal is not valid Owner/Renewer"),
629
    _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
630
              "Broker: Delegation Token requests are not allowed on "
631
              "this connection"),
632
    _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED,
633
              "Broker: Delegation Token authorization failed"),
634
    _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED,
635
              "Broker: Delegation Token is expired"),
636
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE,
637
              "Broker: Supplied principalType is not supported"),
638
    _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP,
639
              "Broker: The group is not empty"),
640
    _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND,
641
              "Broker: The group id does not exist"),
642
    _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND,
643
              "Broker: The fetch session ID was not found"),
644
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH,
645
              "Broker: The fetch session epoch is invalid"),
646
    _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND,
647
              "Broker: No matching listener"),
648
    _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED,
649
              "Broker: Topic deletion is disabled"),
650
    _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
651
              "Broker: Leader epoch is older than broker epoch"),
652
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
653
              "Broker: Leader epoch is newer than broker epoch"),
654
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE,
655
              "Broker: Unsupported compression type"),
656
    _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH,
657
              "Broker: Broker epoch has changed"),
658
    _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
659
              "Broker: Leader high watermark is not caught up"),
660
    _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
661
              "Broker: Group member needs a valid member ID"),
662
    _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE,
663
              "Broker: Preferred leader was not available"),
664
    _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED,
665
              "Broker: Consumer group has reached maximum size"),
666
    _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
667
              "Broker: Static consumer fenced by other consumer with same "
668
              "group.instance.id"),
669
    _ERR_DESC(RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE,
670
              "Broker: Eligible partition leaders are not available"),
671
    _ERR_DESC(RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED,
672
              "Broker: Leader election not needed for topic partition"),
673
    _ERR_DESC(RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS,
674
              "Broker: No partition reassignment is in progress"),
675
    _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC,
676
              "Broker: Deleting offsets of a topic while the consumer "
677
              "group is subscribed to it"),
678
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_RECORD,
679
              "Broker: Broker failed to validate record"),
680
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
681
              "Broker: There are unstable offsets that need to be cleared"),
682
    _ERR_DESC(RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED,
683
              "Broker: Throttling quota has been exceeded"),
684
    _ERR_DESC(RD_KAFKA_RESP_ERR_PRODUCER_FENCED,
685
              "Broker: There is a newer producer with the same "
686
              "transactionalId which fences the current one"),
687
    _ERR_DESC(RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND,
688
              "Broker: Request illegally referred to resource that "
689
              "does not exist"),
690
    _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE,
691
              "Broker: Request illegally referred to the same resource "
692
              "twice"),
693
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL,
694
              "Broker: Requested credential would not meet criteria for "
695
              "acceptability"),
696
    _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET,
697
              "Broker: Indicates that the either the sender or recipient "
698
              "of a voter-only request is not one of the expected voters"),
699
    _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION,
700
              "Broker: Invalid update version"),
701
    _ERR_DESC(RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED,
702
              "Broker: Unable to update finalized features due to "
703
              "server error"),
704
    _ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
705
              "Broker: Request principal deserialization failed during "
706
              "forwarding"),
707
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID, "Broker: Unknown topic id"),
708
    _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
709
              "Broker: The member epoch is fenced by the group coordinator"),
710
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNRELEASED_INSTANCE_ID,
711
              "Broker: The instance ID is still used by another member in the "
712
              "consumer group"),
713
    _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_ASSIGNOR,
714
              "Broker: The assignor or its version range is not supported by "
715
              "the consumer group"),
716
    _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
717
              "Broker: The member epoch is stale"),
718
    _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
719
720
721
void rd_kafka_get_err_descs(const struct rd_kafka_err_desc **errdescs,
722
0
                            size_t *cntp) {
723
0
        *errdescs = rd_kafka_err_descs;
724
0
        *cntp     = RD_ARRAYSIZE(rd_kafka_err_descs);
725
0
}
726
727
728
0
const char *rd_kafka_err2str(rd_kafka_resp_err_t err) {
729
0
        static RD_TLS char ret[32];
730
0
        int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
731
732
0
        if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
733
0
                     err >= RD_KAFKA_RESP_ERR_END_ALL ||
734
0
                     !rd_kafka_err_descs[idx].desc)) {
735
0
                rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
736
0
                return ret;
737
0
        }
738
739
0
        return rd_kafka_err_descs[idx].desc;
740
0
}
741
742
743
0
const char *rd_kafka_err2name(rd_kafka_resp_err_t err) {
744
0
        static RD_TLS char ret[32];
745
0
        int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
746
747
0
        if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
748
0
                     err >= RD_KAFKA_RESP_ERR_END_ALL ||
749
0
                     !rd_kafka_err_descs[idx].desc)) {
750
0
                rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
751
0
                return ret;
752
0
        }
753
754
0
        return rd_kafka_err_descs[idx].name;
755
0
}
756
757
758
0
rd_kafka_resp_err_t rd_kafka_last_error(void) {
759
0
        return rd_kafka_last_error_code;
760
0
}
761
762
763
0
rd_kafka_resp_err_t rd_kafka_errno2err(int errnox) {
764
0
        switch (errnox) {
765
0
        case EINVAL:
766
0
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
767
768
0
        case EBUSY:
769
0
                return RD_KAFKA_RESP_ERR__CONFLICT;
770
771
0
        case ENOENT:
772
0
                return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
773
774
0
        case ESRCH:
775
0
                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
776
777
0
        case ETIMEDOUT:
778
0
                return RD_KAFKA_RESP_ERR__TIMED_OUT;
779
780
0
        case EMSGSIZE:
781
0
                return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
782
783
0
        case ENOBUFS:
784
0
                return RD_KAFKA_RESP_ERR__QUEUE_FULL;
785
786
0
        case ECANCELED:
787
0
                return RD_KAFKA_RESP_ERR__FATAL;
788
789
0
        default:
790
0
                return RD_KAFKA_RESP_ERR__FAIL;
791
0
        }
792
0
}
793
794
795
rd_kafka_resp_err_t
796
0
rd_kafka_fatal_error(rd_kafka_t *rk, char *errstr, size_t errstr_size) {
797
0
        rd_kafka_resp_err_t err;
798
799
0
        if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) {
800
0
                rd_kafka_rdlock(rk);
801
0
                rd_snprintf(errstr, errstr_size, "%s", rk->rk_fatal.errstr);
802
0
                rd_kafka_rdunlock(rk);
803
0
        }
804
805
0
        return err;
806
0
}
807
808
809
/**
810
 * @brief Set's the fatal error for this instance.
811
 *
812
 * @param do_lock RD_DO_LOCK: rd_kafka_wrlock() will be acquired and released,
813
 *                RD_DONT_LOCK: caller must hold rd_kafka_wrlock().
814
 *
815
 * @returns 1 if the error was set, or 0 if a previous fatal error
816
 *          has already been set on this instance.
817
 *
818
 * @locality any
819
 * @locks none
820
 */
821
int rd_kafka_set_fatal_error0(rd_kafka_t *rk,
822
                              rd_dolock_t do_lock,
823
                              rd_kafka_resp_err_t err,
824
                              const char *fmt,
825
0
                              ...) {
826
0
        va_list ap;
827
0
        char buf[512];
828
829
0
        if (do_lock)
830
0
                rd_kafka_wrlock(rk);
831
0
        rk->rk_fatal.cnt++;
832
0
        if (rd_atomic32_get(&rk->rk_fatal.err)) {
833
0
                if (do_lock)
834
0
                        rd_kafka_wrunlock(rk);
835
0
                rd_kafka_dbg(rk, GENERIC, "FATAL",
836
0
                             "Suppressing subsequent fatal error: %s",
837
0
                             rd_kafka_err2name(err));
838
0
                return 0;
839
0
        }
840
841
0
        rd_atomic32_set(&rk->rk_fatal.err, err);
842
843
0
        va_start(ap, fmt);
844
0
        rd_vsnprintf(buf, sizeof(buf), fmt, ap);
845
0
        va_end(ap);
846
0
        rk->rk_fatal.errstr = rd_strdup(buf);
847
848
0
        if (do_lock)
849
0
                rd_kafka_wrunlock(rk);
850
851
        /* If there is an error callback or event handler we
852
         * also log the fatal error as it happens.
853
         * If there is no error callback the error event
854
         * will be automatically logged, and this check here
855
         * prevents us from duplicate logs. */
856
0
        if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)
857
0
                rd_kafka_log(rk, LOG_EMERG, "FATAL", "Fatal error: %s: %s",
858
0
                             rd_kafka_err2str(err), rk->rk_fatal.errstr);
859
0
        else
860
0
                rd_kafka_dbg(rk, ALL, "FATAL", "Fatal error: %s: %s",
861
0
                             rd_kafka_err2str(err), rk->rk_fatal.errstr);
862
863
        /* Indicate to the application that a fatal error was raised,
864
         * the app should use rd_kafka_fatal_error() to extract the
865
         * fatal error code itself.
866
         * For the high-level consumer we propagate the error as a
867
         * consumer error so it is returned from consumer_poll(),
868
         * while for all other client types (the producer) we propagate to
869
         * the standard error handler (typically error_cb). */
870
0
        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
871
0
                rd_kafka_consumer_err(
872
0
                    rk->rk_cgrp->rkcg_q, RD_KAFKA_NODEID_UA,
873
0
                    RD_KAFKA_RESP_ERR__FATAL, 0, NULL, NULL,
874
0
                    RD_KAFKA_OFFSET_INVALID, "Fatal error: %s: %s",
875
0
                    rd_kafka_err2str(err), rk->rk_fatal.errstr);
876
0
        else
877
0
                rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL,
878
0
                                "Fatal error: %s: %s", rd_kafka_err2str(err),
879
0
                                rk->rk_fatal.errstr);
880
881
882
        /* Tell rdkafka main thread to purge producer queues, but not
883
         * in-flight since we'll want proper delivery status for transmitted
884
         * requests.
885
         * Need NON_BLOCKING to avoid dead-lock if user is
886
         * calling purge() at the same time, which could be
887
         * waiting for this broker thread to handle its
888
         * OP_PURGE request. */
889
0
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
890
0
                rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
891
0
                rko->rko_u.purge.flags =
892
0
                    RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_NON_BLOCKING;
893
0
                rd_kafka_q_enq(rk->rk_ops, rko);
894
0
        }
895
896
0
        return 1;
897
0
}
898
899
900
/**
901
 * @returns a copy of the current fatal error, if any, else NULL.
902
 *
903
 * @locks_acquired rd_kafka_rdlock(rk)
904
 */
905
0
rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk) {
906
0
        rd_kafka_error_t *error;
907
0
        rd_kafka_resp_err_t err;
908
909
0
        if (!(err = rd_atomic32_get(&rk->rk_fatal.err)))
910
0
                return NULL; /* No fatal error raised */
911
912
0
        rd_kafka_rdlock(rk);
913
0
        error = rd_kafka_error_new_fatal(err, "%s", rk->rk_fatal.errstr);
914
0
        rd_kafka_rdunlock(rk);
915
916
0
        return error;
917
0
}
918
919
920
rd_kafka_resp_err_t rd_kafka_test_fatal_error(rd_kafka_t *rk,
921
                                              rd_kafka_resp_err_t err,
922
0
                                              const char *reason) {
923
0
        if (!rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s", reason))
924
0
                return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
925
0
        else
926
0
                return RD_KAFKA_RESP_ERR_NO_ERROR;
927
0
}
928
929
930
931
/**
932
 * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
933
 *
934
 * @locality application thread
935
 */
936
0
void rd_kafka_destroy_final(rd_kafka_t *rk) {
937
938
0
        rd_kafka_assert(rk, rd_kafka_terminating(rk));
939
940
        /* Synchronize state */
941
0
        rd_kafka_wrlock(rk);
942
0
        rd_kafka_wrunlock(rk);
943
944
        /* Terminate SASL provider */
945
0
        if (rk->rk_conf.sasl.provider)
946
0
                rd_kafka_sasl_term(rk);
947
948
0
        rd_kafka_timers_destroy(&rk->rk_timers);
949
950
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
951
952
        /* Destroy cgrp */
953
0
        if (rk->rk_cgrp) {
954
0
                rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying cgrp");
955
                /* Reset queue forwarding (rep -> cgrp) */
956
0
                rd_kafka_q_fwd_set(rk->rk_rep, NULL);
957
0
                rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
958
0
        }
959
960
0
        rd_kafka_assignors_term(rk);
961
962
0
        if (rk->rk_type == RD_KAFKA_CONSUMER) {
963
0
                rd_kafka_assignment_destroy(rk);
964
0
                if (rk->rk_consumer.q)
965
0
                        rd_kafka_q_destroy(rk->rk_consumer.q);
966
0
        }
967
968
        /* Purge op-queues */
969
0
        rd_kafka_q_destroy_owner(rk->rk_rep);
970
0
        rd_kafka_q_destroy_owner(rk->rk_ops);
971
972
0
#if WITH_SSL
973
0
        if (rk->rk_conf.ssl.ctx) {
974
0
                rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
975
0
                rd_kafka_ssl_ctx_term(rk);
976
0
        }
977
0
        rd_list_destroy(&rk->rk_conf.ssl.loaded_providers);
978
0
#endif
979
980
        /* It is not safe to log after this point. */
981
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
982
0
                     "Termination done: freeing resources");
983
984
0
        if (rk->rk_logq) {
985
0
                rd_kafka_q_destroy_owner(rk->rk_logq);
986
0
                rk->rk_logq = NULL;
987
0
        }
988
989
0
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
990
0
                cnd_destroy(&rk->rk_curr_msgs.cnd);
991
0
                mtx_destroy(&rk->rk_curr_msgs.lock);
992
0
        }
993
994
0
        if (rk->rk_fatal.errstr) {
995
0
                rd_free(rk->rk_fatal.errstr);
996
0
                rk->rk_fatal.errstr = NULL;
997
0
        }
998
999
0
        cnd_destroy(&rk->rk_broker_state_change_cnd);
1000
0
        mtx_destroy(&rk->rk_broker_state_change_lock);
1001
1002
0
        mtx_destroy(&rk->rk_suppress.sparse_connect_lock);
1003
1004
0
        cnd_destroy(&rk->rk_init_cnd);
1005
0
        mtx_destroy(&rk->rk_init_lock);
1006
1007
0
        if (rk->rk_full_metadata)
1008
0
                rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata);
1009
0
        rd_kafkap_str_destroy(rk->rk_client_id);
1010
0
        rd_kafkap_str_destroy(rk->rk_group_id);
1011
0
        rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
1012
0
        rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
1013
0
        rd_list_destroy(&rk->rk_broker_by_id);
1014
1015
0
        mtx_destroy(&rk->rk_conf.sasl.lock);
1016
0
        rwlock_destroy(&rk->rk_lock);
1017
1018
0
        rd_free(rk);
1019
0
        rd_kafka_global_cnt_decr();
1020
0
}
1021
1022
1023
0
static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) {
1024
0
        thrd_t thrd;
1025
0
#ifndef _WIN32
1026
0
        int term_sig = rk->rk_conf.term_sig;
1027
0
#endif
1028
0
        int res;
1029
0
        char flags_str[256];
1030
0
        static const char *rd_kafka_destroy_flags_names[] = {
1031
0
            "Terminate", "DestroyCalled", "Immediate", "NoConsumerClose", NULL};
1032
1033
        /* Fatal errors and _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */
1034
0
        if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE ||
1035
0
            rd_kafka_fatal_error_code(rk))
1036
0
                flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE;
1037
1038
0
        rd_flags2str(flags_str, sizeof(flags_str), rd_kafka_destroy_flags_names,
1039
0
                     flags);
1040
0
        rd_kafka_dbg(rk, ALL, "DESTROY",
1041
0
                     "Terminating instance "
1042
0
                     "(destroy flags %s (0x%x))",
1043
0
                     flags ? flags_str : "none", flags);
1044
1045
        /* If producer still has messages in queue the application
1046
         * is terminating the producer without first calling flush() or purge()
1047
         * which is a common new user mistake, so hint the user of proper
1048
         * shutdown semantics. */
1049
0
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
1050
0
                unsigned int tot_cnt;
1051
0
                size_t tot_size;
1052
1053
0
                rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1054
1055
0
                if (tot_cnt > 0)
1056
0
                        rd_kafka_log(rk, LOG_WARNING, "TERMINATE",
1057
0
                                     "Producer terminating with %u message%s "
1058
0
                                     "(%" PRIusz
1059
0
                                     " byte%s) still in "
1060
0
                                     "queue or transit: "
1061
0
                                     "use flush() to wait for "
1062
0
                                     "outstanding message delivery",
1063
0
                                     tot_cnt, tot_cnt > 1 ? "s" : "", tot_size,
1064
0
                                     tot_size > 1 ? "s" : "");
1065
0
        }
1066
1067
        /* Make sure destroy is not called from a librdkafka thread
1068
         * since this will most likely cause a deadlock.
1069
         * FIXME: include broker threads (for log_cb) */
1070
0
        if (thrd_is_current(rk->rk_thread) ||
1071
0
            thrd_is_current(rk->rk_background.thread)) {
1072
0
                rd_kafka_log(rk, LOG_EMERG, "BGQUEUE",
1073
0
                             "Application bug: "
1074
0
                             "rd_kafka_destroy() called from "
1075
0
                             "librdkafka owned thread");
1076
0
                rd_kafka_assert(NULL,
1077
0
                                !*"Application bug: "
1078
0
                                "calling rd_kafka_destroy() from "
1079
0
                                "librdkafka owned thread is prohibited");
1080
0
        }
1081
1082
        /* Before signaling for general termination, set the destroy
1083
         * flags to hint cgrp how to shut down. */
1084
0
        rd_atomic32_set(&rk->rk_terminate,
1085
0
                        flags | RD_KAFKA_DESTROY_F_DESTROY_CALLED);
1086
1087
        /* The legacy/simple consumer lacks an API to close down the consumer*/
1088
0
        if (rk->rk_cgrp) {
1089
0
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1090
0
                             "Terminating consumer group handler");
1091
0
                rd_kafka_consumer_close(rk);
1092
0
        }
1093
1094
        /* With the consumer closed, terminate the rest of librdkafka. */
1095
0
        rd_atomic32_set(&rk->rk_terminate,
1096
0
                        flags | RD_KAFKA_DESTROY_F_TERMINATE);
1097
1098
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
1099
0
        rd_kafka_wrlock(rk);
1100
0
        thrd = rk->rk_thread;
1101
0
        rd_kafka_timers_interrupt(&rk->rk_timers);
1102
0
        rd_kafka_wrunlock(rk);
1103
1104
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1105
0
                     "Sending TERMINATE to internal main thread");
1106
        /* Send op to trigger queue/io wake-up.
1107
         * The op itself is (likely) ignored by the receiver. */
1108
0
        rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1109
1110
0
#ifndef _WIN32
1111
        /* Interrupt main kafka thread to speed up termination. */
1112
0
        if (term_sig) {
1113
0
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1114
0
                             "Sending thread kill signal %d", term_sig);
1115
0
                pthread_kill(thrd, term_sig);
1116
0
        }
1117
0
#endif
1118
1119
0
        if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE))
1120
0
                return; /* FIXME: thread resource leak */
1121
1122
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Joining internal main thread");
1123
1124
0
        if (thrd_join(thrd, &res) != thrd_success)
1125
0
                rd_kafka_log(rk, LOG_ERR, "DESTROY",
1126
0
                             "Failed to join internal main thread: %s "
1127
0
                             "(was process forked?)",
1128
0
                             rd_strerror(errno));
1129
1130
0
        rd_kafka_destroy_final(rk);
1131
0
}
1132
1133
1134
/* NOTE: Must only be called by application.
1135
 *       librdkafka itself must use rd_kafka_destroy0(). */
1136
0
void rd_kafka_destroy(rd_kafka_t *rk) {
1137
0
        rd_kafka_destroy_app(rk, 0);
1138
0
}
1139
1140
0
void rd_kafka_destroy_flags(rd_kafka_t *rk, int flags) {
1141
0
        rd_kafka_destroy_app(rk, flags);
1142
0
}
1143
1144
1145
/**
1146
 * Main destructor for rd_kafka_t
1147
 *
1148
 * Locality: rdkafka main thread or application thread during rd_kafka_new()
1149
 */
1150
0
static void rd_kafka_destroy_internal(rd_kafka_t *rk) {
1151
0
        rd_kafka_topic_t *rkt, *rkt_tmp;
1152
0
        rd_kafka_broker_t *rkb, *rkb_tmp;
1153
0
        rd_list_t wait_thrds;
1154
0
        thrd_t *thrd;
1155
0
        int i;
1156
1157
0
        rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
1158
1159
        /* Trigger any state-change waiters (which should check the
1160
         * terminate flag whenever they wake up). */
1161
0
        rd_kafka_brokers_broadcast_state_change(rk);
1162
1163
0
        if (rk->rk_background.thread) {
1164
0
                int res;
1165
                /* Send op to trigger queue/io wake-up.
1166
                 * The op itself is (likely) ignored by the receiver. */
1167
0
                rd_kafka_q_enq(rk->rk_background.q,
1168
0
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1169
1170
0
                rd_kafka_dbg(rk, ALL, "DESTROY",
1171
0
                             "Waiting for background queue thread "
1172
0
                             "to terminate");
1173
0
                thrd_join(rk->rk_background.thread, &res);
1174
0
                rd_kafka_q_destroy_owner(rk->rk_background.q);
1175
0
        }
1176
1177
        /* Call on_destroy() interceptors */
1178
0
        rd_kafka_interceptors_on_destroy(rk);
1179
1180
        /* Brokers pick up on rk_terminate automatically. */
1181
1182
        /* List of (broker) threads to join to synchronize termination */
1183
0
        rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
1184
1185
0
        rd_kafka_wrlock(rk);
1186
1187
0
        rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
1188
        /* Decommission all topics */
1189
0
        TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
1190
0
                rd_kafka_wrunlock(rk);
1191
0
                rd_kafka_topic_partitions_remove(rkt);
1192
0
                rd_kafka_wrlock(rk);
1193
0
        }
1194
1195
        /* Decommission brokers.
1196
         * Broker thread holds a refcount and detects when broker refcounts
1197
         * reaches 1 and then decommissions itself. */
1198
0
        TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
1199
                /* Add broker's thread to wait_thrds list for later joining */
1200
0
                thrd  = rd_malloc(sizeof(*thrd));
1201
0
                *thrd = rkb->rkb_thread;
1202
0
                rd_list_add(&wait_thrds, thrd);
1203
0
                rd_kafka_wrunlock(rk);
1204
1205
0
                rd_kafka_dbg(rk, BROKER, "DESTROY", "Sending TERMINATE to %s",
1206
0
                             rd_kafka_broker_name(rkb));
1207
                /* Send op to trigger queue/io wake-up.
1208
                 * The op itself is (likely) ignored by the broker thread. */
1209
0
                rd_kafka_q_enq(rkb->rkb_ops,
1210
0
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1211
1212
0
#ifndef _WIN32
1213
                /* Interrupt IO threads to speed up termination. */
1214
0
                if (rk->rk_conf.term_sig)
1215
0
                        pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
1216
0
#endif
1217
1218
0
                rd_kafka_broker_destroy(rkb);
1219
1220
0
                rd_kafka_wrlock(rk);
1221
0
        }
1222
1223
0
        if (rk->rk_clusterid) {
1224
0
                rd_free(rk->rk_clusterid);
1225
0
                rk->rk_clusterid = NULL;
1226
0
        }
1227
1228
        /* Destroy coord requests */
1229
0
        rd_kafka_coord_reqs_term(rk);
1230
1231
        /* Destroy the coordinator cache */
1232
0
        rd_kafka_coord_cache_destroy(&rk->rk_coord_cache);
1233
1234
        /* Purge metadata cache.
1235
         * #3279:
1236
         * We mustn't call cache_destroy() here since there might be outstanding
1237
         * broker rkos that hold references to the metadata cache lock,
1238
         * and these brokers are destroyed below. So to avoid a circular
1239
         * dependency refcnt deadlock we first purge the cache here
1240
         * and destroy it after the brokers are destroyed. */
1241
0
        rd_kafka_metadata_cache_purge(rk, rd_true /*observers too*/);
1242
1243
0
        rd_kafka_wrunlock(rk);
1244
1245
0
        mtx_lock(&rk->rk_broker_state_change_lock);
1246
        /* Purge broker state change waiters */
1247
0
        rd_list_destroy(&rk->rk_broker_state_change_waiters);
1248
0
        mtx_unlock(&rk->rk_broker_state_change_lock);
1249
1250
0
        if (rk->rk_type == RD_KAFKA_CONSUMER) {
1251
0
                if (rk->rk_consumer.q)
1252
0
                        rd_kafka_q_disable(rk->rk_consumer.q);
1253
0
        }
1254
1255
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Purging reply queue");
1256
1257
        /* Purge op-queue */
1258
0
        rd_kafka_q_disable(rk->rk_rep);
1259
0
        rd_kafka_q_purge(rk->rk_rep);
1260
1261
        /* Loose our special reference to the internal broker. */
1262
0
        mtx_lock(&rk->rk_internal_rkb_lock);
1263
0
        if ((rkb = rk->rk_internal_rkb)) {
1264
0
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1265
0
                             "Decommissioning internal broker");
1266
1267
                /* Send op to trigger queue wake-up. */
1268
0
                rd_kafka_q_enq(rkb->rkb_ops,
1269
0
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1270
1271
0
                rk->rk_internal_rkb = NULL;
1272
0
                thrd                = rd_malloc(sizeof(*thrd));
1273
0
                *thrd               = rkb->rkb_thread;
1274
0
                rd_list_add(&wait_thrds, thrd);
1275
0
        }
1276
0
        mtx_unlock(&rk->rk_internal_rkb_lock);
1277
0
        if (rkb)
1278
0
                rd_kafka_broker_destroy(rkb);
1279
1280
1281
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Join %d broker thread(s)",
1282
0
                     rd_list_cnt(&wait_thrds));
1283
1284
        /* Join broker threads */
1285
0
        RD_LIST_FOREACH(thrd, &wait_thrds, i) {
1286
0
                int res;
1287
0
                if (thrd_join(*thrd, &res) != thrd_success)
1288
0
                        ;
1289
0
                rd_free(thrd);
1290
0
        }
1291
1292
0
        rd_list_destroy(&wait_thrds);
1293
1294
        /* Destroy mock cluster */
1295
0
        if (rk->rk_mock.cluster)
1296
0
                rd_kafka_mock_cluster_destroy(rk->rk_mock.cluster);
1297
1298
0
        if (rd_atomic32_get(&rk->rk_mock.cluster_cnt) > 0) {
1299
0
                rd_kafka_log(rk, LOG_EMERG, "MOCK",
1300
0
                             "%d mock cluster(s) still active: "
1301
0
                             "must be explicitly destroyed with "
1302
0
                             "rd_kafka_mock_cluster_destroy() prior to "
1303
0
                             "terminating the rd_kafka_t instance",
1304
0
                             (int)rd_atomic32_get(&rk->rk_mock.cluster_cnt));
1305
0
                rd_assert(!*"All mock clusters must be destroyed prior to "
1306
0
                          "rd_kafka_t destroy");
1307
0
        }
1308
1309
        /* Destroy metadata cache */
1310
0
        rd_kafka_wrlock(rk);
1311
0
        rd_kafka_metadata_cache_destroy(rk);
1312
0
        rd_kafka_wrunlock(rk);
1313
0
}
1314
1315
/**
1316
 * @brief Buffer state for stats emitter
1317
 */
1318
struct _stats_emit {
1319
        char *buf;   /* Pointer to allocated buffer */
1320
        size_t size; /* Current allocated size of buf */
1321
        size_t of;   /* Current write-offset in buf */
1322
};
1323
1324
1325
/* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the
1326
 * current scope. */
1327
#define _st_printf(...)                                                          \
1328
0
        do {                                                                     \
1329
0
                ssize_t _r;                                                      \
1330
0
                ssize_t _rem = st->size - st->of;                                \
1331
0
                _r           = rd_snprintf(st->buf + st->of, _rem, __VA_ARGS__); \
1332
0
                if (_r >= _rem) {                                                \
1333
0
                        st->size *= 2;                                           \
1334
0
                        _rem    = st->size - st->of;                             \
1335
0
                        st->buf = rd_realloc(st->buf, st->size);                 \
1336
0
                        _r = rd_snprintf(st->buf + st->of, _rem, __VA_ARGS__);   \
1337
0
                }                                                                \
1338
0
                st->of += _r;                                                    \
1339
0
        } while (0)
1340
1341
struct _stats_total {
1342
        int64_t tx;          /**< broker.tx */
1343
        int64_t tx_bytes;    /**< broker.tx_bytes */
1344
        int64_t rx;          /**< broker.rx */
1345
        int64_t rx_bytes;    /**< broker.rx_bytes */
1346
        int64_t txmsgs;      /**< partition.txmsgs */
1347
        int64_t txmsg_bytes; /**< partition.txbytes */
1348
        int64_t rxmsgs;      /**< partition.rxmsgs */
1349
        int64_t rxmsg_bytes; /**< partition.rxbytes */
1350
};
1351
1352
1353
1354
/**
1355
 * @brief Rollover and emit an average window.
1356
 */
1357
static RD_INLINE void rd_kafka_stats_emit_avg(struct _stats_emit *st,
1358
                                              const char *name,
1359
0
                                              rd_avg_t *src_avg) {
1360
0
        rd_avg_t avg;
1361
1362
0
        rd_avg_rollover(&avg, src_avg);
1363
0
        _st_printf(
1364
0
            "\"%s\": {"
1365
0
            " \"min\":%" PRId64
1366
0
            ","
1367
0
            " \"max\":%" PRId64
1368
0
            ","
1369
0
            " \"avg\":%" PRId64
1370
0
            ","
1371
0
            " \"sum\":%" PRId64
1372
0
            ","
1373
0
            " \"stddev\": %" PRId64
1374
0
            ","
1375
0
            " \"p50\": %" PRId64
1376
0
            ","
1377
0
            " \"p75\": %" PRId64
1378
0
            ","
1379
0
            " \"p90\": %" PRId64
1380
0
            ","
1381
0
            " \"p95\": %" PRId64
1382
0
            ","
1383
0
            " \"p99\": %" PRId64
1384
0
            ","
1385
0
            " \"p99_99\": %" PRId64
1386
0
            ","
1387
0
            " \"outofrange\": %" PRId64
1388
0
            ","
1389
0
            " \"hdrsize\": %" PRId32
1390
0
            ","
1391
0
            " \"cnt\":%i "
1392
0
            "}, ",
1393
0
            name, avg.ra_v.minv, avg.ra_v.maxv, avg.ra_v.avg, avg.ra_v.sum,
1394
0
            (int64_t)avg.ra_hist.stddev, avg.ra_hist.p50, avg.ra_hist.p75,
1395
0
            avg.ra_hist.p90, avg.ra_hist.p95, avg.ra_hist.p99,
1396
0
            avg.ra_hist.p99_99, avg.ra_hist.oor, avg.ra_hist.hdrsize,
1397
0
            avg.ra_v.cnt);
1398
0
        rd_avg_destroy(&avg);
1399
0
}
1400
1401
/**
1402
 * Emit stats for toppar
1403
 */
1404
static RD_INLINE void rd_kafka_stats_emit_toppar(struct _stats_emit *st,
1405
                                                 struct _stats_total *total,
1406
                                                 rd_kafka_toppar_t *rktp,
1407
0
                                                 int first) {
1408
0
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1409
0
        int64_t end_offset;
1410
0
        int64_t consumer_lag        = -1;
1411
0
        int64_t consumer_lag_stored = -1;
1412
0
        struct offset_stats offs;
1413
0
        int32_t broker_id = -1;
1414
1415
0
        rd_kafka_toppar_lock(rktp);
1416
1417
0
        if (rktp->rktp_broker) {
1418
0
                rd_kafka_broker_lock(rktp->rktp_broker);
1419
0
                broker_id = rktp->rktp_broker->rkb_nodeid;
1420
0
                rd_kafka_broker_unlock(rktp->rktp_broker);
1421
0
        }
1422
1423
        /* Grab a copy of the latest finalized offset stats */
1424
0
        offs = rktp->rktp_offsets_fin;
1425
1426
0
        end_offset = (rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED)
1427
0
                         ? rktp->rktp_ls_offset
1428
0
                         : rktp->rktp_hi_offset;
1429
1430
        /* Calculate consumer_lag by using the highest offset
1431
         * of stored_offset (the last message passed to application + 1, or
1432
         * if enable.auto.offset.store=false the last message manually stored),
1433
         * or the committed_offset (the last message committed by this or
1434
         * another consumer).
1435
         * Using stored_offset allows consumer_lag to be up to date even if
1436
         * offsets are not (yet) committed.
1437
         */
1438
0
        if (end_offset != RD_KAFKA_OFFSET_INVALID) {
1439
0
                if (rktp->rktp_stored_pos.offset >= 0 &&
1440
0
                    rktp->rktp_stored_pos.offset <= end_offset)
1441
0
                        consumer_lag_stored =
1442
0
                            end_offset - rktp->rktp_stored_pos.offset;
1443
0
                if (rktp->rktp_committed_pos.offset >= 0 &&
1444
0
                    rktp->rktp_committed_pos.offset <= end_offset)
1445
0
                        consumer_lag =
1446
0
                            end_offset - rktp->rktp_committed_pos.offset;
1447
0
        }
1448
1449
0
        _st_printf(
1450
0
            "%s\"%" PRId32
1451
0
            "\": { "
1452
0
            "\"partition\":%" PRId32
1453
0
            ", "
1454
0
            "\"broker\":%" PRId32
1455
0
            ", "
1456
0
            "\"leader\":%" PRId32
1457
0
            ", "
1458
0
            "\"desired\":%s, "
1459
0
            "\"unknown\":%s, "
1460
0
            "\"msgq_cnt\":%i, "
1461
0
            "\"msgq_bytes\":%" PRIusz
1462
0
            ", "
1463
0
            "\"xmit_msgq_cnt\":%i, "
1464
0
            "\"xmit_msgq_bytes\":%" PRIusz
1465
0
            ", "
1466
0
            "\"fetchq_cnt\":%i, "
1467
0
            "\"fetchq_size\":%" PRIu64
1468
0
            ", "
1469
0
            "\"fetch_state\":\"%s\", "
1470
0
            "\"query_offset\":%" PRId64
1471
0
            ", "
1472
0
            "\"next_offset\":%" PRId64
1473
0
            ", "
1474
0
            "\"app_offset\":%" PRId64
1475
0
            ", "
1476
0
            "\"stored_offset\":%" PRId64
1477
0
            ", "
1478
0
            "\"stored_leader_epoch\":%" PRId32
1479
0
            ", "
1480
0
            "\"commited_offset\":%" PRId64
1481
0
            ", " /*FIXME: issue #80 */
1482
0
            "\"committed_offset\":%" PRId64
1483
0
            ", "
1484
0
            "\"committed_leader_epoch\":%" PRId32
1485
0
            ", "
1486
0
            "\"eof_offset\":%" PRId64
1487
0
            ", "
1488
0
            "\"lo_offset\":%" PRId64
1489
0
            ", "
1490
0
            "\"hi_offset\":%" PRId64
1491
0
            ", "
1492
0
            "\"ls_offset\":%" PRId64
1493
0
            ", "
1494
0
            "\"consumer_lag\":%" PRId64
1495
0
            ", "
1496
0
            "\"consumer_lag_stored\":%" PRId64
1497
0
            ", "
1498
0
            "\"leader_epoch\":%" PRId32
1499
0
            ", "
1500
0
            "\"txmsgs\":%" PRIu64
1501
0
            ", "
1502
0
            "\"txbytes\":%" PRIu64
1503
0
            ", "
1504
0
            "\"rxmsgs\":%" PRIu64
1505
0
            ", "
1506
0
            "\"rxbytes\":%" PRIu64
1507
0
            ", "
1508
0
            "\"msgs\": %" PRIu64
1509
0
            ", "
1510
0
            "\"rx_ver_drops\": %" PRIu64
1511
0
            ", "
1512
0
            "\"msgs_inflight\": %" PRId32
1513
0
            ", "
1514
0
            "\"next_ack_seq\": %" PRId32
1515
0
            ", "
1516
0
            "\"next_err_seq\": %" PRId32
1517
0
            ", "
1518
0
            "\"acked_msgid\": %" PRIu64 "} ",
1519
0
            first ? "" : ", ", rktp->rktp_partition, rktp->rktp_partition,
1520
0
            broker_id, rktp->rktp_leader_id,
1521
0
            (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) ? "true" : "false",
1522
0
            (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) ? "true" : "false",
1523
0
            rd_kafka_msgq_len(&rktp->rktp_msgq),
1524
0
            rd_kafka_msgq_size(&rktp->rktp_msgq),
1525
            /* FIXME: xmit_msgq is local to the broker thread. */
1526
0
            0, (size_t)0, rd_kafka_q_len(rktp->rktp_fetchq),
1527
0
            rd_kafka_q_size(rktp->rktp_fetchq),
1528
0
            rd_kafka_fetch_states[rktp->rktp_fetch_state],
1529
0
            rktp->rktp_query_pos.offset, offs.fetch_pos.offset,
1530
0
            rktp->rktp_app_pos.offset, rktp->rktp_stored_pos.offset,
1531
0
            rktp->rktp_stored_pos.leader_epoch,
1532
0
            rktp->rktp_committed_pos.offset, /* FIXME: issue #80 */
1533
0
            rktp->rktp_committed_pos.offset,
1534
0
            rktp->rktp_committed_pos.leader_epoch, offs.eof_offset,
1535
0
            rktp->rktp_lo_offset, rktp->rktp_hi_offset, rktp->rktp_ls_offset,
1536
0
            consumer_lag, consumer_lag_stored, rktp->rktp_leader_epoch,
1537
0
            rd_atomic64_get(&rktp->rktp_c.tx_msgs),
1538
0
            rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes),
1539
0
            rd_atomic64_get(&rktp->rktp_c.rx_msgs),
1540
0
            rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes),
1541
0
            rk->rk_type == RD_KAFKA_PRODUCER
1542
0
                ? rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs)
1543
0
                : rd_atomic64_get(
1544
0
                      &rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */
1545
0
            rd_atomic64_get(&rktp->rktp_c.rx_ver_drops),
1546
0
            rd_atomic32_get(&rktp->rktp_msgs_inflight),
1547
0
            rktp->rktp_eos.next_ack_seq, rktp->rktp_eos.next_err_seq,
1548
0
            rktp->rktp_eos.acked_msgid);
1549
1550
0
        if (total) {
1551
0
                total->txmsgs += rd_atomic64_get(&rktp->rktp_c.tx_msgs);
1552
0
                total->txmsg_bytes +=
1553
0
                    rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes);
1554
0
                total->rxmsgs += rd_atomic64_get(&rktp->rktp_c.rx_msgs);
1555
0
                total->rxmsg_bytes +=
1556
0
                    rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes);
1557
0
        }
1558
1559
0
        rd_kafka_toppar_unlock(rktp);
1560
0
}
1561
1562
/**
1563
 * @brief Emit broker request type stats
1564
 */
1565
static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
1566
0
                                            rd_kafka_broker_t *rkb) {
1567
        /* Filter out request types that will never be sent by the client. */
1568
0
        static const rd_bool_t filter[4][RD_KAFKAP__NUM] = {
1569
0
            [RD_KAFKA_PRODUCER] = {[RD_KAFKAP_Fetch]        = rd_true,
1570
0
                                   [RD_KAFKAP_OffsetCommit] = rd_true,
1571
0
                                   [RD_KAFKAP_OffsetFetch]  = rd_true,
1572
0
                                   [RD_KAFKAP_JoinGroup]    = rd_true,
1573
0
                                   [RD_KAFKAP_Heartbeat]    = rd_true,
1574
0
                                   [RD_KAFKAP_LeaveGroup]   = rd_true,
1575
0
                                   [RD_KAFKAP_SyncGroup]    = rd_true},
1576
0
            [RD_KAFKA_CONSUMER] =
1577
0
                {
1578
0
                    [RD_KAFKAP_Produce]        = rd_true,
1579
0
                    [RD_KAFKAP_InitProducerId] = rd_true,
1580
                    /* Transactional producer */
1581
0
                    [RD_KAFKAP_AddPartitionsToTxn] = rd_true,
1582
0
                    [RD_KAFKAP_AddOffsetsToTxn]    = rd_true,
1583
0
                    [RD_KAFKAP_EndTxn]             = rd_true,
1584
0
                    [RD_KAFKAP_TxnOffsetCommit]    = rd_true,
1585
0
                },
1586
0
            [2 /*any client type*/] =
1587
0
                {
1588
0
                    [RD_KAFKAP_UpdateMetadata]       = rd_true,
1589
0
                    [RD_KAFKAP_ControlledShutdown]   = rd_true,
1590
0
                    [RD_KAFKAP_LeaderAndIsr]         = rd_true,
1591
0
                    [RD_KAFKAP_StopReplica]          = rd_true,
1592
0
                    [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true,
1593
1594
0
                    [RD_KAFKAP_WriteTxnMarkers] = rd_true,
1595
1596
0
                    [RD_KAFKAP_AlterReplicaLogDirs] = rd_true,
1597
0
                    [RD_KAFKAP_DescribeLogDirs]     = rd_true,
1598
1599
0
                    [RD_KAFKAP_CreateDelegationToken]       = rd_true,
1600
0
                    [RD_KAFKAP_RenewDelegationToken]        = rd_true,
1601
0
                    [RD_KAFKAP_ExpireDelegationToken]       = rd_true,
1602
0
                    [RD_KAFKAP_DescribeDelegationToken]     = rd_true,
1603
0
                    [RD_KAFKAP_IncrementalAlterConfigs]     = rd_true,
1604
0
                    [RD_KAFKAP_ElectLeaders]                = rd_true,
1605
0
                    [RD_KAFKAP_AlterPartitionReassignments] = rd_true,
1606
0
                    [RD_KAFKAP_ListPartitionReassignments]  = rd_true,
1607
0
                    [RD_KAFKAP_AlterUserScramCredentials]   = rd_true,
1608
0
                    [RD_KAFKAP_Vote]                        = rd_true,
1609
0
                    [RD_KAFKAP_BeginQuorumEpoch]            = rd_true,
1610
0
                    [RD_KAFKAP_EndQuorumEpoch]              = rd_true,
1611
0
                    [RD_KAFKAP_DescribeQuorum]              = rd_true,
1612
0
                    [RD_KAFKAP_AlterIsr]                    = rd_true,
1613
0
                    [RD_KAFKAP_UpdateFeatures]              = rd_true,
1614
0
                    [RD_KAFKAP_Envelope]                    = rd_true,
1615
0
                    [RD_KAFKAP_FetchSnapshot]               = rd_true,
1616
0
                    [RD_KAFKAP_BrokerHeartbeat]             = rd_true,
1617
0
                    [RD_KAFKAP_UnregisterBroker]            = rd_true,
1618
0
                    [RD_KAFKAP_AllocateProducerIds]         = rd_true,
1619
0
                    [RD_KAFKAP_ConsumerGroupHeartbeat]      = rd_true,
1620
0
                },
1621
0
            [3 /*hide-unless-non-zero*/] = {
1622
                /* Hide Admin requests unless they've been used */
1623
0
                [RD_KAFKAP_CreateTopics]                 = rd_true,
1624
0
                [RD_KAFKAP_DeleteTopics]                 = rd_true,
1625
0
                [RD_KAFKAP_DeleteRecords]                = rd_true,
1626
0
                [RD_KAFKAP_CreatePartitions]             = rd_true,
1627
0
                [RD_KAFKAP_DescribeAcls]                 = rd_true,
1628
0
                [RD_KAFKAP_CreateAcls]                   = rd_true,
1629
0
                [RD_KAFKAP_DeleteAcls]                   = rd_true,
1630
0
                [RD_KAFKAP_DescribeConfigs]              = rd_true,
1631
0
                [RD_KAFKAP_AlterConfigs]                 = rd_true,
1632
0
                [RD_KAFKAP_DeleteGroups]                 = rd_true,
1633
0
                [RD_KAFKAP_ListGroups]                   = rd_true,
1634
0
                [RD_KAFKAP_DescribeGroups]               = rd_true,
1635
0
                [RD_KAFKAP_DescribeLogDirs]              = rd_true,
1636
0
                [RD_KAFKAP_IncrementalAlterConfigs]      = rd_true,
1637
0
                [RD_KAFKAP_AlterPartitionReassignments]  = rd_true,
1638
0
                [RD_KAFKAP_ListPartitionReassignments]   = rd_true,
1639
0
                [RD_KAFKAP_OffsetDelete]                 = rd_true,
1640
0
                [RD_KAFKAP_DescribeClientQuotas]         = rd_true,
1641
0
                [RD_KAFKAP_AlterClientQuotas]            = rd_true,
1642
0
                [RD_KAFKAP_DescribeUserScramCredentials] = rd_true,
1643
0
                [RD_KAFKAP_AlterUserScramCredentials]    = rd_true,
1644
0
            }};
1645
0
        int i;
1646
0
        int cnt = 0;
1647
1648
0
        _st_printf("\"req\": { ");
1649
0
        for (i = 0; i < RD_KAFKAP__NUM; i++) {
1650
0
                int64_t v;
1651
1652
0
                if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i])
1653
0
                        continue;
1654
1655
0
                v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]);
1656
0
                if (!v && filter[3][i])
1657
0
                        continue; /* Filter out zero values */
1658
1659
0
                _st_printf("%s\"%s\": %" PRId64, cnt > 0 ? ", " : "",
1660
0
                           rd_kafka_ApiKey2str(i), v);
1661
1662
0
                cnt++;
1663
0
        }
1664
0
        _st_printf(" }, ");
1665
0
}
1666
1667
1668
/**
1669
 * Emit all statistics
1670
 */
1671
0
static void rd_kafka_stats_emit_all(rd_kafka_t *rk) {
1672
0
        rd_kafka_broker_t *rkb;
1673
0
        rd_kafka_topic_t *rkt;
1674
0
        rd_ts_t now;
1675
0
        rd_kafka_op_t *rko;
1676
0
        unsigned int tot_cnt;
1677
0
        size_t tot_size;
1678
0
        rd_kafka_resp_err_t err;
1679
0
        struct _stats_emit stx    = {.size = 1024 * 10};
1680
0
        struct _stats_emit *st    = &stx;
1681
0
        struct _stats_total total = {0};
1682
1683
0
        st->buf = rd_malloc(st->size);
1684
1685
1686
0
        rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1687
0
        rd_kafka_rdlock(rk);
1688
1689
0
        now = rd_clock();
1690
0
        _st_printf(
1691
0
            "{ "
1692
0
            "\"name\": \"%s\", "
1693
0
            "\"client_id\": \"%s\", "
1694
0
            "\"type\": \"%s\", "
1695
0
            "\"ts\":%" PRId64
1696
0
            ", "
1697
0
            "\"time\":%lli, "
1698
0
            "\"age\":%" PRId64
1699
0
            ", "
1700
0
            "\"replyq\":%i, "
1701
0
            "\"msg_cnt\":%u, "
1702
0
            "\"msg_size\":%" PRIusz
1703
0
            ", "
1704
0
            "\"msg_max\":%u, "
1705
0
            "\"msg_size_max\":%" PRIusz
1706
0
            ", "
1707
0
            "\"simple_cnt\":%i, "
1708
0
            "\"metadata_cache_cnt\":%i, "
1709
0
            "\"brokers\":{ " /*open brokers*/,
1710
0
            rk->rk_name, rk->rk_conf.client_id_str,
1711
0
            rd_kafka_type2str(rk->rk_type), now, (signed long long)time(NULL),
1712
0
            now - rk->rk_ts_created, rd_kafka_q_len(rk->rk_rep), tot_cnt,
1713
0
            tot_size, rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
1714
0
            rd_atomic32_get(&rk->rk_simple_cnt),
1715
0
            rk->rk_metadata_cache.rkmc_cnt);
1716
1717
1718
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1719
0
                rd_kafka_toppar_t *rktp;
1720
0
                rd_ts_t txidle = -1, rxidle = -1;
1721
1722
0
                rd_kafka_broker_lock(rkb);
1723
1724
0
                if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
1725
                        /* Calculate tx and rx idle time in usecs */
1726
0
                        txidle = rd_atomic64_get(&rkb->rkb_c.ts_send);
1727
0
                        rxidle = rd_atomic64_get(&rkb->rkb_c.ts_recv);
1728
1729
0
                        if (txidle)
1730
0
                                txidle = RD_MAX(now - txidle, 0);
1731
0
                        else
1732
0
                                txidle = -1;
1733
1734
0
                        if (rxidle)
1735
0
                                rxidle = RD_MAX(now - rxidle, 0);
1736
0
                        else
1737
0
                                rxidle = -1;
1738
0
                }
1739
1740
0
                _st_printf(
1741
0
                    "%s\"%s\": { " /*open broker*/
1742
0
                    "\"name\":\"%s\", "
1743
0
                    "\"nodeid\":%" PRId32
1744
0
                    ", "
1745
0
                    "\"nodename\":\"%s\", "
1746
0
                    "\"source\":\"%s\", "
1747
0
                    "\"state\":\"%s\", "
1748
0
                    "\"stateage\":%" PRId64
1749
0
                    ", "
1750
0
                    "\"outbuf_cnt\":%i, "
1751
0
                    "\"outbuf_msg_cnt\":%i, "
1752
0
                    "\"waitresp_cnt\":%i, "
1753
0
                    "\"waitresp_msg_cnt\":%i, "
1754
0
                    "\"tx\":%" PRIu64
1755
0
                    ", "
1756
0
                    "\"txbytes\":%" PRIu64
1757
0
                    ", "
1758
0
                    "\"txerrs\":%" PRIu64
1759
0
                    ", "
1760
0
                    "\"txretries\":%" PRIu64
1761
0
                    ", "
1762
0
                    "\"txidle\":%" PRId64
1763
0
                    ", "
1764
0
                    "\"req_timeouts\":%" PRIu64
1765
0
                    ", "
1766
0
                    "\"rx\":%" PRIu64
1767
0
                    ", "
1768
0
                    "\"rxbytes\":%" PRIu64
1769
0
                    ", "
1770
0
                    "\"rxerrs\":%" PRIu64
1771
0
                    ", "
1772
0
                    "\"rxcorriderrs\":%" PRIu64
1773
0
                    ", "
1774
0
                    "\"rxpartial\":%" PRIu64
1775
0
                    ", "
1776
0
                    "\"rxidle\":%" PRId64
1777
0
                    ", "
1778
0
                    "\"zbuf_grow\":%" PRIu64
1779
0
                    ", "
1780
0
                    "\"buf_grow\":%" PRIu64
1781
0
                    ", "
1782
0
                    "\"wakeups\":%" PRIu64
1783
0
                    ", "
1784
0
                    "\"connects\":%" PRId32
1785
0
                    ", "
1786
0
                    "\"disconnects\":%" PRId32 ", ",
1787
0
                    rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
1788
0
                    rkb->rkb_name, rkb->rkb_name, rkb->rkb_nodeid,
1789
0
                    rkb->rkb_nodename, rd_kafka_confsource2str(rkb->rkb_source),
1790
0
                    rd_kafka_broker_state_names[rkb->rkb_state],
1791
0
                    rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
1792
0
                    rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
1793
0
                    rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
1794
0
                    rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
1795
0
                    rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
1796
0
                    rd_atomic64_get(&rkb->rkb_c.tx),
1797
0
                    rd_atomic64_get(&rkb->rkb_c.tx_bytes),
1798
0
                    rd_atomic64_get(&rkb->rkb_c.tx_err),
1799
0
                    rd_atomic64_get(&rkb->rkb_c.tx_retries), txidle,
1800
0
                    rd_atomic64_get(&rkb->rkb_c.req_timeouts),
1801
0
                    rd_atomic64_get(&rkb->rkb_c.rx),
1802
0
                    rd_atomic64_get(&rkb->rkb_c.rx_bytes),
1803
0
                    rd_atomic64_get(&rkb->rkb_c.rx_err),
1804
0
                    rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
1805
0
                    rd_atomic64_get(&rkb->rkb_c.rx_partial), rxidle,
1806
0
                    rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
1807
0
                    rd_atomic64_get(&rkb->rkb_c.buf_grow),
1808
0
                    rd_atomic64_get(&rkb->rkb_c.wakeups),
1809
0
                    rd_atomic32_get(&rkb->rkb_c.connects),
1810
0
                    rd_atomic32_get(&rkb->rkb_c.disconnects));
1811
1812
0
                total.tx += rd_atomic64_get(&rkb->rkb_c.tx);
1813
0
                total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes);
1814
0
                total.rx += rd_atomic64_get(&rkb->rkb_c.rx);
1815
0
                total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes);
1816
1817
0
                rd_kafka_stats_emit_avg(st, "int_latency",
1818
0
                                        &rkb->rkb_avg_int_latency);
1819
0
                rd_kafka_stats_emit_avg(st, "outbuf_latency",
1820
0
                                        &rkb->rkb_avg_outbuf_latency);
1821
0
                rd_kafka_stats_emit_avg(st, "rtt", &rkb->rkb_avg_rtt);
1822
0
                rd_kafka_stats_emit_avg(st, "throttle", &rkb->rkb_avg_throttle);
1823
1824
0
                rd_kafka_stats_emit_broker_reqs(st, rkb);
1825
1826
0
                _st_printf("\"toppars\":{ " /*open toppars*/);
1827
1828
0
                TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
1829
0
                        _st_printf(
1830
0
                            "%s\"%.*s-%" PRId32
1831
0
                            "\": { "
1832
0
                            "\"topic\":\"%.*s\", "
1833
0
                            "\"partition\":%" PRId32 "} ",
1834
0
                            rktp == TAILQ_FIRST(&rkb->rkb_toppars) ? "" : ", ",
1835
0
                            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1836
0
                            rktp->rktp_partition,
1837
0
                            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1838
0
                            rktp->rktp_partition);
1839
0
                }
1840
1841
0
                rd_kafka_broker_unlock(rkb);
1842
1843
0
                _st_printf(
1844
0
                    "} " /*close toppars*/
1845
0
                    "} " /*close broker*/);
1846
0
        }
1847
1848
1849
0
        _st_printf(
1850
0
            "}, " /* close "brokers" array */
1851
0
            "\"topics\":{ ");
1852
1853
0
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
1854
0
                rd_kafka_toppar_t *rktp;
1855
0
                int i, j;
1856
1857
0
                rd_kafka_topic_rdlock(rkt);
1858
0
                _st_printf(
1859
0
                    "%s\"%.*s\": { "
1860
0
                    "\"topic\":\"%.*s\", "
1861
0
                    "\"age\":%" PRId64
1862
0
                    ", "
1863
0
                    "\"metadata_age\":%" PRId64 ", ",
1864
0
                    rkt == TAILQ_FIRST(&rk->rk_topics) ? "" : ", ",
1865
0
                    RD_KAFKAP_STR_PR(rkt->rkt_topic),
1866
0
                    RD_KAFKAP_STR_PR(rkt->rkt_topic),
1867
0
                    (now - rkt->rkt_ts_create) / 1000,
1868
0
                    rkt->rkt_ts_metadata ? (now - rkt->rkt_ts_metadata) / 1000
1869
0
                                         : 0);
1870
1871
0
                rd_kafka_stats_emit_avg(st, "batchsize",
1872
0
                                        &rkt->rkt_avg_batchsize);
1873
0
                rd_kafka_stats_emit_avg(st, "batchcnt", &rkt->rkt_avg_batchcnt);
1874
1875
0
                _st_printf("\"partitions\":{ " /*open partitions*/);
1876
1877
0
                for (i = 0; i < rkt->rkt_partition_cnt; i++)
1878
0
                        rd_kafka_stats_emit_toppar(st, &total, rkt->rkt_p[i],
1879
0
                                                   i == 0);
1880
1881
0
                RD_LIST_FOREACH(rktp, &rkt->rkt_desp, j)
1882
0
                rd_kafka_stats_emit_toppar(st, &total, rktp, i + j == 0);
1883
1884
0
                i += j;
1885
1886
0
                if (rkt->rkt_ua)
1887
0
                        rd_kafka_stats_emit_toppar(st, NULL, rkt->rkt_ua,
1888
0
                                                   i++ == 0);
1889
1890
0
                rd_kafka_topic_rdunlock(rkt);
1891
1892
0
                _st_printf(
1893
0
                    "} " /*close partitions*/
1894
0
                    "} " /*close topic*/);
1895
0
        }
1896
0
        _st_printf("} " /*close topics*/);
1897
1898
0
        if (rk->rk_cgrp) {
1899
0
                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1900
0
                _st_printf(
1901
0
                    ", \"cgrp\": { "
1902
0
                    "\"state\": \"%s\", "
1903
0
                    "\"stateage\": %" PRId64
1904
0
                    ", "
1905
0
                    "\"join_state\": \"%s\", "
1906
0
                    "\"rebalance_age\": %" PRId64
1907
0
                    ", "
1908
0
                    "\"rebalance_cnt\": %d, "
1909
0
                    "\"rebalance_reason\": \"%s\", "
1910
0
                    "\"assignment_size\": %d }",
1911
0
                    rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1912
0
                    rkcg->rkcg_ts_statechange
1913
0
                        ? (now - rkcg->rkcg_ts_statechange) / 1000
1914
0
                        : 0,
1915
0
                    rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
1916
0
                    rkcg->rkcg_c.ts_rebalance
1917
0
                        ? (now - rkcg->rkcg_c.ts_rebalance) / 1000
1918
0
                        : 0,
1919
0
                    rkcg->rkcg_c.rebalance_cnt, rkcg->rkcg_c.rebalance_reason,
1920
0
                    rkcg->rkcg_c.assignment_size);
1921
0
        }
1922
1923
0
        if (rd_kafka_is_idempotent(rk)) {
1924
0
                _st_printf(
1925
0
                    ", \"eos\": { "
1926
0
                    "\"idemp_state\": \"%s\", "
1927
0
                    "\"idemp_stateage\": %" PRId64
1928
0
                    ", "
1929
0
                    "\"txn_state\": \"%s\", "
1930
0
                    "\"txn_stateage\": %" PRId64
1931
0
                    ", "
1932
0
                    "\"txn_may_enq\": %s, "
1933
0
                    "\"producer_id\": %" PRId64
1934
0
                    ", "
1935
0
                    "\"producer_epoch\": %hd, "
1936
0
                    "\"epoch_cnt\": %d "
1937
0
                    "}",
1938
0
                    rd_kafka_idemp_state2str(rk->rk_eos.idemp_state),
1939
0
                    (now - rk->rk_eos.ts_idemp_state) / 1000,
1940
0
                    rd_kafka_txn_state2str(rk->rk_eos.txn_state),
1941
0
                    (now - rk->rk_eos.ts_txn_state) / 1000,
1942
0
                    rd_atomic32_get(&rk->rk_eos.txn_may_enq) ? "true" : "false",
1943
0
                    rk->rk_eos.pid.id, rk->rk_eos.pid.epoch,
1944
0
                    rk->rk_eos.epoch_cnt);
1945
0
        }
1946
1947
0
        if ((err = rd_atomic32_get(&rk->rk_fatal.err)))
1948
0
                _st_printf(
1949
0
                    ", \"fatal\": { "
1950
0
                    "\"error\": \"%s\", "
1951
0
                    "\"reason\": \"%s\", "
1952
0
                    "\"cnt\": %d "
1953
0
                    "}",
1954
0
                    rd_kafka_err2str(err), rk->rk_fatal.errstr,
1955
0
                    rk->rk_fatal.cnt);
1956
1957
0
        rd_kafka_rdunlock(rk);
1958
1959
        /* Total counters */
1960
0
        _st_printf(
1961
0
            ", "
1962
0
            "\"tx\":%" PRId64
1963
0
            ", "
1964
0
            "\"tx_bytes\":%" PRId64
1965
0
            ", "
1966
0
            "\"rx\":%" PRId64
1967
0
            ", "
1968
0
            "\"rx_bytes\":%" PRId64
1969
0
            ", "
1970
0
            "\"txmsgs\":%" PRId64
1971
0
            ", "
1972
0
            "\"txmsg_bytes\":%" PRId64
1973
0
            ", "
1974
0
            "\"rxmsgs\":%" PRId64
1975
0
            ", "
1976
0
            "\"rxmsg_bytes\":%" PRId64,
1977
0
            total.tx, total.tx_bytes, total.rx, total.rx_bytes, total.txmsgs,
1978
0
            total.txmsg_bytes, total.rxmsgs, total.rxmsg_bytes);
1979
1980
0
        _st_printf("}" /*close object*/);
1981
1982
1983
        /* Enqueue op for application */
1984
0
        rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
1985
0
        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
1986
0
        rko->rko_u.stats.json     = st->buf;
1987
0
        rko->rko_u.stats.json_len = st->of;
1988
0
        rd_kafka_q_enq(rk->rk_rep, rko);
1989
0
}
1990
1991
1992
/**
1993
 * @brief 1 second generic timer.
1994
 *
1995
 * @locality rdkafka main thread
1996
 * @locks none
1997
 */
1998
0
static void rd_kafka_1s_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
1999
0
        rd_kafka_t *rk = rkts->rkts_rk;
2000
2001
        /* Scan topic state, message timeouts, etc. */
2002
0
        rd_kafka_topic_scan_all(rk, rd_clock());
2003
2004
        /* Sparse connections:
2005
         * try to maintain at least one connection to the cluster. */
2006
0
        if (rk->rk_conf.sparse_connections &&
2007
0
            rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
2008
0
                rd_kafka_connect_any(rk, "no cluster connection");
2009
2010
0
        rd_kafka_coord_cache_expire(&rk->rk_coord_cache);
2011
0
}
2012
2013
0
static void rd_kafka_stats_emit_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
2014
0
        rd_kafka_t *rk = rkts->rkts_rk;
2015
0
        rd_kafka_stats_emit_all(rk);
2016
0
}
2017
2018
2019
/**
2020
 * @brief Periodic metadata refresh callback
2021
 *
2022
 * @locality rdkafka main thread
2023
 */
2024
0
static void rd_kafka_metadata_refresh_cb(rd_kafka_timers_t *rkts, void *arg) {
2025
0
        rd_kafka_t *rk = rkts->rkts_rk;
2026
0
        rd_kafka_resp_err_t err;
2027
2028
        /* High-level consumer:
2029
         * We need to query both locally known topics and subscribed topics
2030
         * so that we can detect locally known topics changing partition
2031
         * count or disappearing, as well as detect previously non-existent
2032
         * subscribed topics now being available in the cluster. */
2033
0
        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
2034
0
                err = rd_kafka_metadata_refresh_consumer_topics(
2035
0
                    rk, NULL, "periodic topic and broker list refresh");
2036
0
        else
2037
0
                err = rd_kafka_metadata_refresh_known_topics(
2038
0
                    rk, NULL, rd_true /*force*/,
2039
0
                    "periodic topic and broker list refresh");
2040
2041
2042
0
        if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC &&
2043
0
            rd_interval(&rk->rk_suppress.broker_metadata_refresh,
2044
0
                        10 * 1000 * 1000 /*10s*/, 0) > 0) {
2045
                /* If there are no (locally referenced) topics
2046
                 * to query, refresh the broker list.
2047
                 * This avoids getting idle-disconnected for clients
2048
                 * that have not yet referenced a topic and makes
2049
                 * sure such a client has an up to date broker list. */
2050
0
                rd_kafka_metadata_refresh_brokers(
2051
0
                    rk, NULL, "periodic broker list refresh");
2052
0
        }
2053
0
}
2054
2055
2056
2057
/**
2058
 * @brief Wait for background threads to initialize.
2059
 *
2060
 * @returns the number of background threads still not initialized.
2061
 *
2062
 * @locality app thread calling rd_kafka_new()
2063
 * @locks none
2064
 */
2065
0
static int rd_kafka_init_wait(rd_kafka_t *rk, int timeout_ms) {
2066
0
        struct timespec tspec;
2067
0
        int ret;
2068
2069
0
        rd_timeout_init_timespec(&tspec, timeout_ms);
2070
2071
0
        mtx_lock(&rk->rk_init_lock);
2072
0
        while (rk->rk_init_wait_cnt > 0 &&
2073
0
               cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock, &tspec) ==
2074
0
                   thrd_success)
2075
0
                ;
2076
0
        ret = rk->rk_init_wait_cnt;
2077
0
        mtx_unlock(&rk->rk_init_lock);
2078
2079
0
        return ret;
2080
0
}
2081
2082
2083
/**
2084
 * Main loop for Kafka handler thread.
2085
 */
2086
0
static int rd_kafka_thread_main(void *arg) {
2087
0
        rd_kafka_t *rk                        = arg;
2088
0
        rd_kafka_timer_t tmr_1s               = RD_ZERO_INIT;
2089
0
        rd_kafka_timer_t tmr_stats_emit       = RD_ZERO_INIT;
2090
0
        rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
2091
2092
0
        rd_kafka_set_thread_name("main");
2093
0
        rd_kafka_set_thread_sysname("rdk:main");
2094
2095
0
        rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);
2096
2097
0
        (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
2098
2099
        /* Acquire lock (which was held by thread creator during creation)
2100
         * to synchronise state. */
2101
0
        rd_kafka_wrlock(rk);
2102
0
        rd_kafka_wrunlock(rk);
2103
2104
        /* 1 second timer for topic scan and connection checking. */
2105
0
        rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000,
2106
0
                             rd_kafka_1s_tmr_cb, NULL);
2107
0
        if (rk->rk_conf.stats_interval_ms)
2108
0
                rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
2109
0
                                     rk->rk_conf.stats_interval_ms * 1000ll,
2110
0
                                     rd_kafka_stats_emit_tmr_cb, NULL);
2111
0
        if (rk->rk_conf.metadata_refresh_interval_ms > 0)
2112
0
                rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
2113
0
                                     rk->rk_conf.metadata_refresh_interval_ms *
2114
0
                                         1000ll,
2115
0
                                     rd_kafka_metadata_refresh_cb, NULL);
2116
2117
0
        if (rk->rk_cgrp)
2118
0
                rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
2119
2120
0
        if (rd_kafka_is_idempotent(rk))
2121
0
                rd_kafka_idemp_init(rk);
2122
2123
0
        mtx_lock(&rk->rk_init_lock);
2124
0
        rk->rk_init_wait_cnt--;
2125
0
        cnd_broadcast(&rk->rk_init_cnd);
2126
0
        mtx_unlock(&rk->rk_init_lock);
2127
2128
0
        while (likely(!rd_kafka_terminating(rk) || rd_kafka_q_len(rk->rk_ops) ||
2129
0
                      (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state !=
2130
0
                                       RD_KAFKA_CGRP_STATE_TERM)))) {
2131
0
                rd_ts_t sleeptime = rd_kafka_timers_next(
2132
0
                    &rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/);
2133
                /* Use ceiling division to avoid calling serve with a 0 ms
2134
                 * timeout in a tight loop until 1 ms has passed. */
2135
0
                int timeout_ms = (sleeptime + 999) / 1000;
2136
0
                rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
2137
0
                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
2138
0
                if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
2139
0
                        rd_kafka_cgrp_serve(rk->rk_cgrp);
2140
0
                rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
2141
0
        }
2142
2143
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
2144
0
                     "Internal main thread terminating");
2145
2146
0
        if (rd_kafka_is_idempotent(rk))
2147
0
                rd_kafka_idemp_term(rk);
2148
2149
0
        rd_kafka_q_disable(rk->rk_ops);
2150
0
        rd_kafka_q_purge(rk->rk_ops);
2151
2152
0
        rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1);
2153
0
        if (rk->rk_conf.stats_interval_ms)
2154
0
                rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
2155
0
        rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
2156
2157
        /* Synchronise state */
2158
0
        rd_kafka_wrlock(rk);
2159
0
        rd_kafka_wrunlock(rk);
2160
2161
0
        rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_MAIN);
2162
2163
0
        rd_kafka_destroy_internal(rk);
2164
2165
0
        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
2166
0
                     "Internal main thread termination done");
2167
2168
0
        rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
2169
2170
0
        return 0;
2171
0
}
2172
2173
2174
0
void rd_kafka_term_sig_handler(int sig) {
2175
        /* nop */
2176
0
}
2177
2178
2179
rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
2180
                         rd_kafka_conf_t *app_conf,
2181
                         char *errstr,
2182
0
                         size_t errstr_size) {
2183
0
        rd_kafka_t *rk;
2184
0
        static rd_atomic32_t rkid;
2185
0
        rd_kafka_conf_t *conf;
2186
0
        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2187
0
        int ret_errno               = 0;
2188
0
        const char *conf_err;
2189
0
        char *group_remote_assignor_override = NULL;
2190
0
#ifndef _WIN32
2191
0
        sigset_t newset, oldset;
2192
0
#endif
2193
0
        char builtin_features[128];
2194
0
        size_t bflen;
2195
2196
0
        rd_kafka_global_init();
2197
2198
        /* rd_kafka_new() takes ownership of the provided \p app_conf
2199
         * object if rd_kafka_new() succeeds.
2200
         * Since \p app_conf is optional we allocate a default configuration
2201
         * object here if \p app_conf is NULL.
2202
         * The configuration object itself is struct-copied later
2203
         * leaving the default *conf pointer to be ready for freeing.
2204
         * In case new() fails and app_conf was specified we will clear out
2205
         * rk_conf to avoid double-freeing from destroy_internal() and the
2206
         * user's eventual call to rd_kafka_conf_destroy().
2207
         * This is all a bit tricky but that's the nature of
2208
         * legacy interfaces. */
2209
0
        if (!app_conf)
2210
0
                conf = rd_kafka_conf_new();
2211
0
        else
2212
0
                conf = app_conf;
2213
2214
        /* Verify and finalize configuration */
2215
0
        if ((conf_err = rd_kafka_conf_finalize(type, conf))) {
2216
                /* Incompatible configuration settings */
2217
0
                rd_snprintf(errstr, errstr_size, "%s", conf_err);
2218
0
                if (!app_conf)
2219
0
                        rd_kafka_conf_destroy(conf);
2220
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2221
0
                return NULL;
2222
0
        }
2223
2224
2225
0
        rd_kafka_global_cnt_incr();
2226
2227
        /*
2228
         * Set up the handle.
2229
         */
2230
0
        rk = rd_calloc(1, sizeof(*rk));
2231
2232
0
        rk->rk_type       = type;
2233
0
        rk->rk_ts_created = rd_clock();
2234
2235
        /* Struct-copy the config object. */
2236
0
        rk->rk_conf = *conf;
2237
0
        if (!app_conf)
2238
0
                rd_free(conf); /* Free the base config struct only,
2239
                                * not its fields since they were copied to
2240
                                * rk_conf just above. Those fields are
2241
                                * freed from rd_kafka_destroy_internal()
2242
                                * as the rk itself is destroyed. */
2243
2244
        /* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap.
2245
         */
2246
0
        if (rk->rk_conf.enable_random_seed)
2247
0
                call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);
2248
2249
        /* Call on_new() interceptors */
2250
0
        rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
2251
2252
0
        rwlock_init(&rk->rk_lock);
2253
0
        mtx_init(&rk->rk_conf.sasl.lock, mtx_plain);
2254
0
        mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
2255
2256
0
        cnd_init(&rk->rk_broker_state_change_cnd);
2257
0
        mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
2258
0
        rd_list_init(&rk->rk_broker_state_change_waiters, 8,
2259
0
                     rd_kafka_enq_once_trigger_destroy);
2260
2261
0
        cnd_init(&rk->rk_init_cnd);
2262
0
        mtx_init(&rk->rk_init_lock, mtx_plain);
2263
2264
0
        rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
2265
0
        rd_interval_init(&rk->rk_suppress.broker_metadata_refresh);
2266
0
        rd_interval_init(&rk->rk_suppress.sparse_connect_random);
2267
0
        mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);
2268
2269
0
        rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created);
2270
0
        rd_atomic32_init(&rk->rk_flushing, 0);
2271
2272
0
        rk->rk_rep             = rd_kafka_q_new(rk);
2273
0
        rk->rk_ops             = rd_kafka_q_new(rk);
2274
0
        rk->rk_ops->rkq_serve  = rd_kafka_poll_cb;
2275
0
        rk->rk_ops->rkq_opaque = rk;
2276
2277
0
        if (rk->rk_conf.log_queue) {
2278
0
                rk->rk_logq             = rd_kafka_q_new(rk);
2279
0
                rk->rk_logq->rkq_serve  = rd_kafka_poll_cb;
2280
0
                rk->rk_logq->rkq_opaque = rk;
2281
0
        }
2282
2283
0
        TAILQ_INIT(&rk->rk_brokers);
2284
0
        TAILQ_INIT(&rk->rk_topics);
2285
0
        rd_kafka_timers_init(&rk->rk_timers, rk, rk->rk_ops);
2286
0
        rd_kafka_metadata_cache_init(rk);
2287
0
        rd_kafka_coord_cache_init(&rk->rk_coord_cache,
2288
0
                                  rk->rk_conf.metadata_max_age_ms);
2289
0
        rd_kafka_coord_reqs_init(rk);
2290
2291
0
        if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
2292
0
                rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
2293
0
        else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
2294
0
                rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
2295
0
        else
2296
0
                rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
2297
0
        if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
2298
0
                rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
2299
2300
0
        if (rk->rk_conf.rebalance_cb)
2301
0
                rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
2302
0
        if (rk->rk_conf.offset_commit_cb)
2303
0
                rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
2304
0
        if (rk->rk_conf.error_cb)
2305
0
                rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
2306
#if WITH_SASL_OAUTHBEARER
2307
        if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt &&
2308
            !rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
2309
                rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2310
                    &rk->rk_conf, rd_kafka_oauthbearer_unsecured_token);
2311
2312
        if (rk->rk_conf.sasl.oauthbearer.token_refresh_cb &&
2313
            rk->rk_conf.sasl.oauthbearer.method !=
2314
                RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC)
2315
                rk->rk_conf.enabled_events |=
2316
                    RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
2317
#endif
2318
2319
#if WITH_OAUTHBEARER_OIDC
2320
        if (rk->rk_conf.sasl.oauthbearer.method ==
2321
                RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
2322
            !rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
2323
                rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2324
                    &rk->rk_conf, rd_kafka_oidc_token_refresh_cb);
2325
#endif
2326
2327
0
        rk->rk_controllerid = -1;
2328
2329
        /* Admin client defaults */
2330
0
        rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms;
2331
2332
0
        if (rk->rk_conf.debug)
2333
0
                rk->rk_conf.log_level = LOG_DEBUG;
2334
2335
0
        rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
2336
0
                    rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
2337
0
                    rd_atomic32_add(&rkid, 1));
2338
2339
        /* Construct clientid kafka string */
2340
0
        rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str, -1);
2341
2342
        /* Convert group.id to kafka string (may be NULL) */
2343
0
        rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str, -1);
2344
2345
        /* Config fixups */
2346
0
        rk->rk_conf.queued_max_msg_bytes =
2347
0
            (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
2348
2349
        /* Enable api.version.request=true if fallback.broker.version
2350
         * indicates a supporting broker. */
2351
0
        if (rd_kafka_ApiVersion_is_queryable(
2352
0
                rk->rk_conf.broker_version_fallback))
2353
0
                rk->rk_conf.api_version_request = 1;
2354
2355
0
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
2356
0
                mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
2357
0
                cnd_init(&rk->rk_curr_msgs.cnd);
2358
0
                rk->rk_curr_msgs.max_cnt = rk->rk_conf.queue_buffering_max_msgs;
2359
0
                if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes *
2360
0
                        1024 >
2361
0
                    (unsigned long long)SIZE_MAX) {
2362
0
                        rk->rk_curr_msgs.max_size = SIZE_MAX;
2363
0
                        rd_kafka_log(rk, LOG_WARNING, "QUEUESIZE",
2364
0
                                     "queue.buffering.max.kbytes adjusted "
2365
0
                                     "to system SIZE_MAX limit %" PRIusz
2366
0
                                     " bytes",
2367
0
                                     rk->rk_curr_msgs.max_size);
2368
0
                } else {
2369
0
                        rk->rk_curr_msgs.max_size =
2370
0
                            (size_t)rk->rk_conf.queue_buffering_max_kbytes *
2371
0
                            1024;
2372
0
                }
2373
0
        }
2374
2375
0
        if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
2376
0
                ret_err   = RD_KAFKA_RESP_ERR__INVALID_ARG;
2377
0
                ret_errno = EINVAL;
2378
0
                goto fail;
2379
0
        }
2380
2381
0
        if (!rk->rk_conf.group_remote_assignor) {
2382
0
                rd_kafka_assignor_t *cooperative_assignor;
2383
2384
                /* Detect if chosen assignor is cooperative
2385
                 * FIXME: remove this compatibility altogether
2386
                 * and apply the breaking changes that will be required
2387
                 * in next major version. */
2388
2389
0
                cooperative_assignor =
2390
0
                    rd_kafka_assignor_find(rk, "cooperative-sticky");
2391
0
                rk->rk_conf.partition_assignors_cooperative =
2392
0
                    !rk->rk_conf.partition_assignors.rl_cnt ||
2393
0
                    (cooperative_assignor &&
2394
0
                     cooperative_assignor->rkas_enabled);
2395
2396
0
                if (rk->rk_conf.group_protocol ==
2397
0
                    RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
2398
                        /* Default remote assignor to the chosen local one. */
2399
0
                        if (rk->rk_conf.partition_assignors_cooperative) {
2400
0
                                group_remote_assignor_override =
2401
0
                                    rd_strdup("uniform");
2402
0
                                rk->rk_conf.group_remote_assignor =
2403
0
                                    group_remote_assignor_override;
2404
0
                        } else {
2405
0
                                rd_kafka_assignor_t *range_assignor =
2406
0
                                    rd_kafka_assignor_find(rk, "range");
2407
0
                                if (range_assignor &&
2408
0
                                    range_assignor->rkas_enabled) {
2409
0
                                        rd_kafka_log(
2410
0
                                            rk, LOG_WARNING, "ASSIGNOR",
2411
0
                                            "\"range\" assignor is sticky "
2412
0
                                            "with group protocol CONSUMER");
2413
0
                                        group_remote_assignor_override =
2414
0
                                            rd_strdup("range");
2415
0
                                        rk->rk_conf.group_remote_assignor =
2416
0
                                            group_remote_assignor_override;
2417
0
                                } else {
2418
0
                                        rd_kafka_log(
2419
0
                                            rk, LOG_WARNING, "ASSIGNOR",
2420
0
                                            "roundrobin assignor isn't "
2421
0
                                            "available "
2422
0
                                            "with group protocol CONSUMER, "
2423
0
                                            "using the \"uniform\" one. "
2424
0
                                            "It's similar, "
2425
0
                                            "but it's also sticky");
2426
0
                                        group_remote_assignor_override =
2427
0
                                            rd_strdup("uniform");
2428
0
                                        rk->rk_conf.group_remote_assignor =
2429
0
                                            group_remote_assignor_override;
2430
0
                                }
2431
0
                        }
2432
0
                }
2433
0
        } else {
2434
                /* When users starts setting properties of the new protocol,
2435
                 * they can only use incremental_assign/unassign. */
2436
0
                rk->rk_conf.partition_assignors_cooperative = rd_true;
2437
0
        }
2438
2439
        /* Create Mock cluster */
2440
0
        rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
2441
0
        if (rk->rk_conf.mock.broker_cnt > 0) {
2442
0
                const char *mock_bootstraps;
2443
0
                rk->rk_mock.cluster =
2444
0
                    rd_kafka_mock_cluster_new(rk, rk->rk_conf.mock.broker_cnt);
2445
2446
0
                if (!rk->rk_mock.cluster) {
2447
0
                        rd_snprintf(errstr, errstr_size,
2448
0
                                    "Failed to create mock cluster, see logs");
2449
0
                        ret_err   = RD_KAFKA_RESP_ERR__FAIL;
2450
0
                        ret_errno = EINVAL;
2451
0
                        goto fail;
2452
0
                }
2453
2454
0
                mock_bootstraps =
2455
0
                    rd_kafka_mock_cluster_bootstraps(rk->rk_mock.cluster),
2456
0
                rd_kafka_log(rk, LOG_NOTICE, "MOCK",
2457
0
                             "Mock cluster enabled: "
2458
0
                             "original bootstrap.servers and security.protocol "
2459
0
                             "ignored and replaced with %s",
2460
0
                             mock_bootstraps);
2461
2462
                /* Overwrite bootstrap.servers and connection settings */
2463
0
                if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers",
2464
0
                                      mock_bootstraps, NULL,
2465
0
                                      0) != RD_KAFKA_CONF_OK)
2466
0
                        rd_assert(!"failed to replace mock bootstrap.servers");
2467
2468
0
                if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol",
2469
0
                                      "plaintext", NULL, 0) != RD_KAFKA_CONF_OK)
2470
0
                        rd_assert(!"failed to reset mock security.protocol");
2471
2472
0
                rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;
2473
2474
                /* Apply default RTT to brokers */
2475
0
                if (rk->rk_conf.mock.broker_rtt)
2476
0
                        rd_kafka_mock_broker_set_rtt(
2477
0
                            rk->rk_mock.cluster, -1 /*all brokers*/,
2478
0
                            rk->rk_conf.mock.broker_rtt);
2479
0
        }
2480
2481
0
        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
2482
0
            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
2483
                /* Select SASL provider */
2484
0
                if (rd_kafka_sasl_select_provider(rk, errstr, errstr_size) ==
2485
0
                    -1) {
2486
0
                        ret_err   = RD_KAFKA_RESP_ERR__INVALID_ARG;
2487
0
                        ret_errno = EINVAL;
2488
0
                        goto fail;
2489
0
                }
2490
2491
                /* Initialize SASL provider */
2492
0
                if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) {
2493
0
                        rk->rk_conf.sasl.provider = NULL;
2494
0
                        ret_err   = RD_KAFKA_RESP_ERR__INVALID_ARG;
2495
0
                        ret_errno = EINVAL;
2496
0
                        goto fail;
2497
0
                }
2498
0
        }
2499
2500
0
#if WITH_SSL
2501
0
        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
2502
0
            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
2503
                /* Create SSL context */
2504
0
                if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) {
2505
0
                        ret_err   = RD_KAFKA_RESP_ERR__INVALID_ARG;
2506
0
                        ret_errno = EINVAL;
2507
0
                        goto fail;
2508
0
                }
2509
0
        }
2510
0
#endif
2511
2512
0
        if (type == RD_KAFKA_CONSUMER) {
2513
0
                rd_kafka_assignment_init(rk);
2514
2515
0
                if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
2516
                        /* Create consumer group handle */
2517
0
                        rk->rk_cgrp = rd_kafka_cgrp_new(
2518
0
                            rk, rk->rk_conf.group_protocol, rk->rk_group_id,
2519
0
                            rk->rk_client_id);
2520
0
                        rk->rk_consumer.q =
2521
0
                            rd_kafka_q_keep(rk->rk_cgrp->rkcg_q);
2522
0
                } else {
2523
                        /* Legacy consumer */
2524
0
                        rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
2525
0
                }
2526
2527
0
        } else if (type == RD_KAFKA_PRODUCER) {
2528
0
                rk->rk_eos.transactional_id =
2529
0
                    rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
2530
0
        }
2531
2532
0
#ifndef _WIN32
2533
        /* Block all signals in newly created threads.
2534
         * To avoid race condition we block all signals in the calling
2535
         * thread, which the new thread will inherit its sigmask from,
2536
         * and then restore the original sigmask of the calling thread when
2537
         * we're done creating the thread. */
2538
0
        sigemptyset(&oldset);
2539
0
        sigfillset(&newset);
2540
0
        if (rk->rk_conf.term_sig) {
2541
0
                struct sigaction sa_term = {.sa_handler =
2542
0
                                                rd_kafka_term_sig_handler};
2543
0
                sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
2544
0
        }
2545
0
        pthread_sigmask(SIG_SETMASK, &newset, &oldset);
2546
0
#endif
2547
2548
        /* Create background thread and queue if background_event_cb()
2549
         * RD_KAFKA_EVENT_BACKGROUND has been enabled.
2550
         * Do this before creating the main thread since after
2551
         * the main thread is created it is no longer trivial to error
2552
         * out from rd_kafka_new(). */
2553
0
        if (rk->rk_conf.background_event_cb ||
2554
0
            (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_BACKGROUND)) {
2555
0
                rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
2556
0
                rd_kafka_wrlock(rk);
2557
0
                if (!rk->rk_background.q)
2558
0
                        err = rd_kafka_background_thread_create(rk, errstr,
2559
0
                                                                errstr_size);
2560
0
                rd_kafka_wrunlock(rk);
2561
0
                if (err)
2562
0
                        goto fail;
2563
0
        }
2564
2565
        /* Lock handle here to synchronise state, i.e., hold off
2566
         * the thread until we've finalized the handle. */
2567
0
        rd_kafka_wrlock(rk);
2568
2569
        /* Create handler thread */
2570
0
        mtx_lock(&rk->rk_init_lock);
2571
0
        rk->rk_init_wait_cnt++;
2572
0
        if ((thrd_create(&rk->rk_thread, rd_kafka_thread_main, rk)) !=
2573
0
            thrd_success) {
2574
0
                rk->rk_init_wait_cnt--;
2575
0
                ret_err   = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2576
0
                ret_errno = errno;
2577
0
                if (errstr)
2578
0
                        rd_snprintf(errstr, errstr_size,
2579
0
                                    "Failed to create thread: %s (%i)",
2580
0
                                    rd_strerror(errno), errno);
2581
0
                mtx_unlock(&rk->rk_init_lock);
2582
0
                rd_kafka_wrunlock(rk);
2583
0
#ifndef _WIN32
2584
                /* Restore sigmask of caller */
2585
0
                pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2586
0
#endif
2587
0
                goto fail;
2588
0
        }
2589
2590
0
        mtx_unlock(&rk->rk_init_lock);
2591
0
        rd_kafka_wrunlock(rk);
2592
2593
        /*
2594
         * @warning `goto fail` is prohibited past this point
2595
         */
2596
2597
0
        mtx_lock(&rk->rk_internal_rkb_lock);
2598
0
        rk->rk_internal_rkb =
2599
0
            rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL, RD_KAFKA_PROTO_PLAINTEXT,
2600
0
                                "", 0, RD_KAFKA_NODEID_UA);
2601
0
        mtx_unlock(&rk->rk_internal_rkb_lock);
2602
2603
        /* Add initial list of brokers from configuration */
2604
0
        if (rk->rk_conf.brokerlist) {
2605
0
                if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist,
2606
0
                                          rd_true) == 0)
2607
0
                        rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
2608
0
                                        "No brokers configured");
2609
0
        }
2610
2611
0
#ifndef _WIN32
2612
        /* Restore sigmask of caller */
2613
0
        pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2614
0
#endif
2615
2616
        /* Wait for background threads to fully initialize so that
2617
         * the client instance is fully functional at the time it is
2618
         * returned from the constructor. */
2619
0
        if (rd_kafka_init_wait(rk, 60 * 1000) != 0) {
2620
                /* This should never happen unless there is a bug
2621
                 * or the OS is not scheduling the background threads.
2622
                 * Either case there is no point in handling this gracefully
2623
                 * in the current state since the thread joins are likely
2624
                 * to hang as well. */
2625
0
                mtx_lock(&rk->rk_init_lock);
2626
0
                rd_kafka_log(rk, LOG_CRIT, "INIT",
2627
0
                             "Failed to initialize %s: "
2628
0
                             "%d background thread(s) did not initialize "
2629
0
                             "within 60 seconds",
2630
0
                             rk->rk_name, rk->rk_init_wait_cnt);
2631
0
                if (errstr)
2632
0
                        rd_snprintf(errstr, errstr_size,
2633
0
                                    "Timed out waiting for "
2634
0
                                    "%d background thread(s) to initialize",
2635
0
                                    rk->rk_init_wait_cnt);
2636
0
                mtx_unlock(&rk->rk_init_lock);
2637
2638
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
2639
0
                                        EDEADLK);
2640
0
                return NULL;
2641
0
        }
2642
2643
0
        rk->rk_initialized = 1;
2644
2645
0
        bflen = sizeof(builtin_features);
2646
0
        if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features",
2647
0
                              builtin_features, &bflen) != RD_KAFKA_CONF_OK)
2648
0
                rd_snprintf(builtin_features, sizeof(builtin_features), "?");
2649
0
        rd_kafka_dbg(rk, ALL, "INIT",
2650
0
                     "librdkafka v%s (0x%x) %s initialized "
2651
0
                     "(builtin.features %s, %s, debug 0x%x)",
2652
0
                     rd_kafka_version_str(), rd_kafka_version(), rk->rk_name,
2653
0
                     builtin_features, BUILT_WITH, rk->rk_conf.debug);
2654
2655
        /* Log warnings for deprecated configuration */
2656
0
        rd_kafka_conf_warn(rk);
2657
2658
        /* Debug dump configuration */
2659
0
        if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
2660
0
                rd_kafka_anyconf_dump_dbg(rk, _RK_GLOBAL, &rk->rk_conf,
2661
0
                                          "Client configuration");
2662
0
                if (rk->rk_conf.topic_conf)
2663
0
                        rd_kafka_anyconf_dump_dbg(
2664
0
                            rk, _RK_TOPIC, rk->rk_conf.topic_conf,
2665
0
                            "Default topic configuration");
2666
0
        }
2667
2668
        /* Free user supplied conf's base pointer on success,
2669
         * but not the actual allocated fields since the struct
2670
         * will have been copied in its entirety above. */
2671
0
        if (app_conf)
2672
0
                rd_free(app_conf);
2673
0
        rd_kafka_set_last_error(0, 0);
2674
2675
0
        return rk;
2676
2677
0
fail:
2678
        /*
2679
         * Error out and clean up
2680
         */
2681
2682
        /*
2683
         * Tell background thread to terminate and wait for it to return.
2684
         */
2685
0
        rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE);
2686
2687
        /* Terminate SASL provider */
2688
0
        if (rk->rk_conf.sasl.provider)
2689
0
                rd_kafka_sasl_term(rk);
2690
2691
0
        if (rk->rk_background.thread) {
2692
0
                int res;
2693
0
                thrd_join(rk->rk_background.thread, &res);
2694
0
                rd_kafka_q_destroy_owner(rk->rk_background.q);
2695
0
        }
2696
2697
        /* If on_new() interceptors have been called we also need
2698
         * to allow interceptor clean-up by calling on_destroy() */
2699
0
        rd_kafka_interceptors_on_destroy(rk);
2700
2701
        /* If rk_conf is a struct-copy of the application configuration
2702
         * we need to avoid rk_conf fields from being freed from
2703
         * rd_kafka_destroy_internal() since they belong to app_conf.
2704
         * However, there are some internal fields, such as interceptors,
2705
         * that belong to rk_conf and thus needs to be cleaned up.
2706
         * Legacy APIs, sigh.. */
2707
0
        if (app_conf) {
2708
0
                if (group_remote_assignor_override)
2709
0
                        rd_free(group_remote_assignor_override);
2710
0
                rd_kafka_assignors_term(rk);
2711
0
                rd_kafka_interceptors_destroy(&rk->rk_conf);
2712
0
                memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
2713
0
        }
2714
2715
0
        rd_kafka_destroy_internal(rk);
2716
0
        rd_kafka_destroy_final(rk);
2717
2718
0
        rd_kafka_set_last_error(ret_err, ret_errno);
2719
2720
0
        return NULL;
2721
0
}
2722
2723
2724
2725
/**
2726
 * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
2727
 * friends) since it does not have an API for stopping the cgrp we will need to
2728
 * sort that out automatically in the background when all consumption
2729
 * has stopped.
2730
 *
2731
 * Returns 0 if a  High level consumer is already instantiated
2732
 * which means a Simple consumer cannot co-operate with it, else 1.
2733
 *
2734
 * A rd_kafka_t handle can never migrate from simple to high-level, or
2735
 * vice versa, so we dont need a ..consumer_del().
2736
 */
2737
0
int rd_kafka_simple_consumer_add(rd_kafka_t *rk) {
2738
0
        if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
2739
0
                return 0;
2740
2741
0
        return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
2742
0
}
2743
2744
2745
2746
/**
2747
 * rktp fetch is split up in these parts:
2748
 *   * application side:
2749
 *   * broker side (handled by current leader broker thread for rktp):
2750
 *          - the fetch state, initial offset, etc.
2751
 *          - fetching messages, updating fetched offset, etc.
2752
 *          - offset commits
2753
 *
2754
 * Communication between the two are:
2755
 *    app side -> rdkafka main side: rktp_ops
2756
 *    broker thread -> app side: rktp_fetchq
2757
 *
2758
 * There is no shared state between these threads, instead
2759
 * state is communicated through the two op queues, and state synchronization
2760
 * is performed by version barriers.
2761
 *
2762
 */
2763
2764
static RD_UNUSED int rd_kafka_consume_start0(rd_kafka_topic_t *rkt,
2765
                                             int32_t partition,
2766
                                             int64_t offset,
2767
0
                                             rd_kafka_q_t *rkq) {
2768
0
        rd_kafka_toppar_t *rktp;
2769
2770
0
        if (partition < 0) {
2771
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2772
0
                                        ESRCH);
2773
0
                return -1;
2774
0
        }
2775
2776
0
        if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
2777
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2778
0
                return -1;
2779
0
        }
2780
2781
0
        rd_kafka_topic_wrlock(rkt);
2782
0
        rktp = rd_kafka_toppar_desired_add(rkt, partition);
2783
0
        rd_kafka_topic_wrunlock(rkt);
2784
2785
        /* Verify offset */
2786
0
        if (offset == RD_KAFKA_OFFSET_BEGINNING ||
2787
0
            offset == RD_KAFKA_OFFSET_END ||
2788
0
            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
2789
                /* logical offsets */
2790
2791
0
        } else if (offset == RD_KAFKA_OFFSET_STORED) {
2792
                /* offset manager */
2793
2794
0
                if (rkt->rkt_conf.offset_store_method ==
2795
0
                        RD_KAFKA_OFFSET_METHOD_BROKER &&
2796
0
                    RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
2797
                        /* Broker based offsets require a group id. */
2798
0
                        rd_kafka_toppar_destroy(rktp);
2799
0
                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2800
0
                                                EINVAL);
2801
0
                        return -1;
2802
0
                }
2803
2804
0
        } else if (offset < 0) {
2805
0
                rd_kafka_toppar_destroy(rktp);
2806
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2807
0
                return -1;
2808
0
        }
2809
2810
0
        rd_kafka_toppar_op_fetch_start(rktp, RD_KAFKA_FETCH_POS(offset, -1),
2811
0
                                       rkq, RD_KAFKA_NO_REPLYQ);
2812
2813
0
        rd_kafka_toppar_destroy(rktp);
2814
2815
0
        rd_kafka_set_last_error(0, 0);
2816
0
        return 0;
2817
0
}
2818
2819
2820
2821
int rd_kafka_consume_start(rd_kafka_topic_t *app_rkt,
2822
                           int32_t partition,
2823
0
                           int64_t offset) {
2824
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2825
0
        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
2826
0
                     "Start consuming partition %" PRId32, partition);
2827
0
        return rd_kafka_consume_start0(rkt, partition, offset, NULL);
2828
0
}
2829
2830
int rd_kafka_consume_start_queue(rd_kafka_topic_t *app_rkt,
2831
                                 int32_t partition,
2832
                                 int64_t offset,
2833
0
                                 rd_kafka_queue_t *rkqu) {
2834
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2835
2836
0
        return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
2837
0
}
2838
2839
2840
2841
0
static RD_UNUSED int rd_kafka_consume_stop0(rd_kafka_toppar_t *rktp) {
2842
0
        rd_kafka_q_t *tmpq = NULL;
2843
0
        rd_kafka_resp_err_t err;
2844
2845
0
        rd_kafka_topic_wrlock(rktp->rktp_rkt);
2846
0
        rd_kafka_toppar_lock(rktp);
2847
0
        rd_kafka_toppar_desired_del(rktp);
2848
0
        rd_kafka_toppar_unlock(rktp);
2849
0
        rd_kafka_topic_wrunlock(rktp->rktp_rkt);
2850
2851
0
        tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
2852
2853
0
        rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
2854
2855
        /* Synchronisation: Wait for stop reply from broker thread */
2856
0
        err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2857
0
        rd_kafka_q_destroy_owner(tmpq);
2858
2859
0
        rd_kafka_set_last_error(err, err ? EINVAL : 0);
2860
2861
0
        return err ? -1 : 0;
2862
0
}
2863
2864
2865
0
int rd_kafka_consume_stop(rd_kafka_topic_t *app_rkt, int32_t partition) {
2866
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2867
0
        rd_kafka_toppar_t *rktp;
2868
0
        int r;
2869
2870
0
        if (partition == RD_KAFKA_PARTITION_UA) {
2871
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2872
0
                return -1;
2873
0
        }
2874
2875
0
        rd_kafka_topic_wrlock(rkt);
2876
0
        if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2877
0
            !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2878
0
                rd_kafka_topic_wrunlock(rkt);
2879
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2880
0
                                        ESRCH);
2881
0
                return -1;
2882
0
        }
2883
0
        rd_kafka_topic_wrunlock(rkt);
2884
2885
0
        r = rd_kafka_consume_stop0(rktp);
2886
        /* set_last_error() called by stop0() */
2887
2888
0
        rd_kafka_toppar_destroy(rktp);
2889
2890
0
        return r;
2891
0
}
2892
2893
2894
2895
rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *app_rkt,
2896
                                  int32_t partition,
2897
                                  int64_t offset,
2898
0
                                  int timeout_ms) {
2899
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2900
0
        rd_kafka_toppar_t *rktp;
2901
0
        rd_kafka_q_t *tmpq = NULL;
2902
0
        rd_kafka_resp_err_t err;
2903
0
        rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ;
2904
2905
        /* FIXME: simple consumer check */
2906
2907
0
        if (partition == RD_KAFKA_PARTITION_UA)
2908
0
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
2909
2910
0
        rd_kafka_topic_rdlock(rkt);
2911
0
        if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2912
0
            !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2913
0
                rd_kafka_topic_rdunlock(rkt);
2914
0
                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2915
0
        }
2916
0
        rd_kafka_topic_rdunlock(rkt);
2917
2918
0
        if (timeout_ms) {
2919
0
                tmpq   = rd_kafka_q_new(rkt->rkt_rk);
2920
0
                replyq = RD_KAFKA_REPLYQ(tmpq, 0);
2921
0
        }
2922
2923
0
        if ((err = rd_kafka_toppar_op_seek(rktp, RD_KAFKA_FETCH_POS(offset, -1),
2924
0
                                           replyq))) {
2925
0
                if (tmpq)
2926
0
                        rd_kafka_q_destroy_owner(tmpq);
2927
0
                rd_kafka_toppar_destroy(rktp);
2928
0
                return err;
2929
0
        }
2930
2931
0
        rd_kafka_toppar_destroy(rktp);
2932
2933
0
        if (tmpq) {
2934
0
                err = rd_kafka_q_wait_result(tmpq, timeout_ms);
2935
0
                rd_kafka_q_destroy_owner(tmpq);
2936
0
                return err;
2937
0
        }
2938
2939
0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
2940
0
}
2941
2942
2943
rd_kafka_error_t *
2944
rd_kafka_seek_partitions(rd_kafka_t *rk,
2945
                         rd_kafka_topic_partition_list_t *partitions,
2946
0
                         int timeout_ms) {
2947
0
        rd_kafka_q_t *tmpq = NULL;
2948
0
        rd_kafka_topic_partition_t *rktpar;
2949
0
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2950
0
        int cnt             = 0;
2951
2952
0
        if (rk->rk_type != RD_KAFKA_CONSUMER)
2953
0
                return rd_kafka_error_new(
2954
0
                    RD_KAFKA_RESP_ERR__INVALID_ARG,
2955
0
                    "Must only be used on consumer instance");
2956
2957
0
        if (!partitions || partitions->cnt == 0)
2958
0
                return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
2959
0
                                          "partitions must be specified");
2960
2961
0
        if (timeout_ms)
2962
0
                tmpq = rd_kafka_q_new(rk);
2963
2964
0
        RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
2965
0
                rd_kafka_toppar_t *rktp;
2966
0
                rd_kafka_resp_err_t err;
2967
2968
0
                rktp = rd_kafka_toppar_get2(
2969
0
                    rk, rktpar->topic, rktpar->partition,
2970
0
                    rd_false /*no-ua-on-miss*/, rd_false /*no-create-on-miss*/);
2971
0
                if (!rktp) {
2972
0
                        rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2973
0
                        continue;
2974
0
                }
2975
2976
0
                err = rd_kafka_toppar_op_seek(
2977
0
                    rktp, rd_kafka_topic_partition_get_fetch_pos(rktpar),
2978
0
                    RD_KAFKA_REPLYQ(tmpq, 0));
2979
0
                if (err) {
2980
0
                        rktpar->err = err;
2981
0
                } else {
2982
0
                        rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
2983
0
                        cnt++;
2984
0
                }
2985
2986
0
                rd_kafka_toppar_destroy(rktp); /* refcnt from toppar_get2() */
2987
0
        }
2988
2989
0
        if (!timeout_ms)
2990
0
                return NULL;
2991
2992
2993
0
        while (cnt > 0) {
2994
0
                rd_kafka_op_t *rko;
2995
2996
0
                rko =
2997
0
                    rd_kafka_q_pop(tmpq, rd_timeout_remains_us(abs_timeout), 0);
2998
0
                if (!rko) {
2999
0
                        rd_kafka_q_destroy_owner(tmpq);
3000
3001
0
                        return rd_kafka_error_new(
3002
0
                            RD_KAFKA_RESP_ERR__TIMED_OUT,
3003
0
                            "Timed out waiting for %d remaining partition "
3004
0
                            "seek(s) to finish",
3005
0
                            cnt);
3006
0
                }
3007
3008
0
                if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
3009
0
                        rd_kafka_q_destroy_owner(tmpq);
3010
0
                        rd_kafka_op_destroy(rko);
3011
3012
0
                        return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
3013
0
                                                  "Instance is terminating");
3014
0
                }
3015
3016
0
                rd_assert(rko->rko_rktp);
3017
3018
0
                rktpar = rd_kafka_topic_partition_list_find(
3019
0
                    partitions, rko->rko_rktp->rktp_rkt->rkt_topic->str,
3020
0
                    rko->rko_rktp->rktp_partition);
3021
0
                rd_assert(rktpar);
3022
3023
0
                rktpar->err = rko->rko_err;
3024
3025
0
                rd_kafka_op_destroy(rko);
3026
3027
0
                cnt--;
3028
0
        }
3029
3030
0
        rd_kafka_q_destroy_owner(tmpq);
3031
3032
0
        return NULL;
3033
0
}
3034
3035
3036
3037
static ssize_t rd_kafka_consume_batch0(rd_kafka_q_t *rkq,
3038
                                       int timeout_ms,
3039
                                       rd_kafka_message_t **rkmessages,
3040
0
                                       size_t rkmessages_size) {
3041
        /* Populate application's rkmessages array. */
3042
0
        return rd_kafka_q_serve_rkmessages(rkq, timeout_ms, rkmessages,
3043
0
                                           rkmessages_size);
3044
0
}
3045
3046
3047
ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *app_rkt,
3048
                               int32_t partition,
3049
                               int timeout_ms,
3050
                               rd_kafka_message_t **rkmessages,
3051
0
                               size_t rkmessages_size) {
3052
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3053
0
        rd_kafka_toppar_t *rktp;
3054
0
        ssize_t cnt;
3055
3056
        /* Get toppar */
3057
0
        rd_kafka_topic_rdlock(rkt);
3058
0
        rktp = rd_kafka_toppar_get(rkt, partition, 0 /*no ua on miss*/);
3059
0
        if (unlikely(!rktp))
3060
0
                rktp = rd_kafka_toppar_desired_get(rkt, partition);
3061
0
        rd_kafka_topic_rdunlock(rkt);
3062
3063
0
        if (unlikely(!rktp)) {
3064
                /* No such toppar known */
3065
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3066
0
                                        ESRCH);
3067
0
                return -1;
3068
0
        }
3069
3070
        /* Populate application's rkmessages array. */
3071
0
        cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
3072
0
                                          rkmessages, rkmessages_size);
3073
3074
0
        rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
3075
3076
0
        rd_kafka_set_last_error(0, 0);
3077
3078
0
        return cnt;
3079
0
}
3080
3081
ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
3082
                                     int timeout_ms,
3083
                                     rd_kafka_message_t **rkmessages,
3084
0
                                     size_t rkmessages_size) {
3085
        /* Populate application's rkmessages array. */
3086
0
        return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms, rkmessages,
3087
0
                                       rkmessages_size);
3088
0
}
3089
3090
3091
struct consume_ctx {
3092
        void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque);
3093
        void *opaque;
3094
};
3095
3096
3097
/**
3098
 * Trampoline for application's consume_cb()
3099
 */
3100
static rd_kafka_op_res_t rd_kafka_consume_cb(rd_kafka_t *rk,
3101
                                             rd_kafka_q_t *rkq,
3102
                                             rd_kafka_op_t *rko,
3103
                                             rd_kafka_q_cb_type_t cb_type,
3104
0
                                             void *opaque) {
3105
0
        struct consume_ctx *ctx = opaque;
3106
0
        rd_kafka_message_t *rkmessage;
3107
3108
0
        if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
3109
0
            rko->rko_type == RD_KAFKA_OP_BARRIER) {
3110
0
                rd_kafka_op_destroy(rko);
3111
0
                return RD_KAFKA_OP_RES_HANDLED;
3112
0
        }
3113
3114
0
        rkmessage = rd_kafka_message_get(rko);
3115
3116
0
        rd_kafka_fetch_op_app_prepare(rk, rko);
3117
3118
0
        ctx->consume_cb(rkmessage, ctx->opaque);
3119
3120
0
        rd_kafka_op_destroy(rko);
3121
3122
0
        return RD_KAFKA_OP_RES_HANDLED;
3123
0
}
3124
3125
3126
3127
static rd_kafka_op_res_t rd_kafka_consume_callback0(
3128
    rd_kafka_q_t *rkq,
3129
    int timeout_ms,
3130
    int max_cnt,
3131
    void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque),
3132
0
    void *opaque) {
3133
0
        struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
3134
0
        rd_kafka_op_res_t res;
3135
3136
0
        if (timeout_ms)
3137
0
                rd_kafka_app_poll_blocking(rkq->rkq_rk);
3138
3139
0
        res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
3140
0
                               rd_kafka_consume_cb, &ctx);
3141
3142
0
        rd_kafka_app_polled(rkq->rkq_rk);
3143
3144
0
        return res;
3145
0
}
3146
3147
3148
int rd_kafka_consume_callback(rd_kafka_topic_t *app_rkt,
3149
                              int32_t partition,
3150
                              int timeout_ms,
3151
                              void (*consume_cb)(rd_kafka_message_t *rkmessage,
3152
                                                 void *opaque),
3153
0
                              void *opaque) {
3154
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3155
0
        rd_kafka_toppar_t *rktp;
3156
0
        int r;
3157
3158
        /* Get toppar */
3159
0
        rd_kafka_topic_rdlock(rkt);
3160
0
        rktp = rd_kafka_toppar_get(rkt, partition, 0 /*no ua on miss*/);
3161
0
        if (unlikely(!rktp))
3162
0
                rktp = rd_kafka_toppar_desired_get(rkt, partition);
3163
0
        rd_kafka_topic_rdunlock(rkt);
3164
3165
0
        if (unlikely(!rktp)) {
3166
                /* No such toppar known */
3167
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3168
0
                                        ESRCH);
3169
0
                return -1;
3170
0
        }
3171
3172
0
        r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
3173
0
                                       rkt->rkt_conf.consume_callback_max_msgs,
3174
0
                                       consume_cb, opaque);
3175
3176
0
        rd_kafka_toppar_destroy(rktp);
3177
3178
0
        rd_kafka_set_last_error(0, 0);
3179
3180
0
        return r;
3181
0
}
3182
3183
3184
3185
int rd_kafka_consume_callback_queue(
3186
    rd_kafka_queue_t *rkqu,
3187
    int timeout_ms,
3188
    void (*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque),
3189
0
    void *opaque) {
3190
0
        return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
3191
0
                                          consume_cb, opaque);
3192
0
}
3193
3194
3195
/**
3196
 * Serve queue 'rkq' and return one message.
3197
 * By serving the queue it will also call any registered callbacks
3198
 * registered for matching events, this includes consumer_cb()
3199
 * in which case no message will be returned.
3200
 */
3201
static rd_kafka_message_t *
3202
0
rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
3203
0
        rd_kafka_op_t *rko;
3204
0
        rd_kafka_message_t *rkmessage = NULL;
3205
0
        rd_ts_t abs_timeout           = rd_timeout_init(timeout_ms);
3206
3207
0
        if (timeout_ms)
3208
0
                rd_kafka_app_poll_blocking(rk);
3209
3210
0
        rd_kafka_yield_thread = 0;
3211
0
        while ((
3212
0
            rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0))) {
3213
0
                rd_kafka_op_res_t res;
3214
3215
0
                res =
3216
0
                    rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
3217
3218
0
                if (res == RD_KAFKA_OP_RES_PASS)
3219
0
                        break;
3220
3221
0
                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
3222
0
                             rd_kafka_yield_thread)) {
3223
                        /* Callback called rd_kafka_yield(), we must
3224
                         * stop dispatching the queue and return. */
3225
0
                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR);
3226
0
                        rd_kafka_app_polled(rk);
3227
0
                        return NULL;
3228
0
                }
3229
3230
                /* Message was handled by callback. */
3231
0
                continue;
3232
0
        }
3233
3234
0
        if (!rko) {
3235
                /* Timeout reached with no op returned. */
3236
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
3237
0
                                        ETIMEDOUT);
3238
0
                rd_kafka_app_polled(rk);
3239
0
                return NULL;
3240
0
        }
3241
3242
0
        rd_kafka_assert(rk, rko->rko_type == RD_KAFKA_OP_FETCH ||
3243
0
                                rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
3244
3245
        /* Get rkmessage from rko */
3246
0
        rkmessage = rd_kafka_message_get(rko);
3247
3248
        /* Store offset, etc */
3249
0
        rd_kafka_fetch_op_app_prepare(rk, rko);
3250
3251
0
        rd_kafka_set_last_error(0, 0);
3252
3253
0
        rd_kafka_app_polled(rk);
3254
3255
0
        return rkmessage;
3256
0
}
3257
3258
rd_kafka_message_t *
3259
0
rd_kafka_consume(rd_kafka_topic_t *app_rkt, int32_t partition, int timeout_ms) {
3260
0
        rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3261
0
        rd_kafka_toppar_t *rktp;
3262
0
        rd_kafka_message_t *rkmessage;
3263
3264
0
        rd_kafka_topic_rdlock(rkt);
3265
0
        rktp = rd_kafka_toppar_get(rkt, partition, 0 /*no ua on miss*/);
3266
0
        if (unlikely(!rktp))
3267
0
                rktp = rd_kafka_toppar_desired_get(rkt, partition);
3268
0
        rd_kafka_topic_rdunlock(rkt);
3269
3270
0
        if (unlikely(!rktp)) {
3271
                /* No such toppar known */
3272
0
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3273
0
                                        ESRCH);
3274
0
                return NULL;
3275
0
        }
3276
3277
0
        rkmessage =
3278
0
            rd_kafka_consume0(rkt->rkt_rk, rktp->rktp_fetchq, timeout_ms);
3279
3280
0
        rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
3281
3282
0
        return rkmessage;
3283
0
}
3284
3285
3286
rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
3287
0
                                           int timeout_ms) {
3288
0
        return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
3289
0
}
3290
3291
3292
3293
0
rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk) {
3294
0
        rd_kafka_cgrp_t *rkcg;
3295
3296
0
        if (!(rkcg = rd_kafka_cgrp_get(rk)))
3297
0
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3298
3299
0
        rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
3300
0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
3301
0
}
3302
3303
3304
3305
0
rd_kafka_message_t *rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms) {
3306
0
        rd_kafka_cgrp_t *rkcg;
3307
3308
0
        if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
3309
0
                rd_kafka_message_t *rkmessage = rd_kafka_message_new();
3310
0
                rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3311
0
                return rkmessage;
3312
0
        }
3313
3314
0
        return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
3315
0
}
3316
3317
3318
/**
3319
 * @brief Consumer close.
3320
 *
3321
 * @param rkq The consumer group queue will be forwarded to this queue, which
3322
 *            which must be served (rebalance events) by the application/caller
3323
 *            until rd_kafka_consumer_closed() returns true.
3324
 *            If the consumer is not in a joined state, no rebalance events
3325
 *            will be emitted.
3326
 */
3327
static rd_kafka_error_t *rd_kafka_consumer_close_q(rd_kafka_t *rk,
3328
0
                                                   rd_kafka_q_t *rkq) {
3329
0
        rd_kafka_cgrp_t *rkcg;
3330
0
        rd_kafka_error_t *error = NULL;
3331
3332
0
        if (!(rkcg = rd_kafka_cgrp_get(rk)))
3333
0
                return rd_kafka_error_new(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
3334
0
                                          "Consume close called on non-group "
3335
0
                                          "consumer");
3336
3337
0
        if (rd_atomic32_get(&rkcg->rkcg_terminated))
3338
0
                return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
3339
0
                                          "Consumer already closed");
3340
3341
        /* If a fatal error has been raised and this is an
3342
         * explicit consumer_close() from the application we return
3343
         * a fatal error. Otherwise let the "silent" no_consumer_close
3344
         * logic be performed to clean up properly. */
3345
0
        if (!rd_kafka_destroy_flags_no_consumer_close(rk) &&
3346
0
            (error = rd_kafka_get_fatal_error(rk)))
3347
0
                return error;
3348
3349
0
        rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
3350
0
                     "Closing consumer");
3351
3352
        /* Redirect cgrp queue to the rebalance queue to make sure all posted
3353
         * ops (e.g., rebalance callbacks) are served by
3354
         * the application/caller. */
3355
0
        rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
3356
3357
        /* Tell cgrp subsystem to terminate. A TERMINATE op will be posted
3358
         * on the rkq when done. */
3359
0
        rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
3360
3361
0
        return error;
3362
0
}
3363
3364
rd_kafka_error_t *rd_kafka_consumer_close_queue(rd_kafka_t *rk,
3365
0
                                                rd_kafka_queue_t *rkqu) {
3366
0
        if (!rkqu)
3367
0
                return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
3368
0
                                          "Queue must be specified");
3369
0
        return rd_kafka_consumer_close_q(rk, rkqu->rkqu_q);
3370
0
}
3371
3372
0
rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
3373
0
        rd_kafka_error_t *error;
3374
0
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3375
0
        rd_kafka_q_t *rkq;
3376
3377
        /* Create a temporary reply queue to handle the TERMINATE reply op. */
3378
0
        rkq = rd_kafka_q_new(rk);
3379
3380
        /* Initiate the close (async) */
3381
0
        error = rd_kafka_consumer_close_q(rk, rkq);
3382
0
        if (error) {
3383
0
                err = rd_kafka_error_is_fatal(error)
3384
0
                          ? RD_KAFKA_RESP_ERR__FATAL
3385
0
                          : rd_kafka_error_code(error);
3386
0
                rd_kafka_error_destroy(error);
3387
0
                rd_kafka_q_destroy_owner(rkq);
3388
0
                return err;
3389
0
        }
3390
3391
        /* Disable the queue if termination is immediate or the user
3392
         * does not want the blocking consumer_close() behaviour, this will
3393
         * cause any ops posted for this queue (such as rebalance) to
3394
         * be destroyed.
3395
         */
3396
0
        if (rd_kafka_destroy_flags_no_consumer_close(rk)) {
3397
0
                rd_kafka_dbg(rk, CONSUMER, "CLOSE",
3398
0
                             "Disabling and purging temporary queue to quench "
3399
0
                             "close events");
3400
0
                err = RD_KAFKA_RESP_ERR_NO_ERROR;
3401
0
                rd_kafka_q_disable(rkq);
3402
                /* Purge ops already enqueued */
3403
0
                rd_kafka_q_purge(rkq);
3404
0
        } else {
3405
0
                rd_kafka_op_t *rko;
3406
0
                rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Waiting for close events");
3407
0
                while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
3408
0
                        rd_kafka_op_res_t res;
3409
0
                        if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
3410
0
                            RD_KAFKA_OP_TERMINATE) {
3411
0
                                err = rko->rko_err;
3412
0
                                rd_kafka_op_destroy(rko);
3413
0
                                break;
3414
0
                        }
3415
                        /* Handle callbacks */
3416
0
                        res = rd_kafka_poll_cb(rk, rkq, rko,
3417
0
                                               RD_KAFKA_Q_CB_RETURN, NULL);
3418
0
                        if (res == RD_KAFKA_OP_RES_PASS)
3419
0
                                rd_kafka_op_destroy(rko);
3420
                        /* Ignore YIELD, we need to finish */
3421
0
                }
3422
0
        }
3423
3424
0
        rd_kafka_q_destroy_owner(rkq);
3425
3426
0
        if (err)
3427
0
                rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
3428
0
                             "Consumer closed with error: %s",
3429
0
                             rd_kafka_err2str(err));
3430
0
        else
3431
0
                rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
3432
0
                             "Consumer closed");
3433
3434
0
        return err;
3435
0
}
3436
3437
3438
0
int rd_kafka_consumer_closed(rd_kafka_t *rk) {
3439
0
        if (unlikely(!rk->rk_cgrp))
3440
0
                return 0;
3441
3442
0
        return rd_atomic32_get(&rk->rk_cgrp->rkcg_terminated);
3443
0
}
3444
3445
3446
rd_kafka_resp_err_t
3447
rd_kafka_committed(rd_kafka_t *rk,
3448
                   rd_kafka_topic_partition_list_t *partitions,
3449
0
                   int timeout_ms) {
3450
0
        rd_kafka_q_t *rkq;
3451
0
        rd_kafka_resp_err_t err;
3452
0
        rd_kafka_cgrp_t *rkcg;
3453
0
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3454
3455
0
        if (!partitions)
3456
0
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
3457
3458
0
        if (!(rkcg = rd_kafka_cgrp_get(rk)))
3459
0
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3460
3461
        /* Set default offsets. */
3462
0
        rd_kafka_topic_partition_list_reset_offsets(partitions,
3463
0
                                                    RD_KAFKA_OFFSET_INVALID);
3464
3465
0
        rkq = rd_kafka_q_new(rk);
3466
3467
0
        do {
3468
0
                rd_kafka_op_t *rko;
3469
0
                int state_version = rd_kafka_brokers_get_state_version(rk);
3470
3471
0
                rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
3472
0
                rd_kafka_op_set_replyq(rko, rkq, NULL);
3473
3474
                /* Issue #827
3475
                 * Copy partition list to avoid use-after-free if we time out
3476
                 * here, the app frees the list, and then cgrp starts
3477
                 * processing the op. */
3478
0
                rko->rko_u.offset_fetch.partitions =
3479
0
                    rd_kafka_topic_partition_list_copy(partitions);
3480
0
                rko->rko_u.offset_fetch.require_stable_offsets =
3481
0
                    rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
3482
0
                rko->rko_u.offset_fetch.do_free = 1;
3483
3484
0
                if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
3485
0
                        err = RD_KAFKA_RESP_ERR__DESTROY;
3486
0
                        break;
3487
0
                }
3488
3489
0
                rko =
3490
0
                    rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0);
3491
0
                if (rko) {
3492
0
                        if (!(err = rko->rko_err))
3493
0
                                rd_kafka_topic_partition_list_update(
3494
0
                                    partitions,
3495
0
                                    rko->rko_u.offset_fetch.partitions);
3496
0
                        else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
3497
0
                                  err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
3498
0
                                 !rd_kafka_brokers_wait_state_change(
3499
0
                                     rk, state_version,
3500
0
                                     rd_timeout_remains(abs_timeout)))
3501
0
                                err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3502
3503
0
                        rd_kafka_op_destroy(rko);
3504
0
                } else
3505
0
                        err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3506
0
        } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
3507
0
                 err == RD_KAFKA_RESP_ERR__WAIT_COORD);
3508
3509
0
        rd_kafka_q_destroy_owner(rkq);
3510
3511
0
        return err;
3512
0
}
3513
3514
3515
3516
rd_kafka_resp_err_t
3517
0
rd_kafka_position(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions) {
3518
0
        int i;
3519
3520
0
        for (i = 0; i < partitions->cnt; i++) {
3521
0
                rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
3522
0
                rd_kafka_toppar_t *rktp;
3523
3524
0
                if (!(rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
3525
0
                                                  rktpar->partition, 0, 1))) {
3526
0
                        rktpar->err    = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3527
0
                        rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3528
0
                        continue;
3529
0
                }
3530
3531
0
                rd_kafka_toppar_lock(rktp);
3532
0
                rd_kafka_topic_partition_set_from_fetch_pos(rktpar,
3533
0
                                                            rktp->rktp_app_pos);
3534
0
                rd_kafka_toppar_unlock(rktp);
3535
0
                rd_kafka_toppar_destroy(rktp);
3536
3537
0
                rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3538
0
        }
3539
3540
0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
3541
0
}
3542
3543
3544
3545
struct _query_wmark_offsets_state {
3546
        rd_kafka_resp_err_t err;
3547
        const char *topic;
3548
        int32_t partition;
3549
        int64_t offsets[2];
3550
        int offidx; /* next offset to set from response */
3551
        rd_ts_t ts_end;
3552
        int state_version; /* Broker state version */
3553
};
3554
3555
static void rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t *rk,
3556
                                                 rd_kafka_broker_t *rkb,
3557
                                                 rd_kafka_resp_err_t err,
3558
                                                 rd_kafka_buf_t *rkbuf,
3559
                                                 rd_kafka_buf_t *request,
3560
0
                                                 void *opaque) {
3561
0
        struct _query_wmark_offsets_state *state;
3562
0
        rd_kafka_topic_partition_list_t *offsets;
3563
0
        rd_kafka_topic_partition_t *rktpar;
3564
0
        int actions = 0;
3565
3566
0
        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3567
                /* 'state' has gone out of scope when query_watermark..()
3568
                 * timed out and returned to the caller. */
3569
0
                return;
3570
0
        }
3571
3572
0
        state = opaque;
3573
3574
0
        offsets = rd_kafka_topic_partition_list_new(1);
3575
0
        err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request, offsets,
3576
0
                                          &actions);
3577
3578
0
        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
3579
                /* Remove its cache in case the topic isn't a known topic. */
3580
0
                rd_kafka_wrlock(rk);
3581
0
                rd_kafka_metadata_cache_delete_by_name(rk, state->topic);
3582
0
                rd_kafka_wrunlock(rk);
3583
0
        }
3584
3585
0
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
3586
0
                rd_kafka_topic_partition_list_destroy(offsets);
3587
0
                return; /* Retrying */
3588
0
        }
3589
3590
        /* Retry if no broker connection is available yet. */
3591
0
        if (err == RD_KAFKA_RESP_ERR__TRANSPORT && rkb &&
3592
0
            rd_kafka_brokers_wait_state_change(
3593
0
                rkb->rkb_rk, state->state_version,
3594
0
                rd_timeout_remains(state->ts_end))) {
3595
                /* Retry */
3596
0
                state->state_version   = rd_kafka_brokers_get_state_version(rk);
3597
0
                request->rkbuf_retries = 0;
3598
0
                if (rd_kafka_buf_retry(rkb, request)) {
3599
0
                        rd_kafka_topic_partition_list_destroy(offsets);
3600
0
                        return; /* Retry in progress */
3601
0
                }
3602
                /* FALLTHRU */
3603
0
        }
3604
3605
0
        rktpar = rd_kafka_topic_partition_list_find(offsets, state->topic,
3606
0
                                                    state->partition);
3607
0
        if (!rktpar && err > RD_KAFKA_RESP_ERR__END) {
3608
                /* Partition not seen in response,
3609
                 * not a local error. */
3610
0
                err = RD_KAFKA_RESP_ERR__BAD_MSG;
3611
0
        } else if (rktpar) {
3612
0
                if (rktpar->err)
3613
0
                        err = rktpar->err;
3614
0
                else
3615
0
                        state->offsets[state->offidx] = rktpar->offset;
3616
0
        }
3617
3618
0
        state->offidx++;
3619
3620
0
        if (err || state->offidx == 2) /* Error or Done */
3621
0
                state->err = err;
3622
3623
0
        rd_kafka_topic_partition_list_destroy(offsets);
3624
0
}
3625
3626
3627
rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk,
3628
                                                     const char *topic,
3629
                                                     int32_t partition,
3630
                                                     int64_t *low,
3631
                                                     int64_t *high,
3632
0
                                                     int timeout_ms) {
3633
0
        rd_kafka_q_t *rkq;
3634
0
        struct _query_wmark_offsets_state state;
3635
0
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3636
0
        rd_kafka_topic_partition_list_t *partitions;
3637
0
        rd_kafka_topic_partition_t *rktpar;
3638
0
        struct rd_kafka_partition_leader *leader;
3639
0
        rd_list_t leaders;
3640
0
        rd_kafka_resp_err_t err;
3641
3642
0
        partitions = rd_kafka_topic_partition_list_new(1);
3643
0
        rktpar =
3644
0
            rd_kafka_topic_partition_list_add(partitions, topic, partition);
3645
3646
0
        rd_list_init(&leaders, partitions->cnt,
3647
0
                     (void *)rd_kafka_partition_leader_destroy);
3648
3649
0
        err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
3650
0
                                                          &leaders, timeout_ms);
3651
0
        if (err) {
3652
0
                rd_list_destroy(&leaders);
3653
0
                rd_kafka_topic_partition_list_destroy(partitions);
3654
0
                return err;
3655
0
        }
3656
3657
0
        leader = rd_list_elem(&leaders, 0);
3658
3659
0
        rkq = rd_kafka_q_new(rk);
3660
3661
        /* Due to KAFKA-1588 we need to send a request for each wanted offset,
3662
         * in this case one for the low watermark and one for the high. */
3663
0
        state.topic         = topic;
3664
0
        state.partition     = partition;
3665
0
        state.offsets[0]    = RD_KAFKA_OFFSET_BEGINNING;
3666
0
        state.offsets[1]    = RD_KAFKA_OFFSET_END;
3667
0
        state.offidx        = 0;
3668
0
        state.err           = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3669
0
        state.ts_end        = ts_end;
3670
0
        state.state_version = rd_kafka_brokers_get_state_version(rk);
3671
3672
0
        rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
3673
0
        rd_kafka_ListOffsetsRequest(
3674
0
            leader->rkb, partitions, RD_KAFKA_REPLYQ(rkq, 0),
3675
0
            rd_kafka_query_wmark_offsets_resp_cb, timeout_ms, &state);
3676
3677
0
        rktpar->offset = RD_KAFKA_OFFSET_END;
3678
0
        rd_kafka_ListOffsetsRequest(
3679
0
            leader->rkb, partitions, RD_KAFKA_REPLYQ(rkq, 0),
3680
0
            rd_kafka_query_wmark_offsets_resp_cb, timeout_ms, &state);
3681
3682
0
        rd_kafka_topic_partition_list_destroy(partitions);
3683
0
        rd_list_destroy(&leaders);
3684
3685
        /* Wait for reply (or timeout) */
3686
0
        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
3687
0
                rd_kafka_q_serve(rkq, RD_POLL_INFINITE, 0,
3688
0
                                 RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb,
3689
0
                                 NULL);
3690
0
        }
3691
3692
0
        rd_kafka_q_destroy_owner(rkq);
3693
3694
0
        if (state.err)
3695
0
                return state.err;
3696
0
        else if (state.offidx != 2)
3697
0
                return RD_KAFKA_RESP_ERR__FAIL;
3698
3699
        /* We are not certain about the returned order. */
3700
0
        if (state.offsets[0] < state.offsets[1]) {
3701
0
                *low  = state.offsets[0];
3702
0
                *high = state.offsets[1];
3703
0
        } else {
3704
0
                *low  = state.offsets[1];
3705
0
                *high = state.offsets[0];
3706
0
        }
3707
3708
        /* If partition is empty only one offset (the last) will be returned. */
3709
0
        if (*low < 0 && *high >= 0)
3710
0
                *low = *high;
3711
3712
0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
3713
0
}
3714
3715
3716
rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t *rk,
3717
                                                   const char *topic,
3718
                                                   int32_t partition,
3719
                                                   int64_t *low,
3720
0
                                                   int64_t *high) {
3721
0
        rd_kafka_toppar_t *rktp;
3722
3723
0
        rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
3724
0
        if (!rktp)
3725
0
                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3726
3727
0
        rd_kafka_toppar_lock(rktp);
3728
0
        *low  = rktp->rktp_lo_offset;
3729
0
        *high = rktp->rktp_hi_offset;
3730
0
        rd_kafka_toppar_unlock(rktp);
3731
3732
0
        rd_kafka_toppar_destroy(rktp);
3733
3734
0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
3735
0
}
3736
3737
3738
/**
3739
 * @brief get_offsets_for_times() state
3740
 */
3741
struct _get_offsets_for_times {
3742
        rd_kafka_topic_partition_list_t *results;
3743
        rd_kafka_resp_err_t err;
3744
        int wait_reply;
3745
        int state_version;
3746
        rd_ts_t ts_end;
3747
};
3748
3749
/**
3750
 * @brief Handle OffsetRequest responses
3751
 */
3752
static void rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t *rk,
3753
                                                   rd_kafka_broker_t *rkb,
3754
                                                   rd_kafka_resp_err_t err,
3755
                                                   rd_kafka_buf_t *rkbuf,
3756
                                                   rd_kafka_buf_t *request,
3757
0
                                                   void *opaque) {
3758
0
        struct _get_offsets_for_times *state;
3759
3760
0
        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3761
                /* 'state' has gone out of scope when offsets_for_times()
3762
                 * timed out and returned to the caller. */
3763
0
                return;
3764
0
        }
3765
3766
0
        state = opaque;
3767
3768
0
        err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
3769
0
                                          state->results, NULL);
3770
0
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
3771
0
                return; /* Retrying */
3772
3773
        /* Retry if no broker connection is available yet. */
3774
0
        if (err == RD_KAFKA_RESP_ERR__TRANSPORT && rkb &&
3775
0
            rd_kafka_brokers_wait_state_change(
3776
0
                rkb->rkb_rk, state->state_version,
3777
0
                rd_timeout_remains(state->ts_end))) {
3778
                /* Retry */
3779
0
                state->state_version   = rd_kafka_brokers_get_state_version(rk);
3780
0
                request->rkbuf_retries = 0;
3781
0
                if (rd_kafka_buf_retry(rkb, request))
3782
0
                        return; /* Retry in progress */
3783
                /* FALLTHRU */
3784
0
        }
3785
3786
0
        if (err && !state->err)
3787
0
                state->err = err;
3788
3789
0
        state->wait_reply--;
3790
0
}
3791
3792
3793
rd_kafka_resp_err_t
3794
rd_kafka_offsets_for_times(rd_kafka_t *rk,
3795
                           rd_kafka_topic_partition_list_t *offsets,
3796
0
                           int timeout_ms) {
3797
0
        rd_kafka_q_t *rkq;
3798
0
        struct _get_offsets_for_times state = RD_ZERO_INIT;
3799
0
        rd_ts_t ts_end                      = rd_timeout_init(timeout_ms);
3800
0
        rd_list_t leaders;
3801
0
        int i;
3802
0
        rd_kafka_resp_err_t err;
3803
0
        struct rd_kafka_partition_leader *leader;
3804
0
        int tmout;
3805
3806
0
        if (offsets->cnt == 0)
3807
0
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
3808
3809
0
        rd_list_init(&leaders, offsets->cnt,
3810
0
                     (void *)rd_kafka_partition_leader_destroy);
3811
3812
0
        err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
3813
0
                                                          timeout_ms);
3814
0
        if (err) {
3815
0
                rd_list_destroy(&leaders);
3816
0
                return err;
3817
0
        }
3818
3819
3820
0
        rkq = rd_kafka_q_new(rk);
3821
3822
0
        state.wait_reply = 0;
3823
0
        state.results    = rd_kafka_topic_partition_list_new(offsets->cnt);
3824
3825
        /* For each leader send a request for its partitions */
3826
0
        RD_LIST_FOREACH(leader, &leaders, i) {
3827
0
                state.wait_reply++;
3828
0
                rd_kafka_ListOffsetsRequest(
3829
0
                    leader->rkb, leader->partitions, RD_KAFKA_REPLYQ(rkq, 0),
3830
0
                    rd_kafka_get_offsets_for_times_resp_cb, timeout_ms, &state);
3831
0
        }
3832
3833
0
        rd_list_destroy(&leaders);
3834
3835
        /* Wait for reply (or timeout) */
3836
0
        while (state.wait_reply > 0 &&
3837
0
               !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
3838
0
                rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
3839
0
                                 rd_kafka_poll_cb, NULL);
3840
3841
0
        rd_kafka_q_destroy_owner(rkq);
3842
3843
0
        if (state.wait_reply > 0 && !state.err)
3844
0
                state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3845
3846
        /* Then update the queried partitions. */
3847
0
        if (!state.err)
3848
0
                rd_kafka_topic_partition_list_update(offsets, state.results);
3849
3850
0
        rd_kafka_topic_partition_list_destroy(state.results);
3851
3852
0
        return state.err;
3853
0
}
3854
3855
3856
/**
3857
 * @brief rd_kafka_poll() (and similar) op callback handler.
3858
 *        Will either call registered callback depending on cb_type and op type
3859
 *        or return op to application, if applicable (e.g., fetch message).
3860
 *
3861
 * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the
3862
 *          other res types (such as OP_RES_PASS).
3863
 *
3864
 * @locality any thread that serves op queues
3865
 */
3866
rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
3867
                                   rd_kafka_q_t *rkq,
3868
                                   rd_kafka_op_t *rko,
3869
                                   rd_kafka_q_cb_type_t cb_type,
3870
0
                                   void *opaque) {
3871
0
        rd_kafka_msg_t *rkm;
3872
0
        rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED;
3873
3874
        /* Special handling for events based on cb_type */
3875
0
        if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko)) {
3876
                /* Return-as-event requested. */
3877
0
                return RD_KAFKA_OP_RES_PASS; /* Return as event */
3878
0
        }
3879
3880
0
        switch ((int)rko->rko_type) {
3881
0
        case RD_KAFKA_OP_FETCH:
3882
0
                if (!rk->rk_conf.consume_cb ||
3883
0
                    cb_type == RD_KAFKA_Q_CB_RETURN ||
3884
0
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3885
0
                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3886
0
                else {
3887
0
                        struct consume_ctx ctx = {.consume_cb =
3888
0
                                                      rk->rk_conf.consume_cb,
3889
0
                                                  .opaque = rk->rk_conf.opaque};
3890
3891
0
                        return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
3892
0
                }
3893
0
                break;
3894
3895
0
        case RD_KAFKA_OP_REBALANCE:
3896
0
                if (rk->rk_conf.rebalance_cb)
3897
0
                        rk->rk_conf.rebalance_cb(
3898
0
                            rk, rko->rko_err, rko->rko_u.rebalance.partitions,
3899
0
                            rk->rk_conf.opaque);
3900
0
                else {
3901
                        /** If EVENT_REBALANCE is enabled but rebalance_cb
3902
                         *  isn't, we need to perform a dummy assign for the
3903
                         *  application. This might happen during termination
3904
                         *  with consumer_close() */
3905
0
                        rd_kafka_dbg(rk, CGRP, "UNASSIGN",
3906
0
                                     "Forcing unassign of %d partition(s)",
3907
0
                                     rko->rko_u.rebalance.partitions
3908
0
                                         ? rko->rko_u.rebalance.partitions->cnt
3909
0
                                         : 0);
3910
0
                        rd_kafka_assign(rk, NULL);
3911
0
                }
3912
0
                break;
3913
3914
0
        case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
3915
0
                if (!rko->rko_u.offset_commit.cb)
3916
0
                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3917
0
                rko->rko_u.offset_commit.cb(rk, rko->rko_err,
3918
0
                                            rko->rko_u.offset_commit.partitions,
3919
0
                                            rko->rko_u.offset_commit.opaque);
3920
0
                break;
3921
3922
0
        case RD_KAFKA_OP_FETCH_STOP | RD_KAFKA_OP_REPLY:
3923
                /* Reply from toppar FETCH_STOP */
3924
0
                rd_kafka_assignment_partition_stopped(rk, rko->rko_rktp);
3925
0
                break;
3926
3927
0
        case RD_KAFKA_OP_CONSUMER_ERR:
3928
                /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
3929
                 *   Consumer errors are returned to the application
3930
                 *   as rkmessages, not error callbacks.
3931
                 *
3932
                 * rd_kafka_poll() (_Q_CB_GLOBAL):
3933
                 *   convert to ERR op (fallthru)
3934
                 */
3935
0
                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3936
0
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
3937
                        /* return as message_t to application */
3938
0
                        return RD_KAFKA_OP_RES_PASS;
3939
0
                }
3940
                /* FALLTHRU */
3941
3942
0
        case RD_KAFKA_OP_ERR:
3943
0
                if (rk->rk_conf.error_cb)
3944
0
                        rk->rk_conf.error_cb(rk, rko->rko_err,
3945
0
                                             rko->rko_u.err.errstr,
3946
0
                                             rk->rk_conf.opaque);
3947
0
                else
3948
0
                        rd_kafka_log(rk, LOG_ERR, "ERROR", "%s: %s",
3949
0
                                     rk->rk_name, rko->rko_u.err.errstr);
3950
0
                break;
3951
3952
0
        case RD_KAFKA_OP_DR:
3953
                /* Delivery report:
3954
                 * call application DR callback for each message. */
3955
0
                while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
3956
0
                        rd_kafka_message_t *rkmessage;
3957
3958
0
                        TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs, rkm,
3959
0
                                     rkm_link);
3960
3961
0
                        rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
3962
3963
0
                        if (likely(rk->rk_conf.dr_msg_cb != NULL)) {
3964
0
                                rk->rk_conf.dr_msg_cb(rk, rkmessage,
3965
0
                                                      rk->rk_conf.opaque);
3966
3967
0
                        } else if (rk->rk_conf.dr_cb) {
3968
0
                                rk->rk_conf.dr_cb(
3969
0
                                    rk, rkmessage->payload, rkmessage->len,
3970
0
                                    rkmessage->err, rk->rk_conf.opaque,
3971
0
                                    rkmessage->_private);
3972
0
                        } else if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
3973
0
                                rd_kafka_log(
3974
0
                                    rk, LOG_WARNING, "DRDROP",
3975
0
                                    "Dropped delivery report for "
3976
0
                                    "message to "
3977
0
                                    "%s [%" PRId32
3978
0
                                    "] (%s) with "
3979
0
                                    "opaque %p: flush() or poll() "
3980
0
                                    "should not be called when "
3981
0
                                    "EVENT_DR is enabled",
3982
0
                                    rd_kafka_topic_name(rkmessage->rkt),
3983
0
                                    rkmessage->partition,
3984
0
                                    rd_kafka_err2name(rkmessage->err),
3985
0
                                    rkmessage->_private);
3986
0
                        } else {
3987
0
                                rd_assert(!*"BUG: neither a delivery report "
3988
0
                                          "callback or EVENT_DR flag set");
3989
0
                        }
3990
3991
0
                        rd_kafka_msg_destroy(rk, rkm);
3992
3993
0
                        if (unlikely(rd_kafka_yield_thread)) {
3994
                                /* Callback called yield(),
3995
                                 * re-enqueue the op (if there are any
3996
                                 * remaining messages). */
3997
0
                                if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.rkmq_msgs))
3998
0
                                        rd_kafka_q_reenq(rkq, rko);
3999
0
                                else
4000
0
                                        rd_kafka_op_destroy(rko);
4001
0
                                return RD_KAFKA_OP_RES_YIELD;
4002
0
                        }
4003
0
                }
4004
4005
0
                rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
4006
4007
0
                break;
4008
4009
0
        case RD_KAFKA_OP_THROTTLE:
4010
0
                if (rk->rk_conf.throttle_cb)
4011
0
                        rk->rk_conf.throttle_cb(
4012
0
                            rk, rko->rko_u.throttle.nodename,
4013
0
                            rko->rko_u.throttle.nodeid,
4014
0
                            rko->rko_u.throttle.throttle_time,
4015
0
                            rk->rk_conf.opaque);
4016
0
                break;
4017
4018
0
        case RD_KAFKA_OP_STATS:
4019
                /* Statistics */
4020
0
                if (rk->rk_conf.stats_cb &&
4021
0
                    rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
4022
0
                                         rko->rko_u.stats.json_len,
4023
0
                                         rk->rk_conf.opaque) == 1)
4024
0
                        rko->rko_u.stats.json =
4025
0
                            NULL; /* Application wanted json ptr */
4026
0
                break;
4027
4028
0
        case RD_KAFKA_OP_LOG:
4029
0
                if (likely(rk->rk_conf.log_cb &&
4030
0
                           rk->rk_conf.log_level >= rko->rko_u.log.level))
4031
0
                        rk->rk_conf.log_cb(rk, rko->rko_u.log.level,
4032
0
                                           rko->rko_u.log.fac,
4033
0
                                           rko->rko_u.log.str);
4034
0
                break;
4035
4036
0
        case RD_KAFKA_OP_TERMINATE:
4037
                /* nop: just a wake-up */
4038
0
                res = RD_KAFKA_OP_RES_YIELD;
4039
0
                rd_kafka_op_destroy(rko);
4040
0
                break;
4041
4042
0
        case RD_KAFKA_OP_CREATETOPICS:
4043
0
        case RD_KAFKA_OP_DELETETOPICS:
4044
0
        case RD_KAFKA_OP_CREATEPARTITIONS:
4045
0
        case RD_KAFKA_OP_ALTERCONFIGS:
4046
0
        case RD_KAFKA_OP_INCREMENTALALTERCONFIGS:
4047
0
        case RD_KAFKA_OP_DESCRIBECONFIGS:
4048
0
        case RD_KAFKA_OP_DELETERECORDS:
4049
0
        case RD_KAFKA_OP_DELETEGROUPS:
4050
0
        case RD_KAFKA_OP_ADMIN_FANOUT:
4051
0
        case RD_KAFKA_OP_CREATEACLS:
4052
0
        case RD_KAFKA_OP_DESCRIBEACLS:
4053
0
        case RD_KAFKA_OP_DELETEACLS:
4054
0
        case RD_KAFKA_OP_LISTOFFSETS:
4055
                /* Calls op_destroy() from worker callback,
4056
                 * when the time comes. */
4057
0
                res = rd_kafka_op_call(rk, rkq, rko);
4058
0
                break;
4059
4060
0
        case RD_KAFKA_OP_ADMIN_RESULT:
4061
0
                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
4062
0
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
4063
0
                        return RD_KAFKA_OP_RES_PASS; /* Don't handle here */
4064
4065
                /* Op is silently destroyed below */
4066
0
                break;
4067
4068
0
        case RD_KAFKA_OP_TXN:
4069
                /* Must only be handled by rdkafka main thread */
4070
0
                rd_assert(thrd_is_current(rk->rk_thread));
4071
0
                res = rd_kafka_op_call(rk, rkq, rko);
4072
0
                break;
4073
4074
0
        case RD_KAFKA_OP_BARRIER:
4075
0
                break;
4076
4077
0
        case RD_KAFKA_OP_PURGE:
4078
0
                rd_kafka_purge(rk, rko->rko_u.purge.flags);
4079
0
                break;
4080
4081
0
        default:
4082
                /* If op has a callback set (e.g., OAUTHBEARER_REFRESH),
4083
                 * call it. */
4084
0
                if (rko->rko_type & RD_KAFKA_OP_CB) {
4085
0
                        res = rd_kafka_op_call(rk, rkq, rko);
4086
0
                        break;
4087
0
                }
4088
4089
0
                RD_BUG("Can't handle op type %s (0x%x)",
4090
0
                       rd_kafka_op2str(rko->rko_type), rko->rko_type);
4091
0
                break;
4092
0
        }
4093
4094
0
        if (res == RD_KAFKA_OP_RES_HANDLED)
4095
0
                rd_kafka_op_destroy(rko);
4096
4097
0
        return res;
4098
0
}
4099
4100
0
int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
4101
0
        int r;
4102
4103
0
        r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
4104
0
                             rd_kafka_poll_cb, NULL);
4105
0
        return r;
4106
0
}
4107
4108
4109
0
rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
4110
0
        rd_kafka_op_t *rko;
4111
4112
0
        rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
4113
0
                                   RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
4114
4115
4116
0
        if (!rko)
4117
0
                return NULL;
4118
4119
0
        return rko;
4120
0
}
4121
4122
0
int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
4123
0
        int r;
4124
4125
0
        r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
4126
0
                             RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
4127
0
        return r;
4128
0
}
4129
4130
4131
4132
static void
4133
0
rd_kafka_toppar_dump(FILE *fp, const char *indent, rd_kafka_toppar_t *rktp) {
4134
4135
0
        fprintf(fp,
4136
0
                "%s%.*s [%" PRId32
4137
0
                "] broker %s, "
4138
0
                "leader_id %s\n",
4139
0
                indent, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4140
0
                rktp->rktp_partition,
4141
0
                rktp->rktp_broker ? rktp->rktp_broker->rkb_name : "none",
4142
0
                rktp->rktp_leader ? rktp->rktp_leader->rkb_name : "none");
4143
0
        fprintf(fp,
4144
0
                "%s refcnt %i\n"
4145
0
                "%s msgq:      %i messages\n"
4146
0
                "%s xmit_msgq: %i messages\n"
4147
0
                "%s total:     %" PRIu64 " messages, %" PRIu64 " bytes\n",
4148
0
                indent, rd_refcnt_get(&rktp->rktp_refcnt), indent,
4149
0
                rktp->rktp_msgq.rkmq_msg_cnt, indent,
4150
0
                rktp->rktp_xmit_msgq.rkmq_msg_cnt, indent,
4151
0
                rd_atomic64_get(&rktp->rktp_c.tx_msgs),
4152
0
                rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes));
4153
0
}
4154
4155
0
static void rd_kafka_broker_dump(FILE *fp, rd_kafka_broker_t *rkb, int locks) {
4156
0
        rd_kafka_toppar_t *rktp;
4157
4158
0
        if (locks)
4159
0
                rd_kafka_broker_lock(rkb);
4160
0
        fprintf(fp,
4161
0
                " rd_kafka_broker_t %p: %s NodeId %" PRId32
4162
0
                " in state %s (for %.3fs)\n",
4163
0
                rkb, rkb->rkb_name, rkb->rkb_nodeid,
4164
0
                rd_kafka_broker_state_names[rkb->rkb_state],
4165
0
                rkb->rkb_ts_state
4166
0
                    ? (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f
4167
0
                    : 0.0f);
4168
0
        fprintf(fp, "  refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
4169
0
        fprintf(fp, "  outbuf_cnt: %i waitresp_cnt: %i\n",
4170
0
                rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
4171
0
                rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
4172
0
        fprintf(fp,
4173
0
                "  %" PRIu64 " messages sent, %" PRIu64
4174
0
                " bytes, "
4175
0
                "%" PRIu64 " errors, %" PRIu64
4176
0
                " timeouts\n"
4177
0
                "  %" PRIu64 " messages received, %" PRIu64
4178
0
                " bytes, "
4179
0
                "%" PRIu64
4180
0
                " errors\n"
4181
0
                "  %" PRIu64 " messageset transmissions were retried\n",
4182
0
                rd_atomic64_get(&rkb->rkb_c.tx),
4183
0
                rd_atomic64_get(&rkb->rkb_c.tx_bytes),
4184
0
                rd_atomic64_get(&rkb->rkb_c.tx_err),
4185
0
                rd_atomic64_get(&rkb->rkb_c.req_timeouts),
4186
0
                rd_atomic64_get(&rkb->rkb_c.rx),
4187
0
                rd_atomic64_get(&rkb->rkb_c.rx_bytes),
4188
0
                rd_atomic64_get(&rkb->rkb_c.rx_err),
4189
0
                rd_atomic64_get(&rkb->rkb_c.tx_retries));
4190
4191
0
        fprintf(fp, "  %i toppars:\n", rkb->rkb_toppar_cnt);
4192
0
        TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
4193
0
        rd_kafka_toppar_dump(fp, "   ", rktp);
4194
0
        if (locks) {
4195
0
                rd_kafka_broker_unlock(rkb);
4196
0
        }
4197
0
}
4198
4199
4200
0
static void rd_kafka_dump0(FILE *fp, rd_kafka_t *rk, int locks) {
4201
0
        rd_kafka_broker_t *rkb;
4202
0
        rd_kafka_topic_t *rkt;
4203
0
        rd_kafka_toppar_t *rktp;
4204
0
        int i;
4205
0
        unsigned int tot_cnt;
4206
0
        size_t tot_size;
4207
4208
0
        rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
4209
4210
0
        if (locks)
4211
0
                rd_kafka_rdlock(rk);
4212
#if ENABLE_DEVEL
4213
        fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
4214
#endif
4215
0
        fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
4216
4217
0
        fprintf(fp, " producer.msg_cnt %u (%" PRIusz " bytes)\n", tot_cnt,
4218
0
                tot_size);
4219
0
        fprintf(fp, " rk_rep reply queue: %i ops\n",
4220
0
                rd_kafka_q_len(rk->rk_rep));
4221
4222
0
        fprintf(fp, " brokers:\n");
4223
0
        if (locks)
4224
0
                mtx_lock(&rk->rk_internal_rkb_lock);
4225
0
        if (rk->rk_internal_rkb)
4226
0
                rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
4227
0
        if (locks)
4228
0
                mtx_unlock(&rk->rk_internal_rkb_lock);
4229
4230
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4231
0
                rd_kafka_broker_dump(fp, rkb, locks);
4232
0
        }
4233
4234
0
        fprintf(fp, " cgrp:\n");
4235
0
        if (rk->rk_cgrp) {
4236
0
                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
4237
0
                fprintf(fp, "  %.*s in state %s, flags 0x%x\n",
4238
0
                        RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4239
0
                        rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4240
0
                        rkcg->rkcg_flags);
4241
0
                fprintf(fp, "   coord_id %" PRId32 ", broker %s\n",
4242
0
                        rkcg->rkcg_coord_id,
4243
0
                        rkcg->rkcg_curr_coord
4244
0
                            ? rd_kafka_broker_name(rkcg->rkcg_curr_coord)
4245
0
                            : "(none)");
4246
4247
0
                fprintf(fp, "  toppars:\n");
4248
0
                RD_LIST_FOREACH(rktp, &rkcg->rkcg_toppars, i) {
4249
0
                        fprintf(fp, "   %.*s [%" PRId32 "] in state %s\n",
4250
0
                                RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4251
0
                                rktp->rktp_partition,
4252
0
                                rd_kafka_fetch_states[rktp->rktp_fetch_state]);
4253
0
                }
4254
0
        }
4255
4256
0
        fprintf(fp, " topics:\n");
4257
0
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4258
0
                fprintf(fp,
4259
0
                        "  %.*s with %" PRId32
4260
0
                        " partitions, state %s, "
4261
0
                        "refcnt %i\n",
4262
0
                        RD_KAFKAP_STR_PR(rkt->rkt_topic),
4263
0
                        rkt->rkt_partition_cnt,
4264
0
                        rd_kafka_topic_state_names[rkt->rkt_state],
4265
0
                        rd_refcnt_get(&rkt->rkt_refcnt));
4266
0
                if (rkt->rkt_ua)
4267
0
                        rd_kafka_toppar_dump(fp, "   ", rkt->rkt_ua);
4268
0
                if (rd_list_empty(&rkt->rkt_desp)) {
4269
0
                        fprintf(fp, "   desired partitions:");
4270
0
                        RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
4271
0
                        fprintf(fp, " %" PRId32, rktp->rktp_partition);
4272
0
                        fprintf(fp, "\n");
4273
0
                }
4274
0
        }
4275
4276
0
        fprintf(fp, "\n");
4277
0
        rd_kafka_metadata_cache_dump(fp, rk);
4278
4279
0
        if (locks)
4280
0
                rd_kafka_rdunlock(rk);
4281
0
}
4282
4283
0
void rd_kafka_dump(FILE *fp, rd_kafka_t *rk) {
4284
0
        if (rk)
4285
0
                rd_kafka_dump0(fp, rk, 1 /*locks*/);
4286
0
}
4287
4288
4289
4290
0
const char *rd_kafka_name(const rd_kafka_t *rk) {
4291
0
        return rk->rk_name;
4292
0
}
4293
4294
0
rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
4295
0
        return rk->rk_type;
4296
0
}
4297
4298
4299
0
char *rd_kafka_memberid(const rd_kafka_t *rk) {
4300
0
        rd_kafka_op_t *rko;
4301
0
        rd_kafka_cgrp_t *rkcg;
4302
0
        char *memberid;
4303
4304
0
        if (!(rkcg = rd_kafka_cgrp_get(rk)))
4305
0
                return NULL;
4306
4307
0
        rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
4308
0
        if (!rko)
4309
0
                return NULL;
4310
0
        memberid            = rko->rko_u.name.str;
4311
0
        rko->rko_u.name.str = NULL;
4312
0
        rd_kafka_op_destroy(rko);
4313
4314
0
        return memberid;
4315
0
}
4316
4317
4318
0
char *rd_kafka_clusterid(rd_kafka_t *rk, int timeout_ms) {
4319
0
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
4320
4321
        /* ClusterId is returned in Metadata >=V2 responses and
4322
         * cached on the rk. If no cached value is available
4323
         * it means no metadata has been received yet, or we're
4324
         * using a lower protocol version
4325
         * (e.g., lack of api.version.request=true). */
4326
4327
0
        while (1) {
4328
0
                int remains_ms;
4329
4330
0
                rd_kafka_rdlock(rk);
4331
4332
0
                if (rk->rk_clusterid) {
4333
                        /* Cached clusterid available. */
4334
0
                        char *ret = rd_strdup(rk->rk_clusterid);
4335
0
                        rd_kafka_rdunlock(rk);
4336
0
                        return ret;
4337
0
                } else if (rk->rk_ts_metadata > 0) {
4338
                        /* Metadata received but no clusterid,
4339
                         * this probably means the broker is too old
4340
                         * or api.version.request=false. */
4341
0
                        rd_kafka_rdunlock(rk);
4342
0
                        return NULL;
4343
0
                }
4344
4345
0
                rd_kafka_rdunlock(rk);
4346
4347
                /* Wait for up to timeout_ms for a metadata refresh,
4348
                 * if permitted by application. */
4349
0
                remains_ms = rd_timeout_remains(abs_timeout);
4350
0
                if (rd_timeout_expired(remains_ms))
4351
0
                        return NULL;
4352
4353
0
                rd_kafka_metadata_cache_wait_change(rk, remains_ms);
4354
0
        }
4355
4356
0
        return NULL;
4357
0
}
4358
4359
4360
0
int32_t rd_kafka_controllerid(rd_kafka_t *rk, int timeout_ms) {
4361
0
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
4362
4363
        /* ControllerId is returned in Metadata >=V1 responses and
4364
         * cached on the rk. If no cached value is available
4365
         * it means no metadata has been received yet, or we're
4366
         * using a lower protocol version
4367
         * (e.g., lack of api.version.request=true). */
4368
4369
0
        while (1) {
4370
0
                int remains_ms;
4371
0
                int version;
4372
4373
0
                version = rd_kafka_brokers_get_state_version(rk);
4374
4375
0
                rd_kafka_rdlock(rk);
4376
4377
0
                if (rk->rk_controllerid != -1) {
4378
                        /* Cached controllerid available. */
4379
0
                        rd_kafka_rdunlock(rk);
4380
0
                        return rk->rk_controllerid;
4381
0
                } else if (rk->rk_ts_metadata > 0) {
4382
                        /* Metadata received but no clusterid,
4383
                         * this probably means the broker is too old
4384
                         * or api.version.request=false. */
4385
0
                        rd_kafka_rdunlock(rk);
4386
0
                        return -1;
4387
0
                }
4388
4389
0
                rd_kafka_rdunlock(rk);
4390
4391
                /* Wait for up to timeout_ms for a metadata refresh,
4392
                 * if permitted by application. */
4393
0
                remains_ms = rd_timeout_remains(abs_timeout);
4394
0
                if (rd_timeout_expired(remains_ms))
4395
0
                        return -1;
4396
4397
0
                rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
4398
0
        }
4399
4400
0
        return -1;
4401
0
}
4402
4403
4404
0
void *rd_kafka_opaque(const rd_kafka_t *rk) {
4405
0
        return rk->rk_conf.opaque;
4406
0
}
4407
4408
4409
0
int rd_kafka_outq_len(rd_kafka_t *rk) {
4410
0
        return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) +
4411
0
               (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0);
4412
0
}
4413
4414
4415
0
rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms) {
4416
0
        unsigned int msg_cnt = 0;
4417
4418
0
        if (rk->rk_type != RD_KAFKA_PRODUCER)
4419
0
                return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
4420
4421
0
        rd_kafka_yield_thread = 0;
4422
4423
        /* Set flushing flag on the producer for the duration of the
4424
         * flush() call. This tells producer_serve() that the linger.ms
4425
         * time should be considered immediate. */
4426
0
        rd_atomic32_add(&rk->rk_flushing, 1);
4427
4428
        /* Wake up all broker threads to trigger the produce_serve() call.
4429
         * If this flush() call finishes before the broker wakes up
4430
         * then no flushing will be performed by that broker thread. */
4431
0
        rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_UP, "flushing");
4432
4433
0
        if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
4434
                /* Application wants delivery reports as events rather
4435
                 * than callbacks, we must thus not serve this queue
4436
                 * with rd_kafka_poll() since that would trigger non-existent
4437
                 * delivery report callbacks, which would result
4438
                 * in the delivery reports being dropped.
4439
                 * Instead we rely on the application to serve the event
4440
                 * queue in another thread, so all we do here is wait
4441
                 * for the current message count to reach zero. */
4442
0
                rd_kafka_curr_msgs_wait_zero(rk, timeout_ms, &msg_cnt);
4443
4444
0
        } else {
4445
                /* Standard poll interface.
4446
                 *
4447
                 * First poll call is non-blocking for the case
4448
                 * where timeout_ms==RD_POLL_NOWAIT to make sure poll is
4449
                 * called at least once. */
4450
0
                rd_ts_t ts_end = rd_timeout_init(timeout_ms);
4451
0
                int tmout      = RD_POLL_NOWAIT;
4452
0
                int qlen       = 0;
4453
4454
0
                do {
4455
0
                        rd_kafka_poll(rk, tmout);
4456
0
                        qlen    = rd_kafka_q_len(rk->rk_rep);
4457
0
                        msg_cnt = rd_kafka_curr_msgs_cnt(rk);
4458
0
                } while (qlen + msg_cnt > 0 && !rd_kafka_yield_thread &&
4459
0
                         (tmout = rd_timeout_remains_limit(ts_end, 10)) !=
4460
0
                             RD_POLL_NOWAIT);
4461
4462
0
                msg_cnt += qlen;
4463
0
        }
4464
4465
0
        rd_atomic32_sub(&rk->rk_flushing, 1);
4466
4467
0
        return msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT
4468
0
                           : RD_KAFKA_RESP_ERR_NO_ERROR;
4469
0
}
4470
4471
/**
4472
 * @brief Purge the partition message queue (according to \p purge_flags) for
4473
 *        all toppars.
4474
 *
4475
 * This is a necessity to avoid the race condition when a purge() is scheduled
4476
 * shortly in-between an rktp has been created but before it has been
4477
 * joined to a broker handler thread.
4478
 *
4479
 * The rktp_xmit_msgq is handled by the broker-thread purge.
4480
 *
4481
 * @returns the number of messages purged.
4482
 *
4483
 * @locks_required rd_kafka_*lock()
4484
 * @locks_acquired rd_kafka_topic_rdlock()
4485
 */
4486
0
static int rd_kafka_purge_toppars(rd_kafka_t *rk, int purge_flags) {
4487
0
        rd_kafka_topic_t *rkt;
4488
0
        int cnt = 0;
4489
4490
0
        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4491
0
                rd_kafka_toppar_t *rktp;
4492
0
                int i;
4493
4494
0
                rd_kafka_topic_rdlock(rkt);
4495
0
                for (i = 0; i < rkt->rkt_partition_cnt; i++)
4496
0
                        cnt += rd_kafka_toppar_purge_queues(
4497
0
                            rkt->rkt_p[i], purge_flags, rd_false /*!xmit*/);
4498
4499
0
                RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
4500
0
                cnt += rd_kafka_toppar_purge_queues(rktp, purge_flags,
4501
0
                                                    rd_false /*!xmit*/);
4502
4503
0
                if (rkt->rkt_ua)
4504
0
                        cnt += rd_kafka_toppar_purge_queues(
4505
0
                            rkt->rkt_ua, purge_flags, rd_false /*!xmit*/);
4506
0
                rd_kafka_topic_rdunlock(rkt);
4507
0
        }
4508
4509
0
        return cnt;
4510
0
}
4511
4512
4513
0
rd_kafka_resp_err_t rd_kafka_purge(rd_kafka_t *rk, int purge_flags) {
4514
0
        rd_kafka_broker_t *rkb;
4515
0
        rd_kafka_q_t *tmpq = NULL;
4516
0
        int waitcnt        = 0;
4517
4518
0
        if (rk->rk_type != RD_KAFKA_PRODUCER)
4519
0
                return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
4520
4521
        /* Check that future flags are not passed */
4522
0
        if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0)
4523
0
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
4524
4525
        /* Nothing to purge */
4526
0
        if (!purge_flags)
4527
0
                return RD_KAFKA_RESP_ERR_NO_ERROR;
4528
4529
        /* Set up a reply queue to wait for broker thread signalling
4530
         * completion, unless non-blocking. */
4531
0
        if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
4532
0
                tmpq = rd_kafka_q_new(rk);
4533
4534
0
        rd_kafka_rdlock(rk);
4535
4536
        /* Purge msgq for all toppars. */
4537
0
        rd_kafka_purge_toppars(rk, purge_flags);
4538
4539
        /* Send purge request to all broker threads */
4540
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4541
0
                rd_kafka_broker_purge_queues(rkb, purge_flags,
4542
0
                                             RD_KAFKA_REPLYQ(tmpq, 0));
4543
0
                waitcnt++;
4544
0
        }
4545
4546
0
        rd_kafka_rdunlock(rk);
4547
4548
4549
0
        if (tmpq) {
4550
                /* Wait for responses */
4551
0
                while (waitcnt-- > 0)
4552
0
                        rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
4553
4554
0
                rd_kafka_q_destroy_owner(tmpq);
4555
0
        }
4556
4557
        /* Purge messages for the UA(-1) partitions (which are not
4558
         * handled by a broker thread) */
4559
0
        if (purge_flags & RD_KAFKA_PURGE_F_QUEUE)
4560
0
                rd_kafka_purge_ua_toppar_queues(rk);
4561
4562
0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
4563
0
}
4564
4565
4566
4567
/**
4568
 * @returns a csv string of purge flags in thread-local storage
4569
 */
4570
0
const char *rd_kafka_purge_flags2str(int flags) {
4571
0
        static const char *names[] = {"queue", "inflight", "non-blocking",
4572
0
                                      NULL};
4573
0
        static RD_TLS char ret[64];
4574
4575
0
        return rd_flags2str(ret, sizeof(ret), names, flags);
4576
0
}
4577
4578
4579
0
int rd_kafka_version(void) {
4580
0
        return RD_KAFKA_VERSION;
4581
0
}
4582
4583
0
const char *rd_kafka_version_str(void) {
4584
0
        static RD_TLS char ret[128];
4585
0
        size_t of = 0, r;
4586
4587
0
        if (*ret)
4588
0
                return ret;
4589
4590
#ifdef LIBRDKAFKA_GIT_VERSION
4591
        if (*LIBRDKAFKA_GIT_VERSION) {
4592
                of = rd_snprintf(ret, sizeof(ret), "%s",
4593
                                 *LIBRDKAFKA_GIT_VERSION == 'v'
4594
                                     ? &LIBRDKAFKA_GIT_VERSION[1]
4595
                                     : LIBRDKAFKA_GIT_VERSION);
4596
                if (of > sizeof(ret))
4597
                        of = sizeof(ret);
4598
        }
4599
#endif
4600
4601
0
#define _my_sprintf(...)                                                       \
4602
0
        do {                                                                   \
4603
0
                r = rd_snprintf(ret + of, sizeof(ret) - of, __VA_ARGS__);      \
4604
0
                if (r > sizeof(ret) - of)                                      \
4605
0
                        r = sizeof(ret) - of;                                  \
4606
0
                of += r;                                                       \
4607
0
        } while (0)
4608
4609
0
        if (of == 0) {
4610
0
                int ver  = rd_kafka_version();
4611
0
                int prel = (ver & 0xff);
4612
0
                _my_sprintf("%i.%i.%i", (ver >> 24) & 0xff, (ver >> 16) & 0xff,
4613
0
                            (ver >> 8) & 0xff);
4614
0
                if (prel != 0xff) {
4615
                        /* pre-builds below 200 are just running numbers,
4616
                         * above 200 are RC numbers. */
4617
0
                        if (prel <= 200)
4618
0
                                _my_sprintf("-pre%d", prel);
4619
0
                        else
4620
0
                                _my_sprintf("-RC%d", prel - 200);
4621
0
                }
4622
0
        }
4623
4624
#if ENABLE_DEVEL
4625
        _my_sprintf("-devel");
4626
#endif
4627
4628
#if WITHOUT_OPTIMIZATION
4629
        _my_sprintf("-O0");
4630
#endif
4631
4632
0
        return ret;
4633
0
}
4634
4635
4636
/**
4637
 * Assert trampoline to print some debugging information on crash.
4638
 */
4639
void RD_NORETURN rd_kafka_crash(const char *file,
4640
                                int line,
4641
                                const char *function,
4642
                                rd_kafka_t *rk,
4643
0
                                const char *reason) {
4644
0
        fprintf(stderr, "*** %s:%i:%s: %s ***\n", file, line, function, reason);
4645
0
        if (rk)
4646
0
                rd_kafka_dump0(stderr, rk, 0 /*no locks*/);
4647
0
        abort();
4648
0
}
4649
4650
4651
4652
struct list_groups_state {
4653
        rd_kafka_q_t *q;
4654
        rd_kafka_resp_err_t err;
4655
        int wait_cnt;
4656
        const char *desired_group;
4657
        struct rd_kafka_group_list *grplist;
4658
        int grplist_size;
4659
};
4660
4661
static const char *rd_kafka_consumer_group_state_names[] = {
4662
    "Unknown", "PreparingRebalance", "CompletingRebalance", "Stable", "Dead",
4663
    "Empty"};
4664
4665
const char *
4666
0
rd_kafka_consumer_group_state_name(rd_kafka_consumer_group_state_t state) {
4667
0
        if (state < 0 || state >= RD_KAFKA_CONSUMER_GROUP_STATE__CNT)
4668
0
                return NULL;
4669
0
        return rd_kafka_consumer_group_state_names[state];
4670
0
}
4671
4672
rd_kafka_consumer_group_state_t
4673
0
rd_kafka_consumer_group_state_code(const char *name) {
4674
0
        size_t i;
4675
0
        for (i = 0; i < RD_KAFKA_CONSUMER_GROUP_STATE__CNT; i++) {
4676
0
                if (!rd_strcasecmp(rd_kafka_consumer_group_state_names[i],
4677
0
                                   name))
4678
0
                        return i;
4679
0
        }
4680
0
        return RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN;
4681
0
}
4682
4683
static void rd_kafka_DescribeGroups_resp_cb(rd_kafka_t *rk,
4684
                                            rd_kafka_broker_t *rkb,
4685
                                            rd_kafka_resp_err_t err,
4686
                                            rd_kafka_buf_t *reply,
4687
                                            rd_kafka_buf_t *request,
4688
0
                                            void *opaque) {
4689
0
        struct list_groups_state *state;
4690
0
        const int log_decode_errors = LOG_ERR;
4691
0
        int cnt;
4692
4693
0
        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4694
                /* 'state' has gone out of scope due to list_groups()
4695
                 * timing out and returning. */
4696
0
                return;
4697
0
        }
4698
4699
0
        state = opaque;
4700
0
        state->wait_cnt--;
4701
4702
0
        if (err)
4703
0
                goto err;
4704
4705
0
        rd_kafka_buf_read_i32(reply, &cnt);
4706
4707
0
        while (cnt-- > 0) {
4708
0
                int16_t ErrorCode;
4709
0
                rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
4710
0
                int MemberCnt;
4711
0
                struct rd_kafka_group_info *gi;
4712
4713
0
                if (state->grplist->group_cnt == state->grplist_size) {
4714
                        /* Grow group array */
4715
0
                        state->grplist_size *= 2;
4716
0
                        state->grplist->groups =
4717
0
                            rd_realloc(state->grplist->groups,
4718
0
                                       state->grplist_size *
4719
0
                                           sizeof(*state->grplist->groups));
4720
0
                }
4721
4722
0
                gi = &state->grplist->groups[state->grplist->group_cnt++];
4723
0
                memset(gi, 0, sizeof(*gi));
4724
4725
0
                rd_kafka_buf_read_i16(reply, &ErrorCode);
4726
0
                rd_kafka_buf_read_str(reply, &Group);
4727
0
                rd_kafka_buf_read_str(reply, &GroupState);
4728
0
                rd_kafka_buf_read_str(reply, &ProtoType);
4729
0
                rd_kafka_buf_read_str(reply, &Proto);
4730
0
                rd_kafka_buf_read_i32(reply, &MemberCnt);
4731
4732
0
                if (MemberCnt > 100000) {
4733
0
                        err = RD_KAFKA_RESP_ERR__BAD_MSG;
4734
0
                        goto err;
4735
0
                }
4736
4737
0
                rd_kafka_broker_lock(rkb);
4738
0
                gi->broker.id   = rkb->rkb_nodeid;
4739
0
                gi->broker.host = rd_strdup(rkb->rkb_origname);
4740
0
                gi->broker.port = rkb->rkb_port;
4741
0
                rd_kafka_broker_unlock(rkb);
4742
4743
0
                gi->err           = ErrorCode;
4744
0
                gi->group         = RD_KAFKAP_STR_DUP(&Group);
4745
0
                gi->state         = RD_KAFKAP_STR_DUP(&GroupState);
4746
0
                gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
4747
0
                gi->protocol      = RD_KAFKAP_STR_DUP(&Proto);
4748
4749
0
                if (MemberCnt > 0)
4750
0
                        gi->members =
4751
0
                            rd_malloc(MemberCnt * sizeof(*gi->members));
4752
4753
0
                while (MemberCnt-- > 0) {
4754
0
                        rd_kafkap_str_t MemberId, ClientId, ClientHost;
4755
0
                        rd_kafkap_bytes_t Meta, Assignment;
4756
0
                        struct rd_kafka_group_member_info *mi;
4757
4758
0
                        mi = &gi->members[gi->member_cnt++];
4759
0
                        memset(mi, 0, sizeof(*mi));
4760
4761
0
                        rd_kafka_buf_read_str(reply, &MemberId);
4762
0
                        rd_kafka_buf_read_str(reply, &ClientId);
4763
0
                        rd_kafka_buf_read_str(reply, &ClientHost);
4764
0
                        rd_kafka_buf_read_kbytes(reply, &Meta);
4765
0
                        rd_kafka_buf_read_kbytes(reply, &Assignment);
4766
4767
0
                        mi->member_id   = RD_KAFKAP_STR_DUP(&MemberId);
4768
0
                        mi->client_id   = RD_KAFKAP_STR_DUP(&ClientId);
4769
0
                        mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);
4770
4771
0
                        if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
4772
0
                                mi->member_metadata_size = 0;
4773
0
                                mi->member_metadata      = NULL;
4774
0
                        } else {
4775
0
                                mi->member_metadata_size =
4776
0
                                    RD_KAFKAP_BYTES_LEN(&Meta);
4777
0
                                mi->member_metadata = rd_memdup(
4778
0
                                    Meta.data, mi->member_metadata_size);
4779
0
                        }
4780
4781
0
                        if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
4782
0
                                mi->member_assignment_size = 0;
4783
0
                                mi->member_assignment      = NULL;
4784
0
                        } else {
4785
0
                                mi->member_assignment_size =
4786
0
                                    RD_KAFKAP_BYTES_LEN(&Assignment);
4787
0
                                mi->member_assignment =
4788
0
                                    rd_memdup(Assignment.data,
4789
0
                                              mi->member_assignment_size);
4790
0
                        }
4791
0
                }
4792
0
        }
4793
4794
0
err:
4795
0
        state->err = err;
4796
0
        return;
4797
4798
0
err_parse:
4799
0
        state->err = reply->rkbuf_err;
4800
0
}
4801
4802
static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,
4803
                                        rd_kafka_broker_t *rkb,
4804
                                        rd_kafka_resp_err_t err,
4805
                                        rd_kafka_buf_t *reply,
4806
                                        rd_kafka_buf_t *request,
4807
0
                                        void *opaque) {
4808
0
        struct list_groups_state *state;
4809
0
        const int log_decode_errors = LOG_ERR;
4810
0
        int16_t ErrorCode;
4811
0
        char **grps = NULL;
4812
0
        int cnt, grpcnt, i = 0;
4813
4814
0
        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4815
                /* 'state' is no longer in scope because
4816
                 * list_groups() timed out and returned to the caller.
4817
                 * We must not touch anything here but simply return. */
4818
0
                return;
4819
0
        }
4820
4821
0
        state = opaque;
4822
4823
0
        state->wait_cnt--;
4824
4825
0
        if (err)
4826
0
                goto err;
4827
4828
0
        rd_kafka_buf_read_i16(reply, &ErrorCode);
4829
0
        if (ErrorCode) {
4830
0
                err = ErrorCode;
4831
0
                goto err;
4832
0
        }
4833
4834
0
        rd_kafka_buf_read_i32(reply, &cnt);
4835
4836
0
        if (state->desired_group)
4837
0
                grpcnt = 1;
4838
0
        else
4839
0
                grpcnt = cnt;
4840
4841
0
        if (cnt == 0 || grpcnt == 0)
4842
0
                return;
4843
4844
0
        grps = rd_malloc(sizeof(*grps) * grpcnt);
4845
4846
0
        while (cnt-- > 0) {
4847
0
                rd_kafkap_str_t grp, proto;
4848
4849
0
                rd_kafka_buf_read_str(reply, &grp);
4850
0
                rd_kafka_buf_read_str(reply, &proto);
4851
4852
0
                if (state->desired_group &&
4853
0
                    rd_kafkap_str_cmp_str(&grp, state->desired_group))
4854
0
                        continue;
4855
4856
0
                grps[i++] = RD_KAFKAP_STR_DUP(&grp);
4857
4858
0
                if (i == grpcnt)
4859
0
                        break;
4860
0
        }
4861
4862
0
        if (i > 0) {
4863
0
                rd_kafka_error_t *error;
4864
4865
0
                state->wait_cnt++;
4866
0
                error = rd_kafka_DescribeGroupsRequest(
4867
0
                    rkb, 0, grps, i,
4868
0
                    rd_false /* don't include authorized operations */,
4869
0
                    RD_KAFKA_REPLYQ(state->q, 0),
4870
0
                    rd_kafka_DescribeGroups_resp_cb, state);
4871
0
                if (error) {
4872
0
                        rd_kafka_DescribeGroups_resp_cb(
4873
0
                            rk, rkb, rd_kafka_error_code(error), reply, request,
4874
0
                            opaque);
4875
0
                        rd_kafka_error_destroy(error);
4876
0
                }
4877
4878
0
                while (i-- > 0)
4879
0
                        rd_free(grps[i]);
4880
0
        }
4881
4882
4883
0
        rd_free(grps);
4884
4885
0
err:
4886
0
        state->err = err;
4887
0
        return;
4888
4889
0
err_parse:
4890
0
        if (grps)
4891
0
                rd_free(grps);
4892
0
        state->err = reply->rkbuf_err;
4893
0
}
4894
4895
rd_kafka_resp_err_t
4896
rd_kafka_list_groups(rd_kafka_t *rk,
4897
                     const char *group,
4898
                     const struct rd_kafka_group_list **grplistp,
4899
0
                     int timeout_ms) {
4900
0
        rd_kafka_broker_t *rkb;
4901
0
        int rkb_cnt                    = 0;
4902
0
        struct list_groups_state state = RD_ZERO_INIT;
4903
0
        rd_ts_t ts_end                 = rd_timeout_init(timeout_ms);
4904
4905
        /* Wait until metadata has been fetched from cluster so
4906
         * that we have a full broker list.
4907
         * This state only happens during initial client setup, after that
4908
         * there'll always be a cached metadata copy. */
4909
0
        while (1) {
4910
0
                int state_version = rd_kafka_brokers_get_state_version(rk);
4911
0
                rd_bool_t has_metadata;
4912
4913
0
                rd_kafka_rdlock(rk);
4914
0
                has_metadata = rk->rk_ts_metadata != 0;
4915
0
                rd_kafka_rdunlock(rk);
4916
4917
0
                if (has_metadata)
4918
0
                        break;
4919
4920
0
                if (!rd_kafka_brokers_wait_state_change(
4921
0
                        rk, state_version, rd_timeout_remains(ts_end)))
4922
0
                        return RD_KAFKA_RESP_ERR__TIMED_OUT;
4923
0
        }
4924
4925
4926
0
        state.q             = rd_kafka_q_new(rk);
4927
0
        state.desired_group = group;
4928
0
        state.grplist       = rd_calloc(1, sizeof(*state.grplist));
4929
0
        state.grplist_size  = group ? 1 : 32;
4930
4931
0
        state.grplist->groups =
4932
0
            rd_malloc(state.grplist_size * sizeof(*state.grplist->groups));
4933
4934
        /* Query each broker for its list of groups */
4935
0
        rd_kafka_rdlock(rk);
4936
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4937
0
                rd_kafka_error_t *error;
4938
0
                rd_kafka_broker_lock(rkb);
4939
0
                if (rkb->rkb_nodeid == -1 || RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
4940
0
                        rd_kafka_broker_unlock(rkb);
4941
0
                        continue;
4942
0
                }
4943
0
                rd_kafka_broker_unlock(rkb);
4944
4945
0
                state.wait_cnt++;
4946
0
                rkb_cnt++;
4947
0
                error = rd_kafka_ListGroupsRequest(
4948
0
                    rkb, 0, NULL, 0, RD_KAFKA_REPLYQ(state.q, 0),
4949
0
                    rd_kafka_ListGroups_resp_cb, &state);
4950
0
                if (error) {
4951
0
                        rd_kafka_ListGroups_resp_cb(rk, rkb,
4952
0
                                                    rd_kafka_error_code(error),
4953
0
                                                    NULL, NULL, &state);
4954
0
                        rd_kafka_error_destroy(error);
4955
0
                }
4956
0
        }
4957
0
        rd_kafka_rdunlock(rk);
4958
4959
0
        if (rkb_cnt == 0) {
4960
0
                state.err = RD_KAFKA_RESP_ERR__TRANSPORT;
4961
4962
0
        } else {
4963
0
                int remains;
4964
4965
0
                while (state.wait_cnt > 0 &&
4966
0
                       !rd_timeout_expired(
4967
0
                           (remains = rd_timeout_remains(ts_end)))) {
4968
0
                        rd_kafka_q_serve(state.q, remains, 0,
4969
0
                                         RD_KAFKA_Q_CB_CALLBACK,
4970
0
                                         rd_kafka_poll_cb, NULL);
4971
                        /* Ignore yields */
4972
0
                }
4973
0
        }
4974
4975
0
        rd_kafka_q_destroy_owner(state.q);
4976
4977
0
        if (state.wait_cnt > 0 && !state.err) {
4978
0
                if (state.grplist->group_cnt == 0)
4979
0
                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
4980
0
                else {
4981
0
                        *grplistp = state.grplist;
4982
0
                        return RD_KAFKA_RESP_ERR__PARTIAL;
4983
0
                }
4984
0
        }
4985
4986
0
        if (state.err)
4987
0
                rd_kafka_group_list_destroy(state.grplist);
4988
0
        else
4989
0
                *grplistp = state.grplist;
4990
4991
0
        return state.err;
4992
0
}
4993
4994
4995
0
void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist0) {
4996
0
        struct rd_kafka_group_list *grplist =
4997
0
            (struct rd_kafka_group_list *)grplist0;
4998
4999
0
        while (grplist->group_cnt-- > 0) {
5000
0
                struct rd_kafka_group_info *gi;
5001
0
                gi = &grplist->groups[grplist->group_cnt];
5002
5003
0
                if (gi->broker.host)
5004
0
                        rd_free(gi->broker.host);
5005
0
                if (gi->group)
5006
0
                        rd_free(gi->group);
5007
0
                if (gi->state)
5008
0
                        rd_free(gi->state);
5009
0
                if (gi->protocol_type)
5010
0
                        rd_free(gi->protocol_type);
5011
0
                if (gi->protocol)
5012
0
                        rd_free(gi->protocol);
5013
5014
0
                while (gi->member_cnt-- > 0) {
5015
0
                        struct rd_kafka_group_member_info *mi;
5016
0
                        mi = &gi->members[gi->member_cnt];
5017
5018
0
                        if (mi->member_id)
5019
0
                                rd_free(mi->member_id);
5020
0
                        if (mi->client_id)
5021
0
                                rd_free(mi->client_id);
5022
0
                        if (mi->client_host)
5023
0
                                rd_free(mi->client_host);
5024
0
                        if (mi->member_metadata)
5025
0
                                rd_free(mi->member_metadata);
5026
0
                        if (mi->member_assignment)
5027
0
                                rd_free(mi->member_assignment);
5028
0
                }
5029
5030
0
                if (gi->members)
5031
0
                        rd_free(gi->members);
5032
0
        }
5033
5034
0
        if (grplist->groups)
5035
0
                rd_free(grplist->groups);
5036
5037
0
        rd_free(grplist);
5038
0
}
5039
5040
5041
5042
0
const char *rd_kafka_get_debug_contexts(void) {
5043
0
        return RD_KAFKA_DEBUG_CONTEXTS;
5044
0
}
5045
5046
5047
0
int rd_kafka_path_is_dir(const char *path) {
5048
#ifdef _WIN32
5049
        struct _stat st;
5050
        return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
5051
#else
5052
0
        struct stat st;
5053
0
        return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
5054
0
#endif
5055
0
}
5056
5057
5058
/**
5059
 * @returns true if directory is empty or can't be accessed, else false.
5060
 */
5061
0
rd_bool_t rd_kafka_dir_is_empty(const char *path) {
5062
#if _WIN32
5063
        /* FIXME: Unsupported */
5064
        return rd_true;
5065
#else
5066
0
        DIR *dir;
5067
0
        struct dirent *d;
5068
#if defined(__sun)
5069
        struct stat st;
5070
        int ret = 0;
5071
#endif
5072
5073
0
        dir = opendir(path);
5074
0
        if (!dir)
5075
0
                return rd_true;
5076
5077
0
        while ((d = readdir(dir))) {
5078
5079
0
                if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
5080
0
                        continue;
5081
5082
#if defined(__sun)
5083
                ret = stat(d->d_name, &st);
5084
                if (ret != 0) {
5085
                        return rd_true;  // Can't be accessed
5086
                }
5087
                if (S_ISREG(st.st_mode) || S_ISDIR(st.st_mode) ||
5088
                    S_ISLNK(st.st_mode)) {
5089
#else
5090
0
                if (d->d_type == DT_REG || d->d_type == DT_LNK ||
5091
0
                    d->d_type == DT_DIR) {
5092
0
#endif
5093
0
                        closedir(dir);
5094
0
                        return rd_false;
5095
0
                }
5096
0
        }
5097
5098
0
        closedir(dir);
5099
0
        return rd_true;
5100
0
#endif
5101
0
}
5102
5103
5104
0
void *rd_kafka_mem_malloc(rd_kafka_t *rk, size_t size) {
5105
0
        return rd_malloc(size);
5106
0
}
5107
5108
0
void *rd_kafka_mem_calloc(rd_kafka_t *rk, size_t num, size_t size) {
5109
0
        return rd_calloc(num, size);
5110
0
}
5111
5112
0
void rd_kafka_mem_free(rd_kafka_t *rk, void *ptr) {
5113
0
        rd_free(ptr);
5114
0
}
5115
5116
5117
0
int rd_kafka_errno(void) {
5118
0
        return errno;
5119
0
}
5120
5121
0
int rd_kafka_unittest(void) {
5122
0
        return rd_unittest();
5123
0
}
5124
5125
5126
/**
5127
 * Creates a new UUID.
5128
 *
5129
 * @return A newly allocated UUID.
5130
 */
5131
rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits,
5132
0
                                   int64_t least_significant_bits) {
5133
0
        rd_kafka_Uuid_t *uuid        = rd_calloc(1, sizeof(rd_kafka_Uuid_t));
5134
0
        uuid->most_significant_bits  = most_significant_bits;
5135
0
        uuid->least_significant_bits = least_significant_bits;
5136
0
        return uuid;
5137
0
}
5138
5139
/**
5140
 * Returns a newly allocated copy of the given UUID.
5141
 *
5142
 * @param uuid UUID to copy.
5143
 * @return Copy of the provided UUID.
5144
 *
5145
 * @remark Dynamically allocated. Deallocate (free) after use.
5146
 */
5147
0
rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) {
5148
0
        rd_kafka_Uuid_t *copy_uuid = rd_kafka_Uuid_new(
5149
0
            uuid->most_significant_bits, uuid->least_significant_bits);
5150
0
        if (*uuid->base64str)
5151
0
                memcpy(copy_uuid->base64str, uuid->base64str, 23);
5152
0
        return copy_uuid;
5153
0
}
5154
5155
/**
5156
 * Returns a new non cryptographically secure UUIDv4 (random).
5157
 *
5158
 * @return A UUIDv4.
5159
 *
5160
 * @remark Must be freed after use using rd_kafka_Uuid_destroy().
5161
 */
5162
0
rd_kafka_Uuid_t rd_kafka_Uuid_random() {
5163
0
        int i;
5164
0
        unsigned char rand_values_bytes[16] = {0};
5165
0
        uint64_t *rand_values_uint64        = (uint64_t *)rand_values_bytes;
5166
0
        unsigned char *rand_values_app;
5167
0
        rd_kafka_Uuid_t ret = RD_KAFKA_UUID_ZERO;
5168
0
        for (i = 0; i < 16; i += 2) {
5169
0
                uint16_t rand_uint16 = (uint16_t)rd_jitter(0, INT16_MAX - 1);
5170
                /* No need to convert endianess here because it's still only
5171
                 * a random value. */
5172
0
                rand_values_app = (unsigned char *)&rand_uint16;
5173
0
                rand_values_bytes[i] |= rand_values_app[0];
5174
0
                rand_values_bytes[i + 1] |= rand_values_app[1];
5175
0
        }
5176
5177
0
        rand_values_bytes[6] &= 0x0f; /* clear version */
5178
0
        rand_values_bytes[6] |= 0x40; /* version 4 */
5179
0
        rand_values_bytes[8] &= 0x3f; /* clear variant */
5180
0
        rand_values_bytes[8] |= 0x80; /* IETF variant */
5181
5182
0
        ret.most_significant_bits  = be64toh(rand_values_uint64[0]);
5183
0
        ret.least_significant_bits = be64toh(rand_values_uint64[1]);
5184
0
        return ret;
5185
0
}
5186
5187
/**
5188
 * @brief Destroy the provided uuid.
5189
 *
5190
 * @param uuid UUID
5191
 */
5192
0
void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) {
5193
0
        rd_free(uuid);
5194
0
}
5195
5196
/**
5197
 * @brief Computes canonical encoding for the given uuid string.
5198
 *        Mainly useful for testing.
5199
 *
5200
 * @param uuid UUID for which canonical encoding is required.
5201
 *
5202
 * @return canonical encoded string for the given UUID.
5203
 *
5204
 * @remark  Must be freed after use.
5205
 */
5206
0
const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid) {
5207
0
        int i, j;
5208
0
        unsigned char bytes[16];
5209
0
        char *ret = rd_calloc(37, sizeof(*ret));
5210
5211
0
        for (i = 0; i < 8; i++) {
5212
0
#if __BYTE_ORDER == __LITTLE_ENDIAN
5213
0
                j = 7 - i;
5214
#elif __BYTE_ORDER == __BIG_ENDIAN
5215
                j = i;
5216
#endif
5217
0
                bytes[i]     = (uuid->most_significant_bits >> (8 * j)) & 0xFF;
5218
0
                bytes[8 + i] = (uuid->least_significant_bits >> (8 * j)) & 0xFF;
5219
0
        }
5220
5221
0
        rd_snprintf(ret, 37,
5222
0
                    "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%"
5223
0
                    "02x%02x%02x",
5224
0
                    bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5],
5225
0
                    bytes[6], bytes[7], bytes[8], bytes[9], bytes[10],
5226
0
                    bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]);
5227
0
        return ret;
5228
0
}
5229
5230
0
const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
5231
0
        if (*uuid->base64str)
5232
0
                return uuid->base64str;
5233
5234
0
        rd_chariov_t in_base64;
5235
0
        char *out_base64_str;
5236
0
        char *uuid_bytes;
5237
0
        uint64_t input_uuid[2];
5238
5239
0
        input_uuid[0]  = htobe64(uuid->most_significant_bits);
5240
0
        input_uuid[1]  = htobe64(uuid->least_significant_bits);
5241
0
        uuid_bytes     = (char *)input_uuid;
5242
0
        in_base64.ptr  = uuid_bytes;
5243
0
        in_base64.size = sizeof(uuid->most_significant_bits) +
5244
0
                         sizeof(uuid->least_significant_bits);
5245
5246
0
        out_base64_str = rd_base64_encode_str(&in_base64);
5247
0
        if (!out_base64_str)
5248
0
                return NULL;
5249
5250
0
        rd_strlcpy((char *)uuid->base64str, out_base64_str,
5251
0
                   23 /* Removing extra ('=') padding */);
5252
0
        rd_free(out_base64_str);
5253
0
        return uuid->base64str;
5254
0
}
5255
5256
0
unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid) {
5257
0
        unsigned char bytes[16];
5258
0
        memcpy(bytes, &uuid->most_significant_bits, 8);
5259
0
        memcpy(&bytes[8], &uuid->least_significant_bits, 8);
5260
0
        return rd_bytes_hash(bytes, 16);
5261
0
}
5262
5263
0
unsigned int rd_kafka_Uuid_map_hash(const void *key) {
5264
0
        return rd_kafka_Uuid_hash(key);
5265
0
}
5266
5267
0
int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {
5268
0
        return uuid->least_significant_bits;
5269
0
}
5270
5271
5272
0
int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) {
5273
0
        return uuid->most_significant_bits;
5274
0
}