/src/fluent-bit/plugins/in_statsd/statsd.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_log_event_encoder.h> |
21 | | #include <fluent-bit/flb_input_plugin.h> |
22 | | #include <fluent-bit/flb_utils.h> |
23 | | #include <fluent-bit/flb_socket.h> |
24 | | #include <fluent-bit/flb_pack.h> |
25 | | |
26 | 0 | #define MAX_PACKET_SIZE 65536 |
27 | 0 | #define DEFAULT_LISTEN "0.0.0.0" |
28 | 0 | #define DEFAULT_PORT 8125 |
29 | | |
30 | 0 | #define STATSD_TYPE_COUNTER 1 |
31 | 0 | #define STATSD_TYPE_GAUGE 2 |
32 | 0 | #define STATSD_TYPE_TIMER 3 |
33 | 0 | #define STATSD_TYPE_SET 4 |
34 | | |
35 | | struct flb_statsd { |
36 | | char *buf; /* buffer */ |
37 | | char listen[256]; /* listening address (RFC-2181) */ |
38 | | char port[6]; /* listening port (RFC-793) */ |
39 | | int metrics; /* Import as metrics */ |
40 | | flb_sockfd_t server_fd; /* server socket */ |
41 | | flb_pipefd_t coll_fd; /* server handler */ |
42 | | struct flb_input_instance *ins; /* input instance */ |
43 | | struct flb_log_event_encoder *log_encoder; |
44 | | }; |
45 | | |
46 | | /* |
47 | | * The "statsd_message" represents a single line in UDP packet. |
48 | | * It's just a bunch of pointers to ephemeral buffer. |
49 | | */ |
50 | | struct statsd_message { |
51 | | char *bucket; |
52 | | int bucket_len; |
53 | | char *value; |
54 | | int value_len; |
55 | | int type; |
56 | | double sample_rate; |
57 | | }; |
58 | | |
59 | | static int get_statsd_type(char *str) |
60 | 0 | { |
61 | 0 | switch (*str) { |
62 | 0 | case 'g': |
63 | 0 | return STATSD_TYPE_GAUGE; |
64 | 0 | case 's': |
65 | 0 | return STATSD_TYPE_SET; |
66 | 0 | case 'c': |
67 | 0 | return STATSD_TYPE_COUNTER; |
68 | 0 | case 'm': |
69 | 0 | if (*(str + 1) == 's') { |
70 | 0 | return STATSD_TYPE_TIMER; |
71 | 0 | } |
72 | 0 | } |
73 | 0 | return STATSD_TYPE_COUNTER; |
74 | 0 | } |
75 | | |
76 | | static int is_incremental(char *str) |
77 | 0 | { |
78 | 0 | return (*str == '+' || *str == '-'); |
79 | 0 | } |
80 | | |
81 | | static int statsd_process_message(struct flb_statsd *ctx, |
82 | | struct statsd_message *m) |
83 | 0 | { |
84 | 0 | int ret; |
85 | |
|
86 | 0 | ret = flb_log_event_encoder_begin_record(ctx->log_encoder); |
87 | |
|
88 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
89 | 0 | ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder); |
90 | 0 | } |
91 | |
|
92 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
93 | 0 | switch (m->type) { |
94 | 0 | case STATSD_TYPE_COUNTER: |
95 | 0 | ret = flb_log_event_encoder_append_body_values( |
96 | 0 | ctx->log_encoder, |
97 | |
|
98 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("type"), |
99 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("counter"), |
100 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("bucket"), |
101 | 0 | FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), |
102 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("value"), |
103 | 0 | FLB_LOG_EVENT_DOUBLE_VALUE(strtod(m->value, NULL)), |
104 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("sample_rate"), |
105 | 0 | FLB_LOG_EVENT_DOUBLE_VALUE(m->sample_rate)); |
106 | |
|
107 | 0 | break; |
108 | 0 | case STATSD_TYPE_GAUGE: |
109 | 0 | ret = flb_log_event_encoder_append_body_values( |
110 | 0 | ctx->log_encoder, |
111 | |
|
112 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("type"), |
113 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("gauge"), |
114 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("bucket"), |
115 | 0 | FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), |
116 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("value"), |
117 | 0 | FLB_LOG_EVENT_DOUBLE_VALUE(strtod(m->value, NULL)), |
118 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("incremental"), |
119 | 0 | FLB_LOG_EVENT_INT64_VALUE(is_incremental(m->value))); |
120 | 0 | break; |
121 | 0 | case STATSD_TYPE_TIMER: |
122 | 0 | ret = flb_log_event_encoder_append_body_values( |
123 | 0 | ctx->log_encoder, |
124 | |
|
125 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("type"), |
126 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("timer"), |
127 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("bucket"), |
128 | 0 | FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), |
129 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("value"), |
130 | 0 | FLB_LOG_EVENT_DOUBLE_VALUE(strtod(m->value, NULL)), |
131 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("sample_rate"), |
132 | 0 | FLB_LOG_EVENT_DOUBLE_VALUE(m->sample_rate)); |
133 | |
|
134 | 0 | case STATSD_TYPE_SET: |
135 | 0 | ret = flb_log_event_encoder_append_body_values( |
136 | 0 | ctx->log_encoder, |
137 | |
|
138 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("type"), |
139 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("set"), |
140 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("bucket"), |
141 | 0 | FLB_LOG_EVENT_STRING_VALUE(m->bucket, m->bucket_len), |
142 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("value"), |
143 | 0 | FLB_LOG_EVENT_STRING_VALUE(m->value, m->value_len)); |
144 | 0 | break; |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
149 | 0 | ret = flb_log_event_encoder_commit_record(ctx->log_encoder); |
150 | 0 | } |
151 | |
|
152 | 0 | return ret; |
153 | 0 | } |
154 | | |
155 | | static int statsd_process_line(struct flb_statsd *ctx, char *line) |
156 | 0 | { |
157 | 0 | char *colon, *bar, *atmark; |
158 | 0 | struct statsd_message m; |
159 | | |
160 | | /* |
161 | | * bucket:value|type|@sample_rate |
162 | | * ------ |
163 | | */ |
164 | 0 | colon = strchr(line, ':'); |
165 | 0 | if (colon == NULL) { |
166 | 0 | flb_plg_error(ctx->ins, "no bucket name found"); |
167 | 0 | return -1; |
168 | 0 | } |
169 | 0 | m.bucket = line; |
170 | 0 | m.bucket_len = (colon - line); |
171 | | |
172 | | /* |
173 | | * bucket:value|type|@sample_rate |
174 | | * ---- |
175 | | */ |
176 | 0 | bar = strchr(colon + 1, '|'); |
177 | 0 | if (bar == NULL) { |
178 | 0 | flb_plg_error(ctx->ins, "no metric type found"); |
179 | 0 | return -1; |
180 | 0 | } |
181 | 0 | m.type = get_statsd_type(bar + 1); |
182 | | |
183 | | /* |
184 | | * bucket:value|type|@sample_rate |
185 | | * ----- |
186 | | */ |
187 | 0 | m.value = colon + 1; |
188 | 0 | m.value_len = (bar - colon - 1); |
189 | | |
190 | | /* |
191 | | * bucket:value|type|@sample_rate |
192 | | * ------------ |
193 | | */ |
194 | 0 | atmark = strstr(bar + 1, "|@"); |
195 | 0 | if (atmark == NULL || atof(atmark + 2) == 0) { |
196 | 0 | m.sample_rate = 1.0; |
197 | 0 | } |
198 | 0 | else { |
199 | 0 | m.sample_rate = atof(atmark + 2); |
200 | 0 | } |
201 | |
|
202 | 0 | return statsd_process_message(ctx, &m); |
203 | 0 | } |
204 | | |
205 | | |
206 | | static int cb_statsd_receive(struct flb_input_instance *ins, |
207 | | struct flb_config *config, void *data) |
208 | 0 | { |
209 | 0 | int ret; |
210 | 0 | int len; |
211 | 0 | struct flb_statsd *ctx = data; |
212 | 0 | struct cfl_list *head = NULL; |
213 | 0 | struct cfl_list *kvs = NULL; |
214 | 0 | struct cfl_split_entry *cur = NULL; |
215 | 0 | #ifdef FLB_HAVE_METRICS |
216 | 0 | struct cmt *cmt = NULL; |
217 | 0 | int cmt_flags = 0; |
218 | 0 | #endif |
219 | | |
220 | | /* Receive a UDP datagram */ |
221 | 0 | len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0); |
222 | 0 | if (len < 0) { |
223 | 0 | flb_errno(); |
224 | 0 | return -1; |
225 | 0 | } |
226 | 0 | ctx->buf[len] = '\0'; |
227 | |
|
228 | 0 | #ifdef FLB_HAVE_METRICS |
229 | 0 | if (ctx->metrics == FLB_TRUE) { |
230 | 0 | cmt_flags |= CMT_DECODE_STATSD_GAUGE_OBSERVER; |
231 | 0 | flb_plg_trace(ctx->ins, "received a buf: '%s'", ctx->buf); |
232 | 0 | ret = cmt_decode_statsd_create(&cmt, ctx->buf, len, cmt_flags); |
233 | 0 | if (ret != CMT_DECODE_STATSD_SUCCESS) { |
234 | 0 | flb_plg_error(ctx->ins, "failed to process buf: '%s'", ctx->buf); |
235 | 0 | return -1; |
236 | 0 | } |
237 | | |
238 | | /* Append the updated metrics */ |
239 | 0 | ret = flb_input_metrics_append(ins, NULL, 0, cmt); |
240 | 0 | if (ret != 0) { |
241 | 0 | flb_plg_error(ins, "could not append metrics"); |
242 | 0 | } |
243 | |
|
244 | 0 | cmt_destroy(cmt); |
245 | 0 | } |
246 | 0 | else { |
247 | 0 | #endif |
248 | 0 | ret = FLB_EVENT_ENCODER_SUCCESS; |
249 | 0 | kvs = cfl_utils_split(ctx->buf, '\n', -1 ); |
250 | 0 | if (kvs == NULL) { |
251 | 0 | goto split_error; |
252 | 0 | } |
253 | | |
254 | 0 | cfl_list_foreach(head, kvs) { |
255 | 0 | cur = cfl_list_entry(head, struct cfl_split_entry, _head); |
256 | 0 | flb_plg_trace(ctx->ins, "received a line: '%s'", cur->value); |
257 | |
|
258 | 0 | ret = statsd_process_line(ctx, cur->value); |
259 | |
|
260 | 0 | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
261 | 0 | flb_plg_error(ctx->ins, "failed to process line: '%s'", cur->value); |
262 | |
|
263 | 0 | break; |
264 | 0 | } |
265 | 0 | } |
266 | |
|
267 | 0 | if (kvs != NULL) { |
268 | 0 | cfl_utils_split_free(kvs); |
269 | 0 | } |
270 | |
|
271 | 0 | if (ctx->log_encoder->output_length > 0) { |
272 | 0 | flb_input_log_append(ctx->ins, NULL, 0, |
273 | 0 | ctx->log_encoder->output_buffer, |
274 | 0 | ctx->log_encoder->output_length); |
275 | 0 | } |
276 | 0 | else { |
277 | 0 | flb_plg_error(ctx->ins, "log event encoding error : %d", ret); |
278 | 0 | } |
279 | |
|
280 | 0 | flb_log_event_encoder_reset(ctx->log_encoder); |
281 | 0 | #ifdef FLB_HAVE_METRICS |
282 | 0 | } |
283 | 0 | #endif |
284 | | |
285 | 0 | return 0; |
286 | | |
287 | 0 | split_error: |
288 | 0 | return -1; |
289 | 0 | } |
290 | | |
291 | | static int cb_statsd_init(struct flb_input_instance *ins, |
292 | | struct flb_config *config, void *data) |
293 | 0 | { |
294 | 0 | struct flb_statsd *ctx; |
295 | 0 | char *listen; |
296 | 0 | int port; |
297 | 0 | int ret; |
298 | |
|
299 | 0 | ctx = flb_calloc(1, sizeof(struct flb_statsd)); |
300 | 0 | if (!ctx) { |
301 | 0 | flb_errno(); |
302 | 0 | return -1; |
303 | 0 | } |
304 | 0 | ctx->ins = ins; |
305 | |
|
306 | 0 | ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); |
307 | |
|
308 | 0 | if (ctx->log_encoder == NULL) { |
309 | 0 | flb_plg_error(ins, "could not initialize event encoder"); |
310 | 0 | flb_free(ctx); |
311 | |
|
312 | 0 | return -1; |
313 | 0 | } |
314 | | |
315 | 0 | ctx->buf = flb_malloc(MAX_PACKET_SIZE); |
316 | 0 | if (!ctx->buf) { |
317 | 0 | flb_errno(); |
318 | 0 | flb_log_event_encoder_destroy(ctx->log_encoder); |
319 | 0 | flb_free(ctx); |
320 | 0 | return -1; |
321 | 0 | } |
322 | | |
323 | | /* Load the config map */ |
324 | 0 | ret = flb_input_config_map_set(ins, (void *)ctx); |
325 | 0 | if (ret == -1) { |
326 | 0 | flb_plg_error(ins, "unable to load configuration"); |
327 | 0 | flb_log_event_encoder_destroy(ctx->log_encoder); |
328 | 0 | flb_free(ctx); |
329 | 0 | return -1; |
330 | 0 | } |
331 | | |
332 | | /* Listening address */ |
333 | 0 | if (ins->host.listen) { |
334 | 0 | listen = ins->host.listen; |
335 | 0 | } |
336 | 0 | else { |
337 | 0 | listen = DEFAULT_LISTEN; |
338 | 0 | } |
339 | 0 | strncpy(ctx->listen, listen, sizeof(ctx->listen) - 1); |
340 | | |
341 | | /* Listening port */ |
342 | 0 | if (ins->host.port) { |
343 | 0 | port = ins->host.port; |
344 | 0 | } |
345 | 0 | else { |
346 | 0 | port = DEFAULT_PORT; |
347 | 0 | } |
348 | 0 | snprintf(ctx->port, sizeof(ctx->port), "%hu", (unsigned short) port); |
349 | | |
350 | | /* Export plugin context */ |
351 | 0 | flb_input_set_context(ins, ctx); |
352 | | |
353 | | /* Accepts metrics from UDP connections. */ |
354 | 0 | ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen, ins->net_setup.share_port); |
355 | 0 | if (ctx->server_fd == -1) { |
356 | 0 | flb_plg_error(ctx->ins, "can't bind to %s:%s", ctx->listen, ctx->port); |
357 | 0 | flb_log_event_encoder_destroy(ctx->log_encoder); |
358 | 0 | flb_free(ctx->buf); |
359 | 0 | flb_free(ctx); |
360 | 0 | return -1; |
361 | 0 | } |
362 | | |
363 | | /* Set up the UDP connection callback */ |
364 | 0 | ctx->coll_fd = flb_input_set_collector_socket(ins, cb_statsd_receive, |
365 | 0 | ctx->server_fd, config); |
366 | 0 | if (ctx->coll_fd == -1) { |
367 | 0 | flb_plg_error(ctx->ins, "cannot set up connection callback "); |
368 | 0 | flb_log_event_encoder_destroy(ctx->log_encoder); |
369 | 0 | flb_socket_close(ctx->server_fd); |
370 | 0 | flb_free(ctx->buf); |
371 | 0 | flb_free(ctx); |
372 | 0 | return -1; |
373 | 0 | } |
374 | | |
375 | 0 | flb_plg_info(ctx->ins, "start UDP server on %s:%s", ctx->listen, ctx->port); |
376 | 0 | return 0; |
377 | 0 | } |
378 | | |
379 | | static void cb_statsd_pause(void *data, struct flb_config *config) |
380 | 0 | { |
381 | 0 | struct flb_statsd *ctx = data; |
382 | 0 | flb_input_collector_pause(ctx->coll_fd, ctx->ins); |
383 | 0 | } |
384 | | |
385 | | static void cb_statsd_resume(void *data, struct flb_config *config) |
386 | 0 | { |
387 | 0 | struct flb_statsd *ctx = data; |
388 | 0 | flb_input_collector_resume(ctx->coll_fd, ctx->ins); |
389 | 0 | } |
390 | | |
391 | | static int cb_statsd_exit(void *data, struct flb_config *config) |
392 | 0 | { |
393 | 0 | struct flb_statsd *ctx = data; |
394 | |
|
395 | 0 | if (ctx->log_encoder != NULL) { |
396 | 0 | flb_log_event_encoder_destroy(ctx->log_encoder); |
397 | 0 | } |
398 | |
|
399 | 0 | flb_socket_close(ctx->server_fd); |
400 | 0 | flb_free(ctx->buf); |
401 | 0 | flb_free(ctx); |
402 | |
|
403 | 0 | return 0; |
404 | 0 | } |
405 | | |
406 | | static struct flb_config_map config_map[] = { |
407 | | { |
408 | | FLB_CONFIG_MAP_BOOL, "metrics", "off", |
409 | | 0, FLB_TRUE, offsetof(struct flb_statsd, metrics), |
410 | | "Ingest as metrics type of events." |
411 | | }, |
412 | | /* EOF */ |
413 | | {0} |
414 | | }; |
415 | | |
416 | | /* Plugin reference */ |
417 | | struct flb_input_plugin in_statsd_plugin = { |
418 | | .name = "statsd", |
419 | | .description = "StatsD input plugin", |
420 | | .cb_init = cb_statsd_init, |
421 | | .cb_pre_run = NULL, |
422 | | .cb_collect = NULL, |
423 | | .cb_ingest = NULL, |
424 | | .cb_flush_buf = NULL, |
425 | | .cb_pause = cb_statsd_pause, |
426 | | .cb_resume = cb_statsd_resume, |
427 | | .cb_exit = cb_statsd_exit, |
428 | | .config_map = config_map, |
429 | | .flags = FLB_INPUT_NET_SERVER, |
430 | | }; |