Coverage Report

Created: 2025-11-07 08:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_lib/out_lib.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <stdio.h>
21
22
#include <fluent-bit/flb_output_plugin.h>
23
#include <fluent-bit/flb_utils.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_time.h>
26
#include <fluent-bit/flb_lib.h>
27
#include <msgpack.h>
28
29
#include "out_lib.h"
30
31
#define PLUGIN_NAME "out_lib"
32
33
static int configure(struct flb_out_lib_config *ctx,
34
                     struct flb_output_instance *ins)
35
0
{
36
0
    const char *tmp;
37
38
0
    tmp = flb_output_get_property("format", ins);
39
0
    if (!tmp) {
40
0
        ctx->format = FLB_OUT_LIB_FMT_MSGPACK;
41
0
    }
42
0
    else {
43
0
        if (strcasecmp(tmp, FLB_FMT_STR_MSGPACK) == 0) {
44
0
            ctx->format = FLB_OUT_LIB_FMT_MSGPACK;
45
0
        }
46
0
        else if (strcasecmp(tmp, FLB_FMT_STR_JSON) == 0) {
47
0
            ctx->format = FLB_OUT_LIB_FMT_JSON;
48
0
        }
49
0
    }
50
51
0
    if (strcasecmp(ctx->data_mode_str, "single_record") == 0) {
52
0
        ctx->data_mode = FLB_DATA_MODE_SINGLE_RECORD;
53
0
    }
54
0
    else if (strcasecmp(ctx->data_mode_str, "chunk") == 0) {
55
0
        ctx->data_mode = FLB_DATA_MODE_CHUNK;
56
0
    }
57
0
    else {
58
0
        flb_plg_error(ctx->ins, "Invalid data_mode: %s", ctx->data_mode_str);
59
0
        return -1;
60
0
    }
61
62
0
    return 0;
63
0
}
64
65
66
/**
67
 * User callback is passed from flb_output(ctx, output, callback)
68
 *
69
 *  The prototype of callback should be
70
 *   int (*callback)(void* data, size_t size );
71
 *    @param   data  The data which comes from input plugin.
72
 *    @param   size  The size of data.
73
 *    @return  success ? 0 : negative value
74
 *
75
 */
76
static int out_lib_init(struct flb_output_instance *ins,
77
                        struct flb_config *config,
78
                        void *data)
79
0
{
80
0
    struct flb_out_lib_config *ctx = NULL;
81
0
    struct flb_lib_out_cb *cb_data = data;
82
0
    (void) config;
83
84
0
    ctx = flb_calloc(1, sizeof(struct flb_out_lib_config));
85
0
    if (ctx == NULL) {
86
0
        flb_errno();
87
0
        return -1;
88
0
    }
89
0
    ctx->ins = ins;
90
91
0
    flb_output_config_map_set(ins, (void *) ctx);
92
93
0
    if (cb_data) {
94
        /* Set user callback and data */
95
0
        ctx->cb_func = cb_data->cb;
96
0
        ctx->cb_data = cb_data->data;
97
0
    }
98
0
    else {
99
0
        flb_plg_error(ctx->ins, "Callback is not set");
100
0
        flb_free(ctx);
101
0
        return -1;
102
0
    }
103
104
0
    configure(ctx, ins);
105
0
    flb_output_set_context(ins, ctx);
106
107
0
    return 0;
108
0
}
109
110
static void out_lib_flush(struct flb_event_chunk *event_chunk,
111
                          struct flb_output_flush *out_flush,
112
                          struct flb_input_instance *i_ins,
113
                          void *out_context,
114
                          struct flb_config *config)
115
0
{
116
0
    int len;
117
0
    int count = 0;
118
0
    size_t off = 0;
119
0
    size_t last_off = 0;
120
0
    size_t data_size = 0;
121
0
    size_t alloc_size = 0;
122
0
    size_t out_size = 0;
123
0
    char *buf = NULL;
124
0
    char *out_buf = NULL;
125
0
    char *data_for_user = NULL;
126
0
    msgpack_object *obj;
127
0
    msgpack_unpacked result;
128
0
    struct flb_time tm;
129
0
    struct flb_out_lib_config *ctx = out_context;
130
0
    (void) i_ins;
131
0
    (void) config;
132
133
    /*
134
     * if the plugin is configured with data_mode = 'chunk', we pass the chunk
135
     * as a reference to the callback function.
136
     */
137
0
    if (ctx->data_mode == FLB_DATA_MODE_CHUNK) {
138
0
        ctx->cb_func(event_chunk->data, event_chunk->size, ctx->cb_data);
139
0
        FLB_OUTPUT_RETURN(FLB_OK);
140
0
    }
141
142
    /* Everything else here is for data_mode = 'single_record' */
143
0
    msgpack_unpacked_init(&result);
144
0
    while (msgpack_unpack_next(&result,
145
0
                               event_chunk->data,
146
0
                               event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) {
147
0
        if (ctx->max_records > 0 && count >= ctx->max_records) {
148
0
            break;
149
0
        }
150
0
        switch(ctx->format) {
151
0
        case FLB_OUT_LIB_FMT_MSGPACK:
152
0
            alloc_size = (off - last_off);
153
154
            /* copy raw bytes */
155
0
            data_for_user = flb_malloc(alloc_size);
156
0
            if (!data_for_user) {
157
0
                flb_errno();
158
0
                msgpack_unpacked_destroy(&result);
159
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
160
0
            }
161
162
0
            memcpy(data_for_user,
163
0
                   (char *) event_chunk->data + last_off, alloc_size);
164
0
            data_size = alloc_size;
165
0
            break;
166
0
        case FLB_OUT_LIB_FMT_JSON:
167
0
#ifdef FLB_HAVE_METRICS
168
0
            if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
169
0
                alloc_size = (off - last_off) + 4096;
170
0
                buf = flb_msgpack_to_json_str(alloc_size, &result.data,
171
0
                                              config->json_escape_unicode);
172
0
                if (buf == NULL) {
173
0
                    msgpack_unpacked_destroy(&result);
174
0
                    FLB_OUTPUT_RETURN(FLB_ERROR);
175
0
                }
176
0
                data_size = strlen(buf);
177
0
                data_for_user = buf;
178
0
            }
179
0
            else {
180
0
#endif
181
            /* JSON is larger than msgpack */
182
0
            alloc_size = (off - last_off) + 128;
183
184
0
            flb_time_pop_from_msgpack(&tm, &result, &obj);
185
0
            buf = flb_msgpack_to_json_str(alloc_size, obj,
186
0
                                          config->json_escape_unicode);
187
0
            if (!buf) {
188
0
                msgpack_unpacked_destroy(&result);
189
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
190
0
            }
191
192
0
            len = strlen(buf);
193
0
            out_size = len + 32;
194
0
            out_buf = flb_malloc(out_size);
195
0
            if (!out_buf) {
196
0
                flb_errno();
197
0
                msgpack_unpacked_destroy(&result);
198
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
199
0
            }
200
201
0
            len = snprintf(out_buf, out_size, "[%f,%s]",
202
0
                           flb_time_to_double(&tm),
203
0
                           buf);
204
0
            flb_free(buf);
205
0
            data_for_user = out_buf;
206
0
            data_size = len;
207
208
0
#ifdef FLB_HAVE_METRICS
209
0
            }
210
0
#endif
211
0
            break;
212
0
        }
213
214
        /* Invoke user callback */
215
0
        ctx->cb_func(data_for_user, data_size, ctx->cb_data);
216
0
        last_off = off;
217
0
        count++;
218
0
    }
219
220
0
    msgpack_unpacked_destroy(&result);
221
0
    FLB_OUTPUT_RETURN(FLB_OK);
222
0
}
223
224
static int out_lib_exit(void *data, struct flb_config *config)
225
0
{
226
0
    struct flb_out_lib_config *ctx = data;
227
228
0
    flb_free(ctx);
229
0
    return 0;
230
0
}
231
232
/* Configuration properties map */
233
static struct flb_config_map config_map[] = {
234
    {
235
     FLB_CONFIG_MAP_STR, "format", NULL,
236
     0, FLB_FALSE, 0,
237
     "Specifies the data format to be printed. Supported formats are "
238
     "'msgpack' or 'json', json_lines and json_stream."
239
    },
240
241
    {
242
     FLB_CONFIG_MAP_INT, "max_records", NULL,
243
     0, FLB_TRUE, offsetof(struct flb_out_lib_config, max_records),
244
     "Specifies the maximum number of log records to be printed."
245
    },
246
247
    {
248
     FLB_CONFIG_MAP_STR, "data_mode", "single_record",
249
     0, FLB_TRUE, offsetof(struct flb_out_lib_config, data_mode_str),
250
    },
251
252
    /* EOF */
253
    {0}
254
};
255
256
struct flb_output_plugin out_lib_plugin = {
257
    .name         = "lib",
258
    .description  = "Library mode Output",
259
    .cb_init      = out_lib_init,
260
    .cb_flush     = out_lib_flush,
261
    .cb_exit      = out_lib_exit,
262
    .event_type   = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,
263
    .flags        = 0,
264
    .config_map   = config_map
265
};