/src/fluent-bit/src/flb_kafka.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2019-2021 The Fluent Bit Authors |
6 | | * Copyright (C) 2015-2018 Treasure Data Inc. |
7 | | * |
8 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
9 | | * you may not use this file except in compliance with the License. |
10 | | * You may obtain a copy of the License at |
11 | | * |
12 | | * http://www.apache.org/licenses/LICENSE-2.0 |
13 | | * |
14 | | * Unless required by applicable law or agreed to in writing, software |
15 | | * distributed under the License is distributed on an "AS IS" BASIS, |
16 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
17 | | * See the License for the specific language governing permissions and |
18 | | * limitations under the License. |
19 | | */ |
20 | | |
21 | | #include "fluent-bit/flb_config.h" |
22 | | #include "fluent-bit/flb_mem.h" |
23 | | #include "fluent-bit/flb_str.h" |
24 | | #include "fluent-bit/flb_utils.h" |
25 | | #include "monkey/mk_core/mk_list.h" |
26 | | #include <fluent-bit/flb_kafka.h> |
27 | | #include <fluent-bit/flb_kv.h> |
28 | | |
29 | | #include <rdkafka.h> |
30 | | |
31 | | rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka, |
32 | | struct mk_list *properties, |
33 | | int with_group_id) |
34 | 0 | { |
35 | 0 | struct mk_list *head; |
36 | 0 | struct flb_kv *kv; |
37 | 0 | const char *conf; |
38 | 0 | rd_kafka_conf_t *kafka_cfg; |
39 | 0 | char errstr[512]; |
40 | |
|
41 | 0 | kafka_cfg = rd_kafka_conf_new(); |
42 | 0 | if (!kafka_cfg) { |
43 | 0 | flb_error("[flb_kafka] Could not initialize kafka config object"); |
44 | 0 | goto err; |
45 | 0 | } |
46 | | |
47 | 0 | conf = flb_config_prop_get("client_id", properties); |
48 | 0 | if (!conf) { |
49 | 0 | conf = "fluent-bit"; |
50 | 0 | } |
51 | 0 | if (rd_kafka_conf_set(kafka_cfg, "client.id", conf, |
52 | 0 | errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { |
53 | 0 | flb_error("[flb_kafka] cannot configure client id: %s", errstr); |
54 | 0 | } |
55 | |
|
56 | 0 | if (with_group_id) { |
57 | 0 | conf = flb_config_prop_get("group_id", properties); |
58 | 0 | if (!conf) { |
59 | 0 | conf = "fluent-bit"; |
60 | 0 | } |
61 | 0 | if (rd_kafka_conf_set(kafka_cfg, "group.id", conf, |
62 | 0 | errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { |
63 | 0 | flb_error("[flb_kafka] cannot configure group id: %s", errstr); |
64 | 0 | } |
65 | 0 | } |
66 | |
|
67 | 0 | conf = flb_config_prop_get("brokers", properties); |
68 | 0 | if (conf) { |
69 | 0 | if (rd_kafka_conf_set(kafka_cfg, "bootstrap.servers", conf, |
70 | 0 | errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { |
71 | 0 | flb_error("[flb_kafka] failed to configure brokers: %s", errstr); |
72 | 0 | goto err; |
73 | 0 | } |
74 | 0 | kafka->brokers = flb_strdup(conf); |
75 | 0 | } |
76 | 0 | else { |
77 | 0 | flb_error("config: no brokers defined"); |
78 | 0 | goto err; |
79 | 0 | } |
80 | | |
81 | | /* Iterate custom rdkafka properties */ |
82 | 0 | mk_list_foreach(head, properties) { |
83 | 0 | kv = mk_list_entry(head, struct flb_kv, _head); |
84 | 0 | if (strncasecmp(kv->key, "rdkafka.", 8) == 0 && |
85 | 0 | flb_sds_len(kv->key) > 8) { |
86 | 0 | if (rd_kafka_conf_set(kafka_cfg, kv->key + 8, kv->val, |
87 | 0 | errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { |
88 | 0 | flb_error("[flb_kafka] cannot configure '%s' property with error: '%s'", |
89 | 0 | kv->key + 8, errstr); |
90 | 0 | } |
91 | 0 | } |
92 | 0 | } |
93 | |
|
94 | 0 | return kafka_cfg; |
95 | | |
96 | 0 | err: |
97 | 0 | if (kafka_cfg) { |
98 | 0 | flb_free(kafka_cfg); |
99 | 0 | } |
100 | 0 | return NULL; |
101 | 0 | } |
102 | | |
103 | | static int add_topic_partitions(rd_kafka_topic_partition_list_t *list, |
104 | | const char *topic_str, |
105 | | const char *partitions_str) |
106 | 0 | { |
107 | 0 | int ret = -1; |
108 | 0 | struct mk_list *split; |
109 | 0 | char *str, *end; |
110 | 0 | int start, stop; |
111 | 0 | size_t len; |
112 | 0 | split = flb_utils_split(partitions_str, '-', -1); |
113 | 0 | if (!split) { |
114 | 0 | flb_error("[flb_kafka] Failed to split partitions string"); |
115 | 0 | goto end; |
116 | 0 | } |
117 | | |
118 | 0 | len = mk_list_size(split); |
119 | 0 | if (len == 1) { |
120 | 0 | str = mk_list_entry(split->next, struct flb_split_entry, _head)->value; |
121 | 0 | start = strtol(str, &end, 10); |
122 | 0 | if (end == str || *end != '\0') { |
123 | 0 | flb_error("[flb_kafka] invalid partition \"%s\"", str); |
124 | 0 | goto end; |
125 | 0 | } |
126 | | // single partition |
127 | 0 | rd_kafka_topic_partition_list_add(list, topic_str, start); |
128 | 0 | } else if (len == 2) { |
129 | 0 | str = mk_list_entry(split->next, struct flb_split_entry, _head)->value; |
130 | 0 | start = strtol(str, &end, 10); |
131 | 0 | if (end == str || *end != '\0') { |
132 | 0 | flb_error("[flb_kafka] invalid partition \"%s\"", str); |
133 | 0 | goto end; |
134 | 0 | } |
135 | 0 | str = mk_list_entry(split->next->next, struct flb_split_entry, _head)->value; |
136 | 0 | stop = strtol(str, &end, 10); |
137 | 0 | if (end == str || *end != '\0') { |
138 | 0 | flb_error("[flb_kafka] invalid partition \"%s\"", str); |
139 | 0 | goto end; |
140 | 0 | } |
141 | 0 | rd_kafka_topic_partition_list_add_range(list, topic_str, start, stop); |
142 | 0 | } else { |
143 | 0 | flb_error("[flb_kafka] invalid partition range string \"%s\"", partitions_str); |
144 | 0 | goto end; |
145 | 0 | } |
146 | | |
147 | 0 | ret = 0; |
148 | |
|
149 | 0 | end: |
150 | 0 | if (split) { |
151 | 0 | flb_utils_split_free(split); |
152 | 0 | } |
153 | 0 | return ret; |
154 | 0 | } |
155 | | |
156 | | rd_kafka_topic_partition_list_t *flb_kafka_parse_topics(const char *topics_str) |
157 | 0 | { |
158 | 0 | rd_kafka_topic_partition_list_t *ret; |
159 | 0 | struct mk_list *split = NULL; |
160 | 0 | struct mk_list *partitions = NULL; |
161 | 0 | struct mk_list *curr; |
162 | 0 | struct flb_split_entry *entry; |
163 | 0 | struct flb_split_entry *topic_entry; |
164 | 0 | struct flb_split_entry *partitions_entry; |
165 | 0 | size_t len; |
166 | |
|
167 | 0 | ret = rd_kafka_topic_partition_list_new(1); |
168 | 0 | if (!ret) { |
169 | 0 | flb_error("[flb_kafka] Failed to allocate topic list"); |
170 | 0 | goto err; |
171 | 0 | } |
172 | | |
173 | 0 | split = flb_utils_split(topics_str, ',', -1); |
174 | 0 | if (!split) { |
175 | 0 | flb_error("[flb_kafka] Failed to split topics string"); |
176 | 0 | goto err; |
177 | 0 | } |
178 | | |
179 | 0 | mk_list_foreach(curr, split) { |
180 | 0 | entry = mk_list_entry(curr, struct flb_split_entry, _head); |
181 | 0 | partitions = flb_utils_split(entry->value, ':', -1); |
182 | 0 | if (!partitions) { |
183 | 0 | flb_error("[flb_kafka] Failed to split topic string"); |
184 | 0 | goto err; |
185 | 0 | } |
186 | 0 | len = mk_list_size(partitions); |
187 | 0 | if (len == 1) { |
188 | 0 | rd_kafka_topic_partition_list_add(ret, entry->value, 0); |
189 | 0 | } else if (len == 2) { |
190 | 0 | topic_entry = mk_list_entry( |
191 | 0 | partitions->next, struct flb_split_entry, _head); |
192 | 0 | partitions_entry = mk_list_entry( |
193 | 0 | partitions->next->next, struct flb_split_entry, _head); |
194 | 0 | if (add_topic_partitions(ret, topic_entry->value, partitions_entry->value)) { |
195 | 0 | goto err; |
196 | 0 | } |
197 | 0 | } else { |
198 | 0 | flb_error("[flb_kafka] Failed to parse topic/partition string"); |
199 | 0 | goto err; |
200 | 0 | } |
201 | 0 | flb_utils_split_free(partitions); |
202 | 0 | } |
203 | 0 | flb_utils_split_free(split); |
204 | 0 | return ret; |
205 | | |
206 | 0 | err: |
207 | 0 | if (ret) { |
208 | 0 | rd_kafka_topic_partition_list_destroy(ret); |
209 | 0 | } |
210 | 0 | if (split) { |
211 | 0 | flb_utils_split_free(split); |
212 | 0 | } |
213 | 0 | if (partitions) { |
214 | 0 | flb_utils_split_free(partitions); |
215 | 0 | } |
216 | 0 | return NULL; |
217 | 0 | } |
218 | | |
219 | | struct flb_kafka_opaque *flb_kafka_opaque_create() |
220 | 0 | { |
221 | 0 | struct flb_kafka_opaque *opaque; |
222 | |
|
223 | 0 | opaque = flb_calloc(1, sizeof(struct flb_kafka_opaque)); |
224 | 0 | if (!opaque) { |
225 | 0 | flb_error("[flb_kafka] Failed to allocate opaque object"); |
226 | 0 | return NULL; |
227 | 0 | } |
228 | | |
229 | 0 | return opaque; |
230 | 0 | } |
231 | | |
232 | | /* note: opaque_destroy only destroy the context, not it */ |
233 | | void flb_kafka_opaque_destroy(struct flb_kafka_opaque *opaque) |
234 | 0 | { |
235 | 0 | if (!opaque) { |
236 | 0 | return; |
237 | 0 | } |
238 | | |
239 | 0 | flb_free(opaque); |
240 | 0 | } |
241 | | |
242 | | void flb_kafka_opaque_set(struct flb_kafka_opaque *opaque, void *ptr, void *msk_iam_ctx) |
243 | 0 | { |
244 | 0 | if (!opaque) { |
245 | 0 | return; |
246 | 0 | } |
247 | | |
248 | | /* only set ptr and msk_iam_ctx if they come with non valid values */ |
249 | 0 | if (ptr) { |
250 | 0 | opaque->ptr = ptr; |
251 | 0 | } |
252 | |
|
253 | 0 | if (msk_iam_ctx) { |
254 | 0 | opaque->msk_iam_ctx = msk_iam_ctx; |
255 | 0 | } |
256 | 0 | } |