/src/fluent-bit/plugins/out_vivo_exporter/vivo.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_kv.h> |
22 | | #include <fluent-bit/flb_pack.h> |
23 | | #include <fluent-bit/flb_log_event_decoder.h> |
24 | | #include <fluent-bit/flb_log_event_encoder.h> |
25 | | |
26 | | #include "vivo.h" |
27 | | #include "vivo_http.h" |
28 | | #include "vivo_stream.h" |
29 | | |
30 | | static flb_sds_t format_logs(struct flb_event_chunk *event_chunk) |
31 | 0 | { |
32 | 0 | struct flb_log_event_decoder log_decoder; |
33 | 0 | struct flb_log_event log_event; |
34 | 0 | int result; |
35 | 0 | int i; |
36 | 0 | flb_sds_t out_js; |
37 | 0 | flb_sds_t out_buf = NULL; |
38 | 0 | msgpack_sbuffer tmp_sbuf; |
39 | 0 | msgpack_packer tmp_pck; |
40 | |
|
41 | 0 | result = flb_log_event_decoder_init(&log_decoder, |
42 | 0 | (char *) event_chunk->data, |
43 | 0 | event_chunk->size); |
44 | |
|
45 | 0 | if (result != FLB_EVENT_DECODER_SUCCESS) { |
46 | 0 | return NULL; |
47 | 0 | } |
48 | | |
49 | 0 | out_buf = flb_sds_create_size((event_chunk->size * 2) / 4); |
50 | 0 | if (!out_buf) { |
51 | 0 | flb_errno(); |
52 | 0 | return NULL; |
53 | 0 | } |
54 | | |
55 | | /* Create temporary msgpack buffer */ |
56 | 0 | msgpack_sbuffer_init(&tmp_sbuf); |
57 | 0 | msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); |
58 | |
|
59 | 0 | while ((result = flb_log_event_decoder_next( |
60 | 0 | &log_decoder, |
61 | 0 | &log_event)) == FLB_EVENT_DECODER_SUCCESS) { |
62 | | /* |
63 | | * If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data |
64 | | * by using the following structure: |
65 | | * |
66 | | * [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}] |
67 | | */ |
68 | 0 | msgpack_pack_array(&tmp_pck, 2); |
69 | 0 | msgpack_pack_array(&tmp_pck, 2); |
70 | 0 | msgpack_pack_uint64(&tmp_pck, flb_time_to_nanosec(&log_event.timestamp)); |
71 | | |
72 | | /* add tag only */ |
73 | 0 | msgpack_pack_map(&tmp_pck, 1 + log_event.metadata->via.map.size); |
74 | |
|
75 | 0 | msgpack_pack_str(&tmp_pck, 4); |
76 | 0 | msgpack_pack_str_body(&tmp_pck, "_tag", 4); |
77 | |
|
78 | 0 | msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag)); |
79 | 0 | msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag)); |
80 | | |
81 | | /* Append remaining keys/values */ |
82 | 0 | for (i = 0; |
83 | 0 | i < log_event.metadata->via.map.size; |
84 | 0 | i++) { |
85 | 0 | msgpack_pack_object(&tmp_pck, |
86 | 0 | log_event.metadata->via.map.ptr[i].key); |
87 | 0 | msgpack_pack_object(&tmp_pck, |
88 | 0 | log_event.metadata->via.map.ptr[i].val); |
89 | 0 | } |
90 | | |
91 | | /* pack the remaining content */ |
92 | 0 | msgpack_pack_map(&tmp_pck, log_event.body->via.map.size); |
93 | | |
94 | | /* Append remaining keys/values */ |
95 | 0 | for (i = 0; |
96 | 0 | i < log_event.body->via.map.size; |
97 | 0 | i++) { |
98 | 0 | msgpack_pack_object(&tmp_pck, |
99 | 0 | log_event.body->via.map.ptr[i].key); |
100 | 0 | msgpack_pack_object(&tmp_pck, |
101 | 0 | log_event.body->via.map.ptr[i].val); |
102 | 0 | } |
103 | | |
104 | | /* Concatenate by using break lines */ |
105 | 0 | out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); |
106 | 0 | if (!out_js) { |
107 | 0 | flb_sds_destroy(out_buf); |
108 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
109 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
110 | 0 | return NULL; |
111 | 0 | } |
112 | | |
113 | | /* |
114 | | * One map record has been converted, now append it to the |
115 | | * outgoing out_buf sds variable. |
116 | | */ |
117 | 0 | flb_sds_cat_safe(&out_buf, out_js, flb_sds_len(out_js)); |
118 | 0 | flb_sds_cat_safe(&out_buf, "\n", 1); |
119 | |
|
120 | 0 | flb_sds_destroy(out_js); |
121 | 0 | msgpack_sbuffer_clear(&tmp_sbuf); |
122 | 0 | } |
123 | | |
124 | | /* Release the unpacker */ |
125 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
126 | |
|
127 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
128 | |
|
129 | 0 | return out_buf; |
130 | 0 | } |
131 | | |
132 | | static int logs_event_chunk_append(struct vivo_exporter *ctx, |
133 | | struct flb_event_chunk *event_chunk) |
134 | 0 | { |
135 | 0 | size_t len; |
136 | 0 | flb_sds_t json; |
137 | 0 | struct vivo_stream_entry *entry; |
138 | | |
139 | |
|
140 | 0 | json = format_logs(event_chunk); |
141 | 0 | if (!json) { |
142 | 0 | flb_plg_error(ctx->ins, "cannot convert logs chunk to JSON"); |
143 | 0 | return -1; |
144 | 0 | } |
145 | | |
146 | | /* append content to the stream */ |
147 | 0 | len = flb_sds_len(json); |
148 | 0 | entry = vivo_stream_append(ctx->stream_logs, json, len); |
149 | |
|
150 | 0 | flb_sds_destroy(json); |
151 | |
|
152 | 0 | if (!entry) { |
153 | 0 | flb_plg_error(ctx->ins, "cannot append JSON log to stream"); |
154 | 0 | return -1; |
155 | 0 | } |
156 | | |
157 | 0 | return 0; |
158 | 0 | } |
159 | | |
160 | | static int metrics_traces_event_chunk_append(struct vivo_exporter *ctx, |
161 | | struct vivo_stream *vs, |
162 | | struct flb_event_chunk *event_chunk) |
163 | 0 | { |
164 | 0 | size_t len; |
165 | 0 | flb_sds_t json; |
166 | 0 | struct vivo_stream_entry *entry; |
167 | | |
168 | | /* Convert msgpack to readable JSON format */ |
169 | 0 | json = flb_msgpack_raw_to_json_sds(event_chunk->data, event_chunk->size); |
170 | 0 | if (!json) { |
171 | 0 | flb_plg_error(ctx->ins, "cannot convert metrics chunk to JSON"); |
172 | 0 | return -1; |
173 | 0 | } |
174 | | |
175 | 0 | flb_sds_cat_safe(&json, "\n", 1); |
176 | | |
177 | | /* append content to the stream */ |
178 | 0 | len = flb_sds_len(json); |
179 | 0 | entry = vivo_stream_append(vs, json, len); |
180 | |
|
181 | 0 | flb_sds_destroy(json); |
182 | |
|
183 | 0 | if (!entry) { |
184 | 0 | flb_plg_error(ctx->ins, "cannot append JSON log to stream"); |
185 | 0 | return -1; |
186 | 0 | } |
187 | | |
188 | 0 | return 0; |
189 | 0 | } |
190 | | |
191 | | static int cb_vivo_init(struct flb_output_instance *ins, |
192 | | struct flb_config *config, |
193 | | void *data) |
194 | 0 | { |
195 | 0 | int ret; |
196 | 0 | struct vivo_exporter *ctx; |
197 | |
|
198 | 0 | flb_output_net_default("0.0.0.0", 2025 , ins); |
199 | |
|
200 | 0 | ctx = flb_calloc(1, sizeof(struct vivo_exporter)); |
201 | 0 | if (!ctx) { |
202 | 0 | flb_errno(); |
203 | 0 | return -1; |
204 | 0 | } |
205 | 0 | ctx->ins = ins; |
206 | |
|
207 | 0 | ret = flb_output_config_map_set(ins, (void *) ctx); |
208 | 0 | if (ret == -1) { |
209 | 0 | flb_free(ctx); |
210 | 0 | return -1; |
211 | 0 | } |
212 | | |
213 | 0 | flb_output_set_context(ins, ctx); |
214 | | |
215 | | /* Load config map */ |
216 | 0 | ret = flb_output_config_map_set(ins, (void *) ctx); |
217 | 0 | if (ret == -1) { |
218 | 0 | return -1; |
219 | 0 | } |
220 | | |
221 | | /* Create Streams */ |
222 | 0 | ctx->stream_logs = vivo_stream_create(ctx); |
223 | 0 | if (!ctx->stream_logs) { |
224 | 0 | return -1; |
225 | 0 | } |
226 | | |
227 | 0 | ctx->stream_metrics = vivo_stream_create(ctx); |
228 | 0 | if (!ctx->stream_metrics) { |
229 | 0 | return -1; |
230 | 0 | } |
231 | | |
232 | 0 | ctx->stream_traces = vivo_stream_create(ctx); |
233 | 0 | if (!ctx->stream_traces) { |
234 | 0 | return -1; |
235 | 0 | } |
236 | | |
237 | | /* HTTP Server context */ |
238 | 0 | ctx->http = vivo_http_server_create(ctx, |
239 | 0 | ins->host.name, ins->host.port, config); |
240 | 0 | if (!ctx->http) { |
241 | 0 | flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting"); |
242 | 0 | return -1; |
243 | 0 | } |
244 | | |
245 | | /* Start HTTP Server */ |
246 | 0 | ret = vivo_http_server_start(ctx->http); |
247 | 0 | if (ret == -1) { |
248 | 0 | return -1; |
249 | 0 | } |
250 | | |
251 | 0 | flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d", |
252 | 0 | ins->host.name, ins->host.port); |
253 | |
|
254 | 0 | return 0; |
255 | 0 | } |
256 | | |
257 | | static void cb_vivo_flush(struct flb_event_chunk *event_chunk, |
258 | | struct flb_output_flush *out_flush, |
259 | | struct flb_input_instance *ins, void *out_context, |
260 | | struct flb_config *config) |
261 | 0 | { |
262 | 0 | int ret = -1; |
263 | 0 | struct vivo_exporter *ctx = out_context; |
264 | |
|
265 | 0 | #ifdef FLB_HAVE_METRICS |
266 | 0 | if (event_chunk->type == FLB_EVENT_TYPE_METRICS) { |
267 | 0 | ret = metrics_traces_event_chunk_append(ctx, ctx->stream_metrics, event_chunk); |
268 | 0 | } |
269 | 0 | #endif |
270 | 0 | if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { |
271 | 0 | ret = logs_event_chunk_append(ctx, event_chunk); |
272 | 0 | } |
273 | 0 | else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) { |
274 | 0 | ret = metrics_traces_event_chunk_append(ctx, ctx->stream_traces, event_chunk); |
275 | 0 | } |
276 | |
|
277 | 0 | if (ret == 0) { |
278 | 0 | FLB_OUTPUT_RETURN(FLB_OK); |
279 | 0 | } |
280 | | |
281 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
282 | 0 | } |
283 | | |
284 | | static int cb_vivo_exit(void *data, struct flb_config *config) |
285 | 0 | { |
286 | 0 | struct vivo_exporter *ctx = data; |
287 | |
|
288 | 0 | if (!ctx) { |
289 | 0 | return 0; |
290 | 0 | } |
291 | | |
292 | 0 | if (ctx->http) { |
293 | 0 | vivo_http_server_stop(ctx->http); |
294 | 0 | vivo_http_server_destroy(ctx->http); |
295 | 0 | } |
296 | |
|
297 | 0 | vivo_stream_destroy(ctx->stream_logs); |
298 | 0 | vivo_stream_destroy(ctx->stream_metrics); |
299 | 0 | vivo_stream_destroy(ctx->stream_traces); |
300 | |
|
301 | 0 | flb_free(ctx); |
302 | |
|
303 | 0 | return 0; |
304 | 0 | } |
305 | | |
306 | | /* Configuration properties map */ |
307 | | static struct flb_config_map config_map[] = { |
308 | | { |
309 | | FLB_CONFIG_MAP_BOOL, "empty_stream_on_read", "off", |
310 | | 0, FLB_TRUE, offsetof(struct vivo_exporter, empty_stream_on_read), |
311 | | "If enabled, when an HTTP client consumes the data from a stream, the queue " |
312 | | "content will be removed" |
313 | | }, |
314 | | |
315 | | { |
316 | | FLB_CONFIG_MAP_SIZE, "stream_queue_size", "20M", |
317 | | 0, FLB_TRUE, offsetof(struct vivo_exporter, stream_queue_size), |
318 | | "Specify the maximum queue size per stream. Each specific stream for logs, metrics " |
319 | | "and traces can hold up to 'stream_queue_size' bytes." |
320 | | }, |
321 | | |
322 | | { |
323 | | FLB_CONFIG_MAP_STR, "http_cors_allow_origin", NULL, |
324 | | 0, FLB_TRUE, offsetof(struct vivo_exporter, http_cors_allow_origin), |
325 | | "Specify the value for the HTTP Access-Control-Allow-Origin header (CORS)" |
326 | | }, |
327 | | |
328 | | /* EOF */ |
329 | | {0} |
330 | | }; |
331 | | |
332 | | /* Plugin reference */ |
333 | | struct flb_output_plugin out_vivo_exporter_plugin = { |
334 | | .name = "vivo_exporter", |
335 | | .description = "Vivo Exporter", |
336 | | .cb_init = cb_vivo_init, |
337 | | .cb_flush = cb_vivo_flush, |
338 | | .cb_exit = cb_vivo_exit, |
339 | | .flags = FLB_OUTPUT_NET, |
340 | | .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES, |
341 | | .config_map = config_map, |
342 | | .workers = 1, |
343 | | }; |