Coverage Report

Created: 2025-01-28 07:34

/src/fluent-bit/plugins/out_chronicle/chronicle.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_http_client.h>
22
#include <fluent-bit/flb_pack.h>
23
#include <fluent-bit/flb_utils.h>
24
#include <fluent-bit/flb_time.h>
25
#include <fluent-bit/flb_oauth2.h>
26
#include <fluent-bit/flb_base64.h>
27
#include <fluent-bit/flb_hash.h>
28
#include <fluent-bit/flb_crypto.h>
29
#include <fluent-bit/flb_signv4.h>
30
#include <fluent-bit/flb_kv.h>
31
#include <fluent-bit/flb_log_event_encoder.h>
32
#include <fluent-bit/flb_log_event_decoder.h>
33
34
#include <msgpack.h>
35
36
#include "chronicle.h"
37
#include "chronicle_conf.h"
38
39
// TODO: The following code is copied from the Stackdriver plugin and should be
40
//       factored into common library functions.
41
42
/*
43
 * Base64 Encoding in JWT must:
44
 *
45
 * - remove any trailing padding '=' character
46
 * - replace '+' with '-'
47
 * - replace '/' with '_'
48
 *
49
 * ref: https://www.rfc-editor.org/rfc/rfc7515.txt Appendix C
50
 */
51
int chronicle_jwt_base64_url_encode(unsigned char *out_buf, size_t out_size,
52
                          unsigned char *in_buf, size_t in_size,
53
                          size_t *olen)
54
55
0
{
56
0
    int i;
57
0
    size_t len;
58
0
    int    result;
59
60
    /* do normal base64 encoding */
61
0
    result = flb_base64_encode((unsigned char *) out_buf, out_size - 1,
62
0
                               &len, in_buf, in_size);
63
0
    if (result != 0) {
64
0
        return -1;
65
0
    }
66
67
    /* Replace '+' and '/' characters */
68
0
    for (i = 0; i < len && out_buf[i] != '='; i++) {
69
0
        if (out_buf[i] == '+') {
70
0
            out_buf[i] = '-';
71
0
        }
72
0
        else if (out_buf[i] == '/') {
73
0
            out_buf[i] = '_';
74
0
        }
75
0
    }
76
77
    /* Now 'i' becomes the new length */
78
0
    *olen = i;
79
0
    return 0;
80
0
}
81
82
static int chronicle_jwt_encode(struct flb_chronicle *ctx,
83
                               char *payload, char *secret,
84
                               char **out_signature, size_t *out_size)
85
0
{
86
0
    int ret;
87
0
    int len;
88
0
    int buf_size;
89
0
    size_t olen;
90
0
    char *buf;
91
0
    char *sigd;
92
0
    char *headers = "{\"alg\": \"RS256\", \"typ\": \"JWT\"}";
93
0
    unsigned char sha256_buf[32] = {0};
94
0
    flb_sds_t out;
95
0
    unsigned char sig[256] = {0};
96
0
    size_t sig_len;
97
98
0
    buf_size = (strlen(payload) + strlen(secret)) * 2;
99
0
    buf = flb_malloc(buf_size);
100
0
    if (!buf) {
101
0
        flb_errno();
102
0
        return -1;
103
0
    }
104
105
    /* Encode header */
106
0
    len = strlen(headers);
107
0
    ret = flb_base64_encode((unsigned char *) buf, buf_size - 1,
108
0
                            &olen, (unsigned char *) headers, len);
109
0
    if (ret != 0) {
110
0
        flb_free(buf);
111
112
0
        return ret;
113
0
    }
114
115
    /* Create buffer to store JWT */
116
0
    out = flb_sds_create_size(2048);
117
0
    if (!out) {
118
0
        flb_errno();
119
0
        flb_free(buf);
120
0
        return -1;
121
0
    }
122
123
    /* Append header */
124
0
    flb_sds_cat_safe(&out, buf, olen);
125
0
    flb_sds_cat_safe(&out, ".", 1);
126
127
    /* Encode Payload */
128
0
    len = strlen(payload);
129
0
    chronicle_jwt_base64_url_encode((unsigned char *) buf, buf_size,
130
0
                                    (unsigned char *) payload, len, &olen);
131
132
    /* Append Payload */
133
0
    flb_sds_cat_safe(&out, buf, olen);
134
135
    /* do sha256() of base64(header).base64(payload) */
136
0
    ret = flb_hash_simple(FLB_HASH_SHA256,
137
0
                          (unsigned char *) out, flb_sds_len(out),
138
0
                          sha256_buf, sizeof(sha256_buf));
139
140
0
    if (ret != FLB_CRYPTO_SUCCESS) {
141
0
        flb_plg_error(ctx->ins, "error hashing token");
142
0
        flb_free(buf);
143
0
        flb_sds_destroy(out);
144
0
        return -1;
145
0
    }
146
147
0
    len = strlen(secret);
148
0
    sig_len = sizeof(sig);
149
150
0
    ret = flb_crypto_sign_simple(FLB_CRYPTO_PRIVATE_KEY,
151
0
                                 FLB_CRYPTO_PADDING_PKCS1,
152
0
                                 FLB_HASH_SHA256,
153
0
                                 (unsigned char *) secret, len,
154
0
                                 sha256_buf, sizeof(sha256_buf),
155
0
                                 sig, &sig_len);
156
157
0
    if (ret != FLB_CRYPTO_SUCCESS) {
158
0
        flb_plg_error(ctx->ins, "error creating RSA context");
159
0
        flb_free(buf);
160
0
        flb_sds_destroy(out);
161
0
        return -1;
162
0
    }
163
164
0
    sigd = flb_malloc(2048);
165
0
    if (!sigd) {
166
0
        flb_errno();
167
0
        flb_free(buf);
168
0
        flb_sds_destroy(out);
169
0
        return -1;
170
0
    }
171
172
0
    chronicle_jwt_base64_url_encode((unsigned char *) sigd, 2048, sig, 256, &olen);
173
174
0
    flb_sds_cat_safe(&out, ".", 1);
175
0
    flb_sds_cat_safe(&out, sigd, olen);
176
177
0
    *out_signature = out;
178
0
    *out_size = flb_sds_len(out);
179
180
0
    flb_free(buf);
181
0
    flb_free(sigd);
182
183
0
    return 0;
184
0
}
185
186
/* Create a new oauth2 context and get a oauth2 token */
187
static int chronicle_get_oauth2_token(struct flb_chronicle *ctx)
188
0
{
189
0
    int ret;
190
0
    char *token;
191
0
    char *sig_data;
192
0
    size_t sig_size;
193
0
    time_t issued;
194
0
    time_t expires;
195
0
    char payload[1024];
196
197
    /* Clear any previous oauth2 payload content */
198
0
    flb_oauth2_payload_clear(ctx->o);
199
200
    /* JWT encode for oauth2 */
201
0
    issued = time(NULL);
202
0
    expires = issued + FLB_CHRONICLE_TOKEN_REFRESH;
203
204
0
    snprintf(payload, sizeof(payload) - 1,
205
0
             "{\"iss\": \"%s\", \"scope\": \"%s\", "
206
0
             "\"aud\": \"%s\", \"exp\": %lu, \"iat\": %lu}",
207
0
             ctx->oauth_credentials->client_email, FLB_CHRONICLE_SCOPE,
208
0
             FLB_CHRONICLE_AUTH_URL,
209
0
             expires, issued);
210
211
    /* Compose JWT signature */
212
0
    ret = chronicle_jwt_encode(ctx, payload, ctx->oauth_credentials->private_key,
213
0
                              &sig_data, &sig_size);
214
0
    if (ret != 0) {
215
0
        flb_plg_error(ctx->ins, "JWT signature generation failed");
216
0
        return -1;
217
0
    }
218
219
0
    flb_plg_debug(ctx->ins, "JWT signature:\n%s", sig_data);
220
221
0
    ret = flb_oauth2_payload_append(ctx->o,
222
0
                                    "grant_type", -1,
223
0
                                    "urn%3Aietf%3Aparams%3Aoauth%3A"
224
0
                                    "grant-type%3Ajwt-bearer", -1);
225
0
    if (ret == -1) {
226
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
227
0
        flb_sds_destroy(sig_data);
228
0
        return -1;
229
0
    }
230
231
0
    ret = flb_oauth2_payload_append(ctx->o,
232
0
                                    "assertion", -1,
233
0
                                    sig_data, sig_size);
234
0
    if (ret == -1) {
235
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
236
0
        flb_sds_destroy(sig_data);
237
0
        return -1;
238
0
    }
239
0
    flb_sds_destroy(sig_data);
240
241
    /* Retrieve access token */
242
0
    token = flb_oauth2_token_get(ctx->o);
243
0
    if (!token) {
244
0
        flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
245
0
        return -1;
246
0
    }
247
248
0
    return 0;
249
0
}
250
251
static flb_sds_t get_google_token(struct flb_chronicle *ctx)
252
0
{
253
0
    int ret = 0;
254
0
    flb_sds_t output = NULL;
255
256
0
    if (pthread_mutex_lock(&ctx->token_mutex)){
257
0
        flb_plg_error(ctx->ins, "error locking mutex");
258
0
        return NULL;
259
0
    }
260
261
0
    if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
262
0
        ret = chronicle_get_oauth2_token(ctx);
263
0
    }
264
265
    /* Copy string to prevent race conditions (get_oauth2 can free the string) */
266
0
    if (ret == 0) {
267
0
        output = flb_sds_create(ctx->o->token_type);
268
0
        flb_sds_printf(&output, " %s", ctx->o->access_token);
269
0
    }
270
271
0
    if (pthread_mutex_unlock(&ctx->token_mutex)){
272
0
        flb_plg_error(ctx->ins, "error unlocking mutex");
273
0
        if (output) {
274
0
            flb_sds_destroy(output);
275
0
        }
276
0
        return NULL;
277
0
    }
278
279
0
    return output;
280
0
}
281
282
static int validate_log_type(struct flb_chronicle *ctx, struct flb_config *config,
283
                             const char *body, size_t len)
284
0
{
285
0
    int ret = -1;
286
0
    int root_type;
287
0
    char *msgpack_buf = NULL;
288
0
    size_t msgpack_size;
289
0
    size_t off = 0;
290
0
    msgpack_unpacked result;
291
0
    int i, j, k;
292
0
    msgpack_object key;
293
0
    msgpack_object val;
294
0
    msgpack_object root;
295
0
    msgpack_object *array;
296
0
    msgpack_object *supported_type;
297
0
    int root_map_size;
298
0
    int array_size = 0;
299
300
301
0
    ret = flb_pack_json(body, len,
302
0
                        &msgpack_buf, &msgpack_size,
303
0
                        &root_type, NULL);
304
305
0
    if (ret != 0 || root_type != JSMN_OBJECT) {
306
0
        flb_plg_error(ctx->ins, "json to msgpack conversion error");
307
0
    }
308
309
0
    ret = -1;
310
0
    msgpack_unpacked_init(&result);
311
0
    while (msgpack_unpack_next(&result, msgpack_buf, msgpack_size, &off) == MSGPACK_UNPACK_SUCCESS) {
312
0
        if (result.data.type != MSGPACK_OBJECT_MAP) {
313
0
            flb_plg_error(ctx->ins, "Invalid log_type payload");
314
0
            ret = -2;
315
316
0
            goto cleanup;
317
0
        }
318
319
0
        root = result.data;
320
0
        root_map_size = root.via.map.size;
321
322
0
        for (i = 0; i < root_map_size; i++) {
323
0
            key = root.via.map.ptr[i].key;
324
0
            val = root.via.map.ptr[i].val;
325
326
0
            if (val.type != MSGPACK_OBJECT_ARRAY) {
327
0
                flb_plg_error(ctx->ins, "Invalid inner array type of log_type payload");
328
0
                ret = -2;
329
330
0
                goto cleanup;
331
0
            }
332
333
0
            array = val.via.array.ptr;
334
0
            array_size = val.via.array.size;
335
336
0
            for (j = 0; j < array_size; j++) {
337
0
                supported_type = &array[j];
338
339
0
                if (supported_type->type != MSGPACK_OBJECT_MAP) {
340
0
                    flb_plg_error(ctx->ins, "Invalid inner maps of log_type payload");
341
0
                    ret = -2;
342
343
0
                    continue;
344
0
                }
345
346
0
                for (k = 0; k < supported_type->via.map.size; k++) {
347
0
                    key = supported_type->via.map.ptr[k].key;
348
0
                    val = supported_type->via.map.ptr[k].val;
349
350
0
                    if (strncmp("logType", key.via.str.ptr, key.via.str.size) == 0) {
351
0
                        if (strncmp(ctx->log_type, val.via.bin.ptr, val.via.str.size) == 0) {
352
0
                            ret = 0;
353
0
                            goto cleanup;
354
0
                        }
355
0
                    }
356
0
                }
357
0
            }
358
0
        }
359
0
    }
360
361
0
cleanup:
362
0
    msgpack_unpacked_destroy(&result);
363
364
    /* release 'out_buf' if it was allocated */
365
0
    if (msgpack_buf) {
366
0
        flb_free(msgpack_buf);
367
0
    }
368
369
0
    return ret;
370
0
}
371
372
static int check_chronicle_log_type(struct flb_chronicle *ctx, struct flb_config *config)
373
0
{
374
0
    int ret;
375
0
    size_t b_sent;
376
0
    flb_sds_t token;
377
0
    struct flb_connection *u_conn;
378
0
    struct flb_http_client *c;
379
380
    /* Get upstream connection */
381
0
    u_conn = flb_upstream_conn_get(ctx->u);
382
0
    if (!u_conn) {
383
0
        return -1;
384
0
    }
385
386
    /* Get or renew Token */
387
0
    token = get_google_token(ctx);
388
389
0
    if (!token) {
390
0
        flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
391
0
        flb_upstream_conn_release(u_conn);
392
0
        return -1;
393
0
    }
394
395
    /* Compose HTTP Client request */
396
0
    c = flb_http_client(u_conn, FLB_HTTP_GET, FLB_CHRONICLE_LOG_TYPE_ENDPOINT,
397
0
                        NULL, 0, NULL, 0, NULL, 0);
398
0
    if (!c) {
399
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
400
0
        flb_upstream_conn_release(u_conn);
401
0
        flb_sds_destroy(token);
402
403
0
        return -1;
404
0
    }
405
406
    /* Chronicle supported types are growing. Not to specify the read limit. */
407
0
    flb_http_buffer_size(c, 0);
408
0
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
409
0
    flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
410
411
    /* Compose and append Authorization header */
412
0
    flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
413
414
    /* Send HTTP request */
415
0
    ret = flb_http_do(c, &b_sent);
416
417
    /* validate response */
418
0
    if (ret != 0) {
419
0
        flb_plg_warn(ctx->ins, "http_do=%i", ret);
420
0
        goto cleanup;
421
0
    }
422
0
    else {
423
        /* The request was issued successfully, validate the 'error' field */
424
0
        flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
425
0
        if (c->resp.status == 200) {
426
0
            ret = validate_log_type(ctx, config, c->resp.payload, c->resp.payload_size);
427
0
            if (ret != 0) {
428
0
                flb_plg_error(ctx->ins, "Validate log_type is failed");
429
0
                goto cleanup;
430
0
            }
431
0
        }
432
0
        else {
433
0
            if (c->resp.payload && c->resp.payload_size > 0) {
434
                /* we got an error */
435
0
                flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload);
436
0
            }
437
438
0
            goto cleanup;
439
0
        }
440
0
    }
441
442
0
cleanup:
443
444
    /* Cleanup */
445
0
    flb_sds_destroy(token);
446
0
    flb_http_client_destroy(c);
447
0
    flb_upstream_conn_release(u_conn);
448
449
0
    return ret;
450
0
}
451
452
static int cb_chronicle_init(struct flb_output_instance *ins,
453
                             struct flb_config *config, void *data)
454
0
{
455
0
    char *token;
456
0
    int io_flags = FLB_IO_TLS;
457
0
    struct flb_chronicle *ctx;
458
0
    int ret;
459
460
    /* Create config context */
461
0
    ctx = flb_chronicle_conf_create(ins, config);
462
0
    if (!ctx) {
463
0
        flb_plg_error(ins, "configuration failed");
464
0
        return -1;
465
0
    }
466
467
0
    flb_output_set_context(ins, ctx);
468
469
    /* Network mode IPv6 */
470
0
    if (ins->host.ipv6 == FLB_TRUE) {
471
0
        io_flags |= FLB_IO_IPV6;
472
0
    }
473
474
    /* Create mutex for acquiring oauth tokens (they are shared across flush coroutines) */
475
0
    pthread_mutex_init(&ctx->token_mutex, NULL);
476
477
    /*
478
     * Create upstream context for Chronicle Streaming Inserts
479
     * (no oauth2 service)
480
     */
481
0
    ctx->u = flb_upstream_create_url(config, ctx->uri,
482
0
                                     io_flags, ins->tls);
483
0
    if (!ctx->u) {
484
0
        flb_plg_error(ctx->ins, "upstream creation failed");
485
0
        return -1;
486
0
    }
487
488
    /* Create oauth2 context */
489
0
    ctx->o = flb_oauth2_create(ctx->config, FLB_CHRONICLE_AUTH_URL, 3000);
490
0
    if (!ctx->o) {
491
0
        flb_plg_error(ctx->ins, "cannot create oauth2 context");
492
0
        return -1;
493
0
    }
494
0
    flb_output_upstream_set(ctx->u, ins);
495
496
    /* Get or renew the OAuth2 token */
497
0
    token = get_google_token(ctx);
498
499
0
    if (!token) {
500
0
        flb_plg_warn(ctx->ins, "token retrieval failed");
501
0
    }
502
0
    else {
503
0
        flb_sds_destroy(token);
504
0
    }
505
506
0
    ret = check_chronicle_log_type(ctx, config);
507
0
    if (ret != 0) {
508
0
        flb_plg_error(ctx->ins, "Validate log_type failed. '%s' is not supported. ret = %d",
509
0
                      ctx->log_type, ret);
510
0
        return -1;
511
0
    }
512
513
0
    return 0;
514
0
}
515
516
static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, uint64_t bytes, struct flb_log_event log_event)
517
0
{
518
0
    int i;
519
0
    int map_size;
520
0
    int check = FLB_FALSE;
521
0
    int log_key_missing = 0;
522
0
    int ret;
523
0
    struct flb_chronicle *ctx = out_context;
524
0
    char *val_buf;
525
0
    char *key_str = NULL;
526
0
    size_t key_str_size = 0;
527
0
    size_t msgpack_size = bytes + bytes / 4;
528
0
    size_t val_offset = 0;
529
0
    flb_sds_t out_buf;
530
0
    msgpack_object map;
531
0
    msgpack_object key;
532
0
    msgpack_object val;
533
534
    /* Allocate buffer to store log_key contents */
535
0
    val_buf = flb_calloc(1, msgpack_size);
536
0
    if (val_buf == NULL) {
537
0
        flb_plg_error(ctx->ins, "Could not allocate enough "
538
0
                      "memory to read record");
539
0
        flb_errno();
540
0
        return NULL;
541
0
    }
542
543
    /* Get the record/map */
544
0
    map = *log_event.body;
545
546
0
    if (map.type != MSGPACK_OBJECT_MAP) {
547
0
        return NULL;
548
0
    }
549
550
0
    map_size = map.via.map.size;
551
552
    /* Extract log_key from record and append to output buffer */
553
0
    for (i = 0; i < map_size; i++) {
554
0
        key = map.via.map.ptr[i].key;
555
0
        val = map.via.map.ptr[i].val;
556
557
0
        if (key.type == MSGPACK_OBJECT_BIN) {
558
0
            key_str  = (char *) key.via.bin.ptr;
559
0
            key_str_size = key.via.bin.size;
560
0
            check = FLB_TRUE;
561
0
        }
562
0
        if (key.type == MSGPACK_OBJECT_STR) {
563
0
            key_str  = (char *) key.via.str.ptr;
564
0
            key_str_size = key.via.str.size;
565
0
            check = FLB_TRUE;
566
0
        }
567
568
0
        if (check == FLB_TRUE) {
569
0
            if (strncmp(ctx->log_key, key_str, key_str_size) == 0) {
570
571
                /*
572
                 * Copy contents of value into buffer. Necessary to copy
573
                 * strings because flb_msgpack_to_json does not handle nested
574
                 * JSON gracefully and double escapes them.
575
                 */
576
0
                if (val.type == MSGPACK_OBJECT_BIN) {
577
0
                    memcpy(val_buf + val_offset, val.via.bin.ptr, val.via.bin.size);
578
0
                    val_offset += val.via.bin.size;
579
0
                    val_buf[val_offset] = '\0';
580
0
                    val_offset++;
581
0
                }
582
0
                else if (val.type == MSGPACK_OBJECT_STR) {
583
0
                    memcpy(val_buf + val_offset, val.via.str.ptr, val.via.str.size);
584
0
                    val_offset += val.via.str.size;
585
0
                    val_buf[val_offset] = '\0';
586
0
                    val_offset++;
587
0
                }
588
0
                else {
589
0
                    ret = flb_msgpack_to_json(val_buf + val_offset,
590
0
                                              msgpack_size - val_offset, &val);
591
0
                    if (ret < 0) {
592
0
                        break;
593
0
                    }
594
0
                    val_offset += ret;
595
0
                    val_buf[val_offset] = '\0';
596
0
                    val_offset++;
597
0
                }
598
                /* Exit early once log_key has been found for current record */
599
0
                break;
600
0
            }
601
0
        }
602
603
        /* If log_key was not found in the current record, mark log key as missing */
604
0
        log_key_missing++;
605
0
    }
606
607
0
    if (log_key_missing > 0) {
608
0
        flb_plg_error(ctx->ins, "Could not find log_key '%s' in %d records",
609
0
                      ctx->log_key, log_key_missing);
610
0
    }
611
612
    /* If nothing was read, destroy buffer */
613
0
    if (val_offset == 0) {
614
0
        flb_free(val_buf);
615
0
        return NULL;
616
0
    }
617
0
    val_buf[val_offset] = '\0';
618
619
    /* Create output buffer to store contents */
620
0
    out_buf = flb_sds_create(val_buf);
621
0
    if (out_buf == NULL) {
622
0
        flb_plg_error(ctx->ins, "Error creating buffer to store log_key contents.");
623
0
        flb_errno();
624
0
    }
625
0
    flb_free(val_buf);
626
627
0
    return out_buf;
628
0
}
629
630
static int count_mp_with_threshold(size_t last_offset, size_t threshold,
631
                                   struct flb_log_event_decoder *log_decoder,
632
                                   struct flb_chronicle *ctx)
633
0
{
634
0
    int ret;
635
0
    int array_size = 0;
636
0
    size_t off = 0;
637
0
    struct flb_log_event log_event;
638
639
    /* Adjust decoder offset */
640
0
    if (last_offset != 0) {
641
0
        log_decoder->offset = last_offset;
642
0
    }
643
644
0
    while ((ret = flb_log_event_decoder_next(
645
0
                    log_decoder,
646
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
647
0
        off = log_decoder->offset;
648
0
        array_size++;
649
650
0
        if (off >= (threshold + last_offset)) {
651
0
            flb_plg_debug(ctx->ins,
652
0
                          "the offset %zu is exceeded the threshold %zu. "
653
0
                          "Splitting the payload over the threshold so the processed array size is %d",
654
0
                          off, threshold, array_size);
655
656
0
            break;
657
0
        }
658
0
    }
659
660
0
    return array_size;
661
0
}
662
663
static int chronicle_format(const void *data, size_t bytes,
664
                            const char *tag, size_t tag_len,
665
                            char **out_data, size_t *out_size,
666
                            size_t last_offset,
667
                            size_t threshold, size_t *out_offset,
668
                            struct flb_log_event_decoder *log_decoder,
669
                            struct flb_chronicle *ctx)
670
0
{
671
0
    int len;
672
0
    int ret;
673
0
    int array_size = 0;
674
0
    size_t off = 0;
675
0
    size_t last_off = 0;
676
0
    size_t alloc_size = 0;
677
0
    size_t s;
678
0
    char time_formatted[255];
679
    /* Parameters for Timestamp */
680
0
    struct tm tm;
681
0
    flb_sds_t out_buf;
682
0
    struct flb_log_event log_event;
683
0
    msgpack_sbuffer mp_sbuf;
684
0
    msgpack_packer mp_pck;
685
0
    flb_sds_t log_text = NULL;
686
0
    int log_text_size;
687
688
0
    array_size = count_mp_with_threshold(last_offset, threshold, log_decoder, ctx);
689
690
    /* Reset the decoder state */
691
0
    flb_log_event_decoder_reset(log_decoder, (char *) data, bytes);
692
693
    /* Create temporary msgpack buffer */
694
0
    msgpack_sbuffer_init(&mp_sbuf);
695
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
696
697
    /*
698
     * Pack root map (unstructured log):
699
     * see: https://cloud.google.com/chronicle/docs/reference/ingestion-api#request_body_2
700
     * {
701
     *   "customer_id": "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c",
702
     *   "log_type": "BIND_DNS",
703
     *   "entries": [
704
     *     {
705
     *       "log_text": "26-Feb-2019 13:35:02.187 client 10.120.20.32#4238: query: altostrat.com IN A + (203.0.113.102)",
706
     *       "ts_epoch_microseconds": 1551188102187000
707
     *     },
708
     *     {
709
     *       "log_text": "26-Feb-2019 13:37:04.523 client 10.50.100.33#1116: query: examplepetstore.com IN A + (203.0.113.102)",
710
     *       "ts_rfc3339": "2019-26-02T13:37:04.523-08:00"
711
     *     },
712
     *     {
713
     *       "log_text": "26-Feb-2019 13:39:01.115 client 10.1.2.3#3333: query: www.example.com IN A + (203.0.113.102)"
714
     *     },
715
     *   ]
716
     * }
717
     */
718
0
    msgpack_pack_map(&mp_pck, 3);
719
720
0
    msgpack_pack_str(&mp_pck, 11);
721
0
    msgpack_pack_str_body(&mp_pck, "customer_id", 11);
722
723
0
    msgpack_pack_str(&mp_pck, strlen(ctx->customer_id));
724
0
    msgpack_pack_str_body(&mp_pck, ctx->customer_id, strlen(ctx->customer_id));
725
726
0
    msgpack_pack_str(&mp_pck, 8);
727
0
    msgpack_pack_str_body(&mp_pck, "log_type", 8);
728
729
0
    msgpack_pack_str(&mp_pck, strlen(ctx->log_type));
730
0
    msgpack_pack_str_body(&mp_pck, ctx->log_type, strlen(ctx->log_type));
731
732
0
    msgpack_pack_str(&mp_pck, 7);
733
0
    msgpack_pack_str_body(&mp_pck, "entries", 7);
734
735
    /* Append entries */
736
0
    msgpack_pack_array(&mp_pck, array_size);
737
738
0
    flb_plg_trace(ctx->ins, "last offset is %zu", last_offset);
739
    /* Adjust decoder offset */
740
0
    if (last_offset != 0) {
741
0
        log_decoder->offset = last_offset;
742
0
    }
743
744
0
    while ((ret = flb_log_event_decoder_next(
745
0
                    log_decoder,
746
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
747
0
        off = log_decoder->offset;
748
0
        alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */
749
0
        last_off = off;
750
751
        /*
752
         * Pack entries
753
         *
754
         * {
755
         *  "log_text": {...},
756
         *  "ts_rfc3339": "..."
757
         * }
758
         *
759
         */
760
0
        msgpack_pack_map(&mp_pck, 2);
761
762
        /* log_text */
763
0
        msgpack_pack_str(&mp_pck, 8);
764
0
        msgpack_pack_str_body(&mp_pck, "log_text", 8);
765
0
        if (ctx->log_key != NULL) {
766
0
            log_text = flb_pack_msgpack_extract_log_key(ctx, bytes, log_event);
767
0
            log_text_size = flb_sds_len(log_text);
768
0
        }
769
0
        else {
770
0
            log_text = flb_msgpack_to_json_str(alloc_size, log_event.body);
771
0
            log_text_size = strlen(log_text);
772
0
        }
773
774
0
        if (log_text == NULL) {
775
0
            flb_plg_error(ctx->ins, "Could not marshal msgpack to output string");
776
0
            return -1;
777
0
        }
778
0
        msgpack_pack_str(&mp_pck, log_text_size);
779
0
        msgpack_pack_str_body(&mp_pck, log_text, log_text_size);
780
781
0
        if (ctx->log_key != NULL) {
782
0
            flb_sds_destroy(log_text);
783
0
        }
784
0
        else {
785
0
            flb_free(log_text);
786
0
        }
787
        /* timestamp */
788
0
        msgpack_pack_str(&mp_pck, 10);
789
0
        msgpack_pack_str_body(&mp_pck, "ts_rfc3339", 10);
790
791
0
        gmtime_r(&log_event.timestamp.tm.tv_sec, &tm);
792
0
        s = strftime(time_formatted, sizeof(time_formatted) - 1,
793
0
                        FLB_STD_TIME_FMT, &tm);
794
0
        len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
795
0
                       ".%03" PRIu64 "Z",
796
0
                       (uint64_t) log_event.timestamp.tm.tv_nsec);
797
0
        s += len;
798
799
0
        msgpack_pack_str(&mp_pck, s);
800
0
        msgpack_pack_str_body(&mp_pck, time_formatted, s);
801
802
0
        if (off >= (threshold + last_offset)) {
803
0
            flb_plg_debug(ctx->ins,
804
0
                          "the offset %zu is exceeded the threshold %zu. "
805
0
                          "Splitting the payload over the threshold so the processed array size has %d.",
806
0
                          off, threshold, array_size);
807
808
0
            break;
809
0
        }
810
0
    }
811
812
    /* Convert from msgpack to JSON */
813
0
    out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
814
0
    msgpack_sbuffer_destroy(&mp_sbuf);
815
816
0
    if (!out_buf) {
817
0
        flb_plg_error(ctx->ins, "error formatting JSON payload");
818
0
        return -1;
819
0
    }
820
821
0
    *out_offset = last_off;
822
0
    *out_data = out_buf;
823
0
    *out_size = flb_sds_len(out_buf);
824
825
0
    return 0;
826
0
}
827
828
static void cb_chronicle_flush(struct flb_event_chunk *event_chunk,
829
                              struct flb_output_flush *out_flush,
830
                              struct flb_input_instance *i_ins,
831
                              void *out_context,
832
                              struct flb_config *config)
833
0
{
834
0
    (void) i_ins;
835
0
    (void) config;
836
0
    int ret;
837
0
    int ret_code = FLB_RETRY;
838
0
    size_t b_sent;
839
0
    flb_sds_t token;
840
0
    flb_sds_t payload_buf;
841
0
    size_t payload_size;
842
0
    struct flb_chronicle *ctx = out_context;
843
0
    struct flb_connection *u_conn;
844
0
    struct flb_http_client *c;
845
0
    struct flb_log_event_decoder log_decoder;
846
0
    size_t threshold = 0.8 * 1024 * 1024;
847
0
    size_t offset = 0;
848
0
    size_t out_offset = 0;
849
0
    int need_loop = FLB_TRUE;
850
0
    const int retry_limit = 8;
851
0
    int retries = 0;
852
0
    const size_t one_mebibyte = 1024 * 1024;
853
854
0
    flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);
855
856
    /* Get upstream connection */
857
0
    u_conn = flb_upstream_conn_get(ctx->u);
858
0
    if (!u_conn) {
859
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
860
0
    }
861
862
    /* Get or renew Token */
863
0
    token = get_google_token(ctx);
864
865
0
    if (!token) {
866
0
        flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
867
0
        flb_upstream_conn_release(u_conn);
868
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
869
0
    }
870
871
0
    flb_plg_trace(ctx->ins, "msgpack payload size is %zu", event_chunk->size);
872
873
    /* Prepare log decoder */
874
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size);
875
876
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
877
0
        flb_plg_error(ctx->ins,
878
0
                      "Log event decoder initialization error : %d", ret);
879
880
        /* Cleanup token and conn */
881
0
        flb_sds_destroy(token);
882
0
        flb_upstream_conn_release(u_conn);
883
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
884
0
    }
885
886
0
    while (need_loop) {
887
0
    retry:
888
0
        if (retries > 0) {
889
            /* (retry_limit - retries)/10.0 is a factor to reduce the
890
             * formatting payloads.
891
             * For the first attempt, it will get:
892
             * (8 - 1) / 10.0 = 0.7
893
             * For the second attempt, it will get:
894
             * (8 - 2) / 10.0 = 0.6
895
             * ...
896
             * For 7th attempt, it will get:
897
             * (8 - 7) / 10.0 = 0.1
898
             * For 8th attempt, it won't happen. Just give up for
899
             * formating though. :)
900
             */
901
0
            threshold = (retry_limit - retries)/10.0 * one_mebibyte;
902
0
        }
903
904
        /* Reformat msgpack to chronicle JSON payload */
905
0
        ret = chronicle_format(event_chunk->data, event_chunk->size,
906
0
                               event_chunk->tag, flb_sds_len(event_chunk->tag),
907
0
                               &payload_buf, &payload_size,
908
0
                               offset, threshold, &out_offset,
909
0
                               &log_decoder, ctx);
910
0
        if (ret != 0) {
911
0
            flb_upstream_conn_release(u_conn);
912
0
            flb_sds_destroy(token);
913
0
            flb_sds_destroy(payload_buf);
914
0
            flb_log_event_decoder_destroy(&log_decoder);
915
916
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
917
0
        }
918
919
0
        flb_plg_debug(ctx->ins, "the last offset of msgpack decoder is %zu", out_offset);
920
921
0
        if (payload_size >= one_mebibyte) {
922
0
            retries++;
923
0
            if (retries >= retry_limit) {
924
0
                flb_plg_error(ctx->ins, "Retry limit is exeeced for chronicle_format");
925
926
0
                flb_upstream_conn_release(u_conn);
927
0
                flb_sds_destroy(token);
928
0
                flb_sds_destroy(payload_buf);
929
0
                flb_log_event_decoder_destroy(&log_decoder);
930
931
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
932
0
            }
933
934
0
            flb_plg_debug(ctx->ins,
935
0
                          "HTTP request body is exeeded to %zd bytes. actual: %zu. left attempt(s): %d",
936
0
                          one_mebibyte, payload_size, retry_limit - retries);
937
0
            flb_sds_destroy(payload_buf);
938
939
0
            goto retry;
940
0
        }
941
0
        else {
942
0
            retries = 0;
943
0
        }
944
945
        /* Compose HTTP Client request */
946
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->endpoint,
947
0
                            payload_buf, payload_size, NULL, 0, NULL, 0);
948
0
        if (!c) {
949
0
            flb_plg_error(ctx->ins, "cannot create HTTP client context");
950
0
            flb_upstream_conn_release(u_conn);
951
0
            flb_sds_destroy(token);
952
0
            flb_sds_destroy(payload_buf);
953
0
            flb_log_event_decoder_destroy(&log_decoder);
954
955
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
956
0
        }
957
958
0
        flb_http_buffer_size(c, 4192);
959
0
        flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
960
0
        flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
961
962
        /* Compose and append Authorization header */
963
0
        flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
964
965
        /* Send HTTP request */
966
0
        ret = flb_http_do(c, &b_sent);
967
968
        /* validate response */
969
0
        if (ret != 0) {
970
0
            flb_plg_warn(ctx->ins, "http_do=%i", ret);
971
0
            ret_code = FLB_RETRY;
972
0
        }
973
0
        else {
974
            /* The request was issued successfully, validate the 'error' field */
975
0
            flb_plg_debug(ctx->ins, "HTTP Status=%i", c->resp.status);
976
0
            if (c->resp.status == 200) {
977
0
                ret_code = FLB_OK;
978
0
            }
979
0
            else {
980
0
                if (c->resp.payload && c->resp.payload_size > 0) {
981
                    /* we got an error */
982
0
                    flb_plg_warn(ctx->ins, "response\n%s", c->resp.payload);
983
0
                }
984
0
                ret_code = FLB_RETRY;
985
0
            }
986
0
        }
987
988
        /* Validate all chunks are processed or not */
989
0
        if (out_offset >= event_chunk->size) {
990
0
            need_loop = FLB_FALSE;
991
0
        }
992
        /* Clean up HTTP client stuffs */
993
0
        flb_sds_destroy(payload_buf);
994
0
        flb_http_client_destroy(c);
995
996
        /* The next loop uses the returned offset */
997
0
        offset = out_offset;
998
0
    }
999
1000
    /* Cleanup decoder */
1001
0
    flb_log_event_decoder_destroy(&log_decoder);
1002
1003
    /* Cleanup token and conn */
1004
0
    flb_sds_destroy(token);
1005
0
    flb_upstream_conn_release(u_conn);
1006
1007
    /* Done */
1008
0
    FLB_OUTPUT_RETURN(ret_code);
1009
0
}
1010
1011
static int cb_chronicle_exit(void *data, struct flb_config *config)
1012
0
{
1013
0
    struct flb_chronicle *ctx = data;
1014
1015
0
    if (!ctx) {
1016
0
        return -1;
1017
0
    }
1018
1019
0
    if (ctx->u) {
1020
0
        flb_upstream_destroy(ctx->u);
1021
0
    }
1022
1023
0
    flb_chronicle_conf_destroy(ctx);
1024
0
    return 0;
1025
0
}
1026
1027
static struct flb_config_map config_map[] = {
1028
    {
1029
      FLB_CONFIG_MAP_STR, "google_service_credentials", (char *)NULL,
1030
      0, FLB_TRUE, offsetof(struct flb_chronicle, credentials_file),
1031
      "Set the path for the google service credentials file"
1032
    },
1033
    // set in flb_chronicle_oauth_credentials
1034
    {
1035
      FLB_CONFIG_MAP_STR, "service_account_email", (char *)NULL,
1036
      0, FLB_FALSE, 0,
1037
      "Set the service account email"
1038
    },
1039
    // set in flb_chronicle_oauth_credentials
1040
    {
1041
      FLB_CONFIG_MAP_STR, "service_account_secret", (char *)NULL,
1042
      0, FLB_FALSE, 0,
1043
      "Set the service account secret"
1044
    },
1045
    {
1046
      FLB_CONFIG_MAP_STR, "project_id", (char *)NULL,
1047
      0, FLB_TRUE, offsetof(struct flb_chronicle, project_id),
1048
      "Set the project id"
1049
    },
1050
    {
1051
      FLB_CONFIG_MAP_STR, "customer_id", (char *)NULL,
1052
      0, FLB_TRUE, offsetof(struct flb_chronicle, customer_id),
1053
      "Set the customer id"
1054
    },
1055
    {
1056
      FLB_CONFIG_MAP_STR, "log_type", (char *)NULL,
1057
      0, FLB_TRUE, offsetof(struct flb_chronicle, log_type),
1058
      "Set the log type"
1059
    },
1060
    {
1061
      FLB_CONFIG_MAP_STR, "region", (char *)NULL,
1062
      0, FLB_TRUE, offsetof(struct flb_chronicle, region),
1063
      "Set the region"
1064
    },
1065
    {
1066
      FLB_CONFIG_MAP_STR, "log_key", NULL,
1067
      0, FLB_TRUE, offsetof(struct flb_chronicle, log_key),
1068
      "Set the log key"
1069
    },
1070
    /* EOF */
1071
    {0}
1072
};
1073
1074
struct flb_output_plugin out_chronicle_plugin = {
1075
    .name         = "chronicle",
1076
    .description  = "Send logs to Google Chronicle as unstructured log",
1077
    .cb_init      = cb_chronicle_init,
1078
    .cb_flush     = cb_chronicle_flush,
1079
    .cb_exit      = cb_chronicle_exit,
1080
    .config_map   = config_map,
1081
    /* Plugin flags */
1082
    .flags          = FLB_OUTPUT_NET | FLB_IO_TLS,
1083
};