/src/fluent-bit/src/flb_storage.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_info.h> |
21 | | #include <fluent-bit/flb_input.h> |
22 | | #include <fluent-bit/flb_log.h> |
23 | | #include <fluent-bit/flb_storage.h> |
24 | | #include <fluent-bit/flb_scheduler.h> |
25 | | #include <fluent-bit/flb_utils.h> |
26 | | #include <fluent-bit/flb_http_server.h> |
27 | | |
28 | | static struct cmt *metrics_context_create(struct flb_storage_metrics *sm) |
29 | 2.76k | { |
30 | 2.76k | struct cmt *cmt; |
31 | | |
32 | 2.76k | cmt = cmt_create(); |
33 | 2.76k | if (!cmt) { |
34 | 0 | return NULL; |
35 | 0 | } |
36 | | |
37 | 2.76k | sm->cmt_chunks = cmt_gauge_create(cmt, |
38 | 2.76k | "fluentbit", "storage", "chunks", |
39 | 2.76k | "Total number of chunks in the storage layer.", |
40 | 2.76k | 0, (char *[]) { NULL }); |
41 | | |
42 | 2.76k | sm->cmt_mem_chunks = cmt_gauge_create(cmt, |
43 | 2.76k | "fluentbit", "storage", "mem_chunks", |
44 | 2.76k | "Total number of memory chunks.", |
45 | 2.76k | 0, (char *[]) { NULL }); |
46 | | |
47 | 2.76k | sm->cmt_fs_chunks = cmt_gauge_create(cmt, |
48 | 2.76k | "fluentbit", "storage", "fs_chunks", |
49 | 2.76k | "Total number of filesystem chunks.", |
50 | 2.76k | 0, (char *[]) { NULL }); |
51 | | |
52 | 2.76k | sm->cmt_fs_chunks_up = cmt_gauge_create(cmt, |
53 | 2.76k | "fluentbit", "storage", "fs_chunks_up", |
54 | 2.76k | "Total number of filesystem chunks up in memory.", |
55 | 2.76k | 0, (char *[]) { NULL }); |
56 | | |
57 | 2.76k | sm->cmt_fs_chunks_down = cmt_gauge_create(cmt, |
58 | 2.76k | "fluentbit", "storage", "fs_chunks_down", |
59 | 2.76k | "Total number of filesystem chunks down.", |
60 | 2.76k | 0, (char *[]) { NULL }); |
61 | | |
62 | 2.76k | return cmt; |
63 | 2.76k | } |
64 | | |
65 | | |
66 | | /* This function collect the 'global' metrics of the storage layer (cmetrics) */ |
67 | | int flb_storage_metrics_update(struct flb_config *ctx, struct flb_storage_metrics *sm) |
68 | 0 | { |
69 | 0 | uint64_t ts; |
70 | 0 | struct cio_stats st; |
71 | | |
72 | | /* Retrieve general stats from the storage layer */ |
73 | 0 | cio_stats_get(ctx->cio, &st); |
74 | |
|
75 | 0 | ts = cfl_time_now(); |
76 | |
|
77 | 0 | cmt_gauge_set(sm->cmt_chunks, ts, st.chunks_total, 0, NULL); |
78 | 0 | cmt_gauge_set(sm->cmt_mem_chunks, ts, st.chunks_mem, 0, NULL); |
79 | 0 | cmt_gauge_set(sm->cmt_fs_chunks, ts, st.chunks_fs, 0, NULL); |
80 | 0 | cmt_gauge_set(sm->cmt_fs_chunks_up, ts, st.chunks_fs_up, 0, NULL); |
81 | 0 | cmt_gauge_set(sm->cmt_fs_chunks_down, ts, st.chunks_fs_down, 0, NULL); |
82 | |
|
83 | 0 | return 0; |
84 | 0 | } |
85 | | |
86 | | static void metrics_append_general(msgpack_packer *mp_pck, |
87 | | struct flb_config *ctx, |
88 | | struct flb_storage_metrics *sm) |
89 | 3 | { |
90 | 3 | struct cio_stats storage_st; |
91 | | |
92 | | /* Retrieve general stats from the storage layer */ |
93 | 3 | cio_stats_get(ctx->cio, &storage_st); |
94 | | |
95 | 3 | msgpack_pack_str(mp_pck, 13); |
96 | 3 | msgpack_pack_str_body(mp_pck, "storage_layer", 13); |
97 | 3 | msgpack_pack_map(mp_pck, 1); |
98 | | |
99 | | /* Chunks */ |
100 | 3 | msgpack_pack_str(mp_pck, 6); |
101 | 3 | msgpack_pack_str_body(mp_pck, "chunks", 6); |
102 | 3 | msgpack_pack_map(mp_pck, 5); |
103 | | |
104 | | /* chunks['total_chunks'] */ |
105 | 3 | msgpack_pack_str(mp_pck, 12); |
106 | 3 | msgpack_pack_str_body(mp_pck, "total_chunks", 12); |
107 | 3 | msgpack_pack_uint64(mp_pck, storage_st.chunks_total); |
108 | | |
109 | | /* chunks['mem_chunks'] */ |
110 | 3 | msgpack_pack_str(mp_pck, 10); |
111 | 3 | msgpack_pack_str_body(mp_pck, "mem_chunks", 10); |
112 | 3 | msgpack_pack_uint64(mp_pck, storage_st.chunks_mem); |
113 | | |
114 | | /* chunks['fs_chunks'] */ |
115 | 3 | msgpack_pack_str(mp_pck, 9); |
116 | 3 | msgpack_pack_str_body(mp_pck, "fs_chunks", 9); |
117 | 3 | msgpack_pack_uint64(mp_pck, storage_st.chunks_fs); |
118 | | |
119 | | /* chunks['fs_up_chunks'] */ |
120 | 3 | msgpack_pack_str(mp_pck, 12); |
121 | 3 | msgpack_pack_str_body(mp_pck, "fs_chunks_up", 12); |
122 | 3 | msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_up); |
123 | | |
124 | | /* chunks['fs_down_chunks'] */ |
125 | 3 | msgpack_pack_str(mp_pck, 14); |
126 | 3 | msgpack_pack_str_body(mp_pck, "fs_chunks_down", 14); |
127 | 3 | msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_down); |
128 | 3 | } |
129 | | |
130 | | static void metrics_append_input(msgpack_packer *mp_pck, |
131 | | struct flb_config *ctx, |
132 | | struct flb_storage_metrics *sm) |
133 | 3 | { |
134 | 3 | int len; |
135 | 3 | int ret; |
136 | 3 | uint64_t ts; |
137 | 3 | const char *tmp; |
138 | 3 | char buf[32]; |
139 | 3 | ssize_t size; |
140 | 3 | size_t total_chunks; |
141 | | |
142 | | /* chunks */ |
143 | 3 | int up; |
144 | 3 | int down; |
145 | 3 | int busy; |
146 | 3 | char *name; |
147 | 3 | ssize_t busy_size; |
148 | 3 | struct mk_list *head; |
149 | 3 | struct mk_list *h_chunks; |
150 | 3 | struct flb_input_instance *i; |
151 | 3 | struct flb_input_chunk *ic; |
152 | | |
153 | | /* |
154 | | * DISCLAIMER: This interface will be deprecated once we extend Chunk I/O |
155 | | * stats per stream. |
156 | | * |
157 | | * For now and to avoid duplication of iterating chunks we are adding the |
158 | | * metrics counting for CMetrics inside the same logic for the old code. |
159 | | */ |
160 | | |
161 | 3 | msgpack_pack_str(mp_pck, 12); |
162 | 3 | msgpack_pack_str_body(mp_pck, "input_chunks", 12); |
163 | 3 | msgpack_pack_map(mp_pck, mk_list_size(&ctx->inputs)); |
164 | | |
165 | | /* current time */ |
166 | 3 | ts = cfl_time_now(); |
167 | | |
168 | | /* Input Plugins Ingestion */ |
169 | 3 | mk_list_foreach(head, &ctx->inputs) { |
170 | 3 | i = mk_list_entry(head, struct flb_input_instance, _head); |
171 | | |
172 | 3 | name = (char *) flb_input_name(i); |
173 | 3 | total_chunks = mk_list_size(&i->chunks); |
174 | | |
175 | 3 | tmp = flb_input_name(i); |
176 | 3 | len = strlen(tmp); |
177 | | |
178 | 3 | msgpack_pack_str(mp_pck, len); |
179 | 3 | msgpack_pack_str_body(mp_pck, tmp, len); |
180 | | |
181 | | /* Map for 'status' and 'chunks' */ |
182 | 3 | msgpack_pack_map(mp_pck, 2); |
183 | | |
184 | | /* |
185 | | * Status |
186 | | * ====== |
187 | | */ |
188 | 3 | msgpack_pack_str(mp_pck, 6); |
189 | 3 | msgpack_pack_str_body(mp_pck, "status", 6); |
190 | | |
191 | | /* 'status' map has 2 keys: overlimit and chunks */ |
192 | 3 | msgpack_pack_map(mp_pck, 3); |
193 | | |
194 | | /* status['overlimit'] */ |
195 | 3 | msgpack_pack_str(mp_pck, 9); |
196 | 3 | msgpack_pack_str_body(mp_pck, "overlimit", 9); |
197 | | |
198 | | |
199 | | /* CMetrics */ |
200 | 3 | ret = FLB_FALSE; |
201 | 3 | if (i->mem_buf_limit > 0) { |
202 | 0 | if (i->mem_chunks_size >= i->mem_buf_limit) { |
203 | 0 | ret = FLB_TRUE; |
204 | 0 | } |
205 | 0 | } |
206 | 3 | if (ret == FLB_TRUE) { |
207 | | /* cmetrics */ |
208 | 0 | cmt_gauge_set(i->cmt_storage_overlimit, ts, 1, |
209 | 0 | 1, (char *[]) {name}); |
210 | | |
211 | | /* old code */ |
212 | 0 | msgpack_pack_true(mp_pck); |
213 | 0 | } |
214 | 3 | else { |
215 | | /* cmetrics */ |
216 | 3 | cmt_gauge_set(i->cmt_storage_overlimit, ts, 0, |
217 | 3 | 1, (char *[]) {name}); |
218 | | |
219 | | /* old code */ |
220 | 3 | msgpack_pack_false(mp_pck); |
221 | 3 | } |
222 | | |
223 | | /* fluentbit_storage_memory_bytes */ |
224 | 3 | cmt_gauge_set(i->cmt_storage_memory_bytes, ts, i->mem_chunks_size, |
225 | 3 | 1, (char *[]) {name}); |
226 | | |
227 | | /* status['mem_size'] */ |
228 | 3 | msgpack_pack_str(mp_pck, 8); |
229 | 3 | msgpack_pack_str_body(mp_pck, "mem_size", 8); |
230 | | |
231 | | /* Current memory size used based on last ingestion */ |
232 | 3 | flb_utils_bytes_to_human_readable_size(i->mem_chunks_size, |
233 | 3 | buf, sizeof(buf) - 1); |
234 | 3 | len = strlen(buf); |
235 | 3 | msgpack_pack_str(mp_pck, len); |
236 | 3 | msgpack_pack_str_body(mp_pck, buf, len); |
237 | | |
238 | | /* status['mem_limit'] */ |
239 | 3 | msgpack_pack_str(mp_pck, 9); |
240 | 3 | msgpack_pack_str_body(mp_pck, "mem_limit", 9); |
241 | | |
242 | 3 | flb_utils_bytes_to_human_readable_size(i->mem_buf_limit, |
243 | 3 | buf, sizeof(buf) - 1); |
244 | 3 | len = strlen(buf); |
245 | 3 | msgpack_pack_str(mp_pck, len); |
246 | 3 | msgpack_pack_str_body(mp_pck, buf, len); |
247 | | |
248 | | /* |
249 | | * Chunks |
250 | | * ====== |
251 | | */ |
252 | | |
253 | | /* cmetrics */ |
254 | 3 | cmt_gauge_set(i->cmt_storage_chunks, ts, total_chunks, |
255 | 3 | 1, (char *[]) {name}); |
256 | | |
257 | | |
258 | | /* old code */ |
259 | 3 | msgpack_pack_str(mp_pck, 6); |
260 | 3 | msgpack_pack_str_body(mp_pck, "chunks", 6); |
261 | | |
262 | | /* 'chunks' has 3 keys: total, up, down, busy and busy_size */ |
263 | 3 | msgpack_pack_map(mp_pck, 5); |
264 | | |
265 | | /* chunks['total_chunks'] */ |
266 | 3 | msgpack_pack_str(mp_pck, 5); |
267 | 3 | msgpack_pack_str_body(mp_pck, "total", 5); |
268 | 3 | msgpack_pack_uint64(mp_pck, total_chunks); |
269 | | |
270 | | /* |
271 | | * chunks Details: chunks marked as 'busy' are 'locked' since they are in |
272 | | * a 'flush' state. No more data can be appended to a busy chunk. |
273 | | */ |
274 | 3 | busy = 0; |
275 | 3 | busy_size = 0; |
276 | | |
277 | | /* up/down */ |
278 | 3 | up = 0; |
279 | 3 | down = 0; |
280 | | |
281 | | /* Iterate chunks for the input instance in question */ |
282 | 3 | mk_list_foreach(h_chunks, &i->chunks) { |
283 | 0 | ic = mk_list_entry(h_chunks, struct flb_input_chunk, _head); |
284 | 0 | if (ic->busy == FLB_TRUE) { |
285 | 0 | busy++; |
286 | 0 | size = cio_chunk_get_content_size(ic->chunk); |
287 | 0 | if (size >= 0) { |
288 | 0 | busy_size += size; |
289 | 0 | } |
290 | 0 | } |
291 | |
|
292 | 0 | if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) { |
293 | 0 | up++; |
294 | 0 | } |
295 | 0 | else { |
296 | 0 | down++; |
297 | 0 | } |
298 | |
|
299 | 0 | } |
300 | | |
301 | | /* fluentbit_storage_chunks_up */ |
302 | 3 | cmt_gauge_set(i->cmt_storage_chunks_up, ts, up, |
303 | 3 | 1, (char *[]) {name}); |
304 | | |
305 | | /* chunks['up'] */ |
306 | 3 | msgpack_pack_str(mp_pck, 2); |
307 | 3 | msgpack_pack_str_body(mp_pck, "up", 2); |
308 | 3 | msgpack_pack_uint64(mp_pck, up); |
309 | | |
310 | | /* fluentbit_storage_chunks_down */ |
311 | 3 | cmt_gauge_set(i->cmt_storage_chunks_down, ts, down, |
312 | 3 | 1, (char *[]) {name}); |
313 | | |
314 | | /* chunks['down'] */ |
315 | 3 | msgpack_pack_str(mp_pck, 4); |
316 | 3 | msgpack_pack_str_body(mp_pck, "down", 4); |
317 | 3 | msgpack_pack_uint64(mp_pck, down); |
318 | | |
319 | | /* fluentbit_storage_chunks_busy */ |
320 | 3 | cmt_gauge_set(i->cmt_storage_chunks_busy, ts, busy, |
321 | 3 | 1, (char *[]) {name}); |
322 | | |
323 | | /* chunks['busy'] */ |
324 | 3 | msgpack_pack_str(mp_pck, 4); |
325 | 3 | msgpack_pack_str_body(mp_pck, "busy", 4); |
326 | 3 | msgpack_pack_uint64(mp_pck, busy); |
327 | | |
328 | | /* fluentbit_storage_chunks_busy_size */ |
329 | 3 | cmt_gauge_set(i->cmt_storage_chunks_busy_bytes, ts, busy_size, |
330 | 3 | 1, (char *[]) {name}); |
331 | | |
332 | | /* chunks['busy_size'] */ |
333 | 3 | msgpack_pack_str(mp_pck, 9); |
334 | 3 | msgpack_pack_str_body(mp_pck, "busy_size", 9); |
335 | | |
336 | 3 | flb_utils_bytes_to_human_readable_size(busy_size, buf, sizeof(buf) - 1); |
337 | 3 | len = strlen(buf); |
338 | 3 | msgpack_pack_str(mp_pck, len); |
339 | 3 | msgpack_pack_str_body(mp_pck, buf, len); |
340 | 3 | } |
341 | 3 | } |
342 | | |
343 | | static void cb_storage_metrics_collect(struct flb_config *ctx, void *data) |
344 | 3 | { |
345 | 3 | msgpack_sbuffer mp_sbuf; |
346 | 3 | msgpack_packer mp_pck; |
347 | | |
348 | | /* Prepare new outgoing buffer */ |
349 | 3 | msgpack_sbuffer_init(&mp_sbuf); |
350 | 3 | msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); |
351 | | |
352 | | /* Pack main map and append relevant data */ |
353 | 3 | msgpack_pack_map(&mp_pck, 2); |
354 | 3 | metrics_append_general(&mp_pck, ctx, data); |
355 | 3 | metrics_append_input(&mp_pck, ctx, data); |
356 | | |
357 | 3 | #ifdef FLB_HAVE_HTTP_SERVER |
358 | 3 | if (ctx->http_server == FLB_TRUE && ctx->storage_metrics == FLB_TRUE) { |
359 | 0 | flb_hs_push_storage_metrics(ctx->http_ctx, mp_sbuf.data, mp_sbuf.size); |
360 | 0 | } |
361 | 3 | #endif |
362 | 3 | msgpack_sbuffer_destroy(&mp_sbuf); |
363 | 3 | } |
364 | | |
365 | | struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx) |
366 | 2.76k | { |
367 | 2.76k | int ret; |
368 | 2.76k | struct flb_storage_metrics *sm; |
369 | | |
370 | 2.76k | sm = flb_calloc(1, sizeof(struct flb_storage_metrics)); |
371 | 2.76k | if (!sm) { |
372 | 0 | flb_errno(); |
373 | 0 | return NULL; |
374 | 0 | } |
375 | 2.76k | sm->cmt = metrics_context_create(sm); |
376 | 2.76k | if(!sm->cmt) { |
377 | 0 | flb_free(sm); |
378 | 0 | return NULL; |
379 | 0 | } |
380 | | |
381 | 2.76k | ret = flb_sched_timer_cb_create(ctx->sched, FLB_SCHED_TIMER_CB_PERM, 5000, |
382 | 2.76k | cb_storage_metrics_collect, |
383 | 2.76k | ctx->storage_metrics_ctx, NULL); |
384 | 2.76k | if (ret == -1) { |
385 | 0 | flb_error("[storage metrics] cannot create timer to collect metrics"); |
386 | 0 | flb_free(sm); |
387 | 0 | return NULL; |
388 | 0 | } |
389 | | |
390 | 2.76k | return sm; |
391 | 2.76k | } |
392 | | |
393 | | static int sort_chunk_cmp(const void *a_arg, const void *b_arg) |
394 | 0 | { |
395 | 0 | char *p; |
396 | 0 | struct cio_chunk *chunk_a = *(struct cio_chunk **) a_arg; |
397 | 0 | struct cio_chunk *chunk_b = *(struct cio_chunk **) b_arg; |
398 | 0 | struct timespec tm_a; |
399 | 0 | struct timespec tm_b; |
400 | | |
401 | | /* Scan Chunk A */ |
402 | 0 | p = strchr(chunk_a->name, '-'); |
403 | 0 | if (!p) { |
404 | 0 | return -1; |
405 | 0 | } |
406 | 0 | p++; |
407 | |
|
408 | 0 | sscanf(p, "%lu.%lu.flb", &tm_a.tv_sec, &tm_a.tv_nsec); |
409 | | |
410 | | /* Scan Chunk B */ |
411 | 0 | p = strchr(chunk_b->name, '-'); |
412 | 0 | if (!p) { |
413 | 0 | return -1; |
414 | 0 | } |
415 | 0 | p++; |
416 | 0 | sscanf(p, "%lu.%lu.flb", &tm_b.tv_sec, &tm_b.tv_nsec); |
417 | | |
418 | | /* Compare */ |
419 | 0 | if (tm_a.tv_sec != tm_b.tv_sec) { |
420 | 0 | if (tm_a.tv_sec > tm_b.tv_sec) { |
421 | 0 | return 1; |
422 | 0 | } |
423 | 0 | else { |
424 | 0 | return -1; |
425 | 0 | } |
426 | 0 | } |
427 | 0 | else { |
428 | 0 | if (tm_a.tv_nsec > tm_b.tv_nsec) { |
429 | 0 | return 1; |
430 | 0 | } |
431 | 0 | else if (tm_a.tv_nsec < tm_b.tv_nsec) { |
432 | 0 | return -1; |
433 | 0 | } |
434 | 0 | } |
435 | | |
436 | 0 | return 0; |
437 | 0 | } |
438 | | |
439 | | static void print_storage_info(struct flb_config *ctx, struct cio_ctx *cio) |
440 | 2.76k | { |
441 | 2.76k | char *type; |
442 | 2.76k | char *sync; |
443 | 2.76k | char *checksum; |
444 | 2.76k | struct flb_input_instance *in; |
445 | | |
446 | 2.76k | if (cio->options.root_path) { |
447 | 1.01k | type = "memory+filesystem"; |
448 | 1.01k | } |
449 | 1.75k | else { |
450 | 1.75k | type = "memory"; |
451 | 1.75k | } |
452 | | |
453 | 2.76k | if (cio->options.flags & CIO_FULL_SYNC) { |
454 | 0 | sync = "full"; |
455 | 0 | } |
456 | 2.76k | else { |
457 | 2.76k | sync = "normal"; |
458 | 2.76k | } |
459 | | |
460 | 2.76k | if (cio->options.flags & CIO_CHECKSUM) { |
461 | 0 | checksum = "on"; |
462 | 0 | } |
463 | 2.76k | else { |
464 | 2.76k | checksum = "off"; |
465 | 2.76k | } |
466 | | |
467 | 2.76k | flb_info("[storage] ver=%s, type=%s, sync=%s, checksum=%s, max_chunks_up=%i", |
468 | 2.76k | cio_version(), type, sync, checksum, ctx->storage_max_chunks_up); |
469 | | |
470 | | /* Storage input plugin */ |
471 | 2.76k | if (ctx->storage_input_plugin) { |
472 | 1.01k | in = (struct flb_input_instance *) ctx->storage_input_plugin; |
473 | 1.01k | flb_info("[storage] backlog input plugin: %s", in->name); |
474 | 1.01k | } |
475 | 2.76k | } |
476 | | |
477 | | static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line, |
478 | | char *str) |
479 | 1 | { |
480 | 1 | if (level == CIO_LOG_ERROR) { |
481 | 0 | flb_error("[storage] %s", str); |
482 | 0 | } |
483 | 1 | else if (level == CIO_LOG_WARN) { |
484 | 0 | flb_warn("[storage] %s", str); |
485 | 0 | } |
486 | 1 | else if (level == CIO_LOG_INFO) { |
487 | 1 | flb_info("[storage] %s", str); |
488 | 1 | } |
489 | 0 | else if (level == CIO_LOG_DEBUG) { |
490 | 0 | flb_debug("[storage] %s", str); |
491 | 0 | } |
492 | | |
493 | 1 | return 0; |
494 | 1 | } |
495 | | |
496 | | int flb_storage_input_create(struct cio_ctx *cio, |
497 | | struct flb_input_instance *in) |
498 | 3.78k | { |
499 | 3.78k | int cio_storage_type; |
500 | 3.78k | struct flb_storage_input *si; |
501 | 3.78k | struct cio_stream *stream; |
502 | | |
503 | | /* storage config: get stream type */ |
504 | 3.78k | if (in->storage_type == -1) { |
505 | 2.76k | in->storage_type = FLB_STORAGE_MEM; |
506 | 2.76k | } |
507 | | |
508 | 3.78k | if (in->storage_type == FLB_STORAGE_FS && cio->options.root_path == NULL) { |
509 | 0 | flb_error("[storage] instance '%s' requested filesystem storage " |
510 | 0 | "but no filesystem path was defined.", |
511 | 0 | flb_input_name(in)); |
512 | 0 | return -1; |
513 | 0 | } |
514 | | |
515 | | /* |
516 | | * The input instance can define it owns storage type which is based on some |
517 | | * specific Chunk I/O storage type. We handle the proper initialization here. |
518 | | */ |
519 | 3.78k | cio_storage_type = in->storage_type; |
520 | 3.78k | if (in->storage_type == FLB_STORAGE_MEMRB) { |
521 | 0 | cio_storage_type = FLB_STORAGE_MEM; |
522 | 0 | } |
523 | | |
524 | | /* Check for duplicates */ |
525 | 3.78k | stream = cio_stream_get(cio, in->name); |
526 | 3.78k | if (!stream) { |
527 | | /* create stream for input instance */ |
528 | 2.76k | stream = cio_stream_create(cio, in->name, cio_storage_type); |
529 | 2.76k | if (!stream) { |
530 | 0 | flb_error("[storage] cannot create stream for instance %s", |
531 | 0 | in->name); |
532 | 0 | return -1; |
533 | 0 | } |
534 | 2.76k | } |
535 | 1.01k | else if (stream->type != cio_storage_type) { |
536 | 0 | flb_debug("[storage] storage type mismatch. input type=%s", |
537 | 0 | flb_storage_get_type(in->storage_type)); |
538 | 0 | if (stream->type == FLB_STORAGE_FS) { |
539 | 0 | flb_warn("[storage] Need to remove '%s/%s' if it is empty", cio->options.root_path, in->name); |
540 | 0 | } |
541 | |
|
542 | 0 | cio_stream_destroy(stream); |
543 | 0 | stream = cio_stream_create(cio, in->name, cio_storage_type); |
544 | 0 | if (!stream) { |
545 | 0 | flb_error("[storage] cannot create stream for instance %s", |
546 | 0 | in->name); |
547 | 0 | return -1; |
548 | 0 | } |
549 | 0 | flb_info("[storage] re-create stream type=%s", flb_storage_get_type(in->storage_type)); |
550 | 0 | } |
551 | | |
552 | | /* allocate storage context for the input instance */ |
553 | 3.78k | si = flb_malloc(sizeof(struct flb_storage_input)); |
554 | 3.78k | if (!si) { |
555 | 0 | flb_errno(); |
556 | 0 | return -1; |
557 | 0 | } |
558 | | |
559 | 3.78k | si->stream = stream; |
560 | 3.78k | si->cio = cio; |
561 | 3.78k | si->type = in->storage_type; |
562 | 3.78k | in->storage = si; |
563 | | |
564 | 3.78k | return 0; |
565 | 3.78k | } |
566 | | |
567 | | void flb_storage_input_destroy(struct flb_input_instance *in) |
568 | 7.56k | { |
569 | 7.56k | struct mk_list *tmp; |
570 | 7.56k | struct mk_list *head; |
571 | 7.56k | struct flb_input_chunk *ic; |
572 | | |
573 | | /* Save current temporary data and destroy chunk references */ |
574 | 7.56k | mk_list_foreach_safe(head, tmp, &in->chunks) { |
575 | 0 | ic = mk_list_entry(head, struct flb_input_chunk, _head); |
576 | 0 | flb_input_chunk_destroy(ic, FLB_FALSE); |
577 | 0 | } |
578 | | |
579 | 7.56k | flb_free(in->storage); |
580 | 7.56k | in->storage = NULL; |
581 | 7.56k | } |
582 | | |
583 | | static int storage_contexts_create(struct flb_config *config) |
584 | 2.76k | { |
585 | 2.76k | int c = 0; |
586 | 2.76k | int ret; |
587 | 2.76k | struct mk_list *head; |
588 | 2.76k | struct flb_input_instance *in; |
589 | | |
590 | | /* Iterate each input instance and create a stream for it */ |
591 | 3.78k | mk_list_foreach(head, &config->inputs) { |
592 | 3.78k | in = mk_list_entry(head, struct flb_input_instance, _head); |
593 | 3.78k | ret = flb_storage_input_create(config->cio, in); |
594 | 3.78k | if (ret == -1) { |
595 | 0 | flb_error("[storage] could not create storage for instance: %s", |
596 | 0 | in->name); |
597 | 0 | return -1; |
598 | 0 | } |
599 | 3.78k | c++; |
600 | 3.78k | } |
601 | | |
602 | 2.76k | return c; |
603 | 2.76k | } |
604 | | |
605 | | int flb_storage_create(struct flb_config *ctx) |
606 | 2.76k | { |
607 | 2.76k | int ret; |
608 | 2.76k | int flags; |
609 | 2.76k | struct flb_input_instance *in = NULL; |
610 | 2.76k | struct cio_ctx *cio; |
611 | 2.76k | struct cio_options opts = {0}; |
612 | | |
613 | | /* always use read/write mode */ |
614 | 2.76k | flags = CIO_OPEN; |
615 | | |
616 | | /* if explicitly stated any irrecoverably corrupted |
617 | | * chunks will be deleted */ |
618 | 2.76k | if (ctx->storage_del_bad_chunks) { |
619 | 0 | flags |= CIO_DELETE_IRRECOVERABLE; |
620 | 0 | } |
621 | | |
622 | | /* synchronization mode */ |
623 | 2.76k | if (ctx->storage_sync) { |
624 | 0 | if (strcasecmp(ctx->storage_sync, "normal") == 0) { |
625 | | /* do nothing, keep the default */ |
626 | 0 | } |
627 | 0 | else if (strcasecmp(ctx->storage_sync, "full") == 0) { |
628 | 0 | flags |= CIO_FULL_SYNC; |
629 | 0 | } |
630 | 0 | else { |
631 | 0 | flb_error("[storage] invalid synchronization mode"); |
632 | 0 | return -1; |
633 | 0 | } |
634 | 0 | } |
635 | | |
636 | | /* checksum */ |
637 | 2.76k | if (ctx->storage_checksum == FLB_TRUE) { |
638 | 0 | flags |= CIO_CHECKSUM; |
639 | 0 | } |
640 | | |
641 | | /* file trimming */ |
642 | 2.76k | if (ctx->storage_trim_files == FLB_TRUE) { |
643 | 0 | flags |= CIO_TRIM_FILES; |
644 | 0 | } |
645 | | |
646 | | /* chunkio options */ |
647 | 2.76k | cio_options_init(&opts); |
648 | | |
649 | 2.76k | opts.root_path = ctx->storage_path; |
650 | 2.76k | opts.flags = flags; |
651 | 2.76k | opts.log_cb = log_cb; |
652 | 2.76k | opts.log_level = CIO_LOG_INFO; |
653 | | |
654 | | /* Create chunkio context */ |
655 | 2.76k | cio = cio_create(&opts); |
656 | 2.76k | if (!cio) { |
657 | 0 | flb_error("[storage] error initializing storage engine"); |
658 | 0 | return -1; |
659 | 0 | } |
660 | 2.76k | ctx->cio = cio; |
661 | | |
662 | | /* Set Chunk I/O maximum number of chunks up */ |
663 | 2.76k | if (ctx->storage_max_chunks_up == 0) { |
664 | 2.76k | ctx->storage_max_chunks_up = FLB_STORAGE_MAX_CHUNKS_UP; |
665 | 2.76k | } |
666 | 2.76k | cio_set_max_chunks_up(ctx->cio, ctx->storage_max_chunks_up); |
667 | | |
668 | | /* Load content from the file system if any */ |
669 | 2.76k | ret = cio_load(ctx->cio, NULL); |
670 | 2.76k | if (ret == -1) { |
671 | 0 | flb_error("[storage] error scanning root path content: %s", |
672 | 0 | ctx->storage_path); |
673 | 0 | cio_destroy(ctx->cio); |
674 | 0 | return -1; |
675 | 0 | } |
676 | | |
677 | | /* Sort chunks */ |
678 | 2.76k | cio_qsort(ctx->cio, sort_chunk_cmp); |
679 | | |
680 | | /* |
681 | | * If we have a filesystem storage path, create an instance of the |
682 | | * storage_backlog input plugin to consume any possible pending |
683 | | * chunks. |
684 | | */ |
685 | 2.76k | if (ctx->storage_path) { |
686 | 1.01k | in = flb_input_new(ctx, "storage_backlog", cio, FLB_FALSE); |
687 | 1.01k | if (!in) { |
688 | 0 | flb_error("[storage] cannot init storage backlog input plugin"); |
689 | 0 | cio_destroy(cio); |
690 | 0 | ctx->cio = NULL; |
691 | 0 | return -1; |
692 | 0 | } |
693 | 1.01k | ctx->storage_input_plugin = in; |
694 | | |
695 | | /* Set a queue memory limit */ |
696 | 1.01k | if (!ctx->storage_bl_mem_limit) { |
697 | 1.01k | ctx->storage_bl_mem_limit = flb_strdup(FLB_STORAGE_BL_MEM_LIMIT); |
698 | 1.01k | } |
699 | 1.01k | } |
700 | | |
701 | | /* Create streams for input instances */ |
702 | 2.76k | ret = storage_contexts_create(ctx); |
703 | 2.76k | if (ret == -1) { |
704 | 0 | return -1; |
705 | 0 | } |
706 | | |
707 | | /* print storage info */ |
708 | 2.76k | print_storage_info(ctx, cio); |
709 | | |
710 | 2.76k | return 0; |
711 | 2.76k | } |
712 | | |
713 | | void flb_storage_destroy(struct flb_config *ctx) |
714 | 2.76k | { |
715 | 2.76k | struct cio_ctx *cio; |
716 | 2.76k | struct flb_storage_metrics *sm; |
717 | | |
718 | | /* Destroy Chunk I/O context */ |
719 | 2.76k | cio = (struct cio_ctx *) ctx->cio; |
720 | | |
721 | 2.76k | if (!cio) { |
722 | 0 | return; |
723 | 0 | } |
724 | | |
725 | 2.76k | sm = ctx->storage_metrics_ctx; |
726 | 2.76k | if (ctx->storage_metrics == FLB_TRUE && sm != NULL) { |
727 | 2.76k | cmt_destroy(sm->cmt); |
728 | 2.76k | flb_free(sm); |
729 | 2.76k | ctx->storage_metrics_ctx = NULL; |
730 | 2.76k | } |
731 | | |
732 | 2.76k | cio_destroy(cio); |
733 | 2.76k | ctx->cio = NULL; |
734 | 2.76k | } |