Coverage Report

Created: 2026-05-16 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_kafka/in_kafka.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2019-2021 The Fluent Bit Authors
6
 *  Copyright (C) 2015-2018 Treasure Data Inc.
7
 *
8
 *  Licensed under the Apache License, Version 2.0 (the "License");
9
 *  you may not use this file except in compliance with the License.
10
 *  You may obtain a copy of the License at
11
 *
12
 *      http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 *  Unless required by applicable law or agreed to in writing, software
15
 *  distributed under the License is distributed on an "AS IS" BASIS,
16
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 *  See the License for the specific language governing permissions and
18
 *  limitations under the License.
19
 */
20
21
#include <fluent-bit/flb_input_plugin.h>
22
#include <fluent-bit/flb_pack.h>
23
#include <fluent-bit/flb_engine.h>
24
#include <fluent-bit/flb_time.h>
25
#include <fluent-bit/flb_parser.h>
26
#include <fluent-bit/flb_error.h>
27
#include <fluent-bit/flb_utils.h>
28
#include <fluent-bit/aws/flb_aws_msk_iam.h>
29
30
#include <mpack/mpack.h>
31
#include <stddef.h>
32
#include <stdio.h>
33
34
#include "fluent-bit/flb_input.h"
35
#include "fluent-bit/flb_kafka.h"
36
#include "fluent-bit/flb_mem.h"
37
#include "in_kafka.h"
38
#include "rdkafka.h"
39
40
static int try_json(struct flb_log_event_encoder *log_encoder,
41
                    rd_kafka_message_t *rkm)
42
0
{
43
0
    int root_type;
44
0
    char *buf = NULL;
45
0
    size_t bufsize;
46
0
    int ret;
47
48
0
    ret = flb_pack_json(rkm->payload, rkm->len, &buf, &bufsize, &root_type, NULL);
49
0
    if (ret) {
50
0
        if (buf) {
51
0
            flb_free(buf);
52
0
        }
53
0
        return ret;
54
0
    }
55
0
    flb_log_event_encoder_append_body_raw_msgpack(log_encoder, buf, bufsize);
56
0
    flb_free(buf);
57
0
    return 0;
58
0
}
59
60
static int process_message(struct flb_in_kafka_config *ctx,
61
                           rd_kafka_message_t *rkm)
62
0
{
63
0
    struct flb_log_event_encoder *log_encoder = ctx->log_encoder;
64
0
    int ret;
65
66
0
    ret = flb_log_event_encoder_begin_record(log_encoder);
67
68
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
69
0
        ret = flb_log_event_encoder_set_current_timestamp(log_encoder);
70
0
    }
71
72
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
73
0
        ret = flb_log_event_encoder_append_body_cstring(log_encoder, "topic");
74
0
    }
75
76
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
77
0
        if (rkm->rkt) {
78
0
            ret = flb_log_event_encoder_append_body_cstring(log_encoder,
79
0
                                                            (char *) rd_kafka_topic_name(rkm->rkt));
80
0
        }
81
0
        else {
82
0
            ret = flb_log_event_encoder_append_body_null(log_encoder);
83
0
        }
84
0
    }
85
86
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
87
0
        ret = flb_log_event_encoder_append_body_values(log_encoder,
88
0
                                                       FLB_LOG_EVENT_CSTRING_VALUE("partition"),
89
0
                                                       FLB_LOG_EVENT_INT32_VALUE(rkm->partition));
90
0
    }
91
92
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
93
0
        ret = flb_log_event_encoder_append_body_values(log_encoder,
94
0
                                                       FLB_LOG_EVENT_CSTRING_VALUE("offset"),
95
0
                                                       FLB_LOG_EVENT_INT64_VALUE(rkm->offset));
96
0
    }
97
98
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
99
0
        ret = flb_log_event_encoder_append_body_cstring(log_encoder, "error");
100
0
    }
101
102
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
103
0
        if (rkm->err) {
104
0
            ret = flb_log_event_encoder_append_body_cstring(log_encoder,
105
0
                                                            (char *) rd_kafka_message_errstr(rkm));
106
0
        }
107
0
        else {
108
0
            ret = flb_log_event_encoder_append_body_null(log_encoder);
109
0
        }
110
0
    }
111
112
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
113
0
        ret = flb_log_event_encoder_append_body_cstring(log_encoder, "key");
114
0
    }
115
116
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
117
0
        if (rkm->key) {
118
0
            ret = flb_log_event_encoder_append_body_string(log_encoder,
119
0
                                                           rkm->key,
120
0
                                                           rkm->key_len);
121
0
        }
122
0
        else {
123
0
            ret = flb_log_event_encoder_append_body_null(log_encoder);
124
0
        }
125
0
    }
126
127
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
128
0
        ret = flb_log_event_encoder_append_body_cstring(log_encoder, "payload");
129
0
    }
130
131
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
132
0
        if (rkm->payload) {
133
0
            if (ctx->format != FLB_IN_KAFKA_FORMAT_JSON ||
134
0
                    try_json(log_encoder, rkm)) {
135
0
                ret = flb_log_event_encoder_append_body_string(log_encoder,
136
0
                                                               rkm->payload,
137
0
                                                               rkm->len);
138
0
            }
139
0
        }
140
0
        else {
141
0
            ret = flb_log_event_encoder_append_body_null(log_encoder);
142
0
        }
143
0
    }
144
145
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
146
0
        ret = flb_log_event_encoder_commit_record(log_encoder);
147
0
    }
148
149
0
    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
150
0
        flb_log_event_encoder_rollback_record(log_encoder);
151
0
    }
152
153
0
    return ret;
154
0
}
155
156
static int in_kafka_collect(struct flb_input_instance *ins,
157
                            struct flb_config *config, void *in_context)
158
0
{
159
0
    int ret;
160
0
    int append_ret;
161
0
    struct flb_in_kafka_config *ctx = in_context;
162
0
    rd_kafka_message_t *rkm;
163
164
0
    ret = FLB_EVENT_ENCODER_SUCCESS;
165
166
0
    while (ret == FLB_EVENT_ENCODER_SUCCESS) {
167
        /* Set the Kafka poll timeout based on execution mode:
168
         *
169
         * a) Running in the main event loop (non-threaded):
170
         *    - Use a minimal timeout to avoid blocking other inputs.
171
         *
172
         * b) Running in a dedicated thread:
173
         *    - Optimize for throughput by allowing Kafka's internal batching.
174
         *    - Align with 'fetch.wait.max.ms' (default: 500ms) to maximize batch efficiency.
175
         *    - Set timeout slightly higher than 'fetch.wait.max.ms' (e.g., 1.5x - 2x) to
176
         *      ensure it does not interfere with Kafka's fetch behavior, while still
177
         *      keeping the consumer responsive.
178
         */
179
0
        if (ctx->ins->flags & FLB_INPUT_THREADED) {
180
            /* Threaded mode: Optimize for batch processing and efficiency */
181
0
            rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeout_ms);
182
0
        } else {
183
            /* Main event loop: Minimize delay for non-blocking execution */
184
0
            rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);
185
0
        }
186
187
0
        if (!rkm) {
188
0
            break;
189
0
        }
190
191
0
        if (rkm->err) {
192
0
            flb_plg_warn(ins, "consumer error: %s\n",
193
0
                         rd_kafka_message_errstr(rkm));
194
0
            rd_kafka_message_destroy(rkm);
195
0
            continue;
196
0
        }
197
198
0
        flb_plg_debug(ins, "kafka message received");
199
200
0
        ret = process_message(ctx, rkm);
201
202
0
        rd_kafka_message_destroy(rkm);
203
204
        /* Break from the loop when reaching the limit of polling if available */
205
0
        if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
206
0
            ctx->log_encoder->output_length > ctx->polling_threshold + 512) {
207
0
            break;
208
0
        }
209
0
    }
210
211
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
212
0
        if (ctx->log_encoder->output_length > 0) {
213
0
            append_ret = flb_input_log_append(ins, NULL, 0,
214
0
                                              ctx->log_encoder->output_buffer,
215
0
                                              ctx->log_encoder->output_length);
216
217
0
            if (append_ret == 0) {
218
0
                if (!ctx->enable_auto_commit) {
219
0
                    rd_kafka_commit(ctx->kafka.rk, NULL, 0);
220
0
                }
221
0
                ret = 0;
222
0
            }
223
0
            else {
224
0
                flb_plg_error(ins, "failed to append records");
225
0
                ret = -1;
226
0
            }
227
0
        }
228
0
        else {
229
0
            ret = 0;
230
0
        }
231
0
    }
232
0
    else {
233
0
        flb_plg_error(ins, "Error encoding record : %d", ret);
234
0
        ret = -1;
235
0
    }
236
237
0
    flb_log_event_encoder_reset(ctx->log_encoder);
238
239
0
    return ret;
240
0
}
241
242
/* Initialize plugin */
243
static int in_kafka_init(struct flb_input_instance *ins,
244
                         struct flb_config *config, void *data)
245
0
{
246
0
    int ret;
247
0
    const char *conf;
248
0
    struct flb_in_kafka_config *ctx;
249
0
    rd_kafka_conf_t *kafka_conf = NULL;
250
0
    rd_kafka_topic_partition_list_t *kafka_topics = NULL;
251
0
    rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
252
0
    rd_kafka_conf_res_t res;
253
0
    char errstr[512];
254
0
    (void) data;
255
0
    char conf_val[16];
256
257
    /* Allocate space for the configuration context */
258
0
    ctx = flb_calloc(1, sizeof(struct flb_in_kafka_config));
259
0
    if (!ctx) {
260
0
        return -1;
261
0
    }
262
0
    ctx->ins = ins;
263
264
0
    ret = flb_input_config_map_set(ins, (void*) ctx);
265
0
    if (ret == -1) {
266
0
        flb_plg_error(ins, "unable to load configuration.");
267
0
        flb_free(ctx);
268
0
        return -1;
269
0
    }
270
271
#ifdef FLB_HAVE_AWS_MSK_IAM
272
    /*
273
     * When MSK IAM auth is enabled, default the required
274
     * security settings so users don't need to specify them.
275
     */
276
    if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) {
277
        conf = flb_input_get_property("rdkafka.security.protocol", ins);
278
        if (!conf) {
279
            flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
280
        }
281
282
        conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
283
        if (!conf) {
284
            flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER");
285
            ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER");
286
        }
287
        else {
288
            ctx->sasl_mechanism = flb_sds_create(conf);
289
            flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
290
        }
291
    }
292
    else {
293
#endif
294
295
        /* Retrieve SASL mechanism if configured */
296
0
        conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
297
0
        if (conf) {
298
0
            ctx->sasl_mechanism = flb_sds_create(conf);
299
0
            flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
300
0
        }
301
302
#ifdef FLB_HAVE_AWS_MSK_IAM
303
    }
304
#endif
305
306
0
    kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
307
0
    if (!kafka_conf) {
308
0
        flb_plg_error(ins, "Could not initialize kafka config object");
309
0
        goto init_error;
310
0
    }
311
312
    /* Set enable.auto.commit based on plugin's enable_auto_commit setting */
313
0
    res = rd_kafka_conf_set(kafka_conf, "enable.auto.commit",
314
0
                           ctx->enable_auto_commit ? "true" : "false",
315
0
                           errstr, sizeof(errstr));
316
0
    if (res != RD_KAFKA_CONF_OK) {
317
0
        flb_plg_error(ins, "Failed to set enable.auto.commit: %s", errstr);
318
0
        goto init_error;
319
0
    }
320
321
0
    if (ctx->buffer_max_size > 0) {
322
0
        ctx->polling_threshold = ctx->buffer_max_size;
323
324
0
        snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold - 512);
325
0
        res = rd_kafka_conf_set(kafka_conf, "fetch.max.bytes", conf_val,
326
0
                                errstr, sizeof(errstr));
327
0
        if (res != RD_KAFKA_CONF_OK) {
328
0
            flb_plg_error(ins, "Failed to set up fetch.max.bytes: %s, val = %s",
329
0
                          rd_kafka_err2str(err), conf_val);
330
0
            goto init_error;
331
0
        }
332
333
0
        snprintf(conf_val, sizeof(conf_val), "%zu", ctx->polling_threshold);
334
0
        res = rd_kafka_conf_set(kafka_conf, "receive.message.max.bytes", conf_val,
335
0
                                errstr, sizeof(errstr));
336
0
        if (res != RD_KAFKA_CONF_OK) {
337
0
            flb_plg_error(ins, "Failed to set up receive.message.max.bytes: %s, val = %s",
338
0
                          rd_kafka_err2str(err), conf_val);
339
0
            goto init_error;
340
0
        }
341
0
    }
342
0
    else {
343
0
        ctx->polling_threshold = FLB_IN_KAFKA_UNLIMITED;
344
0
    }
345
346
0
    ctx->opaque = flb_kafka_opaque_create();
347
0
    if (!ctx->opaque) {
348
0
        flb_plg_error(ins, "failed to create kafka opaque context");
349
0
        goto init_error;
350
0
    }
351
0
    flb_kafka_opaque_set(ctx->opaque, ctx, NULL);
352
0
    rd_kafka_conf_set_opaque(kafka_conf, ctx->opaque);
353
354
#ifdef FLB_HAVE_AWS_MSK_IAM
355
    if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism &&
356
        strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
357
        flb_plg_info(ins, "registering MSK IAM authentication with cluster ARN: %s",
358
                     ctx->aws_msk_iam_cluster_arn);
359
        ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
360
                                                         kafka_conf,
361
                                                         ctx->aws_msk_iam_cluster_arn,
362
                                                         ctx->opaque);
363
        if (!ctx->msk_iam) {
364
            flb_plg_error(ins, "failed to setup MSK IAM authentication");
365
        }
366
        else {
367
            res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
368
                                    "principal=admin", errstr, sizeof(errstr));
369
            if (res != RD_KAFKA_CONF_OK) {
370
                flb_plg_error(ins,
371
                             "failed to set sasl.oauthbearer.config: %s",
372
                             errstr);
373
            }
374
        }
375
    }
376
#endif
377
378
0
    ctx->kafka.rk = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_conf, errstr, sizeof(errstr));
379
380
    /* Create Kafka consumer handle */
381
0
    if (!ctx->kafka.rk) {
382
0
        flb_plg_error(ins, "Failed to create new consumer: %s", errstr);
383
0
        goto init_error;
384
0
    }
385
386
    /* Trigger initial token refresh for OAUTHBEARER */
387
0
    rd_kafka_poll(ctx->kafka.rk, 0);
388
389
0
    conf = flb_input_get_property("topics", ins);
390
0
    if (!conf) {
391
0
        flb_plg_error(ins, "config: no topics specified");
392
0
        goto init_error;
393
0
    }
394
395
0
    kafka_topics = flb_kafka_parse_topics(conf);
396
0
    if (!kafka_topics) {
397
0
        flb_plg_error(ins, "Failed to parse topic list");
398
0
        goto init_error;
399
0
    }
400
401
0
    if (strcasecmp(ctx->format_str, "none") == 0) {
402
0
        ctx->format = FLB_IN_KAFKA_FORMAT_NONE;
403
0
    }
404
0
    else if (strcasecmp(ctx->format_str, "json") == 0) {
405
0
        ctx->format = FLB_IN_KAFKA_FORMAT_JSON;
406
0
    }
407
0
    else {
408
0
        flb_plg_error(ins, "config: invalid format \"%s\"", ctx->format_str);
409
0
        goto init_error;
410
0
    }
411
412
0
    if ((err = rd_kafka_subscribe(ctx->kafka.rk, kafka_topics))) {
413
0
        flb_plg_error(ins, "Failed to start consuming topics: %s", rd_kafka_err2str(err));
414
0
        goto init_error;
415
0
    }
416
0
    rd_kafka_topic_partition_list_destroy(kafka_topics);
417
0
    kafka_topics = NULL;
418
419
    /* Set the context */
420
0
    flb_input_set_context(ins, ctx);
421
    /* Collect upon data available on the pipe read fd */
422
423
0
    int poll_seconds = ctx->poll_ms / 1000;
424
0
    int poll_milliseconds = ctx->poll_ms % 1000;
425
426
0
    ret = flb_input_set_collector_time(ins,
427
0
                                       in_kafka_collect,
428
0
                                       poll_seconds, poll_milliseconds * 1e6,
429
0
                                       config);
430
0
    if (ret) {
431
0
        flb_plg_error(ctx->ins, "could not set collector for kafka input plugin");
432
0
        goto init_error;
433
0
    }
434
435
0
    ctx->coll_fd = ret;
436
437
0
    ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
438
439
0
    if (ctx->log_encoder == NULL) {
440
0
        flb_plg_error(ins, "could not initialize log encoder");
441
0
        goto init_error;
442
0
    }
443
444
0
    return 0;
445
446
0
init_error:
447
0
    if (kafka_topics) {
448
0
        rd_kafka_topic_partition_list_destroy(kafka_topics);
449
0
    }
450
0
    if (ctx->kafka.rk) {
451
0
        rd_kafka_consumer_close(ctx->kafka.rk);
452
0
        rd_kafka_destroy(ctx->kafka.rk);
453
0
    }
454
0
    if (ctx->opaque) {
455
0
        flb_kafka_opaque_destroy(ctx->opaque);
456
0
    }
457
0
    else if (kafka_conf) {
458
        /* conf is already destroyed when rd_kafka is initialized */
459
0
        rd_kafka_conf_destroy(kafka_conf);
460
0
    }
461
0
    flb_sds_destroy(ctx->sasl_mechanism);
462
0
    flb_free(ctx);
463
464
0
    return -1;
465
0
}
466
467
static void in_kafka_pause(void *data, struct flb_config *config)
468
0
{
469
0
    struct flb_in_kafka_config *ctx = data;
470
471
0
    flb_input_collector_pause(ctx->coll_fd, ctx->ins);
472
0
}
473
474
static void in_kafka_resume(void *data, struct flb_config *config)
475
0
{
476
0
    struct flb_in_kafka_config *ctx = data;
477
478
0
    flb_input_collector_resume(ctx->coll_fd, ctx->ins);
479
0
}
480
481
/* Cleanup serial input */
482
static int in_kafka_exit(void *in_context, struct flb_config *config)
483
0
{
484
0
    struct flb_in_kafka_config *ctx;
485
486
0
    if (!in_context) {
487
0
        return 0;
488
0
    }
489
490
0
    ctx = in_context;
491
0
    if (ctx->kafka.rk) {
492
0
        rd_kafka_consumer_close(ctx->kafka.rk);
493
0
        rd_kafka_destroy(ctx->kafka.rk);
494
0
    }
495
0
    flb_free(ctx->kafka.brokers);
496
497
0
    if (ctx->log_encoder){
498
0
        flb_log_event_encoder_destroy(ctx->log_encoder);
499
0
    }
500
501
#ifdef FLB_HAVE_AWS_MSK_IAM
502
    if (ctx->msk_iam) {
503
        flb_aws_msk_iam_destroy(ctx->msk_iam);
504
    }
505
#endif
506
507
0
    if (ctx->opaque) {
508
0
        flb_kafka_opaque_destroy(ctx->opaque);
509
0
    }
510
511
0
    flb_sds_destroy(ctx->sasl_mechanism);
512
513
0
    flb_free(ctx);
514
515
0
    return 0;
516
0
}
517
518
static struct flb_config_map config_map[] = {
519
   {
520
    FLB_CONFIG_MAP_INT, "poll_ms", FLB_IN_KAFKA_DEFAULT_POLL_MS,
521
    0, FLB_TRUE, offsetof(struct flb_in_kafka_config, poll_ms),
522
    "Interval in milliseconds to check for new messages."
523
   },
524
   {
525
    FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
526
    0, FLB_FALSE, 0,
527
    "Set the kafka topics, delimited by commas."
528
   },
529
   {
530
    FLB_CONFIG_MAP_STR, "format", FLB_IN_KAFKA_DEFAULT_FORMAT,
531
    0, FLB_TRUE, offsetof(struct flb_in_kafka_config, format_str),
532
    "Set the data format which will be used for parsing records."
533
   },
534
   {
535
    FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
536
    0, FLB_FALSE, 0,
537
    "Set the kafka brokers, delimited by commas."
538
   },
539
   {
540
    FLB_CONFIG_MAP_STR, "client_id", (char *)NULL,
541
    0, FLB_FALSE, 0,
542
    "Set the kafka client_id."
543
   },
544
   {
545
    FLB_CONFIG_MAP_STR, "group_id", (char *)NULL,
546
    0, FLB_FALSE, 0,
547
    "Set the kafka group_id."
548
   },
549
   {
550
    FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL,
551
    /* FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_in_kafka_config, rdkafka_opts), */
552
    0,  FLB_FALSE, 0,
553
    "Set the librdkafka options"
554
   },
555
   {
556
    FLB_CONFIG_MAP_SIZE, "buffer_max_size", FLB_IN_KAFKA_BUFFER_MAX_SIZE,
557
    0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
558
    "Set the maximum size of chunk"
559
   },
560
   {
561
    FLB_CONFIG_MAP_INT, "poll_timeout_ms", "1",
562
    0, FLB_TRUE, offsetof(struct flb_in_kafka_config, poll_timeout_ms),
563
    "Set the timeout in milliseconds for Kafka consumer poll operations. "
564
    "This option only takes effect when running in a dedicated thread (i.e., when 'threaded' is enabled). "
565
    "Using a higher timeout (e.g., 1.5x - 2x 'rdkafka.fetch.wait.max.ms') "
566
    "can improve efficiency by leveraging Kafka's batching mechanism."
567
  },
568
  {
569
    FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT,
570
    0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit),
571
    "Rely on kafka auto-commit and commit messages in batches"
572
  },
573
574
#ifdef FLB_HAVE_AWS_MSK_IAM
575
  {
576
   FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", (char *)NULL,
577
   0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam_cluster_arn),
578
   "ARN of the MSK cluster when using AWS IAM authentication"
579
  },
580
  {
581
    FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
582
    0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam),
583
    "Enable AWS MSK IAM authentication"
584
  },
585
#endif
586
587
  /* EOF */
588
  {0}
589
};
590
591
/* Plugin reference */
592
struct flb_input_plugin in_kafka_plugin = {
593
    .name         = "kafka",
594
    .description  = "Kafka consumer input plugin",
595
    .cb_init      = in_kafka_init,
596
    .cb_pre_run   = NULL,
597
    .cb_collect   = in_kafka_collect,
598
    .cb_flush_buf = NULL,
599
    .cb_pause     = in_kafka_pause,
600
    .cb_resume    = in_kafka_resume,
601
    .cb_exit      = in_kafka_exit,
602
    .config_map   = config_map
603
};