/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
Line  | Count  | Source (jump to first uncovered line)  | 
1  |  | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */  | 
2  |  |  | 
3  |  | /*  Fluent Bit  | 
4  |  |  *  ==========  | 
5  |  |  *  Copyright (C) 2015-2022 The Fluent Bit Authors  | 
6  |  |  *  | 
7  |  |  *  Licensed under the Apache License, Version 2.0 (the "License");  | 
8  |  |  *  you may not use this file except in compliance with the License.  | 
9  |  |  *  You may obtain a copy of the License at  | 
10  |  |  *  | 
11  |  |  *      http://www.apache.org/licenses/LICENSE-2.0  | 
12  |  |  *  | 
13  |  |  *  Unless required by applicable law or agreed to in writing, software  | 
14  |  |  *  distributed under the License is distributed on an "AS IS" BASIS,  | 
15  |  |  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
16  |  |  *  See the License for the specific language governing permissions and  | 
17  |  |  *  limitations under the License.  | 
18  |  |  */  | 
19  |  |  | 
20  |  | #include <fluent-bit/flb_http_client.h>  | 
21  |  | #include <fluent-bit/flb_kv.h>  | 
22  |  | #include <fluent-bit/flb_oauth2.h>  | 
23  |  | #include <fluent-bit/flb_output_plugin.h>  | 
24  |  | #include <fluent-bit/flb_pack.h>  | 
25  |  | #include <fluent-bit/flb_signv4.h>  | 
26  |  |  | 
27  |  | #include "azure_kusto.h"  | 
28  |  | #include "azure_kusto_conf.h"  | 
29  |  | #include "azure_kusto_ingest.h"  | 
30  |  |  | 
31  |  | /* Create a new oauth2 context and get a oauth2 token */  | 
32  |  | static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)  | 
33  | 0  | { | 
34  | 0  |     int ret;  | 
35  | 0  |     char *token;  | 
36  |  |  | 
37  |  |     /* Clear any previous oauth2 payload content */  | 
38  | 0  |     flb_oauth2_payload_clear(ctx->o);  | 
39  |  | 
  | 
40  | 0  |     ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18);  | 
41  | 0  |     if (ret == -1) { | 
42  | 0  |         flb_plg_error(ctx->ins, "error appending oauth2 params");  | 
43  | 0  |         return -1;  | 
44  | 0  |     }  | 
45  |  |  | 
46  | 0  |     ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);  | 
47  | 0  |     if (ret == -1) { | 
48  | 0  |         flb_plg_error(ctx->ins, "error appending oauth2 params");  | 
49  | 0  |         return -1;  | 
50  | 0  |     }  | 
51  |  |  | 
52  | 0  |     ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1);  | 
53  | 0  |     if (ret == -1) { | 
54  | 0  |         flb_plg_error(ctx->ins, "error appending oauth2 params");  | 
55  | 0  |         return -1;  | 
56  | 0  |     }  | 
57  |  |  | 
58  | 0  |     ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1);  | 
59  | 0  |     if (ret == -1) { | 
60  | 0  |         flb_plg_error(ctx->ins, "error appending oauth2 params");  | 
61  | 0  |         return -1;  | 
62  | 0  |     }  | 
63  |  |  | 
64  |  |     /* Retrieve access token */  | 
65  | 0  |     token = flb_oauth2_token_get(ctx->o);  | 
66  | 0  |     if (!token) { | 
67  | 0  |         flb_plg_error(ctx->ins, "error retrieving oauth2 access token");  | 
68  | 0  |         return -1;  | 
69  | 0  |     }  | 
70  |  |  | 
71  | 0  |     return 0;  | 
72  | 0  | }  | 
73  |  |  | 
74  |  | flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)  | 
75  | 0  | { | 
76  | 0  |     int ret = 0;  | 
77  | 0  |     flb_sds_t output = NULL;  | 
78  |  | 
  | 
79  | 0  |     if (pthread_mutex_lock(&ctx->token_mutex)) { | 
80  | 0  |         flb_plg_error(ctx->ins, "error locking mutex");  | 
81  | 0  |         return NULL;  | 
82  | 0  |     }  | 
83  |  |  | 
84  | 0  |     if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { | 
85  | 0  |         ret = azure_kusto_get_oauth2_token(ctx);  | 
86  | 0  |     }  | 
87  |  |  | 
88  |  |     /* Copy string to prevent race conditions (get_oauth2 can free the string) */  | 
89  | 0  |     if (ret == 0) { | 
90  | 0  |         output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +  | 
91  | 0  |                                      flb_sds_len(ctx->o->access_token) + 2);  | 
92  | 0  |         if (!output) { | 
93  | 0  |             flb_plg_error(ctx->ins, "error creating token buffer");  | 
94  | 0  |             return NULL;  | 
95  | 0  |         }  | 
96  | 0  |         flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,  | 
97  | 0  |                          ctx->o->access_token);  | 
98  | 0  |     }  | 
99  |  |  | 
100  | 0  |     if (pthread_mutex_unlock(&ctx->token_mutex)) { | 
101  | 0  |         flb_plg_error(ctx->ins, "error unlocking mutex");  | 
102  | 0  |         if (output) { | 
103  | 0  |             flb_sds_destroy(output);  | 
104  | 0  |         }  | 
105  | 0  |         return NULL;  | 
106  | 0  |     }  | 
107  |  |  | 
108  | 0  |     return output;  | 
109  | 0  | }  | 
110  |  |  | 
111  |  | /**  | 
112  |  |  * Executes a control command against kusto's endpoint  | 
113  |  |  *  | 
114  |  |  * @param ctx       Plugin's context  | 
115  |  |  * @param csl       Kusto's control command  | 
116  |  |  * @return flb_sds_t      Returns the response or NULL on error.  | 
117  |  |  */  | 
118  |  | flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl)  | 
119  | 0  | { | 
120  | 0  |     flb_sds_t token;  | 
121  | 0  |     flb_sds_t body;  | 
122  | 0  |     size_t b_sent;  | 
123  | 0  |     int ret;  | 
124  | 0  |     struct flb_connection *u_conn;  | 
125  | 0  |     struct flb_http_client *c;  | 
126  | 0  |     flb_sds_t resp = NULL;  | 
127  |  |  | 
128  |  |     /* Get upstream connection */  | 
129  | 0  |     u_conn = flb_upstream_conn_get(ctx->u);  | 
130  |  | 
  | 
131  | 0  |     if (u_conn) { | 
132  | 0  |         token = get_azure_kusto_token(ctx);  | 
133  |  | 
  | 
134  | 0  |         if (token) { | 
135  |  |             /* Compose request body */  | 
136  | 0  |             body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +  | 
137  | 0  |                                        strlen(csl));  | 
138  |  | 
  | 
139  | 0  |             if (body) { | 
140  | 0  |                 flb_sds_snprintf(&body, flb_sds_alloc(body),  | 
141  | 0  |                                  FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl);  | 
142  |  |  | 
143  |  |                 /* Compose HTTP Client request */  | 
144  | 0  |                 c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH,  | 
145  | 0  |                                     body, flb_sds_len(body), NULL, 0, NULL, 0);  | 
146  |  | 
  | 
147  | 0  |                 if (c) { | 
148  |  |                     /* Add headers */  | 
149  | 0  |                     flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);  | 
150  | 0  |                     flb_http_add_header(c, "Content-Type", 12, "application/json", 16);  | 
151  | 0  |                     flb_http_add_header(c, "Accept", 6, "application/json", 16);  | 
152  | 0  |                     flb_http_add_header(c, "Authorization", 13, token,  | 
153  | 0  |                                         flb_sds_len(token));  | 
154  | 0  |                     flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);  | 
155  |  |  | 
156  |  |                     /* Send HTTP request */  | 
157  | 0  |                     ret = flb_http_do(c, &b_sent);  | 
158  | 0  |                     flb_plg_debug(  | 
159  | 0  |                         ctx->ins,  | 
160  | 0  |                         "Kusto ingestion command request http_do=%i, HTTP Status: %i",  | 
161  | 0  |                         ret, c->resp.status);  | 
162  |  | 
  | 
163  | 0  |                     if (ret == 0) { | 
164  | 0  |                         if (c->resp.status == 200) { | 
165  |  |                             /* Copy payload response to the response param */  | 
166  | 0  |                             resp =  | 
167  | 0  |                                 flb_sds_create_len(c->resp.payload, c->resp.payload_size);  | 
168  | 0  |                         }  | 
169  | 0  |                         else if (c->resp.payload_size > 0) { | 
170  | 0  |                             flb_plg_debug(ctx->ins, "Request failed and returned: \n%s",  | 
171  | 0  |                                           c->resp.payload);  | 
172  | 0  |                         }  | 
173  | 0  |                         else { | 
174  | 0  |                             flb_plg_debug(ctx->ins, "Request failed");  | 
175  | 0  |                         }  | 
176  | 0  |                     }  | 
177  | 0  |                     else { | 
178  | 0  |                         flb_plg_error(ctx->ins, "cannot send HTTP request");  | 
179  | 0  |                     }  | 
180  |  | 
  | 
181  | 0  |                     flb_http_client_destroy(c);  | 
182  | 0  |                 }  | 
183  | 0  |                 else { | 
184  | 0  |                     flb_plg_error(ctx->ins, "cannot create HTTP client context");  | 
185  | 0  |                 }  | 
186  |  | 
  | 
187  | 0  |                 flb_sds_destroy(body);  | 
188  | 0  |             }  | 
189  | 0  |             else { | 
190  | 0  |                 flb_plg_error(ctx->ins, "cannot construct request body");  | 
191  | 0  |             }  | 
192  |  | 
  | 
193  | 0  |             flb_sds_destroy(token);  | 
194  | 0  |         }  | 
195  | 0  |         else { | 
196  | 0  |             flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");  | 
197  | 0  |         }  | 
198  |  | 
  | 
199  | 0  |         flb_upstream_conn_release(u_conn);  | 
200  | 0  |     }  | 
201  | 0  |     else { | 
202  | 0  |         flb_plg_error(ctx->ins, "cannot create upstream connection");  | 
203  | 0  |     }  | 
204  |  | 
  | 
205  | 0  |     return resp;  | 
206  | 0  | }  | 
207  |  |  | 
208  |  | static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config,  | 
209  |  |                                void *data)  | 
210  | 0  | { | 
211  | 0  |     int io_flags = FLB_IO_TLS;  | 
212  | 0  |     struct flb_azure_kusto *ctx;  | 
213  |  |  | 
214  |  |     /* Create config context */  | 
215  | 0  |     ctx = flb_azure_kusto_conf_create(ins, config);  | 
216  | 0  |     if (!ctx) { | 
217  | 0  |         flb_plg_error(ins, "configuration failed");  | 
218  | 0  |         return -1;  | 
219  | 0  |     }  | 
220  |  |  | 
221  | 0  |     flb_output_set_context(ins, ctx);  | 
222  |  |  | 
223  |  |     /* Network mode IPv6 */  | 
224  | 0  |     if (ins->host.ipv6 == FLB_TRUE) { | 
225  | 0  |         io_flags |= FLB_IO_IPV6;  | 
226  | 0  |     }  | 
227  |  |  | 
228  |  |     /* Create mutex for acquiring oauth tokens  and getting ingestion resources (they  | 
229  |  |      * are shared across flush coroutines)  | 
230  |  |      */  | 
231  | 0  |     pthread_mutex_init(&ctx->token_mutex, NULL);  | 
232  | 0  |     pthread_mutex_init(&ctx->resources_mutex, NULL);  | 
233  |  |  | 
234  |  |     /*  | 
235  |  |      * Create upstream context for Kusto Ingestion endpoint  | 
236  |  |      */  | 
237  | 0  |     ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);  | 
238  | 0  |     if (!ctx->u) { | 
239  | 0  |         flb_plg_error(ctx->ins, "upstream creation failed");  | 
240  | 0  |         return -1;  | 
241  | 0  |     }  | 
242  |  |  | 
243  |  |     /* Create oauth2 context */  | 
244  | 0  |     ctx->o =  | 
245  | 0  |         flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);  | 
246  | 0  |     if (!ctx->o) { | 
247  | 0  |         flb_plg_error(ctx->ins, "cannot create oauth2 context");  | 
248  | 0  |         return -1;  | 
249  | 0  |     }  | 
250  | 0  |     flb_output_upstream_set(ctx->u, ins);  | 
251  |  | 
  | 
252  | 0  |     return 0;  | 
253  | 0  | }  | 
254  |  |  | 
255  |  | static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len,  | 
256  |  |                               const void *data, size_t bytes, void **out_data,  | 
257  |  |                               size_t *out_size)  | 
258  | 0  | { | 
259  | 0  |     int records = 0;  | 
260  | 0  |     size_t off = 0;  | 
261  | 0  |     msgpack_unpacked result;  | 
262  | 0  |     msgpack_sbuffer mp_sbuf;  | 
263  | 0  |     msgpack_packer mp_pck;  | 
264  |  |     /* for sub msgpack objs */  | 
265  | 0  |     int map_size;  | 
266  | 0  |     struct flb_time tm;  | 
267  | 0  |     struct tm tms;  | 
268  | 0  |     msgpack_object root;  | 
269  | 0  |     msgpack_object *obj;  | 
270  | 0  |     char time_formatted[32];  | 
271  | 0  |     size_t s;  | 
272  | 0  |     int len;  | 
273  |  |  | 
274  |  |     /* output buffer */  | 
275  | 0  |     flb_sds_t out_buf;  | 
276  |  |  | 
277  |  |     /* Create array for all records */  | 
278  | 0  |     records = flb_mp_count(data, bytes);  | 
279  | 0  |     if (records <= 0) { | 
280  | 0  |         flb_plg_error(ctx->ins, "error counting msgpack entries");  | 
281  | 0  |         return -1;  | 
282  | 0  |     }  | 
283  |  |  | 
284  |  |     /* Create temporary msgpack buffer */  | 
285  | 0  |     msgpack_sbuffer_init(&mp_sbuf);  | 
286  | 0  |     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);  | 
287  |  | 
  | 
288  | 0  |     msgpack_pack_array(&mp_pck, records);  | 
289  |  | 
  | 
290  | 0  |     off = 0;  | 
291  | 0  |     msgpack_unpacked_init(&result);  | 
292  | 0  |     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { | 
293  | 0  |         root = result.data;  | 
294  |  |         /* Each array must have two entries: time and record */  | 
295  | 0  |         if (root.type != MSGPACK_OBJECT_ARRAY) { | 
296  | 0  |             flb_plg_debug(ctx->ins, "unexpected msgpack object type: %d", root.type);  | 
297  | 0  |             continue;  | 
298  | 0  |         }  | 
299  | 0  |         if (root.via.array.size != 2) { | 
300  | 0  |             flb_plg_debug(ctx->ins, "unexpected msgpack array size: %d",  | 
301  | 0  |                           root.via.array.size);  | 
302  | 0  |             continue;  | 
303  | 0  |         }  | 
304  |  |  | 
305  |  |         /* Get timestamp and object */  | 
306  | 0  |         flb_time_pop_from_msgpack(&tm, &result, &obj);  | 
307  |  | 
  | 
308  | 0  |         map_size = 1;  | 
309  | 0  |         if (ctx->include_time_key == FLB_TRUE) { | 
310  | 0  |             map_size++;  | 
311  | 0  |         }  | 
312  |  | 
  | 
313  | 0  |         if (ctx->include_tag_key == FLB_TRUE) { | 
314  | 0  |             map_size++;  | 
315  | 0  |         }  | 
316  |  | 
  | 
317  | 0  |         msgpack_pack_map(&mp_pck, map_size);  | 
318  |  |  | 
319  |  |         /* include_time_key */  | 
320  | 0  |         if (ctx->include_time_key == FLB_TRUE) { | 
321  | 0  |             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));  | 
322  | 0  |             msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key));  | 
323  |  |  | 
324  |  |             /* Append the time value as ISO 8601 */  | 
325  | 0  |             gmtime_r(&tm.tm.tv_sec, &tms);  | 
326  | 0  |             s = strftime(time_formatted, sizeof(time_formatted) - 1,  | 
327  | 0  |                          FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);  | 
328  |  | 
  | 
329  | 0  |             len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,  | 
330  | 0  |                            ".%03" PRIu64 "Z", (uint64_t)tm.tm.tv_nsec / 1000000);  | 
331  | 0  |             s += len;  | 
332  | 0  |             msgpack_pack_str(&mp_pck, s);  | 
333  | 0  |             msgpack_pack_str_body(&mp_pck, time_formatted, s);  | 
334  | 0  |         }  | 
335  |  |  | 
336  |  |         /* include_tag_key */  | 
337  | 0  |         if (ctx->include_tag_key == FLB_TRUE) { | 
338  | 0  |             msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key));  | 
339  | 0  |             msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));  | 
340  | 0  |             msgpack_pack_str(&mp_pck, tag_len);  | 
341  | 0  |             msgpack_pack_str_body(&mp_pck, tag, tag_len);  | 
342  | 0  |         }  | 
343  |  | 
  | 
344  | 0  |         msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key));  | 
345  | 0  |         msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key));  | 
346  | 0  |         msgpack_pack_object(&mp_pck, *obj);  | 
347  | 0  |     }  | 
348  |  |  | 
349  |  |     /* Convert from msgpack to JSON */  | 
350  | 0  |     out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);  | 
351  |  |  | 
352  |  |     /* Cleanup */  | 
353  | 0  |     msgpack_sbuffer_destroy(&mp_sbuf);  | 
354  | 0  |     msgpack_unpacked_destroy(&result);  | 
355  |  | 
  | 
356  | 0  |     if (!out_buf) { | 
357  | 0  |         flb_plg_error(ctx->ins, "error formatting JSON payload");  | 
358  | 0  |         return -1;  | 
359  | 0  |     }  | 
360  |  |  | 
361  | 0  |     *out_data = out_buf;  | 
362  | 0  |     *out_size = flb_sds_len(out_buf);  | 
363  |  | 
  | 
364  | 0  |     return 0;  | 
365  | 0  | }  | 
366  |  |  | 
367  |  | static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,  | 
368  |  |                                  struct flb_output_flush *out_flush,  | 
369  |  |                                  struct flb_input_instance *i_ins, void *out_context,  | 
370  |  |                                  struct flb_config *config)  | 
371  | 0  | { | 
372  | 0  |     int ret;  | 
373  | 0  |     flb_sds_t json;  | 
374  | 0  |     size_t json_size;  | 
375  | 0  |     size_t tag_len;  | 
376  | 0  |     struct flb_azure_kusto *ctx = out_context;  | 
377  |  | 
  | 
378  | 0  |     (void)i_ins;  | 
379  | 0  |     (void)config;  | 
380  |  | 
  | 
381  | 0  |     flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);  | 
382  |  | 
  | 
383  | 0  |     tag_len = flb_sds_len(event_chunk->tag);  | 
384  |  |  | 
385  |  |     /* Load or refresh ingestion resources */  | 
386  | 0  |     ret = azure_kusto_load_ingestion_resources(ctx, config);  | 
387  | 0  |     if (ret != 0) { | 
388  | 0  |         flb_plg_error(ctx->ins, "cannot load ingestion resources");  | 
389  | 0  |         FLB_OUTPUT_RETURN(FLB_RETRY);  | 
390  | 0  |     }  | 
391  |  |  | 
392  |  |     /* Reformat msgpack to JSON payload */  | 
393  | 0  |     ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,  | 
394  | 0  |                              event_chunk->size, (void **)&json, &json_size);  | 
395  | 0  |     if (ret != 0) { | 
396  | 0  |         flb_plg_error(ctx->ins, "cannot reformat data into json");  | 
397  | 0  |         FLB_OUTPUT_RETURN(FLB_RETRY);  | 
398  | 0  |     }  | 
399  |  |  | 
400  | 0  |     ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);  | 
401  | 0  |     if (ret != 0) { | 
402  | 0  |         flb_plg_error(ctx->ins, "cannot perform queued ingestion");  | 
403  | 0  |         flb_sds_destroy(json);  | 
404  | 0  |         FLB_OUTPUT_RETURN(FLB_RETRY);  | 
405  | 0  |     }  | 
406  |  |  | 
407  |  |     /* Cleanup */  | 
408  | 0  |     flb_sds_destroy(json);  | 
409  |  |  | 
410  |  |     /* Done */  | 
411  | 0  |     FLB_OUTPUT_RETURN(FLB_OK);  | 
412  | 0  | }  | 
413  |  |  | 
414  |  | static int cb_azure_kusto_exit(void *data, struct flb_config *config)  | 
415  | 0  | { | 
416  | 0  |     struct flb_azure_kusto *ctx = data;  | 
417  |  | 
  | 
418  | 0  |     if (!ctx) { | 
419  | 0  |         return -1;  | 
420  | 0  |     }  | 
421  |  |  | 
422  | 0  |     if (ctx->u) { | 
423  | 0  |         flb_upstream_destroy(ctx->u);  | 
424  | 0  |         ctx->u = NULL;  | 
425  | 0  |     }  | 
426  |  | 
  | 
427  | 0  |     flb_azure_kusto_conf_destroy(ctx);  | 
428  |  | 
  | 
429  | 0  |     return 0;  | 
430  | 0  | }  | 
431  |  |  | 
432  |  | static struct flb_config_map config_map[] = { | 
433  |  |     {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE, | 
434  |  |      offsetof(struct flb_azure_kusto, tenant_id),  | 
435  |  |      "Set the tenant ID of the AAD application used for authentication"},  | 
436  |  |     {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE, | 
437  |  |      offsetof(struct flb_azure_kusto, client_id),  | 
438  |  |      "Set the client ID (Application ID) of the AAD application used for authentication"},  | 
439  |  |     {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE, | 
440  |  |      offsetof(struct flb_azure_kusto, client_secret),  | 
441  |  |      "Set the client secret (Application Password) of the AAD application used for "  | 
442  |  |      "authentication"},  | 
443  |  |     {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, | 
444  |  |      offsetof(struct flb_azure_kusto, ingestion_endpoint),  | 
445  |  |      "Set the Kusto cluster's ingestion endpoint URL (e.g. "  | 
446  |  |      "https://ingest-mycluster.eastus.kusto.windows.net)"},  | 
447  |  |     {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE, | 
448  |  |      offsetof(struct flb_azure_kusto, database_name), "Set the database name"},  | 
449  |  |     {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE, | 
450  |  |      offsetof(struct flb_azure_kusto, table_name), "Set the table name"},  | 
451  |  |     {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE, | 
452  |  |      offsetof(struct flb_azure_kusto, ingestion_mapping_reference),  | 
453  |  |      "Set the ingestion mapping reference"},  | 
454  |  |     {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE, | 
455  |  |      offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"},  | 
456  |  |     {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE, | 
457  |  |      offsetof(struct flb_azure_kusto, include_tag_key),  | 
458  |  |      "If enabled, tag is appended to output. "  | 
459  |  |      "The key name is used 'tag_key' property."},  | 
460  |  |     {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE, | 
461  |  |      offsetof(struct flb_azure_kusto, tag_key),  | 
462  |  |      "The key name of tag. If 'include_tag_key' is false, "  | 
463  |  |      "This property is ignored"},  | 
464  |  |     {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE, | 
465  |  |      offsetof(struct flb_azure_kusto, include_time_key),  | 
466  |  |      "If enabled, time is appended to output. "  | 
467  |  |      "The key name is used 'time_key' property."},  | 
468  |  |     {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE, | 
469  |  |      offsetof(struct flb_azure_kusto, time_key),  | 
470  |  |      "The key name of the time. If 'include_time_key' is false, "  | 
471  |  |      "This property is ignored"},  | 
472  |  |     /* EOF */  | 
473  |  |     {0}}; | 
474  |  |  | 
475  |  | struct flb_output_plugin out_azure_kusto_plugin = { | 
476  |  |     .name = "azure_kusto",  | 
477  |  |     .description = "Send events to Kusto (Azure Data Explorer)",  | 
478  |  |     .cb_init = cb_azure_kusto_init,  | 
479  |  |     .cb_flush = cb_azure_kusto_flush,  | 
480  |  |     .cb_exit = cb_azure_kusto_exit,  | 
481  |  |     .config_map = config_map,  | 
482  |  |     /* Plugin flags */  | 
483  |  |     .flags = FLB_OUTPUT_NET | FLB_IO_TLS,  | 
484  |  | };  |