Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/src/flb_kafka.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2019-2021 The Fluent Bit Authors
6
 *  Copyright (C) 2015-2018 Treasure Data Inc.
7
 *
8
 *  Licensed under the Apache License, Version 2.0 (the "License");
9
 *  you may not use this file except in compliance with the License.
10
 *  You may obtain a copy of the License at
11
 *
12
 *      http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 *  Unless required by applicable law or agreed to in writing, software
15
 *  distributed under the License is distributed on an "AS IS" BASIS,
16
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 *  See the License for the specific language governing permissions and
18
 *  limitations under the License.
19
 */
20
21
#include "fluent-bit/flb_config.h"
22
#include "fluent-bit/flb_mem.h"
23
#include "fluent-bit/flb_str.h"
24
#include "fluent-bit/flb_utils.h"
25
#include "monkey/mk_core/mk_list.h"
26
#include <fluent-bit/flb_kafka.h>
27
#include <fluent-bit/flb_kv.h>
28
29
#include <rdkafka.h>
30
31
rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka,
32
                                       struct mk_list *properties,
33
                                       int with_group_id)
34
0
{
35
0
    struct mk_list *head;
36
0
    struct flb_kv *kv;
37
0
    const char *conf;
38
0
    rd_kafka_conf_t *kafka_cfg;
39
0
    char errstr[512];
40
41
0
    kafka_cfg = rd_kafka_conf_new();
42
0
    if (!kafka_cfg) {
43
0
        flb_error("[flb_kafka] Could not initialize kafka config object");
44
0
        goto err;
45
0
    }
46
47
0
    conf = flb_config_prop_get("client_id", properties);
48
0
    if (!conf) {
49
0
        conf = "fluent-bit";
50
0
    }
51
0
    if (rd_kafka_conf_set(kafka_cfg, "client.id", conf,
52
0
                errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
53
0
        flb_error("[flb_kafka] cannot configure client id: %s", errstr);
54
0
    }
55
56
0
    if (with_group_id) {
57
0
        conf = flb_config_prop_get("group_id", properties);
58
0
        if (!conf) {
59
0
            conf = "fluent-bit";
60
0
        }
61
0
        if (rd_kafka_conf_set(kafka_cfg, "group.id", conf,
62
0
                    errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
63
0
            flb_error("[flb_kafka] cannot configure group id: %s", errstr);
64
0
        }
65
0
    }
66
67
0
    conf = flb_config_prop_get("brokers", properties);
68
0
    if (conf) {
69
0
        if (rd_kafka_conf_set(kafka_cfg, "bootstrap.servers", conf,
70
0
                errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
71
0
            flb_error("[flb_kafka] failed to configure brokers: %s", errstr);
72
0
            goto err;
73
0
        }
74
0
        kafka->brokers = flb_strdup(conf);
75
0
    }
76
0
    else {
77
0
        flb_error("config: no brokers defined");
78
0
        goto err;
79
0
    }
80
81
    /* Iterate custom rdkafka properties */
82
0
    mk_list_foreach(head, properties) {
83
0
        kv = mk_list_entry(head, struct flb_kv, _head);
84
0
        if (strncasecmp(kv->key, "rdkafka.", 8) == 0 &&
85
0
            flb_sds_len(kv->key) > 8) {
86
0
            if (rd_kafka_conf_set(kafka_cfg, kv->key + 8, kv->val,
87
0
                        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
88
0
                flb_error("[flb_kafka] cannot configure '%s' property with error: '%s'",
89
0
                          kv->key + 8, errstr);
90
0
            }
91
0
        }
92
0
    }
93
94
0
    return kafka_cfg;
95
96
0
err:
97
0
    if (kafka_cfg) {
98
0
        flb_free(kafka_cfg);
99
0
    }
100
0
    return NULL;
101
0
}
102
103
static int add_topic_partitions(rd_kafka_topic_partition_list_t *list,
104
                                const char *topic_str,
105
                                const char *partitions_str)
106
0
{
107
0
    int ret = -1;
108
0
    struct mk_list *split;
109
0
    char *str, *end;
110
0
    int start, stop;
111
0
    size_t len;
112
0
    split = flb_utils_split(partitions_str, '-', -1);
113
0
    if (!split) {
114
0
        flb_error("[flb_kafka] Failed to split partitions string");
115
0
        goto end;
116
0
    }
117
118
0
    len = mk_list_size(split);
119
0
    if (len == 1) {
120
0
        str = mk_list_entry(split->next, struct flb_split_entry, _head)->value;
121
0
        start = strtol(str, &end, 10);
122
0
        if (end == str || *end != '\0') {
123
0
            flb_error("[flb_kafka] invalid partition \"%s\"", str);
124
0
            goto end;
125
0
        }
126
        // single partition
127
0
        rd_kafka_topic_partition_list_add(list, topic_str, start);
128
0
    } else if (len == 2) {
129
0
        str = mk_list_entry(split->next, struct flb_split_entry, _head)->value;
130
0
        start = strtol(str, &end, 10);
131
0
        if (end == str || *end != '\0') {
132
0
            flb_error("[flb_kafka] invalid partition \"%s\"", str);
133
0
            goto end;
134
0
        }
135
0
        str = mk_list_entry(split->next->next, struct flb_split_entry, _head)->value;
136
0
        stop = strtol(str, &end, 10);
137
0
        if (end == str || *end != '\0') {
138
0
            flb_error("[flb_kafka] invalid partition \"%s\"", str);
139
0
            goto end;
140
0
        }
141
0
        rd_kafka_topic_partition_list_add_range(list, topic_str, start, stop);
142
0
    } else {
143
0
        flb_error("[flb_kafka] invalid partition range string \"%s\"", partitions_str);
144
0
        goto end;
145
0
    }
146
147
0
    ret = 0;
148
149
0
end:
150
0
    if (split) {
151
0
        flb_utils_split_free(split);
152
0
    }
153
0
    return ret;
154
0
}
155
156
rd_kafka_topic_partition_list_t *flb_kafka_parse_topics(const char *topics_str)
157
0
{
158
0
    rd_kafka_topic_partition_list_t *ret;
159
0
    struct mk_list *split = NULL;
160
0
    struct mk_list *partitions = NULL;
161
0
    struct mk_list *curr;
162
0
    struct flb_split_entry *entry;
163
0
    struct flb_split_entry *topic_entry;
164
0
    struct flb_split_entry *partitions_entry;
165
0
    size_t len;
166
167
0
    ret = rd_kafka_topic_partition_list_new(1);
168
0
    if (!ret) {
169
0
        flb_error("[flb_kafka] Failed to allocate topic list");
170
0
        goto err;
171
0
    }
172
173
0
    split = flb_utils_split(topics_str, ',', -1);
174
0
    if (!split) {
175
0
        flb_error("[flb_kafka] Failed to split topics string");
176
0
        goto err;
177
0
    }
178
179
0
    mk_list_foreach(curr, split) {
180
0
        entry = mk_list_entry(curr, struct flb_split_entry, _head);
181
0
        partitions = flb_utils_split(entry->value, ':', -1);
182
0
        if (!partitions) {
183
0
            flb_error("[flb_kafka] Failed to split topic string");
184
0
            goto err;
185
0
        }
186
0
        len = mk_list_size(partitions);
187
0
        if (len == 1) {
188
0
            rd_kafka_topic_partition_list_add(ret, entry->value, 0);
189
0
        } else if (len == 2) {
190
0
            topic_entry = mk_list_entry(
191
0
                    partitions->next, struct flb_split_entry, _head);
192
0
            partitions_entry = mk_list_entry(
193
0
                    partitions->next->next, struct flb_split_entry, _head);
194
0
            if (add_topic_partitions(ret, topic_entry->value, partitions_entry->value)) {
195
0
                goto err;
196
0
            }
197
0
        } else {
198
0
            flb_error("[flb_kafka] Failed to parse topic/partition string");
199
0
            goto err;
200
0
        }
201
0
        flb_utils_split_free(partitions);
202
0
    }
203
0
    flb_utils_split_free(split);
204
0
    return ret;
205
206
0
err:
207
0
    if (ret) {
208
0
        rd_kafka_topic_partition_list_destroy(ret);
209
0
    }
210
0
    if (split) {
211
0
        flb_utils_split_free(split);
212
0
    }
213
0
    if (partitions) {
214
0
        flb_utils_split_free(partitions);
215
0
    }
216
0
    return NULL;
217
0
}
218
219
struct flb_kafka_opaque *flb_kafka_opaque_create()
220
0
{
221
0
    struct flb_kafka_opaque *opaque;
222
223
0
    opaque = flb_calloc(1, sizeof(struct flb_kafka_opaque));
224
0
    if (!opaque) {
225
0
        flb_error("[flb_kafka] Failed to allocate opaque object");
226
0
        return NULL;
227
0
    }
228
229
0
    return opaque;
230
0
}
231
232
/* note: opaque_destroy only destroy the context, not it */
233
void flb_kafka_opaque_destroy(struct flb_kafka_opaque *opaque)
234
0
{
235
0
    if (!opaque) {
236
0
        return;
237
0
    }
238
239
0
    flb_free(opaque);
240
0
}
241
242
void flb_kafka_opaque_set(struct flb_kafka_opaque *opaque, void *ptr, void *msk_iam_ctx)
243
0
{
244
0
    if (!opaque) {
245
0
        return;
246
0
    }
247
248
    /* only set ptr and msk_iam_ctx if they come with non valid values */
249
0
    if (ptr) {
250
0
        opaque->ptr = ptr;
251
0
    }
252
253
0
    if (msk_iam_ctx) {
254
0
        opaque->msk_iam_ctx = msk_iam_ctx;
255
0
    }
256
0
}