/src/fluent-bit/plugins/out_chronicle/chronicle.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_http_client.h> |
22 | | #include <fluent-bit/flb_pack.h> |
23 | | #include <fluent-bit/flb_utils.h> |
24 | | #include <fluent-bit/flb_time.h> |
25 | | #include <fluent-bit/flb_oauth2.h> |
26 | | #include <fluent-bit/flb_base64.h> |
27 | | #include <fluent-bit/flb_hash.h> |
28 | | #include <fluent-bit/flb_crypto.h> |
29 | | #include <fluent-bit/flb_signv4.h> |
30 | | #include <fluent-bit/flb_kv.h> |
31 | | #include <fluent-bit/flb_log_event_encoder.h> |
32 | | #include <fluent-bit/flb_log_event_decoder.h> |
33 | | |
34 | | #include <msgpack.h> |
35 | | |
36 | | #include "chronicle.h" |
37 | | #include "chronicle_conf.h" |
38 | | |
39 | | // TODO: The following code is copied from the Stackdriver plugin and should be |
40 | | // factored into common library functions. |
41 | | |
42 | | /* |
43 | | * Base64 Encoding in JWT must: |
44 | | * |
45 | | * - remove any trailing padding '=' character |
46 | | * - replace '+' with '-' |
47 | | * - replace '/' with '_' |
48 | | * |
49 | | * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C |
50 | | */ |
51 | | int chronicle_jwt_base64_url_encode(unsigned char *out_buf, size_t out_size, |
52 | | unsigned char *in_buf, size_t in_size, |
53 | | size_t *olen) |
54 | | |
55 | 0 | { |
56 | 0 | int i; |
57 | 0 | size_t len; |
58 | 0 | int result; |
59 | | |
60 | | /* do normal base64 encoding */ |
61 | 0 | result = flb_base64_encode((unsigned char *) out_buf, out_size - 1, |
62 | 0 | &len, in_buf, in_size); |
63 | 0 | if (result != 0) { |
64 | 0 | return -1; |
65 | 0 | } |
66 | | |
67 | | /* Replace '+' and '/' characters */ |
68 | 0 | for (i = 0; i < len && out_buf[i] != '='; i++) { |
69 | 0 | if (out_buf[i] == '+') { |
70 | 0 | out_buf[i] = '-'; |
71 | 0 | } |
72 | 0 | else if (out_buf[i] == '/') { |
73 | 0 | out_buf[i] = '_'; |
74 | 0 | } |
75 | 0 | } |
76 | | |
77 | | /* Now 'i' becomes the new length */ |
78 | 0 | *olen = i; |
79 | 0 | return 0; |
80 | 0 | } |
81 | | |
82 | | static int chronicle_jwt_encode(struct flb_chronicle *ctx, |
83 | | char *payload, char *secret, |
84 | | char **out_signature, size_t *out_size) |
85 | 0 | { |
86 | 0 | int ret; |
87 | 0 | int len; |
88 | 0 | int buf_size; |
89 | 0 | size_t olen; |
90 | 0 | char *buf; |
91 | 0 | char *sigd; |
92 | 0 | char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}"; |
93 | 0 | unsigned char sha256_buf[32] = {0}; |
94 | 0 | flb_sds_t out; |
95 | 0 | unsigned char sig[256] = {0}; |
96 | 0 | size_t sig_len; |
97 | |
|
98 | 0 | buf_size = (strlen(payload) + strlen(secret)) * 2; |
99 | 0 | buf = flb_malloc(buf_size); |
100 | 0 | if (!buf) { |
101 | 0 | flb_errno(); |
102 | 0 | return -1; |
103 | 0 | } |
104 | | |
105 | | /* Encode header */ |
106 | 0 | len = strlen(headers); |
107 | 0 | ret = flb_base64_encode((unsigned char *) buf, buf_size - 1, |
108 | 0 | &olen, (unsigned char *) headers, len); |
109 | 0 | if (ret != 0) { |
110 | 0 | flb_free(buf); |
111 | |
|
112 | 0 | return ret; |
113 | 0 | } |
114 | | |
115 | | /* Create buffer to store JWT */ |
116 | 0 | out = flb_sds_create_size(2048); |
117 | 0 | if (!out) { |
118 | 0 | flb_errno(); |
119 | 0 | flb_free(buf); |
120 | 0 | return -1; |
121 | 0 | } |
122 | | |
123 | | /* Append header */ |
124 | 0 | flb_sds_cat_safe(&out, buf, olen); |
125 | 0 | flb_sds_cat_safe(&out, ".", 1); |
126 | | |
127 | | /* Encode Payload */ |
128 | 0 | len = strlen(payload); |
129 | 0 | chronicle_jwt_base64_url_encode((unsigned char *) buf, buf_size, |
130 | 0 | (unsigned char *) payload, len, &olen); |
131 | | |
132 | | /* Append Payload */ |
133 | 0 | flb_sds_cat_safe(&out, buf, olen); |
134 | | |
135 | | /* do sha256() of base64(header).base64(payload) */ |
136 | 0 | ret = flb_hash_simple(FLB_HASH_SHA256, |
137 | 0 | (unsigned char *) out, flb_sds_len(out), |
138 | 0 | sha256_buf, sizeof(sha256_buf)); |
139 | |
|
140 | 0 | if (ret != FLB_CRYPTO_SUCCESS) { |
141 | 0 | flb_plg_error(ctx->ins, "error hashing token"); |
142 | 0 | flb_free(buf); |
143 | 0 | flb_sds_destroy(out); |
144 | 0 | return -1; |
145 | 0 | } |
146 | | |
147 | 0 | len = strlen(secret); |
148 | 0 | sig_len = sizeof(sig); |
149 | |
|
150 | 0 | ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY, |
151 | 0 | FLB_CRYPTO_PADDING_PKCS1, |
152 | 0 | FLB_HASH_SHA256, |
153 | 0 | (unsigned char *) secret, len, |
154 | 0 | sha256_buf, sizeof(sha256_buf), |
155 | 0 | sig, &sig_len); |
156 | |
|
157 | 0 | if (ret != FLB_CRYPTO_SUCCESS) { |
158 | 0 | flb_plg_error(ctx->ins, "error creating RSA context"); |
159 | 0 | flb_free(buf); |
160 | 0 | flb_sds_destroy(out); |
161 | 0 | return -1; |
162 | 0 | } |
163 | | |
164 | 0 | sigd = flb_malloc(2048); |
165 | 0 | if (!sigd) { |
166 | 0 | flb_errno(); |
167 | 0 | flb_free(buf); |
168 | 0 | flb_sds_destroy(out); |
169 | 0 | return -1; |
170 | 0 | } |
171 | | |
172 | 0 | chronicle_jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen); |
173 | |
|
174 | 0 | flb_sds_cat_safe(&out, ".", 1); |
175 | 0 | flb_sds_cat_safe(&out, sigd, olen); |
176 | |
|
177 | 0 | *out_signature = out; |
178 | 0 | *out_size = flb_sds_len(out); |
179 | |
|
180 | 0 | flb_free(buf); |
181 | 0 | flb_free(sigd); |
182 | |
|
183 | 0 | return 0; |
184 | 0 | } |
185 | | |
186 | | /* Create a new oauth2 context and get a oauth2 token */ |
187 | | static int chronicle_get_oauth2_token(struct flb_chronicle *ctx) |
188 | 0 | { |
189 | 0 | int ret; |
190 | 0 | char *token; |
191 | 0 | char *sig_data; |
192 | 0 | size_t sig_size; |
193 | 0 | time_t issued; |
194 | 0 | time_t expires; |
195 | 0 | char payload[1024]; |
196 | | |
197 | | /* Clear any previous oauth2 payload content */ |
198 | 0 | flb_oauth2_payload_clear(ctx->o); |
199 | | |
200 | | /* JWT encode for oauth2 */ |
201 | 0 | issued = time(NULL); |
202 | 0 | expires = issued + FLB_CHRONICLE_TOKEN_REFRESH; |
203 | |
|
204 | 0 | snprintf(payload, sizeof(payload) - 1, |
205 | 0 | "{\"iss\": \"%s\", \"scope\": \"%s\", " |
206 | 0 | "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}", |
207 | 0 | ctx->oauth_credentials->client_email, FLB_CHRONICLE_SCOPE, |
208 | 0 | FLB_CHRONICLE_AUTH_URL, |
209 | 0 | expires, issued); |
210 | | |
211 | | /* Compose JWT signature */ |
212 | 0 | ret = chronicle_jwt_encode(ctx, payload, ctx->oauth_credentials->private_key, |
213 | 0 | &sig_data, &sig_size); |
214 | 0 | if (ret != 0) { |
215 | 0 | flb_plg_error(ctx->ins, "JWT signature generation failed"); |
216 | 0 | return -1; |
217 | 0 | } |
218 | | |
219 | 0 | flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data); |
220 | |
|
221 | 0 | ret = flb_oauth2_payload_append(ctx->o, |
222 | 0 | "grant_type", -1, |
223 | 0 | "urn%3Aietf%3Aparams%3Aoauth%3A" |
224 | 0 | "grant-type%3Ajwt-bearer", -1); |
225 | 0 | if (ret == -1) { |
226 | 0 | flb_plg_error(ctx->ins, "error appending oauth2 params"); |
227 | 0 | flb_sds_destroy(sig_data); |
228 | 0 | return -1; |
229 | 0 | } |
230 | | |
231 | 0 | ret = flb_oauth2_payload_append(ctx->o, |
232 | 0 | "assertion", -1, |
233 | 0 | sig_data, sig_size); |
234 | 0 | if (ret == -1) { |
235 | 0 | flb_plg_error(ctx->ins, "error appending oauth2 params"); |
236 | 0 | flb_sds_destroy(sig_data); |
237 | 0 | return -1; |
238 | 0 | } |
239 | 0 | flb_sds_destroy(sig_data); |
240 | | |
241 | | /* Retrieve access token */ |
242 | 0 | token = flb_oauth2_token_get(ctx->o); |
243 | 0 | if (!token) { |
244 | 0 | flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); |
245 | 0 | return -1; |
246 | 0 | } |
247 | | |
248 | 0 | return 0; |
249 | 0 | } |
250 | | |
251 | | static flb_sds_t get_google_token(struct flb_chronicle *ctx) |
252 | 0 | { |
253 | 0 | int ret = 0; |
254 | 0 | flb_sds_t output = NULL; |
255 | |
|
256 | 0 | if (pthread_mutex_lock(&ctx->token_mutex)){ |
257 | 0 | flb_plg_error(ctx->ins, "error locking mutex"); |
258 | 0 | return NULL; |
259 | 0 | } |
260 | | |
261 | 0 | if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { |
262 | 0 | ret = chronicle_get_oauth2_token(ctx); |
263 | 0 | } |
264 | | |
265 | | /* Copy string to prevent race conditions (get_oauth2 can free the string) */ |
266 | 0 | if (ret == 0) { |
267 | 0 | output = flb_sds_create(ctx->o->token_type); |
268 | 0 | flb_sds_printf(&output, " %s", ctx->o->access_token); |
269 | 0 | } |
270 | |
|
271 | 0 | if (pthread_mutex_unlock(&ctx->token_mutex)){ |
272 | 0 | flb_plg_error(ctx->ins, "error unlocking mutex"); |
273 | 0 | if (output) { |
274 | 0 | flb_sds_destroy(output); |
275 | 0 | } |
276 | 0 | return NULL; |
277 | 0 | } |
278 | | |
279 | 0 | return output; |
280 | 0 | } |
281 | | |
282 | | static int validate_log_type(struct flb_chronicle *ctx, struct flb_config *config, |
283 | | const char *body, size_t len) |
284 | 0 | { |
285 | 0 | int ret = -1; |
286 | 0 | int root_type; |
287 | 0 | char *msgpack_buf = NULL; |
288 | 0 | size_t msgpack_size; |
289 | 0 | size_t off = 0; |
290 | 0 | msgpack_unpacked result; |
291 | 0 | int i, j, k; |
292 | 0 | msgpack_object key; |
293 | 0 | msgpack_object val; |
294 | 0 | msgpack_object root; |
295 | 0 | msgpack_object *array; |
296 | 0 | msgpack_object *supported_type; |
297 | 0 | int root_map_size; |
298 | 0 | int array_size = 0; |
299 | | |
300 | |
|
301 | 0 | ret = flb_pack_json(body, len, |
302 | 0 | &msgpack_buf, &msgpack_size, |
303 | 0 | &root_type, NULL); |
304 | |
|
305 | 0 | if (ret != 0 || root_type != JSMN_OBJECT) { |
306 | 0 | flb_plg_error(ctx->ins, "json to msgpack conversion error"); |
307 | 0 | } |
308 | |
|
309 | 0 | ret = -1; |
310 | 0 | msgpack_unpacked_init(&result); |
311 | 0 | while (msgpack_unpack_next(&result, msgpack_buf, msgpack_size, &off) == MSGPACK_UNPACK_SUCCESS) { |
312 | 0 | if (result.data.type != MSGPACK_OBJECT_MAP) { |
313 | 0 | flb_plg_error(ctx->ins, "Invalid log_type payload"); |
314 | 0 | ret = -2; |
315 | |
|
316 | 0 | goto cleanup; |
317 | 0 | } |
318 | | |
319 | 0 | root = result.data; |
320 | 0 | root_map_size = root.via.map.size; |
321 | |
|
322 | 0 | for (i = 0; i < root_map_size; i++) { |
323 | 0 | key = root.via.map.ptr[i].key; |
324 | 0 | val = root.via.map.ptr[i].val; |
325 | |
|
326 | 0 | if (val.type != MSGPACK_OBJECT_ARRAY) { |
327 | 0 | flb_plg_error(ctx->ins, "Invalid inner array type of log_type payload"); |
328 | 0 | ret = -2; |
329 | |
|
330 | 0 | goto cleanup; |
331 | 0 | } |
332 | | |
333 | 0 | array = val.via.array.ptr; |
334 | 0 | array_size = val.via.array.size; |
335 | |
|
336 | 0 | for (j = 0; j < array_size; j++) { |
337 | 0 | supported_type = &array[j]; |
338 | |
|
339 | 0 | if (supported_type->type != MSGPACK_OBJECT_MAP) { |
340 | 0 | flb_plg_error(ctx->ins, "Invalid inner maps of log_type payload"); |
341 | 0 | ret = -2; |
342 | |
|
343 | 0 | continue; |
344 | 0 | } |
345 | | |
346 | 0 | for (k = 0; k < supported_type->via.map.size; k++) { |
347 | 0 | key = supported_type->via.map.ptr[k].key; |
348 | 0 | val = supported_type->via.map.ptr[k].val; |
349 | |
|
350 | 0 | if (strncmp("logType", key.via.str.ptr, key.via.str.size) == 0) { |
351 | 0 | if (strncmp(ctx->log_type, val.via.bin.ptr, val.via.str.size) == 0) { |
352 | 0 | ret = 0; |
353 | 0 | goto cleanup; |
354 | 0 | } |
355 | 0 | } |
356 | 0 | } |
357 | 0 | } |
358 | 0 | } |
359 | 0 | } |
360 | | |
361 | 0 | cleanup: |
362 | 0 | msgpack_unpacked_destroy(&result); |
363 | | |
364 | | /* release 'out_buf' if it was allocated */ |
365 | 0 | if (msgpack_buf) { |
366 | 0 | flb_free(msgpack_buf); |
367 | 0 | } |
368 | |
|
369 | 0 | return ret; |
370 | 0 | } |
371 | | |
372 | | static int check_chronicle_log_type(struct flb_chronicle *ctx, struct flb_config *config) |
373 | 0 | { |
374 | 0 | int ret; |
375 | 0 | size_t b_sent; |
376 | 0 | flb_sds_t token; |
377 | 0 | struct flb_connection *u_conn; |
378 | 0 | struct flb_http_client *c; |
379 | | |
380 | | /* Get upstream connection */ |
381 | 0 | u_conn = flb_upstream_conn_get(ctx->u); |
382 | 0 | if (!u_conn) { |
383 | 0 | return -1; |
384 | 0 | } |
385 | | |
386 | | /* Get or renew Token */ |
387 | 0 | token = get_google_token(ctx); |
388 | |
|
389 | 0 | if (!token) { |
390 | 0 | flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); |
391 | 0 | flb_upstream_conn_release(u_conn); |
392 | 0 | return -1; |
393 | 0 | } |
394 | | |
395 | | /* Compose HTTP Client request */ |
396 | 0 | c = flb_http_client(u_conn, FLB_HTTP_GET, FLB_CHRONICLE_LOG_TYPE_ENDPOINT, |
397 | 0 | NULL, 0, NULL, 0, NULL, 0); |
398 | 0 | if (!c) { |
399 | 0 | flb_plg_error(ctx->ins, "cannot create HTTP client context"); |
400 | 0 | flb_upstream_conn_release(u_conn); |
401 | 0 | flb_sds_destroy(token); |
402 | |
|
403 | 0 | return -1; |
404 | 0 | } |
405 | | |
406 | | /* Chronicle supported types are growing. Not to specify the read limit. */ |
407 | 0 | flb_http_buffer_size(c, 0); |
408 | 0 | flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); |
409 | 0 | flb_http_add_header(c, "Content-Type", 12, "application/json", 16); |
410 | | |
411 | | /* Compose and append Authorization header */ |
412 | 0 | flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); |
413 | | |
414 | | /* Send HTTP request */ |
415 | 0 | ret = flb_http_do(c, &b_sent); |
416 | | |
417 | | /* validate response */ |
418 | 0 | if (ret != 0) { |
419 | 0 | flb_plg_warn(ctx->ins, "http_do=%i", ret); |
420 | 0 | goto cleanup; |
421 | 0 | } |
422 | 0 | else { |
423 | | /* The request was issued successfully, validate the 'error' field */ |
424 | 0 | flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); |
425 | 0 | if (c->resp.status == 200) { |
426 | 0 | ret = validate_log_type(ctx, config, c->resp.payload, c->resp.payload_size); |
427 | 0 | if (ret != 0) { |
428 | 0 | flb_plg_error(ctx->ins, "Validate log_type is failed"); |
429 | 0 | goto cleanup; |
430 | 0 | } |
431 | 0 | } |
432 | 0 | else { |
433 | 0 | if (c->resp.payload && c->resp.payload_size > 0) { |
434 | | /* we got an error */ |
435 | 0 | flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); |
436 | 0 | } |
437 | |
|
438 | 0 | goto cleanup; |
439 | 0 | } |
440 | 0 | } |
441 | | |
442 | 0 | cleanup: |
443 | | |
444 | | /* Cleanup */ |
445 | 0 | flb_sds_destroy(token); |
446 | 0 | flb_http_client_destroy(c); |
447 | 0 | flb_upstream_conn_release(u_conn); |
448 | |
|
449 | 0 | return ret; |
450 | 0 | } |
451 | | |
452 | | static int cb_chronicle_init(struct flb_output_instance *ins, |
453 | | struct flb_config *config, void *data) |
454 | 0 | { |
455 | 0 | char *token; |
456 | 0 | int io_flags = FLB_IO_TLS; |
457 | 0 | struct flb_chronicle *ctx; |
458 | 0 | int ret; |
459 | | |
460 | | /* Create config context */ |
461 | 0 | ctx = flb_chronicle_conf_create(ins, config); |
462 | 0 | if (!ctx) { |
463 | 0 | flb_plg_error(ins, "configuration failed"); |
464 | 0 | return -1; |
465 | 0 | } |
466 | | |
467 | 0 | flb_output_set_context(ins, ctx); |
468 | | |
469 | | /* Network mode IPv6 */ |
470 | 0 | if (ins->host.ipv6 == FLB_TRUE) { |
471 | 0 | io_flags |= FLB_IO_IPV6; |
472 | 0 | } |
473 | | |
474 | | /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */ |
475 | 0 | pthread_mutex_init(&ctx->token_mutex, NULL); |
476 | | |
477 | | /* |
478 | | * Create upstream context for Chronicle Streaming Inserts |
479 | | * (no oauth2 service) |
480 | | */ |
481 | 0 | ctx->u = flb_upstream_create_url(config, ctx->uri, |
482 | 0 | io_flags, ins->tls); |
483 | 0 | if (!ctx->u) { |
484 | 0 | flb_plg_error(ctx->ins, "upstream creation failed"); |
485 | 0 | return -1; |
486 | 0 | } |
487 | | |
488 | | /* Create oauth2 context */ |
489 | 0 | ctx->o = flb_oauth2_create(ctx->config, FLB_CHRONICLE_AUTH_URL, 3000); |
490 | 0 | if (!ctx->o) { |
491 | 0 | flb_plg_error(ctx->ins, "cannot create oauth2 context"); |
492 | 0 | return -1; |
493 | 0 | } |
494 | 0 | flb_output_upstream_set(ctx->u, ins); |
495 | | |
496 | | /* Get or renew the OAuth2 token */ |
497 | 0 | token = get_google_token(ctx); |
498 | |
|
499 | 0 | if (!token) { |
500 | 0 | flb_plg_warn(ctx->ins, "token retrieval failed"); |
501 | 0 | } |
502 | 0 | else { |
503 | 0 | flb_sds_destroy(token); |
504 | 0 | } |
505 | |
|
506 | 0 | ret = check_chronicle_log_type(ctx, config); |
507 | 0 | if (ret != 0) { |
508 | 0 | flb_plg_error(ctx->ins, "Validate log_type failed. '%s' is not supported. ret = %d", |
509 | 0 | ctx->log_type, ret); |
510 | 0 | return -1; |
511 | 0 | } |
512 | | |
513 | 0 | return 0; |
514 | 0 | } |
515 | | |
516 | | static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, uint64_t bytes, struct flb_log_event log_event) |
517 | 0 | { |
518 | 0 | int i; |
519 | 0 | int map_size; |
520 | 0 | int check = FLB_FALSE; |
521 | 0 | int log_key_missing = 0; |
522 | 0 | int ret; |
523 | 0 | struct flb_chronicle *ctx = out_context; |
524 | 0 | char *val_buf; |
525 | 0 | char *key_str = NULL; |
526 | 0 | size_t key_str_size = 0; |
527 | 0 | size_t msgpack_size = bytes + bytes / 4; |
528 | 0 | size_t val_offset = 0; |
529 | 0 | flb_sds_t out_buf; |
530 | 0 | msgpack_object map; |
531 | 0 | msgpack_object key; |
532 | 0 | msgpack_object val; |
533 | | |
534 | | /* Allocate buffer to store log_key contents */ |
535 | 0 | val_buf = flb_calloc(1, msgpack_size); |
536 | 0 | if (val_buf == NULL) { |
537 | 0 | flb_plg_error(ctx->ins, "Could not allocate enough " |
538 | 0 | "memory to read record"); |
539 | 0 | flb_errno(); |
540 | 0 | return NULL; |
541 | 0 | } |
542 | | |
543 | | /* Get the record/map */ |
544 | 0 | map = *log_event.body; |
545 | |
|
546 | 0 | if (map.type != MSGPACK_OBJECT_MAP) { |
547 | 0 | return NULL; |
548 | 0 | } |
549 | | |
550 | 0 | map_size = map.via.map.size; |
551 | | |
552 | | /* Extract log_key from record and append to output buffer */ |
553 | 0 | for (i = 0; i < map_size; i++) { |
554 | 0 | key = map.via.map.ptr[i].key; |
555 | 0 | val = map.via.map.ptr[i].val; |
556 | |
|
557 | 0 | if (key.type == MSGPACK_OBJECT_BIN) { |
558 | 0 | key_str = (char *) key.via.bin.ptr; |
559 | 0 | key_str_size = key.via.bin.size; |
560 | 0 | check = FLB_TRUE; |
561 | 0 | } |
562 | 0 | if (key.type == MSGPACK_OBJECT_STR) { |
563 | 0 | key_str = (char *) key.via.str.ptr; |
564 | 0 | key_str_size = key.via.str.size; |
565 | 0 | check = FLB_TRUE; |
566 | 0 | } |
567 | |
|
568 | 0 | if (check == FLB_TRUE) { |
569 | 0 | if (strncmp(ctx->log_key, key_str, key_str_size) == 0) { |
570 | | |
571 | | /* |
572 | | * Copy contents of value into buffer. Necessary to copy |
573 | | * strings because flb_msgpack_to_json does not handle nested |
574 | | * JSON gracefully and double escapes them. |
575 | | */ |
576 | 0 | if (val.type == MSGPACK_OBJECT_BIN) { |
577 | 0 | memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size); |
578 | 0 | val_offset += val.via.bin.size; |
579 | 0 | val_buf[val_offset] = '\0'; |
580 | 0 | val_offset++; |
581 | 0 | } |
582 | 0 | else if (val.type == MSGPACK_OBJECT_STR) { |
583 | 0 | memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size); |
584 | 0 | val_offset += val.via.str.size; |
585 | 0 | val_buf[val_offset] = '\0'; |
586 | 0 | val_offset++; |
587 | 0 | } |
588 | 0 | else { |
589 | 0 | ret = flb_msgpack_to_json(val_buf + val_offset, |
590 | 0 | msgpack_size - val_offset, &val); |
591 | 0 | if (ret < 0) { |
592 | 0 | break; |
593 | 0 | } |
594 | 0 | val_offset += ret; |
595 | 0 | val_buf[val_offset] = '\0'; |
596 | 0 | val_offset++; |
597 | 0 | } |
598 | | /* Exit early once log_key has been found for current record */ |
599 | 0 | break; |
600 | 0 | } |
601 | 0 | } |
602 | | |
603 | | /* If log_key was not found in the current record, mark log key as missing */ |
604 | 0 | log_key_missing++; |
605 | 0 | } |
606 | |
|
607 | 0 | if (log_key_missing > 0) { |
608 | 0 | flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records", |
609 | 0 | ctx->log_key, log_key_missing); |
610 | 0 | } |
611 | | |
612 | | /* If nothing was read, destroy buffer */ |
613 | 0 | if (val_offset == 0) { |
614 | 0 | flb_free(val_buf); |
615 | 0 | return NULL; |
616 | 0 | } |
617 | 0 | val_buf[val_offset] = '\0'; |
618 | | |
619 | | /* Create output buffer to store contents */ |
620 | 0 | out_buf = flb_sds_create(val_buf); |
621 | 0 | if (out_buf == NULL) { |
622 | 0 | flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents."); |
623 | 0 | flb_errno(); |
624 | 0 | } |
625 | 0 | flb_free(val_buf); |
626 | |
|
627 | 0 | return out_buf; |
628 | 0 | } |
629 | | |
630 | | static int count_mp_with_threshold(size_t last_offset, size_t threshold, |
631 | | struct flb_log_event_decoder *log_decoder, |
632 | | struct flb_chronicle *ctx) |
633 | 0 | { |
634 | 0 | int ret; |
635 | 0 | int array_size = 0; |
636 | 0 | size_t off = 0; |
637 | 0 | struct flb_log_event log_event; |
638 | | |
639 | | /* Adjust decoder offset */ |
640 | 0 | if (last_offset != 0) { |
641 | 0 | log_decoder->offset = last_offset; |
642 | 0 | } |
643 | |
|
644 | 0 | while ((ret = flb_log_event_decoder_next( |
645 | 0 | log_decoder, |
646 | 0 | &log_event)) == FLB_EVENT_DECODER_SUCCESS) { |
647 | 0 | off = log_decoder->offset; |
648 | 0 | array_size++; |
649 | |
|
650 | 0 | if (off >= (threshold + last_offset)) { |
651 | 0 | flb_plg_debug(ctx->ins, |
652 | 0 | "the offset %zu is exceeded the threshold %zu. " |
653 | 0 | "Splitting the payload over the threshold so the processed array size is %d", |
654 | 0 | off, threshold, array_size); |
655 | |
|
656 | 0 | break; |
657 | 0 | } |
658 | 0 | } |
659 | |
|
660 | 0 | return array_size; |
661 | 0 | } |
662 | | |
663 | | static int chronicle_format(const void *data, size_t bytes, |
664 | | const char *tag, size_t tag_len, |
665 | | char **out_data, size_t *out_size, |
666 | | size_t last_offset, |
667 | | size_t threshold, size_t *out_offset, |
668 | | struct flb_log_event_decoder *log_decoder, |
669 | | struct flb_chronicle *ctx) |
670 | 0 | { |
671 | 0 | int len; |
672 | 0 | int ret; |
673 | 0 | int array_size = 0; |
674 | 0 | size_t off = 0; |
675 | 0 | size_t last_off = 0; |
676 | 0 | size_t alloc_size = 0; |
677 | 0 | size_t s; |
678 | 0 | char time_formatted[255]; |
679 | | /* Parameters for Timestamp */ |
680 | 0 | struct tm tm; |
681 | 0 | flb_sds_t out_buf; |
682 | 0 | struct flb_log_event log_event; |
683 | 0 | msgpack_sbuffer mp_sbuf; |
684 | 0 | msgpack_packer mp_pck; |
685 | 0 | flb_sds_t log_text = NULL; |
686 | 0 | int log_text_size; |
687 | |
|
688 | 0 | array_size = count_mp_with_threshold(last_offset, threshold, log_decoder, ctx); |
689 | | |
690 | | /* Reset the decoder state */ |
691 | 0 | flb_log_event_decoder_reset(log_decoder, (char *) data, bytes); |
692 | | |
693 | | /* Create temporary msgpack buffer */ |
694 | 0 | msgpack_sbuffer_init(&mp_sbuf); |
695 | 0 | msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); |
696 | | |
697 | | /* |
698 | | * Pack root map (unstructured log): |
699 | | * see: https://cloud.google.com/chronicle/docs/reference/ingestion-api#request_body_2 |
700 | | * { |
701 | | * "customer_id": "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c", |
702 | | * "log_type": "BIND_DNS", |
703 | | * "entries": [ |
704 | | * { |
705 | | * "log_text": "26-Feb-2019 13:35:02.187 client 10.120.20.32#4238: query: altostrat.com IN A + (203.0.113.102)", |
706 | | * "ts_epoch_microseconds": 1551188102187000 |
707 | | * }, |
708 | | * { |
709 | | * "log_text": "26-Feb-2019 13:37:04.523 client 10.50.100.33#1116: query: examplepetstore.com IN A + (203.0.113.102)", |
710 | | * "ts_rfc3339": "2019-26-02T13:37:04.523-08:00" |
711 | | * }, |
712 | | * { |
713 | | * "log_text": "26-Feb-2019 13:39:01.115 client 10.1.2.3#3333: query: www.example.com IN A + (203.0.113.102)" |
714 | | * }, |
715 | | * ] |
716 | | * } |
717 | | */ |
718 | 0 | msgpack_pack_map(&mp_pck, 3); |
719 | |
|
720 | 0 | msgpack_pack_str(&mp_pck, 11); |
721 | 0 | msgpack_pack_str_body(&mp_pck, "customer_id", 11); |
722 | |
|
723 | 0 | msgpack_pack_str(&mp_pck, strlen(ctx->customer_id)); |
724 | 0 | msgpack_pack_str_body(&mp_pck, ctx->customer_id, strlen(ctx->customer_id)); |
725 | |
|
726 | 0 | msgpack_pack_str(&mp_pck, 8); |
727 | 0 | msgpack_pack_str_body(&mp_pck, "log_type", 8); |
728 | |
|
729 | 0 | msgpack_pack_str(&mp_pck, strlen(ctx->log_type)); |
730 | 0 | msgpack_pack_str_body(&mp_pck, ctx->log_type, strlen(ctx->log_type)); |
731 | |
|
732 | 0 | msgpack_pack_str(&mp_pck, 7); |
733 | 0 | msgpack_pack_str_body(&mp_pck, "entries", 7); |
734 | | |
735 | | /* Append entries */ |
736 | 0 | msgpack_pack_array(&mp_pck, array_size); |
737 | |
|
738 | 0 | flb_plg_trace(ctx->ins, "last offset is %zu", last_offset); |
739 | | /* Adjust decoder offset */ |
740 | 0 | if (last_offset != 0) { |
741 | 0 | log_decoder->offset = last_offset; |
742 | 0 | } |
743 | |
|
744 | 0 | while ((ret = flb_log_event_decoder_next( |
745 | 0 | log_decoder, |
746 | 0 | &log_event)) == FLB_EVENT_DECODER_SUCCESS) { |
747 | 0 | off = log_decoder->offset; |
748 | 0 | alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ |
749 | 0 | last_off = off; |
750 | | |
751 | | /* |
752 | | * Pack entries |
753 | | * |
754 | | * { |
755 | | * "log_text": {...}, |
756 | | * "ts_rfc3339": "..." |
757 | | * } |
758 | | * |
759 | | */ |
760 | 0 | msgpack_pack_map(&mp_pck, 2); |
761 | | |
762 | | /* log_text */ |
763 | 0 | msgpack_pack_str(&mp_pck, 8); |
764 | 0 | msgpack_pack_str_body(&mp_pck, "log_text", 8); |
765 | 0 | if (ctx->log_key != NULL) { |
766 | 0 | log_text = flb_pack_msgpack_extract_log_key(ctx, bytes, log_event); |
767 | 0 | log_text_size = flb_sds_len(log_text); |
768 | 0 | } |
769 | 0 | else { |
770 | 0 | log_text = flb_msgpack_to_json_str(alloc_size, log_event.body); |
771 | 0 | log_text_size = strlen(log_text); |
772 | 0 | } |
773 | |
|
774 | 0 | if (log_text == NULL) { |
775 | 0 | flb_plg_error(ctx->ins, "Could not marshal msgpack to output string"); |
776 | 0 | return -1; |
777 | 0 | } |
778 | 0 | msgpack_pack_str(&mp_pck, log_text_size); |
779 | 0 | msgpack_pack_str_body(&mp_pck, log_text, log_text_size); |
780 | |
|
781 | 0 | if (ctx->log_key != NULL) { |
782 | 0 | flb_sds_destroy(log_text); |
783 | 0 | } |
784 | 0 | else { |
785 | 0 | flb_free(log_text); |
786 | 0 | } |
787 | | /* timestamp */ |
788 | 0 | msgpack_pack_str(&mp_pck, 10); |
789 | 0 | msgpack_pack_str_body(&mp_pck, "ts_rfc3339", 10); |
790 | |
|
791 | 0 | gmtime_r(&log_event.timestamp.tm.tv_sec, &tm); |
792 | 0 | s = strftime(time_formatted, sizeof(time_formatted) - 1, |
793 | 0 | FLB_STD_TIME_FMT, &tm); |
794 | 0 | len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, |
795 | 0 | ".%03" PRIu64 "Z", |
796 | 0 | (uint64_t) log_event.timestamp.tm.tv_nsec); |
797 | 0 | s += len; |
798 | |
|
799 | 0 | msgpack_pack_str(&mp_pck, s); |
800 | 0 | msgpack_pack_str_body(&mp_pck, time_formatted, s); |
801 | |
|
802 | 0 | if (off >= (threshold + last_offset)) { |
803 | 0 | flb_plg_debug(ctx->ins, |
804 | 0 | "the offset %zu is exceeded the threshold %zu. " |
805 | 0 | "Splitting the payload over the threshold so the processed array size has %d.", |
806 | 0 | off, threshold, array_size); |
807 | |
|
808 | 0 | break; |
809 | 0 | } |
810 | 0 | } |
811 | | |
812 | | /* Convert from msgpack to JSON */ |
813 | 0 | out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); |
814 | 0 | msgpack_sbuffer_destroy(&mp_sbuf); |
815 | |
|
816 | 0 | if (!out_buf) { |
817 | 0 | flb_plg_error(ctx->ins, "error formatting JSON payload"); |
818 | 0 | return -1; |
819 | 0 | } |
820 | | |
821 | 0 | *out_offset = last_off; |
822 | 0 | *out_data = out_buf; |
823 | 0 | *out_size = flb_sds_len(out_buf); |
824 | |
|
825 | 0 | return 0; |
826 | 0 | } |
827 | | |
828 | | static void cb_chronicle_flush(struct flb_event_chunk *event_chunk, |
829 | | struct flb_output_flush *out_flush, |
830 | | struct flb_input_instance *i_ins, |
831 | | void *out_context, |
832 | | struct flb_config *config) |
833 | 0 | { |
834 | 0 | (void) i_ins; |
835 | 0 | (void) config; |
836 | 0 | int ret; |
837 | 0 | int ret_code = FLB_RETRY; |
838 | 0 | size_t b_sent; |
839 | 0 | flb_sds_t token; |
840 | 0 | flb_sds_t payload_buf; |
841 | 0 | size_t payload_size; |
842 | 0 | struct flb_chronicle *ctx = out_context; |
843 | 0 | struct flb_connection *u_conn; |
844 | 0 | struct flb_http_client *c; |
845 | 0 | struct flb_log_event_decoder log_decoder; |
846 | 0 | size_t threshold = 0.8 * 1024 * 1024; |
847 | 0 | size_t offset = 0; |
848 | 0 | size_t out_offset = 0; |
849 | 0 | int need_loop = FLB_TRUE; |
850 | 0 | const int retry_limit = 8; |
851 | 0 | int retries = 0; |
852 | 0 | const size_t one_mebibyte = 1024 * 1024; |
853 | |
|
854 | 0 | flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size); |
855 | | |
856 | | /* Get upstream connection */ |
857 | 0 | u_conn = flb_upstream_conn_get(ctx->u); |
858 | 0 | if (!u_conn) { |
859 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
860 | 0 | } |
861 | | |
862 | | /* Get or renew Token */ |
863 | 0 | token = get_google_token(ctx); |
864 | |
|
865 | 0 | if (!token) { |
866 | 0 | flb_plg_error(ctx->ins, "cannot retrieve oauth2 token"); |
867 | 0 | flb_upstream_conn_release(u_conn); |
868 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
869 | 0 | } |
870 | | |
871 | 0 | flb_plg_trace(ctx->ins, "msgpack payload size is %zu", event_chunk->size); |
872 | | |
873 | | /* Prepare log decoder */ |
874 | 0 | ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size); |
875 | |
|
876 | 0 | if (ret != FLB_EVENT_DECODER_SUCCESS) { |
877 | 0 | flb_plg_error(ctx->ins, |
878 | 0 | "Log event decoder initialization error : %d", ret); |
879 | | |
880 | | /* Cleanup token and conn */ |
881 | 0 | flb_sds_destroy(token); |
882 | 0 | flb_upstream_conn_release(u_conn); |
883 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
884 | 0 | } |
885 | | |
886 | 0 | while (need_loop) { |
887 | 0 | retry: |
888 | 0 | if (retries > 0) { |
889 | | /* (retry_limit - retries)/10.0 is a factor to reduce the |
890 | | * formatting payloads. |
891 | | * For the first attempt, it will get: |
892 | | * (8 - 1) / 10.0 = 0.7 |
893 | | * For the second attempt, it will get: |
894 | | * (8 - 2) / 10.0 = 0.6 |
895 | | * ... |
896 | | * For 7th attempt, it will get: |
897 | | * (8 - 7) / 10.0 = 0.1 |
898 | | * For 8th attempt, it won't happen. Just give up for |
899 | | * formating though. :) |
900 | | */ |
901 | 0 | threshold = (retry_limit - retries)/10.0 * one_mebibyte; |
902 | 0 | } |
903 | | |
904 | | /* Reformat msgpack to chronicle JSON payload */ |
905 | 0 | ret = chronicle_format(event_chunk->data, event_chunk->size, |
906 | 0 | event_chunk->tag, flb_sds_len(event_chunk->tag), |
907 | 0 | &payload_buf, &payload_size, |
908 | 0 | offset, threshold, &out_offset, |
909 | 0 | &log_decoder, ctx); |
910 | 0 | if (ret != 0) { |
911 | 0 | flb_upstream_conn_release(u_conn); |
912 | 0 | flb_sds_destroy(token); |
913 | 0 | flb_sds_destroy(payload_buf); |
914 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
915 | |
|
916 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
917 | 0 | } |
918 | | |
919 | 0 | flb_plg_debug(ctx->ins, "the last offset of msgpack decoder is %zu", out_offset); |
920 | |
|
921 | 0 | if (payload_size >= one_mebibyte) { |
922 | 0 | retries++; |
923 | 0 | if (retries >= retry_limit) { |
924 | 0 | flb_plg_error(ctx->ins, "Retry limit is exeeced for chronicle_format"); |
925 | |
|
926 | 0 | flb_upstream_conn_release(u_conn); |
927 | 0 | flb_sds_destroy(token); |
928 | 0 | flb_sds_destroy(payload_buf); |
929 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
930 | |
|
931 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
932 | 0 | } |
933 | | |
934 | 0 | flb_plg_debug(ctx->ins, |
935 | 0 | "HTTP request body is exeeded to %zd bytes. actual: %zu. left attempt(s): %d", |
936 | 0 | one_mebibyte, payload_size, retry_limit - retries); |
937 | 0 | flb_sds_destroy(payload_buf); |
938 | |
|
939 | 0 | goto retry; |
940 | 0 | } |
941 | 0 | else { |
942 | 0 | retries = 0; |
943 | 0 | } |
944 | | |
945 | | /* Compose HTTP Client request */ |
946 | 0 | c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->endpoint, |
947 | 0 | payload_buf, payload_size, NULL, 0, NULL, 0); |
948 | 0 | if (!c) { |
949 | 0 | flb_plg_error(ctx->ins, "cannot create HTTP client context"); |
950 | 0 | flb_upstream_conn_release(u_conn); |
951 | 0 | flb_sds_destroy(token); |
952 | 0 | flb_sds_destroy(payload_buf); |
953 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
954 | |
|
955 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
956 | 0 | } |
957 | | |
958 | 0 | flb_http_buffer_size(c, 4192); |
959 | 0 | flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); |
960 | 0 | flb_http_add_header(c, "Content-Type", 12, "application/json", 16); |
961 | | |
962 | | /* Compose and append Authorization header */ |
963 | 0 | flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); |
964 | | |
965 | | /* Send HTTP request */ |
966 | 0 | ret = flb_http_do(c, &b_sent); |
967 | | |
968 | | /* validate response */ |
969 | 0 | if (ret != 0) { |
970 | 0 | flb_plg_warn(ctx->ins, "http_do=%i", ret); |
971 | 0 | ret_code = FLB_RETRY; |
972 | 0 | } |
973 | 0 | else { |
974 | | /* The request was issued successfully, validate the 'error' field */ |
975 | 0 | flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status); |
976 | 0 | if (c->resp.status == 200) { |
977 | 0 | ret_code = FLB_OK; |
978 | 0 | } |
979 | 0 | else { |
980 | 0 | if (c->resp.payload && c->resp.payload_size > 0) { |
981 | | /* we got an error */ |
982 | 0 | flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload); |
983 | 0 | } |
984 | 0 | ret_code = FLB_RETRY; |
985 | 0 | } |
986 | 0 | } |
987 | | |
988 | | /* Validate all chunks are processed or not */ |
989 | 0 | if (out_offset >= event_chunk->size) { |
990 | 0 | need_loop = FLB_FALSE; |
991 | 0 | } |
992 | | /* Clean up HTTP client stuffs */ |
993 | 0 | flb_sds_destroy(payload_buf); |
994 | 0 | flb_http_client_destroy(c); |
995 | | |
996 | | /* The next loop uses the returned offset */ |
997 | 0 | offset = out_offset; |
998 | 0 | } |
999 | | |
1000 | | /* Cleanup decoder */ |
1001 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
1002 | | |
1003 | | /* Cleanup token and conn */ |
1004 | 0 | flb_sds_destroy(token); |
1005 | 0 | flb_upstream_conn_release(u_conn); |
1006 | | |
1007 | | /* Done */ |
1008 | 0 | FLB_OUTPUT_RETURN(ret_code); |
1009 | 0 | } |
1010 | | |
1011 | | static int cb_chronicle_exit(void *data, struct flb_config *config) |
1012 | 0 | { |
1013 | 0 | struct flb_chronicle *ctx = data; |
1014 | |
|
1015 | 0 | if (!ctx) { |
1016 | 0 | return -1; |
1017 | 0 | } |
1018 | | |
1019 | 0 | if (ctx->u) { |
1020 | 0 | flb_upstream_destroy(ctx->u); |
1021 | 0 | } |
1022 | |
|
1023 | 0 | flb_chronicle_conf_destroy(ctx); |
1024 | 0 | return 0; |
1025 | 0 | } |
1026 | | |
1027 | | static struct flb_config_map config_map[] = { |
1028 | | { |
1029 | | FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL, |
1030 | | 0, FLB_TRUE, offsetof(struct flb_chronicle, credentials_file), |
1031 | | "Set the path for the google service credentials file" |
1032 | | }, |
1033 | | // set in flb_chronicle_oauth_credentials |
1034 | | { |
1035 | | FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL, |
1036 | | 0, FLB_FALSE, 0, |
1037 | | "Set the service account email" |
1038 | | }, |
1039 | | // set in flb_chronicle_oauth_credentials |
1040 | | { |
1041 | | FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL, |
1042 | | 0, FLB_FALSE, 0, |
1043 | | "Set the service account secret" |
1044 | | }, |
1045 | | { |
1046 | | FLB_CONFIG_MAP_STR, "project_id", (char *)NULL, |
1047 | | 0, FLB_TRUE, offsetof(struct flb_chronicle, project_id), |
1048 | | "Set the project id" |
1049 | | }, |
1050 | | { |
1051 | | FLB_CONFIG_MAP_STR, "customer_id", (char *)NULL, |
1052 | | 0, FLB_TRUE, offsetof(struct flb_chronicle, customer_id), |
1053 | | "Set the customer id" |
1054 | | }, |
1055 | | { |
1056 | | FLB_CONFIG_MAP_STR, "log_type", (char *)NULL, |
1057 | | 0, FLB_TRUE, offsetof(struct flb_chronicle, log_type), |
1058 | | "Set the log type" |
1059 | | }, |
1060 | | { |
1061 | | FLB_CONFIG_MAP_STR, "region", (char *)NULL, |
1062 | | 0, FLB_TRUE, offsetof(struct flb_chronicle, region), |
1063 | | "Set the region" |
1064 | | }, |
1065 | | { |
1066 | | FLB_CONFIG_MAP_STR, "log_key", NULL, |
1067 | | 0, FLB_TRUE, offsetof(struct flb_chronicle, log_key), |
1068 | | "Set the log key" |
1069 | | }, |
1070 | | /* EOF */ |
1071 | | {0} |
1072 | | }; |
1073 | | |
1074 | | struct flb_output_plugin out_chronicle_plugin = { |
1075 | | .name = "chronicle", |
1076 | | .description = "Send logs to Google Chronicle as unstructured log", |
1077 | | .cb_init = cb_chronicle_init, |
1078 | | .cb_flush = cb_chronicle_flush, |
1079 | | .cb_exit = cb_chronicle_exit, |
1080 | | .config_map = config_map, |
1081 | | /* Plugin flags */ |
1082 | | .flags = FLB_OUTPUT_NET | FLB_IO_TLS, |
1083 | | }; |