/src/fluent-bit/plugins/in_lib/in_lib.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <stdio.h> |
21 | | #include <stdlib.h> |
22 | | #include <sys/types.h> |
23 | | #include <sys/stat.h> |
24 | | #include <fcntl.h> |
25 | | |
26 | | #include <fluent-bit/flb_info.h> |
27 | | #include <fluent-bit/flb_input.h> |
28 | | #include <fluent-bit/flb_input_plugin.h> |
29 | | #include <fluent-bit/flb_config.h> |
30 | | #include <fluent-bit/flb_pack.h> |
31 | | #include <fluent-bit/flb_error.h> |
32 | | #include <fluent-bit/flb_log_event_decoder.h> |
33 | | #include "in_lib.h" |
34 | | |
35 | | static int in_lib_collect(struct flb_input_instance *ins, |
36 | | struct flb_config *config, void *in_context) |
37 | 7.32M | { |
38 | 7.32M | int ret; |
39 | 7.32M | int dec_ret; |
40 | 7.32M | int enc_ret; |
41 | 7.32M | int bytes; |
42 | 7.32M | int out_size; |
43 | 7.32M | int capacity; |
44 | 7.32M | int size; |
45 | 7.32M | char *ptr; |
46 | 7.32M | char *pack; |
47 | 7.32M | struct flb_log_event record; |
48 | 7.32M | struct flb_log_event_decoder decoder; |
49 | 7.32M | struct flb_in_lib_config *ctx = in_context; |
50 | | |
51 | 7.32M | capacity = (ctx->buf_size - ctx->buf_len); |
52 | | |
53 | | /* Allocate memory as required (FIXME: this will be limited in later) */ |
54 | 7.32M | if (capacity == 0) { |
55 | 374 | size = ctx->buf_size + LIB_BUF_CHUNK; |
56 | 374 | ptr = flb_realloc(ctx->buf_data, size); |
57 | 374 | if (!ptr) { |
58 | 0 | flb_errno(); |
59 | 0 | return -1; |
60 | 0 | } |
61 | 374 | ctx->buf_data = ptr; |
62 | 374 | ctx->buf_size = size; |
63 | 374 | capacity = LIB_BUF_CHUNK; |
64 | 374 | } |
65 | | |
66 | 7.32M | bytes = flb_pipe_r(ctx->fd, |
67 | 7.32M | ctx->buf_data + ctx->buf_len, |
68 | 7.32M | capacity); |
69 | 7.32M | flb_plg_trace(ctx->ins, "in_lib read() = %i", bytes); |
70 | 7.32M | if (bytes == -1) { |
71 | 0 | perror("read"); |
72 | 0 | if (errno == -EPIPE) { |
73 | 0 | return -1; |
74 | 0 | } |
75 | 0 | return 0; |
76 | 0 | } |
77 | 7.32M | ctx->buf_len += bytes; |
78 | | |
79 | | /* initially we should support json input */ |
80 | 7.32M | ret = flb_pack_json_state(ctx->buf_data, ctx->buf_len, |
81 | 7.32M | &pack, &out_size, &ctx->state); |
82 | 7.32M | if (ret == FLB_ERR_JSON_PART) { |
83 | 4.87M | flb_plg_warn(ctx->ins, "lib data incomplete, waiting for more data..."); |
84 | 4.87M | return 0; |
85 | 4.87M | } |
86 | 2.45M | else if (ret == FLB_ERR_JSON_INVAL) { |
87 | 2.38M | flb_plg_warn(ctx->ins, "lib data invalid"); |
88 | 2.38M | flb_pack_state_reset(&ctx->state); |
89 | 2.38M | flb_pack_state_init(&ctx->state); |
90 | 2.38M | return -1; |
91 | 2.38M | } |
92 | 67.4k | ctx->buf_len = 0; |
93 | | |
94 | 67.4k | dec_ret = flb_log_event_decoder_init(&decoder, pack, out_size); |
95 | 67.4k | if (dec_ret != FLB_EVENT_DECODER_SUCCESS) { |
96 | 0 | flb_plg_error(ctx->ins, |
97 | 0 | "Log event decoder initialization error : %s", |
98 | 0 | flb_log_event_decoder_get_error_description(dec_ret)); |
99 | 0 | flb_free(pack); |
100 | 0 | flb_pack_state_reset(&ctx->state); |
101 | 0 | flb_pack_state_init(&ctx->state); |
102 | 0 | return -1; |
103 | 0 | } |
104 | | |
105 | 67.5k | while ((dec_ret = flb_log_event_decoder_next( |
106 | 67.5k | &decoder, |
107 | 67.5k | &record)) == FLB_EVENT_DECODER_SUCCESS) { |
108 | 141 | enc_ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); |
109 | 141 | if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { |
110 | 0 | flb_plg_error(ctx->ins, |
111 | 0 | "flb_log_event_encoder_begin_record error : %s", |
112 | 0 | flb_log_event_encoder_get_error_description(enc_ret)); |
113 | 0 | flb_log_event_encoder_rollback_record(&ctx->log_encoder); |
114 | 0 | continue; |
115 | 0 | } |
116 | | |
117 | 141 | enc_ret = flb_log_event_encoder_set_timestamp( |
118 | 141 | &ctx->log_encoder, |
119 | 141 | &record.timestamp); |
120 | 141 | if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { |
121 | 0 | flb_plg_error(ctx->ins, |
122 | 0 | "flb_log_event_encoder_set_timestamp error : %s", |
123 | 0 | flb_log_event_encoder_get_error_description(enc_ret)); |
124 | 0 | flb_log_event_encoder_rollback_record(&ctx->log_encoder); |
125 | 0 | continue; |
126 | 0 | } |
127 | | |
128 | 141 | enc_ret = flb_log_event_encoder_set_metadata_from_msgpack_object( |
129 | 141 | &ctx->log_encoder, |
130 | 141 | record.metadata); |
131 | 141 | if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { |
132 | 0 | flb_plg_error(ctx->ins, |
133 | 0 | "flb_log_event_encoder_set_metadata_from_msgpack_object error : %s", |
134 | 0 | flb_log_event_encoder_get_error_description(enc_ret)); |
135 | 0 | flb_log_event_encoder_rollback_record(&ctx->log_encoder); |
136 | 0 | continue; |
137 | 0 | } |
138 | | |
139 | 141 | enc_ret = flb_log_event_encoder_set_body_from_msgpack_object( |
140 | 141 | &ctx->log_encoder, |
141 | 141 | record.body); |
142 | 141 | if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { |
143 | 0 | flb_plg_error(ctx->ins, |
144 | 0 | "flb_log_event_encoder_set_body_from_msgpack_object error : %s", |
145 | 0 | flb_log_event_encoder_get_error_description(enc_ret)); |
146 | 0 | flb_log_event_encoder_rollback_record(&ctx->log_encoder); |
147 | 0 | continue; |
148 | 0 | } |
149 | | |
150 | 141 | enc_ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); |
151 | 141 | if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) { |
152 | 0 | flb_plg_error(ctx->ins, |
153 | 0 | "flb_log_event_encoder_commit_record error : %s", |
154 | 0 | flb_log_event_encoder_get_error_description(enc_ret)); |
155 | 0 | flb_log_event_encoder_rollback_record(&ctx->log_encoder); |
156 | 0 | continue; |
157 | 0 | } |
158 | 141 | } |
159 | | |
160 | 67.4k | dec_ret = flb_log_event_decoder_get_last_result(&decoder); |
161 | 67.4k | if (dec_ret == FLB_EVENT_DECODER_SUCCESS) { |
162 | 237 | flb_input_log_append(ctx->ins, NULL, 0, |
163 | 237 | ctx->log_encoder.output_buffer, |
164 | 237 | ctx->log_encoder.output_length); |
165 | | |
166 | 237 | ret = 0; |
167 | 237 | } |
168 | 67.1k | else { |
169 | 67.1k | flb_plg_error(ctx->ins, |
170 | 67.1k | "flb_log_event_decoder_get_last_result error : %s", |
171 | 67.1k | flb_log_event_decoder_get_error_description(dec_ret)); |
172 | 67.1k | ret = -1; |
173 | 67.1k | } |
174 | | |
175 | 67.4k | flb_log_event_encoder_reset(&ctx->log_encoder); |
176 | 67.4k | flb_log_event_decoder_destroy(&decoder); |
177 | | |
178 | | /* Reset the state */ |
179 | 67.4k | flb_free(pack); |
180 | | |
181 | 67.4k | flb_pack_state_reset(&ctx->state); |
182 | 67.4k | flb_pack_state_init(&ctx->state); |
183 | | |
184 | 67.4k | return ret; |
185 | 67.4k | } |
186 | | |
187 | | /* Initialize plugin */ |
188 | | static int in_lib_init(struct flb_input_instance *in, |
189 | | struct flb_config *config, void *data) |
190 | 2.74k | { |
191 | 2.74k | int ret; |
192 | 2.74k | struct flb_in_lib_config *ctx; |
193 | 2.74k | (void) data; |
194 | | |
195 | | /* Allocate space for the configuration */ |
196 | 2.74k | ctx = flb_malloc(sizeof(struct flb_in_lib_config)); |
197 | 2.74k | if (!ctx) { |
198 | 0 | return -1; |
199 | 0 | } |
200 | 2.74k | ctx->ins = in; |
201 | | |
202 | | /* Buffer for incoming data */ |
203 | 2.74k | ctx->buf_size = LIB_BUF_CHUNK; |
204 | 2.74k | ctx->buf_data = flb_calloc(1, LIB_BUF_CHUNK); |
205 | 2.74k | ctx->buf_len = 0; |
206 | | |
207 | 2.74k | if (!ctx->buf_data) { |
208 | 0 | flb_errno(); |
209 | 0 | flb_plg_error(ctx->ins, "Could not allocate initial buf memory buffer"); |
210 | 0 | flb_free(ctx); |
211 | 0 | return -1; |
212 | 0 | } |
213 | | |
214 | | /* Init communication channel */ |
215 | 2.74k | flb_input_channel_init(in); |
216 | 2.74k | ctx->fd = in->channel[0]; |
217 | | |
218 | | /* Set the context */ |
219 | 2.74k | flb_input_set_context(in, ctx); |
220 | | |
221 | | /* Collect upon data available on the standard input */ |
222 | 2.74k | ret = flb_input_set_collector_event(in, |
223 | 2.74k | in_lib_collect, |
224 | 2.74k | ctx->fd, |
225 | 2.74k | config); |
226 | 2.74k | if (ret == -1) { |
227 | 0 | flb_plg_error(ctx->ins, "Could not set collector for LIB input plugin"); |
228 | 0 | flb_free(ctx->buf_data); |
229 | 0 | flb_free(ctx); |
230 | 0 | return -1; |
231 | 0 | } |
232 | | |
233 | 2.74k | ret = flb_log_event_encoder_init(&ctx->log_encoder, |
234 | 2.74k | FLB_LOG_EVENT_FORMAT_DEFAULT); |
235 | | |
236 | 2.74k | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
237 | 0 | flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret); |
238 | |
|
239 | 0 | flb_free(ctx->buf_data); |
240 | 0 | flb_free(ctx); |
241 | |
|
242 | 0 | return -1; |
243 | 0 | } |
244 | | |
245 | 2.74k | flb_pack_state_init(&ctx->state); |
246 | | |
247 | 2.74k | return 0; |
248 | 2.74k | } |
249 | | |
250 | | static int in_lib_exit(void *data, struct flb_config *config) |
251 | 2.74k | { |
252 | 2.74k | struct flb_in_lib_config *ctx = data; |
253 | 2.74k | struct flb_pack_state *s; |
254 | | |
255 | 2.74k | (void) config; |
256 | | |
257 | 2.74k | flb_log_event_encoder_destroy(&ctx->log_encoder); |
258 | | |
259 | 2.74k | if (ctx->buf_data) { |
260 | 2.74k | flb_free(ctx->buf_data); |
261 | 2.74k | } |
262 | | |
263 | 2.74k | s = &ctx->state; |
264 | 2.74k | flb_pack_state_reset(s); |
265 | 2.74k | flb_free(ctx); |
266 | 2.74k | return 0; |
267 | 2.74k | } |
268 | | |
269 | | /* Plugin reference */ |
270 | | struct flb_input_plugin in_lib_plugin = { |
271 | | .name = "lib", |
272 | | .description = "Library mode Input", |
273 | | .cb_init = in_lib_init, |
274 | | .cb_pre_run = NULL, |
275 | | .cb_collect = NULL, |
276 | | .cb_ingest = NULL, |
277 | | .cb_flush_buf = NULL, |
278 | | .cb_exit = in_lib_exit |
279 | | }; |