Coverage Report

Created: 2024-09-19 07:08

/src/fluent-bit/plugins/in_lib/in_lib.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <stdio.h>
21
#include <stdlib.h>
22
#include <sys/types.h>
23
#include <sys/stat.h>
24
#include <fcntl.h>
25
26
#include <fluent-bit/flb_info.h>
27
#include <fluent-bit/flb_input.h>
28
#include <fluent-bit/flb_input_plugin.h>
29
#include <fluent-bit/flb_config.h>
30
#include <fluent-bit/flb_pack.h>
31
#include <fluent-bit/flb_error.h>
32
#include <fluent-bit/flb_log_event_decoder.h>
33
#include "in_lib.h"
34
35
static int in_lib_collect(struct flb_input_instance *ins,
36
                          struct flb_config *config, void *in_context)
37
7.32M
{
38
7.32M
    int ret;
39
7.32M
    int dec_ret;
40
7.32M
    int enc_ret;
41
7.32M
    int bytes;
42
7.32M
    int out_size;
43
7.32M
    int capacity;
44
7.32M
    int size;
45
7.32M
    char *ptr;
46
7.32M
    char *pack;
47
7.32M
    struct flb_log_event record;
48
7.32M
    struct flb_log_event_decoder decoder;
49
7.32M
    struct flb_in_lib_config *ctx = in_context;
50
51
7.32M
    capacity = (ctx->buf_size - ctx->buf_len);
52
53
    /* Allocate memory as required (FIXME: this will be limited in later) */
54
7.32M
    if (capacity == 0) {
55
374
        size = ctx->buf_size + LIB_BUF_CHUNK;
56
374
        ptr = flb_realloc(ctx->buf_data, size);
57
374
        if (!ptr) {
58
0
            flb_errno();
59
0
            return -1;
60
0
        }
61
374
        ctx->buf_data = ptr;
62
374
        ctx->buf_size = size;
63
374
        capacity = LIB_BUF_CHUNK;
64
374
    }
65
66
7.32M
    bytes = flb_pipe_r(ctx->fd,
67
7.32M
                       ctx->buf_data + ctx->buf_len,
68
7.32M
                       capacity);
69
7.32M
    flb_plg_trace(ctx->ins, "in_lib read() = %i", bytes);
70
7.32M
    if (bytes == -1) {
71
0
        perror("read");
72
0
        if (errno == -EPIPE) {
73
0
            return -1;
74
0
        }
75
0
        return 0;
76
0
    }
77
7.32M
    ctx->buf_len += bytes;
78
79
    /* initially we should support json input */
80
7.32M
    ret = flb_pack_json_state(ctx->buf_data, ctx->buf_len,
81
7.32M
                              &pack, &out_size, &ctx->state);
82
7.32M
    if (ret == FLB_ERR_JSON_PART) {
83
4.87M
        flb_plg_warn(ctx->ins, "lib data incomplete, waiting for more data...");
84
4.87M
        return 0;
85
4.87M
    }
86
2.45M
    else if (ret == FLB_ERR_JSON_INVAL) {
87
2.38M
        flb_plg_warn(ctx->ins, "lib data invalid");
88
2.38M
        flb_pack_state_reset(&ctx->state);
89
2.38M
        flb_pack_state_init(&ctx->state);
90
2.38M
        return -1;
91
2.38M
    }
92
67.4k
    ctx->buf_len = 0;
93
94
67.4k
    dec_ret = flb_log_event_decoder_init(&decoder, pack, out_size);
95
67.4k
    if (dec_ret != FLB_EVENT_DECODER_SUCCESS) {
96
0
        flb_plg_error(ctx->ins,
97
0
                      "Log event decoder initialization error : %s",
98
0
                      flb_log_event_decoder_get_error_description(dec_ret));
99
0
        flb_free(pack);
100
0
        flb_pack_state_reset(&ctx->state);
101
0
        flb_pack_state_init(&ctx->state);
102
0
        return -1;
103
0
    }
104
105
67.5k
    while ((dec_ret = flb_log_event_decoder_next(
106
67.5k
                      &decoder,
107
67.5k
                      &record)) == FLB_EVENT_DECODER_SUCCESS) {
108
141
        enc_ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
109
141
        if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
110
0
            flb_plg_error(ctx->ins,
111
0
                          "flb_log_event_encoder_begin_record error : %s",
112
0
                          flb_log_event_encoder_get_error_description(enc_ret));
113
0
            flb_log_event_encoder_rollback_record(&ctx->log_encoder);
114
0
            continue;
115
0
        }
116
117
141
        enc_ret = flb_log_event_encoder_set_timestamp(
118
141
                  &ctx->log_encoder,
119
141
                  &record.timestamp);
120
141
        if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
121
0
            flb_plg_error(ctx->ins,
122
0
                          "flb_log_event_encoder_set_timestamp error : %s",
123
0
                          flb_log_event_encoder_get_error_description(enc_ret));
124
0
            flb_log_event_encoder_rollback_record(&ctx->log_encoder);
125
0
            continue;
126
0
        }
127
128
141
        enc_ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
129
141
                  &ctx->log_encoder,
130
141
                  record.metadata);
131
141
        if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
132
0
            flb_plg_error(ctx->ins,
133
0
                          "flb_log_event_encoder_set_metadata_from_msgpack_object error : %s",
134
0
                          flb_log_event_encoder_get_error_description(enc_ret));
135
0
            flb_log_event_encoder_rollback_record(&ctx->log_encoder);
136
0
            continue;
137
0
        }
138
139
141
        enc_ret = flb_log_event_encoder_set_body_from_msgpack_object(
140
141
                  &ctx->log_encoder,
141
141
                  record.body);
142
141
        if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
143
0
            flb_plg_error(ctx->ins,
144
0
                          "flb_log_event_encoder_set_body_from_msgpack_object error : %s",
145
0
                          flb_log_event_encoder_get_error_description(enc_ret));
146
0
            flb_log_event_encoder_rollback_record(&ctx->log_encoder);
147
0
            continue;
148
0
        }
149
150
141
        enc_ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
151
141
        if (enc_ret != FLB_EVENT_ENCODER_SUCCESS) {
152
0
            flb_plg_error(ctx->ins,
153
0
                          "flb_log_event_encoder_commit_record error : %s",
154
0
                          flb_log_event_encoder_get_error_description(enc_ret));
155
0
            flb_log_event_encoder_rollback_record(&ctx->log_encoder);
156
0
            continue;
157
0
        }
158
141
    }
159
160
67.4k
    dec_ret = flb_log_event_decoder_get_last_result(&decoder);
161
67.4k
    if (dec_ret == FLB_EVENT_DECODER_SUCCESS) {
162
237
        flb_input_log_append(ctx->ins, NULL, 0,
163
237
                             ctx->log_encoder.output_buffer,
164
237
                             ctx->log_encoder.output_length);
165
166
237
        ret = 0;
167
237
    }
168
67.1k
    else {
169
67.1k
        flb_plg_error(ctx->ins,
170
67.1k
                      "flb_log_event_decoder_get_last_result error : %s",
171
67.1k
                      flb_log_event_decoder_get_error_description(dec_ret));
172
67.1k
        ret = -1;
173
67.1k
    }
174
175
67.4k
    flb_log_event_encoder_reset(&ctx->log_encoder);
176
67.4k
    flb_log_event_decoder_destroy(&decoder);
177
178
    /* Reset the state */
179
67.4k
    flb_free(pack);
180
181
67.4k
    flb_pack_state_reset(&ctx->state);
182
67.4k
    flb_pack_state_init(&ctx->state);
183
184
67.4k
    return ret;
185
67.4k
}
186
187
/* Initialize plugin */
188
static int in_lib_init(struct flb_input_instance *in,
189
                       struct flb_config *config, void *data)
190
2.74k
{
191
2.74k
    int ret;
192
2.74k
    struct flb_in_lib_config *ctx;
193
2.74k
    (void) data;
194
195
    /* Allocate space for the configuration */
196
2.74k
    ctx = flb_malloc(sizeof(struct flb_in_lib_config));
197
2.74k
    if (!ctx) {
198
0
        return -1;
199
0
    }
200
2.74k
    ctx->ins = in;
201
202
    /* Buffer for incoming data */
203
2.74k
    ctx->buf_size = LIB_BUF_CHUNK;
204
2.74k
    ctx->buf_data = flb_calloc(1, LIB_BUF_CHUNK);
205
2.74k
    ctx->buf_len = 0;
206
207
2.74k
    if (!ctx->buf_data) {
208
0
        flb_errno();
209
0
        flb_plg_error(ctx->ins, "Could not allocate initial buf memory buffer");
210
0
        flb_free(ctx);
211
0
        return -1;
212
0
    }
213
214
    /* Init communication channel */
215
2.74k
    flb_input_channel_init(in);
216
2.74k
    ctx->fd = in->channel[0];
217
218
    /* Set the context */
219
2.74k
    flb_input_set_context(in, ctx);
220
221
    /* Collect upon data available on the standard input */
222
2.74k
    ret = flb_input_set_collector_event(in,
223
2.74k
                                        in_lib_collect,
224
2.74k
                                        ctx->fd,
225
2.74k
                                        config);
226
2.74k
    if (ret == -1) {
227
0
        flb_plg_error(ctx->ins, "Could not set collector for LIB input plugin");
228
0
        flb_free(ctx->buf_data);
229
0
        flb_free(ctx);
230
0
        return -1;
231
0
    }
232
233
2.74k
    ret = flb_log_event_encoder_init(&ctx->log_encoder,
234
2.74k
                                     FLB_LOG_EVENT_FORMAT_DEFAULT);
235
236
2.74k
    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
237
0
        flb_plg_error(ctx->ins, "error initializing event encoder : %d", ret);
238
239
0
        flb_free(ctx->buf_data);
240
0
        flb_free(ctx);
241
242
0
        return -1;
243
0
    }
244
245
2.74k
    flb_pack_state_init(&ctx->state);
246
247
2.74k
    return 0;
248
2.74k
}
249
250
static int in_lib_exit(void *data, struct flb_config *config)
251
2.74k
{
252
2.74k
    struct flb_in_lib_config *ctx = data;
253
2.74k
    struct flb_pack_state *s;
254
255
2.74k
    (void) config;
256
257
2.74k
    flb_log_event_encoder_destroy(&ctx->log_encoder);
258
259
2.74k
    if (ctx->buf_data) {
260
2.74k
        flb_free(ctx->buf_data);
261
2.74k
    }
262
263
2.74k
    s = &ctx->state;
264
2.74k
    flb_pack_state_reset(s);
265
2.74k
    flb_free(ctx);
266
2.74k
    return 0;
267
2.74k
}
268
269
/* Plugin reference */
270
struct flb_input_plugin in_lib_plugin = {
271
    .name         = "lib",
272
    .description  = "Library mode Input",
273
    .cb_init      = in_lib_init,
274
    .cb_pre_run   = NULL,
275
    .cb_collect   = NULL,
276
    .cb_ingest    = NULL,
277
    .cb_flush_buf = NULL,
278
    .cb_exit      = in_lib_exit
279
};