/src/fluent-bit/src/flb_output.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <stdio.h> |
21 | | #include <stdlib.h> |
22 | | #include <string.h> |
23 | | |
24 | | #include <fluent-bit/flb_info.h> |
25 | | #include <fluent-bit/flb_mem.h> |
26 | | #include <fluent-bit/flb_str.h> |
27 | | #include <fluent-bit/flb_env.h> |
28 | | #include <fluent-bit/flb_coro.h> |
29 | | #include <fluent-bit/flb_output.h> |
30 | | #include <fluent-bit/flb_kv.h> |
31 | | #include <fluent-bit/flb_io.h> |
32 | | #include <fluent-bit/flb_uri.h> |
33 | | #include <fluent-bit/flb_config.h> |
34 | | #include <fluent-bit/flb_macros.h> |
35 | | #include <fluent-bit/flb_utils.h> |
36 | | #include <fluent-bit/flb_plugin.h> |
37 | | #include <fluent-bit/flb_plugin_proxy.h> |
38 | | #include <fluent-bit/flb_http_client_debug.h> |
39 | | #include <fluent-bit/flb_output_thread.h> |
40 | | #include <fluent-bit/flb_mp.h> |
41 | | #include <fluent-bit/flb_pack.h> |
42 | | |
43 | | FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params); |
44 | | |
45 | | void flb_output_prepare() |
46 | 0 | { |
47 | 0 | FLB_TLS_INIT(out_flush_params); |
48 | 0 | } |
49 | | |
50 | | /* Validate the the output address protocol */ |
51 | | static int check_protocol(const char *prot, const char *output) |
52 | 0 | { |
53 | 0 | int len; |
54 | 0 | char *p; |
55 | |
|
56 | 0 | p = strstr(output, "://"); |
57 | 0 | if (p && p != output) { |
58 | 0 | len = p - output; |
59 | 0 | } |
60 | 0 | else { |
61 | 0 | len = strlen(output); |
62 | 0 | } |
63 | |
|
64 | 0 | if (strlen(prot) != len) { |
65 | 0 | return 0; |
66 | 0 | } |
67 | | |
68 | | /* Output plugin match */ |
69 | 0 | if (strncasecmp(prot, output, len) == 0) { |
70 | 0 | return 1; |
71 | 0 | } |
72 | | |
73 | 0 | return 0; |
74 | 0 | } |
75 | | |
76 | | |
77 | | /* Invoke pre-run call for the output plugin */ |
78 | | void flb_output_pre_run(struct flb_config *config) |
79 | 0 | { |
80 | 0 | struct mk_list *head; |
81 | 0 | struct flb_output_instance *ins; |
82 | 0 | struct flb_output_plugin *p; |
83 | |
|
84 | 0 | mk_list_foreach(head, &config->outputs) { |
85 | 0 | ins = mk_list_entry(head, struct flb_output_instance, _head); |
86 | 0 | p = ins->p; |
87 | 0 | if (p->cb_pre_run) { |
88 | 0 | p->cb_pre_run(ins->context, config); |
89 | 0 | } |
90 | 0 | } |
91 | 0 | } |
92 | | |
93 | | static void flb_output_free_properties(struct flb_output_instance *ins) |
94 | 0 | { |
95 | |
|
96 | 0 | flb_kv_release(&ins->properties); |
97 | 0 | flb_kv_release(&ins->net_properties); |
98 | |
|
99 | 0 | #ifdef FLB_HAVE_TLS |
100 | 0 | if (ins->tls_vhost) { |
101 | 0 | flb_sds_destroy(ins->tls_vhost); |
102 | 0 | } |
103 | 0 | if (ins->tls_ca_path) { |
104 | 0 | flb_sds_destroy(ins->tls_ca_path); |
105 | 0 | } |
106 | 0 | if (ins->tls_ca_file) { |
107 | 0 | flb_sds_destroy(ins->tls_ca_file); |
108 | 0 | } |
109 | 0 | if (ins->tls_crt_file) { |
110 | 0 | flb_sds_destroy(ins->tls_crt_file); |
111 | 0 | } |
112 | 0 | if (ins->tls_key_file) { |
113 | 0 | flb_sds_destroy(ins->tls_key_file); |
114 | 0 | } |
115 | 0 | if (ins->tls_key_passwd) { |
116 | 0 | flb_sds_destroy(ins->tls_key_passwd); |
117 | 0 | } |
118 | 0 | #endif |
119 | 0 | } |
120 | | |
121 | | void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush) |
122 | 0 | { |
123 | 0 | struct flb_output_instance *ins = out_flush->o_ins; |
124 | 0 | struct flb_out_thread_instance *th_ins; |
125 | | |
126 | | /* Move output coroutine context from active list to the destroy one */ |
127 | 0 | if (flb_output_is_threaded(ins) == FLB_TRUE) { |
128 | 0 | th_ins = flb_output_thread_instance_get(); |
129 | 0 | pthread_mutex_lock(&th_ins->flush_mutex); |
130 | 0 | mk_list_del(&out_flush->_head); |
131 | 0 | mk_list_add(&out_flush->_head, &th_ins->flush_list_destroy); |
132 | 0 | pthread_mutex_unlock(&th_ins->flush_mutex); |
133 | 0 | } |
134 | 0 | else { |
135 | 0 | mk_list_del(&out_flush->_head); |
136 | 0 | mk_list_add(&out_flush->_head, &ins->flush_list_destroy); |
137 | 0 | } |
138 | 0 | } |
139 | | |
140 | | int flb_output_flush_id_get(struct flb_output_instance *ins) |
141 | 0 | { |
142 | 0 | int id; |
143 | 0 | int max = (2 << 13) - 1; /* max for 14 bits */ |
144 | 0 | struct flb_out_thread_instance *th_ins; |
145 | |
|
146 | 0 | if (flb_output_is_threaded(ins) == FLB_TRUE) { |
147 | 0 | th_ins = flb_output_thread_instance_get(); |
148 | 0 | id = th_ins->flush_id; |
149 | 0 | th_ins->flush_id++; |
150 | | |
151 | | /* reset once it reach the maximum allowed */ |
152 | 0 | if (th_ins->flush_id > max) { |
153 | 0 | th_ins->flush_id = 0; |
154 | 0 | } |
155 | 0 | } |
156 | 0 | else { |
157 | 0 | id = ins->flush_id; |
158 | 0 | ins->flush_id++; |
159 | | |
160 | | /* reset once it reach the maximum allowed */ |
161 | 0 | if (ins->flush_id > max) { |
162 | 0 | ins->flush_id = 0; |
163 | 0 | } |
164 | 0 | } |
165 | |
|
166 | 0 | return id; |
167 | 0 | } |
168 | | |
169 | | void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro) |
170 | 0 | { |
171 | 0 | struct flb_output_flush *out_flush; |
172 | |
|
173 | 0 | out_flush = (struct flb_output_flush *) FLB_CORO_DATA(coro); |
174 | 0 | mk_list_add(&out_flush->_head, &ins->flush_list); |
175 | 0 | } |
176 | | |
177 | | /* |
178 | | * Queue a task to be flushed at a later time |
179 | | * Deletes retry context if enqueue fails |
180 | | */ |
181 | | static int flb_output_task_queue_enqueue(struct flb_task_queue *queue, |
182 | | struct flb_task_retry *retry, |
183 | | struct flb_task *task, |
184 | | struct flb_output_instance *out_ins, |
185 | | struct flb_config *config) |
186 | 0 | { |
187 | 0 | struct flb_task_enqueued *queued_task; |
188 | |
|
189 | 0 | queued_task = flb_malloc(sizeof(struct flb_task_enqueued)); |
190 | 0 | if (!queued_task) { |
191 | 0 | flb_errno(); |
192 | 0 | if (retry) { |
193 | 0 | flb_task_retry_destroy(retry); |
194 | 0 | } |
195 | 0 | return -1; |
196 | 0 | } |
197 | 0 | queued_task->retry = retry; |
198 | 0 | queued_task->out_instance = out_ins; |
199 | 0 | queued_task->task = task; |
200 | 0 | queued_task->config = config; |
201 | |
|
202 | 0 | mk_list_add(&queued_task->_head, &queue->pending); |
203 | 0 | return 0; |
204 | 0 | } |
205 | | |
206 | | /* |
207 | | * Pop task from pending queue and flush it |
208 | | * Will delete retry context if flush fails |
209 | | */ |
210 | | static int flb_output_task_queue_flush_one(struct flb_task_queue *queue) |
211 | 0 | { |
212 | 0 | struct flb_task_enqueued *queued_task; |
213 | 0 | int ret; |
214 | 0 | int is_empty; |
215 | |
|
216 | 0 | is_empty = mk_list_is_empty(&queue->pending) == 0; |
217 | 0 | if (is_empty) { |
218 | 0 | flb_error("Attempting to flush task from an empty in_progress queue"); |
219 | 0 | return -1; |
220 | 0 | } |
221 | | |
222 | 0 | queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head); |
223 | 0 | mk_list_del(&queued_task->_head); |
224 | 0 | mk_list_add(&queued_task->_head, &queue->in_progress); |
225 | | |
226 | | /* |
227 | | * Remove temporary user now that task is out of singleplex queue. |
228 | | * Flush will add back the user representing queued_task->out_instance if it succeeds. |
229 | | */ |
230 | 0 | flb_task_users_dec(queued_task->task, FLB_FALSE); |
231 | 0 | ret = flb_output_task_flush(queued_task->task, |
232 | 0 | queued_task->out_instance, |
233 | 0 | queued_task->config); |
234 | | |
235 | | /* Destroy retry context if needed */ |
236 | 0 | if (ret == -1) { |
237 | 0 | if (queued_task->retry) { |
238 | 0 | flb_task_retry_destroy(queued_task->retry); |
239 | 0 | } |
240 | | /* Flush the next task */ |
241 | 0 | flb_output_task_singleplex_flush_next(queue); |
242 | 0 | return -1; |
243 | 0 | } |
244 | | |
245 | 0 | return ret; |
246 | 0 | } |
247 | | |
248 | | /* |
249 | | * Will either run or queue running a single task |
250 | | * Deletes retry context if enqueue fails |
251 | | */ |
252 | | int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue, |
253 | | struct flb_task_retry *retry, |
254 | | struct flb_task *task, |
255 | | struct flb_output_instance *out_ins, |
256 | | struct flb_config *config) |
257 | 0 | { |
258 | 0 | int ret; |
259 | 0 | int is_empty; |
260 | | |
261 | | /* |
262 | | * Add temporary user to preserve task while in singleplex queue. |
263 | | * Temporary user will be removed when task is removed from queue. |
264 | | * |
265 | | * Note: if we fail to increment now, then the task may be prematurely |
266 | | * deleted if the task's users go to 0 while we are waiting in the |
267 | | * queue. |
268 | | */ |
269 | 0 | flb_task_users_inc(task); |
270 | | |
271 | | /* Enqueue task */ |
272 | 0 | ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config); |
273 | 0 | if (ret == -1) { |
274 | 0 | return -1; |
275 | 0 | } |
276 | | |
277 | | /* Launch task if nothing is running */ |
278 | 0 | is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0; |
279 | 0 | if (is_empty) { |
280 | 0 | return flb_output_task_queue_flush_one(out_ins->singleplex_queue); |
281 | 0 | } |
282 | | |
283 | 0 | return 0; |
284 | 0 | } |
285 | | |
286 | | /* |
287 | | * Clear in progress task and flush a single queued task if exists |
288 | | * Deletes retry context on next flush if flush fails |
289 | | */ |
290 | | int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue) |
291 | 0 | { |
292 | 0 | int is_empty; |
293 | 0 | struct flb_task_enqueued *ended_task; |
294 | | |
295 | | /* Remove in progress task */ |
296 | 0 | is_empty = mk_list_is_empty(&queue->in_progress) == 0; |
297 | 0 | if (!is_empty) { |
298 | 0 | ended_task = mk_list_entry_first(&queue->in_progress, |
299 | 0 | struct flb_task_enqueued, _head); |
300 | 0 | mk_list_del(&ended_task->_head); |
301 | 0 | flb_free(ended_task); |
302 | 0 | } |
303 | | |
304 | | /* Flush if there is a pending task queued */ |
305 | 0 | is_empty = mk_list_is_empty(&queue->pending) == 0; |
306 | 0 | if (!is_empty) { |
307 | 0 | return flb_output_task_queue_flush_one(queue); |
308 | 0 | } |
309 | 0 | return 0; |
310 | 0 | } |
311 | | |
312 | | /* |
313 | | * Flush a task through the output plugin, either using a worker thread + coroutine |
314 | | * or a simple co-routine in the current thread. |
315 | | */ |
316 | | int flb_output_task_flush(struct flb_task *task, |
317 | | struct flb_output_instance *out_ins, |
318 | | struct flb_config *config) |
319 | 0 | { |
320 | 0 | int ret; |
321 | 0 | struct flb_output_flush *out_flush; |
322 | |
|
323 | 0 | if (flb_output_is_threaded(out_ins) == FLB_TRUE) { |
324 | 0 | flb_task_users_inc(task); |
325 | | |
326 | | /* Dispatch the task to the thread pool */ |
327 | 0 | ret = flb_output_thread_pool_flush(task, out_ins, config); |
328 | 0 | if (ret == -1) { |
329 | 0 | flb_task_users_dec(task, FLB_FALSE); |
330 | | |
331 | | /* If we are in synchronous mode, flush one waiting task */ |
332 | 0 | if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) { |
333 | 0 | flb_output_task_singleplex_flush_next(out_ins->singleplex_queue); |
334 | 0 | } |
335 | 0 | } |
336 | 0 | } |
337 | 0 | else { |
338 | | /* Queue co-routine handling */ |
339 | 0 | out_flush = flb_output_flush_create(task, |
340 | 0 | task->i_ins, |
341 | 0 | out_ins, |
342 | 0 | config); |
343 | 0 | if (!out_flush) { |
344 | 0 | return -1; |
345 | 0 | } |
346 | | |
347 | 0 | flb_task_users_inc(task); |
348 | 0 | ret = flb_pipe_w(config->ch_self_events[1], &out_flush, |
349 | 0 | sizeof(struct flb_output_flush*)); |
350 | 0 | if (ret == -1) { |
351 | 0 | flb_errno(); |
352 | 0 | flb_output_flush_destroy(out_flush); |
353 | 0 | flb_task_users_dec(task, FLB_FALSE); |
354 | | |
355 | | /* If we are in synchronous mode, flush one waiting task */ |
356 | 0 | if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) { |
357 | 0 | flb_output_task_singleplex_flush_next(out_ins->singleplex_queue); |
358 | 0 | } |
359 | |
|
360 | 0 | return -1; |
361 | 0 | } |
362 | 0 | } |
363 | | |
364 | 0 | return 0; |
365 | 0 | } |
366 | | |
367 | | int flb_output_instance_destroy(struct flb_output_instance *ins) |
368 | 0 | { |
369 | 0 | if (ins->alias) { |
370 | 0 | flb_sds_destroy(ins->alias); |
371 | 0 | } |
372 | | |
373 | | /* Remove URI context */ |
374 | 0 | if (ins->host.uri) { |
375 | 0 | flb_uri_destroy(ins->host.uri); |
376 | 0 | } |
377 | |
|
378 | 0 | flb_sds_destroy(ins->host.name); |
379 | 0 | flb_sds_destroy(ins->host.address); |
380 | 0 | flb_sds_destroy(ins->host.listen); |
381 | 0 | flb_sds_destroy(ins->match); |
382 | |
|
383 | 0 | #ifdef FLB_HAVE_REGEX |
384 | 0 | if (ins->match_regex) { |
385 | 0 | flb_regex_destroy(ins->match_regex); |
386 | 0 | } |
387 | 0 | #endif |
388 | |
|
389 | 0 | #ifdef FLB_HAVE_TLS |
390 | 0 | if (ins->use_tls == FLB_TRUE) { |
391 | 0 | if (ins->tls) { |
392 | 0 | flb_tls_destroy(ins->tls); |
393 | 0 | } |
394 | 0 | } |
395 | |
|
396 | 0 | if (ins->tls_config_map) { |
397 | 0 | flb_config_map_destroy(ins->tls_config_map); |
398 | 0 | } |
399 | 0 | #endif |
400 | | |
401 | | /* Remove metrics */ |
402 | 0 | #ifdef FLB_HAVE_METRICS |
403 | 0 | if (ins->cmt) { |
404 | 0 | cmt_destroy(ins->cmt); |
405 | 0 | } |
406 | |
|
407 | 0 | if (ins->metrics) { |
408 | 0 | flb_metrics_destroy(ins->metrics); |
409 | 0 | } |
410 | 0 | #endif |
411 | | |
412 | | /* destroy callback context */ |
413 | 0 | if (ins->callback) { |
414 | 0 | flb_callback_destroy(ins->callback); |
415 | 0 | } |
416 | | |
417 | | /* destroy config map */ |
418 | 0 | if (ins->config_map) { |
419 | 0 | flb_config_map_destroy(ins->config_map); |
420 | 0 | } |
421 | |
|
422 | 0 | if (ins->net_config_map) { |
423 | 0 | flb_config_map_destroy(ins->net_config_map); |
424 | 0 | } |
425 | |
|
426 | 0 | if (ins->ch_events[0] > 0) { |
427 | 0 | mk_event_closesocket(ins->ch_events[0]); |
428 | 0 | } |
429 | |
|
430 | 0 | if (ins->ch_events[1] > 0) { |
431 | 0 | mk_event_closesocket(ins->ch_events[1]); |
432 | 0 | } |
433 | | |
434 | | /* release properties */ |
435 | 0 | flb_output_free_properties(ins); |
436 | | |
437 | | /* free singleplex queue */ |
438 | 0 | if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) { |
439 | 0 | flb_task_queue_destroy(ins->singleplex_queue); |
440 | 0 | } |
441 | |
|
442 | 0 | mk_list_del(&ins->_head); |
443 | | |
444 | | /* processor */ |
445 | 0 | if (ins->processor) { |
446 | 0 | flb_processor_destroy(ins->processor); |
447 | 0 | } |
448 | |
|
449 | 0 | flb_free(ins); |
450 | |
|
451 | 0 | return 0; |
452 | 0 | } |
453 | | |
454 | | /* Invoke exit call for the output plugin */ |
455 | | void flb_output_exit(struct flb_config *config) |
456 | 0 | { |
457 | 0 | struct mk_list *tmp; |
458 | 0 | struct mk_list *head; |
459 | 0 | struct flb_output_instance *ins; |
460 | 0 | struct flb_output_plugin *p; |
461 | 0 | void *params; |
462 | |
|
463 | 0 | mk_list_foreach_safe(head, tmp, &config->outputs) { |
464 | 0 | ins = mk_list_entry(head, struct flb_output_instance, _head); |
465 | 0 | p = ins->p; |
466 | |
|
467 | 0 | if (ins->is_threaded == FLB_FALSE) { |
468 | 0 | if (ins->p->cb_worker_exit) { |
469 | 0 | ins->p->cb_worker_exit(ins->context, ins->config); |
470 | 0 | } |
471 | 0 | } |
472 | | |
473 | | /* Stop any worker thread */ |
474 | 0 | if (flb_output_is_threaded(ins) == FLB_TRUE) { |
475 | 0 | flb_output_thread_pool_destroy(ins); |
476 | 0 | } |
477 | | |
478 | | /* Check a exit callback */ |
479 | 0 | if (p->cb_exit) { |
480 | 0 | p->cb_exit(ins->context, config); |
481 | 0 | } |
482 | 0 | flb_output_instance_destroy(ins); |
483 | 0 | } |
484 | |
|
485 | 0 | params = FLB_TLS_GET(out_flush_params); |
486 | 0 | if (params) { |
487 | 0 | flb_free(params); |
488 | 0 | FLB_TLS_SET(out_flush_params, NULL); |
489 | 0 | } |
490 | 0 | } |
491 | | |
492 | | static inline int instance_id(struct flb_config *config) |
493 | 0 | { |
494 | 0 | struct flb_output_instance *ins; |
495 | |
|
496 | 0 | if (mk_list_size(&config->outputs) == 0) { |
497 | 0 | return 0; |
498 | 0 | } |
499 | | |
500 | 0 | ins = mk_list_entry_last(&config->outputs, struct flb_output_instance, |
501 | 0 | _head); |
502 | 0 | return (ins->id + 1); |
503 | 0 | } |
504 | | |
505 | | struct flb_output_instance *flb_output_get_instance(struct flb_config *config, |
506 | | int out_id) |
507 | 0 | { |
508 | 0 | struct mk_list *head; |
509 | 0 | struct flb_output_instance *ins; |
510 | |
|
511 | 0 | mk_list_foreach(head, &config->outputs) { |
512 | 0 | ins = mk_list_entry(head, struct flb_output_instance, _head); |
513 | 0 | if (ins->id == out_id) { |
514 | 0 | break; |
515 | 0 | } |
516 | 0 | ins = NULL; |
517 | 0 | } |
518 | |
|
519 | 0 | if (!ins) { |
520 | 0 | return NULL; |
521 | 0 | } |
522 | | |
523 | 0 | return ins; |
524 | 0 | } |
525 | | |
526 | | /* |
527 | | * Invoked everytime a flush callback has finished (returned). This function |
528 | | * is called from the event loop. |
529 | | */ |
530 | | int flb_output_flush_finished(struct flb_config *config, int out_id) |
531 | 0 | { |
532 | 0 | struct mk_list *tmp; |
533 | 0 | struct mk_list *head; |
534 | 0 | struct mk_list *list; |
535 | 0 | struct flb_output_instance *ins; |
536 | 0 | struct flb_output_flush *out_flush; |
537 | 0 | struct flb_out_thread_instance *th_ins; |
538 | |
|
539 | 0 | ins = flb_output_get_instance(config, out_id); |
540 | 0 | if (!ins) { |
541 | 0 | return -1; |
542 | 0 | } |
543 | | |
544 | 0 | if (flb_output_is_threaded(ins) == FLB_TRUE) { |
545 | 0 | th_ins = flb_output_thread_instance_get(); |
546 | 0 | list = &th_ins->flush_list_destroy; |
547 | 0 | } |
548 | 0 | else { |
549 | 0 | list = &ins->flush_list_destroy; |
550 | 0 | } |
551 | | |
552 | | /* Look for output coroutines that needs to be destroyed */ |
553 | 0 | mk_list_foreach_safe(head, tmp, list) { |
554 | 0 | out_flush = mk_list_entry(head, struct flb_output_flush, _head); |
555 | 0 | flb_output_flush_destroy(out_flush); |
556 | 0 | } |
557 | |
|
558 | 0 | return 0; |
559 | 0 | } |
560 | | |
561 | | |
562 | | /* |
563 | | * It validate an output type given the string, it return the |
564 | | * proper type and if valid, populate the global config. |
565 | | */ |
566 | | struct flb_output_instance *flb_output_new(struct flb_config *config, |
567 | | const char *output, void *data, |
568 | | int public_only) |
569 | 0 | { |
570 | 0 | int ret = -1; |
571 | 0 | int flags = 0; |
572 | 0 | struct mk_list *head; |
573 | 0 | struct flb_output_plugin *plugin; |
574 | 0 | struct flb_output_instance *instance = NULL; |
575 | |
|
576 | 0 | if (!output) { |
577 | 0 | return NULL; |
578 | 0 | } |
579 | | |
580 | 0 | mk_list_foreach(head, &config->out_plugins) { |
581 | 0 | plugin = mk_list_entry(head, struct flb_output_plugin, _head); |
582 | 0 | if (!check_protocol(plugin->name, output)) { |
583 | 0 | plugin = NULL; |
584 | 0 | continue; |
585 | 0 | } |
586 | | |
587 | 0 | if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) { |
588 | 0 | return NULL; |
589 | 0 | } |
590 | 0 | break; |
591 | 0 | } |
592 | | |
593 | 0 | if (!plugin) { |
594 | 0 | return NULL; |
595 | 0 | } |
596 | | |
597 | | /* Create and load instance */ |
598 | 0 | instance = flb_calloc(1, sizeof(struct flb_output_instance)); |
599 | 0 | if (!instance) { |
600 | 0 | flb_errno(); |
601 | 0 | return NULL; |
602 | 0 | } |
603 | | |
604 | | /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */ |
605 | 0 | if (plugin->event_type == 0) { |
606 | 0 | instance->event_type = FLB_OUTPUT_LOGS; |
607 | 0 | } |
608 | 0 | else { |
609 | 0 | instance->event_type = plugin->event_type; |
610 | 0 | } |
611 | 0 | instance->config = config; |
612 | 0 | instance->log_level = -1; |
613 | 0 | instance->log_suppress_interval = -1; |
614 | 0 | instance->test_mode = FLB_FALSE; |
615 | 0 | instance->is_threaded = FLB_FALSE; |
616 | 0 | instance->tp_workers = plugin->workers; |
617 | | |
618 | | /* Retrieve an instance id for the output instance */ |
619 | 0 | instance->id = instance_id(config); |
620 | | |
621 | | /* format name (with instance id) */ |
622 | 0 | snprintf(instance->name, sizeof(instance->name) - 1, |
623 | 0 | "%s.%i", plugin->name, instance->id); |
624 | 0 | instance->p = plugin; |
625 | 0 | instance->callback = flb_callback_create(instance->name); |
626 | 0 | if (!instance->callback) { |
627 | 0 | if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { |
628 | 0 | flb_task_queue_destroy(instance->singleplex_queue); |
629 | 0 | } |
630 | 0 | flb_free(instance); |
631 | 0 | return NULL; |
632 | 0 | } |
633 | | |
634 | 0 | if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) { |
635 | 0 | instance->context = NULL; |
636 | 0 | } |
637 | 0 | else { |
638 | 0 | struct flb_plugin_proxy_context *ctx; |
639 | |
|
640 | 0 | ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context)); |
641 | 0 | if (!ctx) { |
642 | 0 | flb_errno(); |
643 | 0 | if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { |
644 | 0 | flb_task_queue_destroy(instance->singleplex_queue); |
645 | 0 | } |
646 | 0 | flb_free(instance); |
647 | 0 | return NULL; |
648 | 0 | } |
649 | | |
650 | 0 | ctx->proxy = plugin->proxy; |
651 | |
|
652 | 0 | instance->context = ctx; |
653 | 0 | } |
654 | | |
655 | 0 | instance->alias = NULL; |
656 | 0 | instance->flags = instance->p->flags; |
657 | 0 | instance->data = data; |
658 | 0 | instance->match = NULL; |
659 | 0 | #ifdef FLB_HAVE_REGEX |
660 | 0 | instance->match_regex = NULL; |
661 | 0 | #endif |
662 | 0 | instance->retry_limit = 1; |
663 | 0 | instance->host.name = NULL; |
664 | 0 | instance->host.address = NULL; |
665 | 0 | instance->net_config_map = NULL; |
666 | | |
667 | | /* Storage */ |
668 | 0 | instance->total_limit_size = -1; |
669 | | |
670 | | /* Parent plugin flags */ |
671 | 0 | flags = instance->flags; |
672 | 0 | if (flags & FLB_IO_TCP) { |
673 | 0 | instance->use_tls = FLB_FALSE; |
674 | 0 | } |
675 | 0 | else if (flags & FLB_IO_TLS) { |
676 | 0 | instance->use_tls = FLB_TRUE; |
677 | 0 | } |
678 | 0 | else if (flags & FLB_IO_OPT_TLS) { |
679 | | /* TLS must be enabled manually in the config */ |
680 | 0 | instance->use_tls = FLB_FALSE; |
681 | 0 | instance->flags |= FLB_IO_TLS; |
682 | 0 | } |
683 | |
|
684 | 0 | #ifdef FLB_HAVE_TLS |
685 | 0 | instance->tls = NULL; |
686 | 0 | instance->tls_debug = -1; |
687 | 0 | instance->tls_verify = FLB_TRUE; |
688 | 0 | instance->tls_verify_hostname = FLB_FALSE; |
689 | 0 | instance->tls_vhost = NULL; |
690 | 0 | instance->tls_ca_path = NULL; |
691 | 0 | instance->tls_ca_file = NULL; |
692 | 0 | instance->tls_crt_file = NULL; |
693 | 0 | instance->tls_key_file = NULL; |
694 | 0 | instance->tls_key_passwd = NULL; |
695 | 0 | #endif |
696 | |
|
697 | 0 | if (plugin->flags & FLB_OUTPUT_NET) { |
698 | 0 | ret = flb_net_host_set(plugin->name, &instance->host, output); |
699 | 0 | if (ret != 0) { |
700 | 0 | if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { |
701 | 0 | flb_task_queue_destroy(instance->singleplex_queue); |
702 | 0 | } |
703 | 0 | flb_free(instance); |
704 | 0 | return NULL; |
705 | 0 | } |
706 | 0 | } |
707 | | |
708 | | /* Create singleplex queue if SYNCHRONOUS mode is used */ |
709 | 0 | instance->singleplex_queue = NULL; |
710 | 0 | if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) { |
711 | 0 | instance->singleplex_queue = flb_task_queue_create(); |
712 | 0 | if (!instance->singleplex_queue) { |
713 | 0 | flb_free(instance); |
714 | 0 | flb_errno(); |
715 | 0 | return NULL; |
716 | 0 | } |
717 | 0 | } |
718 | | |
719 | 0 | flb_kv_init(&instance->properties); |
720 | 0 | flb_kv_init(&instance->net_properties); |
721 | 0 | mk_list_init(&instance->upstreams); |
722 | 0 | mk_list_init(&instance->flush_list); |
723 | 0 | mk_list_init(&instance->flush_list_destroy); |
724 | |
|
725 | 0 | mk_list_add(&instance->_head, &config->outputs); |
726 | | |
727 | | /* processor instance */ |
728 | 0 | instance->processor = flb_processor_create(config, instance->name, instance, FLB_PLUGIN_OUTPUT); |
729 | | |
730 | | /* Tests */ |
731 | 0 | instance->test_formatter.callback = plugin->test_formatter.callback; |
732 | 0 | instance->test_response.callback = plugin->test_response.callback; |
733 | | |
734 | |
|
735 | 0 | return instance; |
736 | 0 | } |
737 | | |
738 | | static inline int prop_key_check(const char *key, const char *kv, int k_len) |
739 | 0 | { |
740 | 0 | int len; |
741 | |
|
742 | 0 | len = strlen(key); |
743 | 0 | if (strncasecmp(key, kv, k_len) == 0 && len == k_len) { |
744 | 0 | return 0; |
745 | 0 | } |
746 | | |
747 | 0 | return -1; |
748 | 0 | } |
749 | | |
750 | | /* Override a configuration property for the given input_instance plugin */ |
751 | | int flb_output_set_property(struct flb_output_instance *ins, |
752 | | const char *k, const char *v) |
753 | 0 | { |
754 | 0 | int len; |
755 | 0 | int ret; |
756 | 0 | ssize_t limit; |
757 | 0 | flb_sds_t tmp; |
758 | 0 | struct flb_kv *kv; |
759 | 0 | struct flb_config *config = ins->config; |
760 | |
|
761 | 0 | len = strlen(k); |
762 | 0 | tmp = flb_env_var_translate(config->env, v); |
763 | 0 | if (tmp) { |
764 | 0 | if (strlen(tmp) == 0) { |
765 | 0 | flb_sds_destroy(tmp); |
766 | 0 | tmp = NULL; |
767 | 0 | } |
768 | 0 | } |
769 | | |
770 | | /* Check if the key is a known/shared property */ |
771 | 0 | if (prop_key_check("match", k, len) == 0) { |
772 | 0 | flb_utils_set_plugin_string_property("match", &ins->match, tmp); |
773 | 0 | } |
774 | 0 | #ifdef FLB_HAVE_REGEX |
775 | 0 | else if (prop_key_check("match_regex", k, len) == 0 && tmp) { |
776 | 0 | ins->match_regex = flb_regex_create(tmp); |
777 | 0 | flb_sds_destroy(tmp); |
778 | 0 | } |
779 | 0 | #endif |
780 | 0 | else if (prop_key_check("alias", k, len) == 0 && tmp) { |
781 | 0 | flb_utils_set_plugin_string_property("alias", &ins->alias, tmp); |
782 | 0 | } |
783 | 0 | else if (prop_key_check("log_level", k, len) == 0 && tmp) { |
784 | 0 | ret = flb_log_get_level_str(tmp); |
785 | 0 | flb_sds_destroy(tmp); |
786 | 0 | if (ret == -1) { |
787 | 0 | return -1; |
788 | 0 | } |
789 | 0 | ins->log_level = ret; |
790 | 0 | } |
791 | 0 | else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) { |
792 | 0 | ret = flb_utils_time_to_seconds(tmp); |
793 | 0 | flb_sds_destroy(tmp); |
794 | 0 | if (ret == -1) { |
795 | 0 | return -1; |
796 | 0 | } |
797 | 0 | ins->log_suppress_interval = ret; |
798 | 0 | } |
799 | 0 | else if (prop_key_check("host", k, len) == 0) { |
800 | 0 | flb_utils_set_plugin_string_property("host", &ins->host.name, tmp); |
801 | 0 | } |
802 | 0 | else if (prop_key_check("port", k, len) == 0) { |
803 | 0 | if (tmp) { |
804 | 0 | ins->host.port = atoi(tmp); |
805 | 0 | flb_sds_destroy(tmp); |
806 | 0 | } |
807 | 0 | else { |
808 | 0 | ins->host.port = 0; |
809 | 0 | } |
810 | 0 | } |
811 | 0 | else if (prop_key_check("ipv6", k, len) == 0 && tmp) { |
812 | 0 | ins->host.ipv6 = flb_utils_bool(tmp); |
813 | 0 | flb_sds_destroy(tmp); |
814 | 0 | } |
815 | 0 | else if (prop_key_check("retry_limit", k, len) == 0) { |
816 | 0 | if (tmp) { |
817 | 0 | if (strcasecmp(tmp, "no_limits") == 0 || |
818 | 0 | strcasecmp(tmp, "false") == 0 || |
819 | 0 | strcasecmp(tmp, "off") == 0) { |
820 | | /* No limits for retries */ |
821 | 0 | ins->retry_limit = FLB_OUT_RETRY_UNLIMITED; |
822 | 0 | } |
823 | 0 | else if (strcasecmp(tmp, "no_retries") == 0) { |
824 | 0 | ins->retry_limit = FLB_OUT_RETRY_NONE; |
825 | 0 | } |
826 | 0 | else { |
827 | 0 | ins->retry_limit = atoi(tmp); |
828 | 0 | if (ins->retry_limit <= 0) { |
829 | 0 | flb_warn("[config] invalid retry_limit. set default."); |
830 | | /* set default when input is invalid number */ |
831 | 0 | ins->retry_limit = 1; |
832 | 0 | } |
833 | 0 | } |
834 | 0 | flb_sds_destroy(tmp); |
835 | 0 | } |
836 | 0 | else { |
837 | 0 | ins->retry_limit = 1; |
838 | 0 | } |
839 | 0 | } |
840 | 0 | else if (strncasecmp("net.", k, 4) == 0 && tmp) { |
841 | 0 | kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL); |
842 | 0 | if (!kv) { |
843 | 0 | if (tmp) { |
844 | 0 | flb_sds_destroy(tmp); |
845 | 0 | } |
846 | 0 | return -1; |
847 | 0 | } |
848 | 0 | kv->val = tmp; |
849 | 0 | } |
850 | | #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG |
851 | | else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) { |
852 | | ret = flb_http_client_debug_property_is_valid((char *) k, tmp); |
853 | | if (ret == FLB_TRUE) { |
854 | | kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); |
855 | | if (!kv) { |
856 | | if (tmp) { |
857 | | flb_sds_destroy(tmp); |
858 | | } |
859 | | return -1; |
860 | | } |
861 | | kv->val = tmp; |
862 | | } |
863 | | else { |
864 | | flb_error("[config] invalid property '%s' on instance '%s'", |
865 | | k, flb_output_name(ins)); |
866 | | flb_sds_destroy(tmp); |
867 | | } |
868 | | } |
869 | | #endif |
870 | 0 | #ifdef FLB_HAVE_TLS |
871 | 0 | else if (prop_key_check("tls", k, len) == 0 && tmp) { |
872 | 0 | ins->use_tls = flb_utils_bool(tmp); |
873 | 0 | if (ins->use_tls == FLB_TRUE && ((ins->flags & FLB_IO_TLS) == 0)) { |
874 | 0 | flb_error("[config] %s does not support TLS", ins->name); |
875 | 0 | flb_sds_destroy(tmp); |
876 | 0 | return -1; |
877 | 0 | } |
878 | 0 | flb_sds_destroy(tmp); |
879 | 0 | } |
880 | 0 | else if (prop_key_check("tls.verify", k, len) == 0 && tmp) { |
881 | 0 | ins->tls_verify = flb_utils_bool(tmp); |
882 | 0 | flb_sds_destroy(tmp); |
883 | 0 | } |
884 | 0 | else if (prop_key_check("tls.verify_hostname", k, len) == 0 && tmp) { |
885 | 0 | ins->tls_verify_hostname = flb_utils_bool(tmp); |
886 | 0 | flb_sds_destroy(tmp); |
887 | 0 | } |
888 | 0 | else if (prop_key_check("tls.debug", k, len) == 0 && tmp) { |
889 | 0 | ins->tls_debug = atoi(tmp); |
890 | 0 | flb_sds_destroy(tmp); |
891 | 0 | } |
892 | 0 | else if (prop_key_check("tls.vhost", k, len) == 0) { |
893 | 0 | flb_utils_set_plugin_string_property("tls.vhost", &ins->tls_vhost, tmp); |
894 | 0 | } |
895 | 0 | else if (prop_key_check("tls.ca_path", k, len) == 0) { |
896 | 0 | flb_utils_set_plugin_string_property("tls.ca_path", &ins->tls_ca_path, tmp); |
897 | 0 | } |
898 | 0 | else if (prop_key_check("tls.ca_file", k, len) == 0) { |
899 | 0 | flb_utils_set_plugin_string_property("tls.ca_file", &ins->tls_ca_file, tmp); |
900 | 0 | } |
901 | 0 | else if (prop_key_check("tls.crt_file", k, len) == 0) { |
902 | 0 | flb_utils_set_plugin_string_property("tls.crt_file", &ins->tls_crt_file, tmp); |
903 | 0 | } |
904 | 0 | else if (prop_key_check("tls.key_file", k, len) == 0) { |
905 | 0 | flb_utils_set_plugin_string_property("tls.key_file", &ins->tls_key_file, tmp); |
906 | 0 | } |
907 | 0 | else if (prop_key_check("tls.key_passwd", k, len) == 0) { |
908 | 0 | flb_utils_set_plugin_string_property("tls.key_passwd", &ins->tls_key_passwd, tmp); |
909 | 0 | } |
910 | 0 | #endif |
911 | 0 | else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) { |
912 | 0 | if (strcasecmp(tmp, "off") == 0 || |
913 | 0 | flb_utils_bool(tmp) == FLB_FALSE) { |
914 | | /* no limit for filesystem storage */ |
915 | 0 | limit = -1; |
916 | 0 | flb_info("[config] unlimited filesystem buffer for %s plugin", |
917 | 0 | ins->name); |
918 | 0 | } |
919 | 0 | else { |
920 | 0 | limit = flb_utils_size_to_bytes(tmp); |
921 | 0 | if (limit == -1) { |
922 | 0 | flb_sds_destroy(tmp); |
923 | 0 | return -1; |
924 | 0 | } |
925 | | |
926 | 0 | if (limit == 0) { |
927 | 0 | limit = -1; |
928 | 0 | } |
929 | 0 | } |
930 | | |
931 | 0 | flb_sds_destroy(tmp); |
932 | 0 | ins->total_limit_size = (size_t) limit; |
933 | 0 | } |
934 | 0 | else if (prop_key_check("workers", k, len) == 0 && tmp) { |
935 | | /* Set the number of workers */ |
936 | 0 | ins->tp_workers = atoi(tmp); |
937 | 0 | flb_sds_destroy(tmp); |
938 | 0 | } |
939 | 0 | else { |
940 | | /* |
941 | | * Create the property, we don't pass the value since we will |
942 | | * map it directly to avoid an extra memory allocation. |
943 | | */ |
944 | 0 | kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); |
945 | 0 | if (!kv) { |
946 | 0 | if (tmp) { |
947 | 0 | flb_sds_destroy(tmp); |
948 | 0 | } |
949 | 0 | return -1; |
950 | 0 | } |
951 | 0 | kv->val = tmp; |
952 | 0 | } |
953 | | |
954 | 0 | return 0; |
955 | 0 | } |
956 | | |
957 | | /* Configure a default hostname and TCP port if they are not set */ |
958 | | void flb_output_net_default(const char *host, const int port, |
959 | | struct flb_output_instance *ins) |
960 | 0 | { |
961 | | /* Set default network configuration */ |
962 | 0 | if (!ins->host.name) { |
963 | 0 | ins->host.name = flb_sds_create(host); |
964 | 0 | } |
965 | 0 | if (ins->host.port == 0) { |
966 | 0 | ins->host.port = port; |
967 | 0 | } |
968 | 0 | } |
969 | | |
970 | | /* Add thread pool for output plugin if configured with workers */ |
971 | | int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config) |
972 | 0 | { |
973 | | /* Multi-threading enabled ? (through 'workers' property) */ |
974 | 0 | if (ins->tp_workers > 0) { |
975 | 0 | if(flb_output_thread_pool_create(config, ins) != 0) { |
976 | 0 | flb_output_instance_destroy(ins); |
977 | 0 | return -1; |
978 | 0 | } |
979 | 0 | flb_output_thread_pool_start(ins); |
980 | 0 | } |
981 | | |
982 | 0 | return 0; |
983 | 0 | } |
984 | | |
985 | | /* Return an instance name or alias */ |
986 | | const char *flb_output_name(struct flb_output_instance *ins) |
987 | 0 | { |
988 | 0 | if (ins->alias) { |
989 | 0 | return ins->alias; |
990 | 0 | } |
991 | | |
992 | 0 | return ins->name; |
993 | 0 | } |
994 | | |
995 | | const char *flb_output_get_property(const char *key, struct flb_output_instance *ins) |
996 | 0 | { |
997 | 0 | return flb_config_prop_get(key, &ins->properties); |
998 | 0 | } |
999 | | |
1000 | | #ifdef FLB_HAVE_METRICS |
1001 | | void *flb_output_get_cmt_instance(struct flb_output_instance *ins) |
1002 | 0 | { |
1003 | 0 | return (void *)ins->cmt; |
1004 | 0 | } |
1005 | | #endif |
1006 | | |
1007 | | int flb_output_net_property_check(struct flb_output_instance *ins, |
1008 | | struct flb_config *config) |
1009 | 0 | { |
1010 | 0 | int ret = 0; |
1011 | | |
1012 | | /* Get Upstream net_setup configmap */ |
1013 | 0 | ins->net_config_map = flb_upstream_get_config_map(config); |
1014 | 0 | if (!ins->net_config_map) { |
1015 | 0 | flb_output_instance_destroy(ins); |
1016 | 0 | return -1; |
1017 | 0 | } |
1018 | | |
1019 | | /* |
1020 | | * Validate 'net.*' properties: if the plugin use the Upstream interface, |
1021 | | * it might receive some networking settings. |
1022 | | */ |
1023 | 0 | if (mk_list_size(&ins->net_properties) > 0) { |
1024 | 0 | ret = flb_config_map_properties_check(ins->p->name, |
1025 | 0 | &ins->net_properties, |
1026 | 0 | ins->net_config_map); |
1027 | 0 | if (ret == -1) { |
1028 | 0 | if (config->program_name) { |
1029 | 0 | flb_helper("try the command: %s -o %s -h\n", |
1030 | 0 | config->program_name, ins->p->name); |
1031 | 0 | } |
1032 | 0 | return -1; |
1033 | 0 | } |
1034 | 0 | } |
1035 | | |
1036 | 0 | return 0; |
1037 | 0 | } |
1038 | | |
1039 | | int flb_output_plugin_property_check(struct flb_output_instance *ins, |
1040 | | struct flb_config *config) |
1041 | 0 | { |
1042 | 0 | int ret = 0; |
1043 | 0 | struct mk_list *config_map; |
1044 | 0 | struct flb_output_plugin *p = ins->p; |
1045 | |
|
1046 | 0 | if (p->config_map) { |
1047 | | /* |
1048 | | * Create a dynamic version of the configmap that will be used by the specific |
1049 | | * instance in question. |
1050 | | */ |
1051 | 0 | config_map = flb_config_map_create(config, p->config_map); |
1052 | 0 | if (!config_map) { |
1053 | 0 | flb_error("[output] error loading config map for '%s' plugin", |
1054 | 0 | p->name); |
1055 | 0 | return -1; |
1056 | 0 | } |
1057 | 0 | ins->config_map = config_map; |
1058 | | |
1059 | | /* Validate incoming properties against config map */ |
1060 | 0 | ret = flb_config_map_properties_check(ins->p->name, |
1061 | 0 | &ins->properties, ins->config_map); |
1062 | 0 | if (ret == -1) { |
1063 | 0 | if (config->program_name) { |
1064 | 0 | flb_helper("try the command: %s -o %s -h\n", |
1065 | 0 | config->program_name, ins->p->name); |
1066 | 0 | } |
1067 | 0 | return -1; |
1068 | 0 | } |
1069 | 0 | } |
1070 | | |
1071 | 0 | return 0; |
1072 | 0 | } |
1073 | | |
1074 | | /* Trigger the output plugins setup callbacks to prepare them. */ |
1075 | | int flb_output_init_all(struct flb_config *config) |
1076 | 0 | { |
1077 | 0 | int ret; |
1078 | 0 | #ifdef FLB_HAVE_METRICS |
1079 | 0 | char *name; |
1080 | 0 | #endif |
1081 | 0 | struct mk_list *tmp; |
1082 | 0 | struct mk_list *head; |
1083 | 0 | struct flb_output_instance *ins; |
1084 | 0 | struct flb_output_plugin *p; |
1085 | 0 | uint64_t ts; |
1086 | | |
1087 | | /* Retrieve the plugin reference */ |
1088 | 0 | mk_list_foreach_safe(head, tmp, &config->outputs) { |
1089 | 0 | ins = mk_list_entry(head, struct flb_output_instance, _head); |
1090 | 0 | if (ins->log_level == -1) { |
1091 | 0 | ins->log_level = config->log->level; |
1092 | 0 | } |
1093 | 0 | p = ins->p; |
1094 | | |
1095 | | /* Output Events Channel */ |
1096 | 0 | ret = mk_event_channel_create(config->evl, |
1097 | 0 | &ins->ch_events[0], |
1098 | 0 | &ins->ch_events[1], |
1099 | 0 | ins); |
1100 | 0 | if (ret != 0) { |
1101 | 0 | flb_error("could not create events channels for '%s'", |
1102 | 0 | flb_output_name(ins)); |
1103 | 0 | flb_output_instance_destroy(ins); |
1104 | 0 | return -1; |
1105 | 0 | } |
1106 | 0 | flb_debug("[%s:%s] created event channels: read=%i write=%i", |
1107 | 0 | ins->p->name, flb_output_name(ins), |
1108 | 0 | ins->ch_events[0], ins->ch_events[1]); |
1109 | | |
1110 | | /* |
1111 | | * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by |
1112 | | * default, we need to overwrite this value so we can do a clean check |
1113 | | * into the Engine when the event is triggered. |
1114 | | */ |
1115 | 0 | ins->event.type = FLB_ENGINE_EV_OUTPUT; |
1116 | | |
1117 | | /* Metrics */ |
1118 | 0 | #ifdef FLB_HAVE_METRICS |
1119 | | /* Get name or alias for the instance */ |
1120 | 0 | name = (char *) flb_output_name(ins); |
1121 | | |
1122 | | /* get timestamp */ |
1123 | 0 | ts = cfl_time_now(); |
1124 | | |
1125 | | /* CMetrics */ |
1126 | 0 | ins->cmt = cmt_create(); |
1127 | 0 | if (!ins->cmt) { |
1128 | 0 | flb_error("[output] could not create cmetrics context"); |
1129 | 0 | return -1; |
1130 | 0 | } |
1131 | | |
1132 | | /* |
1133 | | * Register generic output plugin metrics |
1134 | | */ |
1135 | | |
1136 | | /* fluentbit_output_proc_records_total */ |
1137 | 0 | ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit", |
1138 | 0 | "output", "proc_records_total", |
1139 | 0 | "Number of processed output records.", |
1140 | 0 | 1, (char *[]) {"name"}); |
1141 | 0 | cmt_counter_set(ins->cmt_proc_records, ts, 0, 1, (char *[]) {name}); |
1142 | | |
1143 | | |
1144 | | /* fluentbit_output_proc_bytes_total */ |
1145 | 0 | ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit", |
1146 | 0 | "output", "proc_bytes_total", |
1147 | 0 | "Number of processed output bytes.", |
1148 | 0 | 1, (char *[]) {"name"}); |
1149 | 0 | cmt_counter_set(ins->cmt_proc_bytes, ts, 0, 1, (char *[]) {name}); |
1150 | | |
1151 | | |
1152 | | /* fluentbit_output_errors_total */ |
1153 | 0 | ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit", |
1154 | 0 | "output", "errors_total", |
1155 | 0 | "Number of output errors.", |
1156 | 0 | 1, (char *[]) {"name"}); |
1157 | 0 | cmt_counter_set(ins->cmt_errors, ts, 0, 1, (char *[]) {name}); |
1158 | | |
1159 | | |
1160 | | /* fluentbit_output_retries_total */ |
1161 | 0 | ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit", |
1162 | 0 | "output", "retries_total", |
1163 | 0 | "Number of output retries.", |
1164 | 0 | 1, (char *[]) {"name"}); |
1165 | 0 | cmt_counter_set(ins->cmt_retries, ts, 0, 1, (char *[]) {name}); |
1166 | | |
1167 | | /* fluentbit_output_retries_failed_total */ |
1168 | 0 | ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit", |
1169 | 0 | "output", "retries_failed_total", |
1170 | 0 | "Number of abandoned batches because " |
1171 | 0 | "the maximum number of re-tries was " |
1172 | 0 | "reached.", |
1173 | 0 | 1, (char *[]) {"name"}); |
1174 | 0 | cmt_counter_set(ins->cmt_retries_failed, ts, 0, 1, (char *[]) {name}); |
1175 | | |
1176 | | |
1177 | | /* fluentbit_output_dropped_records_total */ |
1178 | 0 | ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit", |
1179 | 0 | "output", "dropped_records_total", |
1180 | 0 | "Number of dropped records.", |
1181 | 0 | 1, (char *[]) {"name"}); |
1182 | 0 | cmt_counter_set(ins->cmt_dropped_records, ts, 0, 1, (char *[]) {name}); |
1183 | | |
1184 | | /* fluentbit_output_retried_records_total */ |
1185 | 0 | ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit", |
1186 | 0 | "output", "retried_records_total", |
1187 | 0 | "Number of retried records.", |
1188 | 0 | 1, (char *[]) {"name"}); |
1189 | 0 | cmt_counter_set(ins->cmt_retried_records, ts, 0, 1, (char *[]) {name}); |
1190 | | |
1191 | | /* output_upstream_total_connections */ |
1192 | 0 | ins->cmt_upstream_total_connections = cmt_gauge_create(ins->cmt, |
1193 | 0 | "fluentbit", |
1194 | 0 | "output", |
1195 | 0 | "upstream_total_connections", |
1196 | 0 | "Total Connection count.", |
1197 | 0 | 1, (char *[]) {"name"}); |
1198 | 0 | cmt_gauge_set(ins->cmt_upstream_total_connections, |
1199 | 0 | ts, |
1200 | 0 | 0, |
1201 | 0 | 1, (char *[]) {name}); |
1202 | | |
1203 | | /* output_upstream_total_connections */ |
1204 | 0 | ins->cmt_upstream_busy_connections = cmt_gauge_create(ins->cmt, |
1205 | 0 | "fluentbit", |
1206 | 0 | "output", |
1207 | 0 | "upstream_busy_connections", |
1208 | 0 | "Busy Connection count.", |
1209 | 0 | 1, (char *[]) {"name"}); |
1210 | 0 | cmt_gauge_set(ins->cmt_upstream_busy_connections, |
1211 | 0 | ts, |
1212 | 0 | 0, |
1213 | 0 | 1, (char *[]) {name}); |
1214 | | |
1215 | | /* output_chunk_available_capacity_percent */ |
1216 | 0 | ins->cmt_chunk_available_capacity_percent = cmt_gauge_create(ins->cmt, |
1217 | 0 | "fluentbit", |
1218 | 0 | "output", |
1219 | 0 | "chunk_available_capacity_percent", |
1220 | 0 | "Available chunk capacity (percent)", |
1221 | 0 | 1, (char *[]) {"name"}); |
1222 | 0 | cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, |
1223 | 0 | ts, |
1224 | 0 | 100.0, |
1225 | 0 | 1, (char *[]) {name}); |
1226 | | |
1227 | | /* old API */ |
1228 | 0 | ins->metrics = flb_metrics_create(name); |
1229 | 0 | if (ins->metrics) { |
1230 | 0 | flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS, |
1231 | 0 | "proc_records", ins->metrics); |
1232 | 0 | flb_metrics_add(FLB_METRIC_OUT_OK_BYTES, |
1233 | 0 | "proc_bytes", ins->metrics); |
1234 | 0 | flb_metrics_add(FLB_METRIC_OUT_ERROR, |
1235 | 0 | "errors", ins->metrics); |
1236 | 0 | flb_metrics_add(FLB_METRIC_OUT_RETRY, |
1237 | 0 | "retries", ins->metrics); |
1238 | 0 | flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED, |
1239 | 0 | "retries_failed", ins->metrics); |
1240 | 0 | flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS, |
1241 | 0 | "dropped_records", ins->metrics); |
1242 | 0 | flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS, |
1243 | 0 | "retried_records", ins->metrics); |
1244 | 0 | } |
1245 | 0 | #endif |
1246 | |
|
1247 | 0 | #ifdef FLB_HAVE_TLS |
1248 | 0 | if (ins->use_tls == FLB_TRUE) { |
1249 | 0 | ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, |
1250 | 0 | ins->tls_verify, |
1251 | 0 | ins->tls_debug, |
1252 | 0 | ins->tls_vhost, |
1253 | 0 | ins->tls_ca_path, |
1254 | 0 | ins->tls_ca_file, |
1255 | 0 | ins->tls_crt_file, |
1256 | 0 | ins->tls_key_file, |
1257 | 0 | ins->tls_key_passwd); |
1258 | 0 | if (!ins->tls) { |
1259 | 0 | flb_error("[output %s] error initializing TLS context", |
1260 | 0 | ins->name); |
1261 | 0 | flb_output_instance_destroy(ins); |
1262 | 0 | return -1; |
1263 | 0 | } |
1264 | | |
1265 | 0 | if (ins->tls_verify_hostname == FLB_TRUE) { |
1266 | 0 | ret = flb_tls_set_verify_hostname(ins->tls, ins->tls_verify_hostname); |
1267 | 0 | if (ret == -1) { |
1268 | 0 | flb_error("[output %s] error set up to verify hostname in TLS context", |
1269 | 0 | ins->name); |
1270 | |
|
1271 | 0 | return -1; |
1272 | 0 | } |
1273 | 0 | } |
1274 | 0 | } |
1275 | 0 | #endif |
1276 | | /* |
1277 | | * Before to call the initialization callback, make sure that the received |
1278 | | * configuration parameters are valid if the plugin is registering a config map. |
1279 | | */ |
1280 | 0 | if (flb_output_plugin_property_check(ins, config) == -1) { |
1281 | 0 | flb_output_instance_destroy(ins); |
1282 | 0 | return -1; |
1283 | 0 | } |
1284 | | |
1285 | 0 | #ifdef FLB_HAVE_TLS |
1286 | 0 | struct flb_config_map *m; |
1287 | | |
1288 | | /* TLS config map (just for 'help' formatting purposes) */ |
1289 | 0 | ins->tls_config_map = flb_tls_get_config_map(config); |
1290 | 0 | if (!ins->tls_config_map) { |
1291 | 0 | flb_output_instance_destroy(ins); |
1292 | 0 | return -1; |
1293 | 0 | } |
1294 | | |
1295 | | /* Override first configmap value based on it plugin flag */ |
1296 | 0 | m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head); |
1297 | 0 | if (p->flags & FLB_IO_TLS) { |
1298 | 0 | m->value.val.boolean = FLB_TRUE; |
1299 | 0 | } |
1300 | 0 | else { |
1301 | 0 | m->value.val.boolean = FLB_FALSE; |
1302 | 0 | } |
1303 | 0 | #endif |
1304 | | |
1305 | | /* Init network defaults */ |
1306 | 0 | flb_net_setup_init(&ins->net_setup); |
1307 | |
|
1308 | 0 | if (flb_output_net_property_check(ins, config) == -1) { |
1309 | 0 | flb_output_instance_destroy(ins); |
1310 | 0 | return -1; |
1311 | 0 | } |
1312 | | |
1313 | | /* Initialize plugin through it 'init callback' */ |
1314 | 0 | ret = p->cb_init(ins, config, ins->data); |
1315 | 0 | if (ret == -1) { |
1316 | 0 | flb_error("[output] failed to initialize '%s' plugin", |
1317 | 0 | p->name); |
1318 | 0 | flb_output_instance_destroy(ins); |
1319 | 0 | return -1; |
1320 | 0 | } |
1321 | | |
1322 | 0 | ins->notification_channel = config->notification_channels[1]; |
1323 | | |
1324 | | /* Multi-threading enabled if configured */ |
1325 | 0 | ret = flb_output_enable_multi_threading(ins, config); |
1326 | 0 | if (ret == -1) { |
1327 | 0 | flb_error("[output] could not start thread pool for '%s' plugin", |
1328 | 0 | flb_output_name(ins)); |
1329 | 0 | return -1; |
1330 | 0 | } |
1331 | | |
1332 | 0 | if (ins->is_threaded == FLB_FALSE) { |
1333 | 0 | if (ins->p->cb_worker_init) { |
1334 | 0 | ret = ins->p->cb_worker_init(ins->context, ins->config); |
1335 | 0 | } |
1336 | 0 | } |
1337 | |
|
1338 | 0 | ins->processor->notification_channel = ins->notification_channel; |
1339 | | |
1340 | | /* initialize processors */ |
1341 | 0 | ret = flb_processor_init(ins->processor); |
1342 | 0 | if (ret == -1) { |
1343 | 0 | return -1; |
1344 | 0 | } |
1345 | 0 | } |
1346 | | |
1347 | 0 | return 0; |
1348 | 0 | } |
1349 | | |
1350 | | /* Assign an Configuration context to an Output */ |
1351 | | void flb_output_set_context(struct flb_output_instance *ins, void *context) |
1352 | 0 | { |
1353 | 0 | ins->context = context; |
1354 | 0 | } |
1355 | | |
1356 | | /* Check that at least one Output is enabled */ |
1357 | | int flb_output_check(struct flb_config *config) |
1358 | 0 | { |
1359 | 0 | if (mk_list_is_empty(&config->outputs) == 0) { |
1360 | 0 | return -1; |
1361 | 0 | } |
1362 | 0 | return 0; |
1363 | 0 | } |
1364 | | |
1365 | | /* Check output plugin's log level. |
1366 | | * Not for core plugins but for Golang plugins. |
1367 | | * Golang plugins do not have thread-local flb_worker_ctx information. */ |
1368 | | int flb_output_log_check(struct flb_output_instance *ins, int l) |
1369 | 0 | { |
1370 | 0 | if (ins->log_level < l) { |
1371 | 0 | return FLB_FALSE; |
1372 | 0 | } |
1373 | | |
1374 | 0 | return FLB_TRUE; |
1375 | 0 | } |
1376 | | |
1377 | | /* |
1378 | | * Output plugins might have enabled certain features that have not been passed |
1379 | | * directly to the upstream context. In order to avoid let plugins validate specific |
1380 | | * variables from the instance context like tls, tls.x, keepalive, etc, we populate |
1381 | | * them directly through this function. |
1382 | | */ |
1383 | | int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins) |
1384 | 0 | { |
1385 | 0 | int flags = 0; |
1386 | |
|
1387 | 0 | if (!u) { |
1388 | 0 | return -1; |
1389 | 0 | } |
1390 | | |
1391 | | /* TLS */ |
1392 | 0 | #ifdef FLB_HAVE_TLS |
1393 | 0 | if (ins->use_tls == FLB_TRUE) { |
1394 | 0 | flags |= FLB_IO_TLS; |
1395 | 0 | } |
1396 | 0 | else { |
1397 | 0 | flags |= FLB_IO_TCP; |
1398 | 0 | } |
1399 | | #else |
1400 | | flags |= FLB_IO_TCP; |
1401 | | #endif |
1402 | | |
1403 | | /* IPv6 */ |
1404 | 0 | if (ins->host.ipv6 == FLB_TRUE) { |
1405 | 0 | flags |= FLB_IO_IPV6; |
1406 | 0 | } |
1407 | | /* keepalive */ |
1408 | 0 | if (ins->net_setup.keepalive == FLB_TRUE) { |
1409 | 0 | flags |= FLB_IO_TCP_KA; |
1410 | 0 | } |
1411 | |
|
1412 | 0 | if (ins->net_setup.keepalive == FLB_TRUE) { |
1413 | 0 | flags |= FLB_IO_TCP_KA; |
1414 | 0 | } |
1415 | | |
1416 | | /* Set flags */ |
1417 | 0 | flb_stream_enable_flags(&u->base, flags); |
1418 | |
|
1419 | 0 | flb_upstream_set_total_connections_label(u, |
1420 | 0 | flb_output_name(ins)); |
1421 | |
|
1422 | 0 | flb_upstream_set_total_connections_gauge(u, |
1423 | 0 | ins->cmt_upstream_total_connections); |
1424 | |
|
1425 | 0 | flb_upstream_set_busy_connections_label(u, |
1426 | 0 | flb_output_name(ins)); |
1427 | |
|
1428 | 0 | flb_upstream_set_busy_connections_gauge(u, |
1429 | 0 | ins->cmt_upstream_busy_connections); |
1430 | | |
1431 | | /* |
1432 | | * If the output plugin flush callbacks will run in multiple threads, enable |
1433 | | * the thread safe mode for the Upstream context. |
1434 | | */ |
1435 | 0 | if (ins->tp_workers > 0) { |
1436 | 0 | flb_stream_enable_thread_safety(&u->base); |
1437 | |
|
1438 | 0 | mk_list_add(&u->base._head, &ins->upstreams); |
1439 | 0 | } |
1440 | | |
1441 | | /* Set networking options 'net.*' received through instance properties */ |
1442 | 0 | memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup)); |
1443 | |
|
1444 | 0 | return 0; |
1445 | 0 | } |
1446 | | |
1447 | | int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins) |
1448 | 0 | { |
1449 | 0 | struct mk_list *head; |
1450 | 0 | struct flb_upstream_node *node; |
1451 | 0 | struct flb_upstream_ha *upstream_ha = ha; |
1452 | |
|
1453 | 0 | mk_list_foreach(head, &upstream_ha->nodes) { |
1454 | 0 | node = mk_list_entry(head, struct flb_upstream_node, _head); |
1455 | 0 | flb_output_upstream_set(node->u, ins); |
1456 | 0 | } |
1457 | |
|
1458 | 0 | return 0; |
1459 | 0 | } |
1460 | | |
1461 | | /* |
1462 | | * Helper function to set HTTP callbacks using the output instance 'callback' |
1463 | | * context. |
1464 | | */ |
1465 | | int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins) |
1466 | 0 | { |
1467 | | #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG |
1468 | | return flb_http_client_debug_setup(ins->callback, &ins->properties); |
1469 | | #else |
1470 | 0 | return 0; |
1471 | 0 | #endif |
1472 | 0 | } |