Coverage Report

Created: 2024-09-19 07:08

/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
21
#include <sys/types.h>
22
#include <sys/stat.h>
23
24
#include <fluent-bit/flb_input_plugin.h>
25
#include <fluent-bit/flb_network.h>
26
#include <fluent-bit/flb_pack.h>
27
#include <fluent-bit/flb_utils.h>
28
#include <fluent-bit/flb_error.h>
29
#include <fluent-bit/flb_compat.h>
30
#include <fluent-bit/flb_ra_key.h>
31
#include <fluent-bit/flb_time.h>
32
#include <fluent-bit/flb_strptime.h>
33
#include <fluent-bit/flb_parser.h>
34
#include <fluent-bit/flb_log_event_encoder.h>
35
#include <fluent-bit/flb_compat.h>
36
37
#include "kubernetes_events.h"
38
#include "kubernetes_events_conf.h"
39
40
#ifdef FLB_HAVE_SQLDB
41
#include "kubernetes_events_sql.h"
42
static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item);
43
#endif
44
45
0
#define JSON_ARRAY_DELIM "\r\n"
46
47
static int file_to_buffer(const char *path,
48
                          char **out_buf, size_t *out_size)
49
0
{
50
0
    int ret;
51
0
    int len;
52
0
    char *buf;
53
0
    ssize_t bytes;
54
0
    FILE *fp;
55
0
    struct stat st;
56
57
0
    if (!(fp = fopen(path, "r"))) {
58
0
        return -1;
59
0
    }
60
61
0
    ret = stat(path, &st);
62
0
    if (ret == -1) {
63
0
        flb_errno();
64
0
        fclose(fp);
65
0
        return -1;
66
0
    }
67
68
0
    buf = flb_calloc(1, (st.st_size + 1));
69
0
    if (!buf) {
70
0
        flb_errno();
71
0
        fclose(fp);
72
0
        return -1;
73
0
    }
74
75
0
    bytes = fread(buf, st.st_size, 1, fp);
76
0
    if (bytes < 1) {
77
0
        flb_free(buf);
78
0
        fclose(fp);
79
0
        return -1;
80
0
    }
81
82
0
    fclose(fp);
83
84
    /* trim new lines */
85
0
    for (len = st.st_size; len > 0; len--) {
86
0
        if (buf[len-1] != '\n' && buf[len-1] != '\r') {
87
0
            break;
88
0
        }
89
0
    }
90
0
    buf[len] = '\0';
91
92
0
    *out_buf = buf;
93
0
    *out_size = len;
94
95
0
    return 0;
96
0
}
97
98
/* Set K8s Authorization Token and get HTTP Auth Header */
99
static int get_http_auth_header(struct k8s_events *ctx)
100
0
{
101
0
    int ret;
102
0
    char *temp;
103
0
    char *tk = NULL;
104
0
    size_t tk_size = 0;
105
106
0
    if (!ctx->token_file || strlen(ctx->token_file) == 0) {
107
0
        return 0;
108
0
    }
109
110
0
    ret = file_to_buffer(ctx->token_file, &tk, &tk_size);
111
0
    if (ret == -1) {
112
0
        flb_plg_warn(ctx->ins, "cannot open %s", ctx->token_file);
113
0
        return -1;
114
0
    }
115
0
    ctx->token_created = time(NULL);
116
117
    /* Token */
118
0
    if (ctx->token != NULL) {
119
0
        flb_free(ctx->token);
120
0
    }
121
0
    ctx->token = tk;
122
0
    ctx->token_len = tk_size;
123
124
    /* HTTP Auth Header */
125
0
    if (ctx->auth == NULL) {
126
0
        ctx->auth = flb_malloc(tk_size + 32);
127
0
    }
128
0
    else if (ctx->auth_len < tk_size + 32) {
129
0
        temp = flb_realloc(ctx->auth, tk_size + 32);
130
0
        if (temp == NULL) {
131
0
            flb_errno();
132
0
            flb_free(ctx->auth);
133
0
            ctx->auth = NULL;
134
0
            return -1;
135
0
        }
136
0
        ctx->auth = temp;
137
0
    }
138
139
0
    if (!ctx->auth) {
140
0
        return -1;
141
0
    }
142
143
0
    ctx->auth_len = snprintf(ctx->auth, tk_size + 32, "Bearer %s", tk);
144
0
    return 0;
145
0
}
146
147
/* Refresh HTTP Auth Header if K8s Authorization Token is expired */
148
static int refresh_token_if_needed(struct k8s_events *ctx)
149
0
{
150
0
    int expired = FLB_FALSE;
151
0
    int ret;
152
153
0
    if (!ctx->token_file || strlen(ctx->token_file) == 0) {
154
0
        return 0;
155
0
    }
156
157
0
    if (ctx->token_created > 0) {
158
0
        if (time(NULL) > ctx->token_created + ctx->token_ttl) {
159
0
            expired = FLB_TRUE;
160
0
        }
161
0
    }
162
163
0
    if (expired || ctx->token_created == 0) {
164
0
        ret = get_http_auth_header(ctx);
165
0
        if (ret == -1) {
166
0
            return -1;
167
0
        }
168
0
    }
169
170
0
    return 0;
171
0
}
172
173
static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname)
174
0
{
175
0
    int i;
176
0
    msgpack_object *k;
177
0
    msgpack_object *v;
178
179
0
    if (obj->type != MSGPACK_OBJECT_MAP) {
180
0
        return NULL;
181
0
    }
182
183
0
    for (i = 0; i < obj->via.map.size; i++) {
184
0
        k = &obj->via.map.ptr[i].key;
185
0
        if (k->type != MSGPACK_OBJECT_STR) {
186
0
            continue;
187
0
        }
188
189
0
        if (strncmp(k->via.str.ptr, fieldname, strlen(fieldname)) == 0) {
190
0
            v = &obj->via.map.ptr[i].val;
191
0
            return v;
192
0
        }
193
0
    }
194
0
    return NULL;
195
0
}
196
197
static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_sds_t *val)
198
0
{
199
0
    msgpack_object *v;
200
201
0
    v = record_get_field_ptr(obj, fieldname);
202
0
    if (v == NULL) {
203
0
        return 0;
204
0
    }
205
0
    if (v->type != MSGPACK_OBJECT_STR) {
206
0
        return -1;
207
0
    }
208
209
0
    *val = flb_sds_create_len(v->via.str.ptr, v->via.str.size);
210
0
    return 0;
211
0
}
212
213
static int record_get_field_time(msgpack_object *obj, const char *fieldname, struct flb_time *val)
214
0
{
215
0
    msgpack_object *v;
216
0
    struct flb_tm tm = { 0 };
217
218
0
    v = record_get_field_ptr(obj, fieldname);
219
0
    if (v == NULL) {
220
0
        return -1;
221
0
    }
222
0
    if (v->type != MSGPACK_OBJECT_STR) {
223
0
        return -1;
224
0
    }
225
226
0
    if (flb_strptime(v->via.str.ptr, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) {
227
0
        return -2;
228
0
    }
229
230
0
    val->tm.tv_sec = flb_parser_tm2time(&tm, FLB_FALSE);
231
0
    val->tm.tv_nsec = 0;
232
233
0
    return 0;
234
0
}
235
236
static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, uint64_t *val)
237
0
{
238
0
    msgpack_object *v;
239
0
    char *end;
240
241
0
    v = record_get_field_ptr(obj, fieldname);
242
0
    if (v == NULL) {
243
0
        return -1;
244
0
    }
245
246
    /* attempt to parse string as number... */
247
0
    if (v->type == MSGPACK_OBJECT_STR) {
248
0
        *val = strtoul(v->via.str.ptr, &end, 10);
249
0
        if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) {
250
0
            return -1;
251
0
        }
252
0
        return 0;
253
0
    }
254
0
    if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
255
0
        *val = v->via.u64;
256
0
        return 0;
257
0
    }
258
0
    if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
259
0
        *val = (uint64_t)v->via.i64;
260
0
        return 0;
261
0
    }
262
0
    return -1;
263
0
}
264
265
static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time)
266
0
{
267
0
    int ret;
268
0
    msgpack_object *metadata;
269
270
    /* some events can have lastTimestamp and firstTimestamp set to
271
     * NULL while having metadata.creationTimestamp set.
272
     */
273
0
    ret = record_get_field_time(obj, "lastTimestamp", event_time);
274
0
    if (ret != -1) {
275
0
        return FLB_TRUE;
276
0
    }
277
278
0
    ret = record_get_field_time(obj, "firstTimestamp", event_time);
279
0
    if (ret != -1) {
280
0
        return FLB_TRUE;
281
0
    }
282
283
0
    metadata = record_get_field_ptr(obj, "metadata");
284
0
    if (metadata == NULL) {
285
0
        return FLB_FALSE;
286
0
    }
287
288
0
    ret = record_get_field_time(metadata, "creationTimestamp", event_time);
289
0
    if (ret != -1) {
290
0
        return FLB_TRUE;
291
0
    }
292
293
0
    return FLB_FALSE;
294
0
}
295
296
static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj,
297
                                    struct flb_time *event_time)
298
0
{
299
0
    int ret;
300
0
    uint64_t outdated;
301
0
    msgpack_object *metadata;
302
0
    flb_sds_t uid;
303
0
    uint64_t resource_version;
304
305
0
    outdated = cfl_time_now() - (ctx->retention_time * 1000000000L);
306
0
    if (flb_time_to_nanosec(event_time) < outdated) {
307
0
        flb_plg_debug(ctx->ins, "Item is older than retention_time: %" PRIu64 " < %" PRIu64,
308
0
                      flb_time_to_nanosec(event_time), outdated);
309
0
        return FLB_TRUE;
310
0
    }
311
312
0
    metadata = record_get_field_ptr(obj, "metadata");
313
0
    if (metadata == NULL) {
314
0
        flb_plg_error(ctx->ins, "Cannot unpack item metadata in response");
315
0
        return FLB_FALSE;
316
0
    }
317
318
0
    ret = record_get_field_uint64(metadata, "resourceVersion", &resource_version);
319
0
    if (ret == -1) {
320
0
        flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response");
321
0
        return FLB_FALSE;
322
0
    }
323
324
0
    ret = record_get_field_sds(metadata, "uid", &uid);
325
0
    if (ret == -1) {
326
0
        flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response");
327
0
        return FLB_FALSE;
328
0
    }
329
330
331
0
#ifdef FLB_HAVE_SQLDB
332
0
    bool exists;
333
334
335
0
    if (ctx->db) {
336
0
        sqlite3_bind_text(ctx->stmt_get_kubernetes_event_exists_by_uid,
337
0
                           1, uid, -1, NULL);
338
0
        ret = sqlite3_step(ctx->stmt_get_kubernetes_event_exists_by_uid);
339
0
        if (ret != SQLITE_ROW) {
340
0
            if (ret != SQLITE_DONE) {
341
0
                flb_plg_error(ctx->ins, "cannot execute kubernetes event exists");
342
0
            }
343
0
            sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid);
344
0
            sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid);
345
0
            flb_sds_destroy(uid);
346
0
            return FLB_FALSE;
347
0
        }
348
349
0
        exists = sqlite3_column_int64(ctx->stmt_get_kubernetes_event_exists_by_uid, 0);
350
351
0
        flb_plg_debug(ctx->ins, "is_filtered: uid=%s exists=%d", uid, exists);
352
0
        sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid);
353
0
        sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid);
354
0
        flb_sds_destroy(uid);
355
356
0
        return exists > 0 ? FLB_TRUE : FLB_FALSE;
357
0
    }
358
0
#endif
359
360
    /* check if this is an old event. */
361
0
    if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) {
362
0
        flb_plg_debug(ctx->ins, "skipping old object: %llu (< %llu)", resource_version,
363
0
                        ctx->last_resource_version);
364
0
        flb_sds_destroy(uid);
365
0
        return FLB_TRUE;
366
0
    }
367
368
0
    flb_sds_destroy(uid);
369
0
    return FLB_FALSE;
370
0
}
371
372
373
static int process_event_object(struct k8s_events *ctx, flb_sds_t action,
374
                         msgpack_object *item)
375
0
{
376
0
    int ret = -1;
377
0
    struct flb_time ts;
378
0
    uint64_t resource_version;
379
0
    msgpack_object* item_metadata;
380
381
0
    if(strncmp(action, "ADDED", 5) != 0 && strncmp(action, "MODIFIED", 8) != 0 ) {
382
        /* We don't process DELETED nor BOOKMARK */
383
0
        return 0;
384
0
    }
385
386
0
    item_metadata = record_get_field_ptr(item, "metadata");
387
0
    if (item_metadata == NULL) {
388
0
        flb_plg_warn(ctx->ins, "Event without metadata");
389
0
        return -1;
390
0
    }
391
0
    ret = record_get_field_uint64(item_metadata, "resourceVersion", &resource_version);
392
0
    if (ret == -1) {
393
0
        return ret;
394
0
    }
395
396
    /* reset the log encoder */
397
0
    flb_log_event_encoder_reset(ctx->encoder);
398
399
    /* print every item from the items array */
400
0
    if (item->type != MSGPACK_OBJECT_MAP) {
401
0
        flb_plg_error(ctx->ins, "Cannot unpack item in response");
402
0
        return -1;
403
0
    }
404
405
    /* get event timestamp */
406
0
    ret = item_get_timestamp(item, &ts);
407
0
    if (ret == FLB_FALSE) {
408
0
        flb_plg_error(ctx->ins, "cannot retrieve event timestamp");
409
0
        return -1;
410
0
    }
411
412
0
    if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) {
413
0
        return 0;
414
0
    }
415
416
0
#ifdef FLB_HAVE_SQLDB
417
0
    if (ctx->db) {
418
0
        k8s_events_sql_insert_event(ctx, item);
419
0
    }
420
0
#endif
421
422
    /* encode content as a log event */
423
0
    flb_log_event_encoder_begin_record(ctx->encoder);
424
0
    flb_log_event_encoder_set_timestamp(ctx->encoder, &ts);
425
426
0
    ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item);
427
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
428
0
        ret = flb_log_event_encoder_commit_record(ctx->encoder);
429
0
    }
430
0
    else {
431
0
        flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version);
432
0
    }
433
434
0
    if (ctx->encoder->output_length > 0) {
435
0
        flb_input_log_append(ctx->ins, NULL, 0,
436
0
                             ctx->encoder->output_buffer,
437
0
                             ctx->encoder->output_length);
438
0
    }
439
440
0
    return 0;
441
0
}
442
443
0
static int process_watched_event(struct k8s_events *ctx, char *buf_data, size_t buf_size) {
444
0
    int ret = -1;
445
0
    size_t off = 0;
446
0
    msgpack_unpacked result;
447
0
    msgpack_object root;
448
0
    msgpack_object *item = NULL;
449
0
    flb_sds_t event_type = NULL;
450
451
    /* unpack */
452
0
    msgpack_unpacked_init(&result);
453
0
    ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
454
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
455
0
        flb_plg_error(ctx->ins, "Cannot unpack response");
456
0
        return -1;
457
0
    }
458
459
0
    root = result.data;
460
0
    if (root.type != MSGPACK_OBJECT_MAP) {
461
0
        return -1;
462
0
    }
463
464
0
    ret = record_get_field_sds(&root, "type", &event_type);
465
0
    if (ret == -1) {
466
0
        flb_plg_warn(ctx->ins, "Streamed Event 'type' not found");
467
0
        goto msg_error;
468
0
    }
469
470
0
    item = record_get_field_ptr(&root, "object");
471
0
    if (item == NULL || item->type != MSGPACK_OBJECT_MAP) {
472
0
        flb_plg_warn(ctx->ins, "Streamed Event 'object' not found");
473
0
        ret = -1;
474
0
        goto msg_error;
475
0
    }
476
477
0
    ret = process_event_object(ctx, event_type, item);
478
479
0
msg_error:
480
0
    flb_sds_destroy(event_type);
481
0
    msgpack_unpacked_destroy(&result);
482
0
    return ret;
483
0
}
484
485
static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_size,
486
                          uint64_t *max_resource_version, flb_sds_t *continue_token)
487
0
{
488
0
    int i;
489
0
    int ret = -1;
490
0
    int root_type;
491
0
    size_t consumed = 0;
492
0
    char *buf_data;
493
0
    size_t buf_size;
494
0
    size_t off = 0;
495
0
    msgpack_unpacked result;
496
0
    msgpack_object root;
497
0
    msgpack_object k;
498
0
    msgpack_object *items = NULL;
499
0
    msgpack_object *item = NULL;
500
0
    msgpack_object *metadata = NULL;
501
0
    const flb_sds_t action = "ADDED"; /* All items from a k8s list we consider as 'ADDED' */
502
503
0
    ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed);
504
0
    if (ret == -1) {
505
0
        flb_plg_error(ctx->ins, "could not process payload, incomplete or bad formed JSON");
506
0
        goto json_error;
507
0
    }
508
509
    /* unpack */
510
0
    msgpack_unpacked_init(&result);
511
0
    ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
512
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
513
0
        flb_plg_error(ctx->ins, "Cannot unpack response");
514
0
        goto unpack_error;
515
0
    }
516
517
    /* lookup the items array */
518
0
    root = result.data;
519
0
    if (root.type != MSGPACK_OBJECT_MAP) {
520
0
        return -1;
521
0
    }
522
523
    /* Traverse the EventList for the metadata (for the continue token) and the items.
524
     * https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList
525
     */
526
0
    for (i = 0; i < root.via.map.size; i++) {
527
0
        k = root.via.map.ptr[i].key;
528
0
        if (k.type != MSGPACK_OBJECT_STR) {
529
0
            continue;
530
0
        }
531
532
0
        if (strncmp(k.via.str.ptr, "items", 5) == 0) {
533
0
            items = &root.via.map.ptr[i].val;
534
0
            if (items->type != MSGPACK_OBJECT_ARRAY) {
535
0
                flb_plg_error(ctx->ins, "Cannot unpack items");
536
0
                goto msg_error;
537
0
            }
538
0
        }
539
540
0
        if (strncmp(k.via.str.ptr, "metadata", 8) == 0) {
541
0
            metadata = &root.via.map.ptr[i].val;
542
0
            if (metadata->type != MSGPACK_OBJECT_MAP) {
543
0
                flb_plg_error(ctx->ins, "Cannot unpack metadata");
544
0
                goto msg_error;
545
0
            }
546
0
        }
547
0
    }
548
549
0
    if (items == NULL) {
550
0
        flb_plg_error(ctx->ins, "Cannot find items in response");
551
0
        goto msg_error;
552
0
    }
553
554
0
    if (metadata == NULL) {
555
0
        flb_plg_error(ctx->ins, "Cannot find metadata in response");
556
0
        goto msg_error;
557
0
    }
558
559
0
    ret = record_get_field_uint64(metadata, "resourceVersion", max_resource_version);
560
0
    if (ret == -1) {
561
0
        flb_plg_error(ctx->ins, "Cannot find EventList resourceVersion");
562
0
            goto msg_error;
563
0
    }
564
565
0
    ret = record_get_field_sds(metadata, "continue", continue_token);
566
0
    if (ret == -1) {
567
0
        flb_plg_error(ctx->ins, "Cannot process continue token");
568
0
        goto msg_error;
569
0
    }
570
571
0
    for (i = 0; i < items->via.array.size; i++) {
572
0
        item = &items->via.array.ptr[i];
573
0
        if (item->type != MSGPACK_OBJECT_MAP) {
574
0
            flb_plg_error(ctx->ins, "Cannot unpack item in response");
575
0
            goto msg_error;
576
0
        }
577
0
        process_event_object(ctx, action, item);
578
0
    }
579
580
0
msg_error:
581
0
    msgpack_unpacked_destroy(&result);
582
0
unpack_error:
583
0
    flb_free(buf_data);
584
0
json_error:
585
0
    return ret;
586
0
}
587
588
static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx,
589
                                                      uint64_t max_resource_version)
590
0
{
591
0
    flb_sds_t url;
592
0
    struct flb_http_client *c;
593
594
0
    if (ctx->namespace == NULL) {
595
0
        url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
596
0
    }
597
0
    else {
598
0
        url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
599
0
                                  strlen(ctx->namespace));
600
0
        flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
601
0
    }
602
603
0
    flb_sds_printf(&url, "?watch=1&resourceVersion=%llu", max_resource_version);
604
0
    flb_plg_info(ctx->ins, "Requesting %s", url);
605
0
    c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
606
0
                        NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
607
0
    flb_sds_destroy(url);
608
0
    return c;
609
0
 }
610
611
static struct flb_http_client *make_event_list_api_request(struct k8s_events *ctx,
612
                                                      flb_sds_t continue_token)
613
0
{
614
0
    flb_sds_t url;
615
0
    struct flb_http_client *c;
616
617
0
    if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) {
618
0
        return flb_http_client(ctx->current_connection, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
619
0
                            NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
620
0
    }
621
622
0
    if (ctx->namespace == NULL) {
623
0
        url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
624
0
    }
625
0
    else {
626
0
        url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
627
0
                                  strlen(ctx->namespace));
628
0
        flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
629
0
    }
630
631
0
    flb_sds_cat_safe(&url, "?", 1);
632
0
    if (ctx->limit_request) {
633
0
        if (continue_token != NULL) {
634
0
            flb_sds_printf(&url, "continue=%s&", continue_token);
635
0
        }
636
0
        flb_sds_printf(&url, "limit=%d", ctx->limit_request);
637
0
    }
638
0
    c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
639
0
                        NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
640
0
    flb_sds_destroy(url);
641
0
    return c;
642
0
}
643
644
#ifdef FLB_HAVE_SQLDB
645
646
static int k8s_events_cleanup_db(struct flb_input_instance *ins,
647
                                 struct flb_config *config, void *in_context)
648
0
{
649
0
    int ret;
650
0
    struct k8s_events *ctx = (struct k8s_events *)in_context;
651
0
    time_t retention_time_ago;
652
0
    time_t now = (cfl_time_now() / 1000000000);
653
654
0
    if (ctx->db == NULL) {
655
0
        FLB_INPUT_RETURN(0);
656
0
    }
657
658
0
    retention_time_ago = now - (ctx->retention_time);
659
0
    sqlite3_bind_int64(ctx->stmt_delete_old_kubernetes_events,
660
0
                        1, (int64_t)retention_time_ago);
661
0
    ret = sqlite3_step(ctx->stmt_delete_old_kubernetes_events);
662
0
    if (ret != SQLITE_ROW && ret != SQLITE_DONE) {
663
0
        flb_plg_error(ctx->ins, "cannot execute delete old kubernetes events");
664
0
    }
665
666
0
    sqlite3_clear_bindings(ctx->stmt_delete_old_kubernetes_events);
667
0
    sqlite3_reset(ctx->stmt_delete_old_kubernetes_events);
668
669
0
    FLB_INPUT_RETURN(0);
670
0
}
671
672
static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item)
673
0
{
674
0
    int ret;
675
0
    uint64_t resource_version;
676
0
    struct flb_time last;
677
0
    msgpack_object *meta;
678
0
    flb_sds_t uid;
679
680
681
0
    meta = record_get_field_ptr(item, "meta");
682
0
    if (meta == NULL) {
683
0
        flb_plg_error(ctx->ins, "unable to find metadata to save event");
684
0
        return -1;
685
0
    }
686
687
0
    ret = record_get_field_uint64(meta, "resourceVersion", &resource_version);
688
0
    if (ret == -1) {
689
0
        flb_plg_error(ctx->ins, "unable to find resourceVersion in metadata to save event");
690
0
        return -1;
691
0
    }
692
693
0
    ret = record_get_field_sds(meta, "uid", &uid);
694
0
    if (ret == -1) {
695
0
        flb_plg_error(ctx->ins, "unable to find uid in metadata to save event");
696
0
        return -1;
697
0
    }
698
699
0
    ret = item_get_timestamp(item, &last);
700
0
    if (ret == -FLB_FALSE) {
701
0
        flb_plg_error(ctx->ins, "Cannot get timestamp for item to save it");
702
0
        return -1;
703
0
    }
704
705
0
    if (ret == -2) {
706
0
        flb_plg_error(ctx->ins, "unable to parse lastTimestamp in item to save event");
707
0
        flb_sds_destroy(uid);
708
0
        return -1;
709
0
    }
710
711
    /* Bind parameters */
712
0
    sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0);
713
0
    sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version);
714
0
    sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, flb_time_to_nanosec(&last));
715
716
    /* Run the insert */
717
0
    ret = sqlite3_step(ctx->stmt_insert_kubernetes_event);
718
0
    if (ret != SQLITE_DONE) {
719
0
        sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
720
0
        sqlite3_reset(ctx->stmt_insert_kubernetes_event);
721
0
        flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%llu",
722
0
                      uid, resource_version);
723
0
        flb_sds_destroy(uid);
724
0
        return -1;
725
0
    }
726
727
0
    flb_plg_debug(ctx->ins,
728
0
                  "inserted k8s event: uid=%s, resource_version=%llu, last=%llu",
729
0
                  uid, resource_version, flb_time_to_nanosec(&last));
730
0
    sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
731
0
    sqlite3_reset(ctx->stmt_insert_kubernetes_event);
732
733
0
    flb_sds_destroy(uid);
734
0
    return flb_sqldb_last_id(ctx->db);
735
0
}
736
737
#endif
738
739
static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
740
                              size_t *bytes_consumed)
741
0
{
742
0
    int ret = 0;
743
0
    int root_type;
744
0
    size_t consumed = 0;
745
0
    char *buf_data = NULL;
746
0
    size_t buf_size;
747
0
    size_t token_size = 0;
748
0
    char *token_start = 0;
749
0
    char *token_end = NULL;
750
751
0
    token_start = c->resp.payload;
752
0
    token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
753
0
    while ( token_end != NULL && ret == 0 ) {
754
0
        token_size = token_end - token_start;
755
0
        ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
756
0
        if (ret == -1) {
757
0
            flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
758
0
                          c->resp.payload);
759
0
        }
760
0
        else {
761
0
            *bytes_consumed += token_size + 1;
762
0
            ret = process_watched_event(ctx, buf_data, buf_size);
763
0
        }
764
765
0
        flb_free(buf_data);
766
0
        if (buf_data) {
767
0
            buf_data = NULL;
768
0
        }
769
0
        token_start = token_end+1;
770
0
        token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
771
0
    }
772
773
0
    if (buf_data) {
774
0
        flb_free(buf_data);
775
0
    }
776
0
    return ret;
777
0
}
778
779
static void initialize_http_client(struct flb_http_client* c, struct k8s_events* ctx)
780
0
{
781
0
    flb_http_buffer_size(c, 0);
782
783
0
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
784
0
    if (ctx->auth_len > 0) {
785
0
        flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len);
786
0
    }
787
0
}
788
789
static int check_and_init_stream(struct k8s_events *ctx)
790
0
{
791
    /* Returns FLB_TRUE if stream has been initialized */
792
0
    flb_sds_t continue_token = NULL;
793
0
    uint64_t max_resource_version = 0;
794
0
    size_t b_sent;
795
0
    int ret;
796
0
    struct flb_http_client *c = NULL;
797
798
    /* if the streaming client is already active, just return it */
799
0
    if(ctx->streaming_client) {
800
0
        return FLB_TRUE;
801
0
    }
802
803
    /* setup connection if one does not exist */
804
0
    if(!ctx->current_connection) {
805
0
        ctx->current_connection = flb_upstream_conn_get(ctx->upstream);
806
0
        if (!ctx->current_connection) {
807
0
            flb_plg_error(ctx->ins, "upstream connection initialization error");
808
0
            goto failure;
809
0
        }
810
811
0
        ret = refresh_token_if_needed(ctx);
812
0
        if (ret == -1) {
813
0
            flb_plg_error(ctx->ins, "failed to refresh token");
814
0
            goto failure;
815
0
        }
816
0
    }
817
818
0
    do {
819
0
        c = make_event_list_api_request(ctx, continue_token);
820
0
        if (continue_token != NULL) {
821
0
            flb_sds_destroy(continue_token);
822
0
            continue_token = NULL;
823
0
        }
824
0
        if (!c) {
825
0
            flb_plg_error(ctx->ins, "unable to create http client");
826
0
            goto failure;
827
0
        }
828
0
        initialize_http_client(c, ctx);
829
0
        ret = flb_http_do(c, &b_sent);
830
0
        if (ret != 0) {
831
0
            flb_plg_error(ctx->ins, "http do error");
832
0
            goto failure;
833
0
        }
834
835
0
        if (c->resp.status == 200 && c->resp.payload_size > 0) {
836
0
            ret = process_event_list(ctx, c->resp.payload, c->resp.payload_size,
837
0
                                     &max_resource_version, &continue_token);
838
0
        }
839
0
        else
840
0
        {
841
0
            if (c->resp.payload_size > 0) {
842
0
                flb_plg_error(ctx->ins, "http_status=%i:\n%s", c->resp.status, c->resp.payload);
843
0
            }
844
0
            else {
845
0
                flb_plg_error(ctx->ins, "http_status=%i", c->resp.status);
846
0
            }
847
0
            goto failure;
848
0
        }
849
0
        flb_http_client_destroy(c);
850
0
        c = NULL;
851
0
    } while(continue_token != NULL);
852
853
0
    if (max_resource_version > ctx->last_resource_version) {
854
0
        flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version);
855
0
        ctx->last_resource_version = max_resource_version;
856
0
    }
857
858
    /* Now that we've done a full list, we can use the resource version and do a watch
859
     * to stream updates efficiently
860
     */
861
0
    ctx->streaming_client = make_event_watch_api_request(ctx, max_resource_version);
862
0
    if (!ctx->streaming_client) {
863
0
        flb_plg_error(ctx->ins, "unable to create http client");
864
0
        goto failure;
865
0
    }
866
0
    initialize_http_client(ctx->streaming_client, ctx);
867
868
    /* Watch will stream chunked json data, so we only send
869
     * the http request, then use flb_http_get_response_data
870
     * to attempt processing on available streamed data
871
     */
872
0
    b_sent = 0;
873
0
    ret = flb_http_do_request(ctx->streaming_client, &b_sent);
874
0
    if (ret != 0) {
875
0
        flb_plg_error(ctx->ins, "http do request error");
876
0
        goto failure;
877
0
    }
878
879
0
    return FLB_TRUE;
880
881
0
failure:
882
0
    if (c) {
883
0
        flb_http_client_destroy(c);
884
0
    }
885
0
    if (ctx->streaming_client) {
886
0
        flb_http_client_destroy(ctx->streaming_client);
887
0
        ctx->streaming_client = NULL;
888
0
    }
889
0
    if (ctx->current_connection) {
890
0
        flb_upstream_conn_release(ctx->current_connection);
891
0
        ctx->current_connection = NULL;
892
0
    }
893
0
    return FLB_FALSE;
894
0
}
895
896
static int k8s_events_collect(struct flb_input_instance *ins,
897
                              struct flb_config *config, void *in_context)
898
0
{
899
0
    int ret;
900
0
    struct k8s_events *ctx = in_context;
901
0
    size_t bytes_consumed;
902
0
    int chunk_proc_ret;
903
904
0
    if (pthread_mutex_trylock(&ctx->lock) != 0) {
905
0
        FLB_INPUT_RETURN(0);
906
0
    }
907
908
0
    if (check_and_init_stream(ctx) == FLB_FALSE) {
909
0
        pthread_mutex_unlock(&ctx->lock);
910
0
        FLB_INPUT_RETURN(0);
911
0
    }
912
913
0
    ret = FLB_HTTP_MORE;
914
0
    bytes_consumed = 0;
915
0
    chunk_proc_ret = 0;
916
0
    while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) {
917
0
        ret = flb_http_get_response_data(ctx->streaming_client, bytes_consumed);
918
0
        bytes_consumed = 0;
919
0
        if(ctx->streaming_client->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
920
0
            chunk_proc_ret = process_http_chunk(ctx, ctx->streaming_client, &bytes_consumed);
921
0
        }
922
0
    }
923
    /* NOTE: skipping any processing after streaming socket closes */
924
925
0
    if (ctx->streaming_client->resp.status != 200 || ret == FLB_HTTP_ERROR || ret == FLB_HTTP_OK) {
926
0
        if (ret == FLB_HTTP_ERROR) {
927
0
            flb_plg_warn(ins, "kubernetes chunked stream error.");
928
0
        }
929
0
        else if (ret == FLB_HTTP_OK) {
930
0
            flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");
931
0
        }
932
0
        else {
933
0
            flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
934
0
                         ctx->streaming_client->resp.status, ctx->streaming_client->resp.payload);
935
0
        }
936
937
0
        flb_plg_info(ins, "kubernetes stream disconnected, ret=%d", ret);
938
0
        flb_http_client_destroy(ctx->streaming_client);
939
0
        flb_upstream_conn_release(ctx->current_connection);
940
0
        ctx->streaming_client = NULL;
941
0
        ctx->current_connection = NULL;
942
0
    }
943
944
0
    pthread_mutex_unlock(&ctx->lock);
945
0
    FLB_INPUT_RETURN(0);
946
0
}
947
948
static int k8s_events_init(struct flb_input_instance *ins,
949
                           struct flb_config *config, void *data)
950
0
{
951
0
    struct k8s_events *ctx = NULL;
952
953
0
    ctx = k8s_events_conf_create(ins);
954
0
    if (!ctx) {
955
0
        return -1;
956
0
    }
957
958
0
    ctx->coll_id = flb_input_set_collector_time(ins,
959
0
                                                k8s_events_collect,
960
0
                                                ctx->interval_sec,
961
0
                                                ctx->interval_nsec,
962
0
                                                config);
963
964
0
#ifdef FLB_HAVE_SQLDB
965
0
    if (ctx->db) {
966
0
        ctx->coll_cleanup_id = flb_input_set_collector_time(ins,
967
0
                                                            k8s_events_cleanup_db,
968
0
                                                            ctx->interval_sec,
969
0
                                                            ctx->interval_nsec,
970
0
                                                            config);
971
0
    }
972
0
#endif
973
974
0
    return 0;
975
0
}
976
977
static int k8s_events_exit(void *data, struct flb_config *config)
978
0
{
979
0
    struct k8s_events *ctx = data;
980
981
0
    if (!ctx) {
982
0
        return 0;
983
0
    }
984
985
0
    k8s_events_conf_destroy(ctx);
986
0
    return 0;
987
0
}
988
989
/* Configuration properties map */
990
static struct flb_config_map config_map[] = {
991
    /* Full Kubernetes API server URL */
992
    {
993
     FLB_CONFIG_MAP_STR, "kube_url", "https://kubernetes.default.svc",
994
     0, FLB_FALSE, 0,
995
     "Kubernetes API server URL"
996
    },
997
998
    /* Refresh interval */
999
    {
1000
      FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC,
1001
      0, FLB_TRUE, offsetof(struct k8s_events, interval_sec),
1002
      "Set the polling interval for each channel"
1003
    },
1004
    {
1005
      FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC,
1006
      0, FLB_TRUE, offsetof(struct k8s_events, interval_nsec),
1007
      "Set the polling interval for each channel (sub seconds)"
1008
    },
1009
1010
    /* TLS: set debug 'level' */
1011
    {
1012
     FLB_CONFIG_MAP_INT, "tls.debug", "0",
1013
     0, FLB_TRUE, offsetof(struct k8s_events, tls_debug),
1014
     "set TLS debug level: 0 (no debug), 1 (error), "
1015
     "2 (state change), 3 (info) and 4 (verbose)"
1016
    },
1017
1018
    /* TLS: enable verification */
1019
    {
1020
     FLB_CONFIG_MAP_BOOL, "tls.verify", "true",
1021
     0, FLB_TRUE, offsetof(struct k8s_events, tls_verify),
1022
     "enable or disable verification of TLS peer certificate"
1023
    },
1024
1025
    /* TLS: set tls.vhost feature */
1026
    {
1027
     FLB_CONFIG_MAP_STR, "tls.vhost", NULL,
1028
     0, FLB_TRUE, offsetof(struct k8s_events, tls_vhost),
1029
     "set optional TLS virtual host"
1030
    },
1031
1032
    /* Kubernetes TLS: CA file */
1033
    {
1034
     FLB_CONFIG_MAP_STR, "kube_ca_file", K8S_EVENTS_KUBE_CA,
1035
     0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_file),
1036
     "Kubernetes TLS CA file"
1037
    },
1038
1039
    /* Kubernetes TLS: CA certs path */
1040
    {
1041
     FLB_CONFIG_MAP_STR, "kube_ca_path", NULL,
1042
     0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_path),
1043
     "Kubernetes TLS ca path"
1044
    },
1045
1046
    /* Kubernetes Token file */
1047
    {
1048
     FLB_CONFIG_MAP_STR, "kube_token_file", K8S_EVENTS_KUBE_TOKEN,
1049
     0, FLB_TRUE, offsetof(struct k8s_events, token_file),
1050
     "Kubernetes authorization token file"
1051
    },
1052
1053
    /* Kubernetes Token file TTL */
1054
    {
1055
     FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m",
1056
     0, FLB_TRUE, offsetof(struct k8s_events, token_ttl),
1057
     "kubernetes token ttl, until it is reread from the token file. Default: 10m"
1058
    },
1059
1060
    {
1061
     FLB_CONFIG_MAP_INT, "kube_request_limit", "0",
1062
     0, FLB_TRUE, offsetof(struct k8s_events, limit_request),
1063
     "kubernetes limit parameter for events query, no limit applied when set to 0"
1064
    },
1065
1066
    {
1067
      FLB_CONFIG_MAP_TIME, "kube_retention_time", "1h",
1068
      0, FLB_TRUE, offsetof(struct k8s_events, retention_time),
1069
      "kubernetes retention time for events. Default: 1h"
1070
    },
1071
1072
    {
1073
      FLB_CONFIG_MAP_STR, "kube_namespace", NULL,
1074
      0, FLB_TRUE, offsetof(struct k8s_events, namespace),
1075
      "kubernetes namespace to get events from, gets event from all namespaces by default."
1076
    },
1077
1078
#ifdef FLB_HAVE_SQLDB
1079
    {
1080
      FLB_CONFIG_MAP_STR, "db", NULL,
1081
      0, FLB_FALSE, 0,
1082
      "set a database file to keep track of recorded kubernetes events."
1083
    },
1084
    {
1085
     FLB_CONFIG_MAP_STR, "db.sync", "normal",
1086
     0, FLB_FALSE, 0,
1087
     "set a database sync method. values: extra, full, normal and off."
1088
    },
1089
#endif
1090
1091
    /* EOF */
1092
    {0}
1093
};
1094
1095
/* Plugin reference */
1096
struct flb_input_plugin in_kubernetes_events_plugin = {
1097
    .name         = "kubernetes_events",
1098
    .description  = "Kubernetes Events",
1099
    .cb_init      = k8s_events_init,
1100
    .cb_pre_run   = NULL,
1101
    .cb_collect   = k8s_events_collect,
1102
    .cb_flush_buf = NULL,
1103
    .cb_exit      = k8s_events_exit,
1104
    .config_map   = config_map,
1105
    .flags        = FLB_INPUT_NET | FLB_INPUT_CORO | FLB_INPUT_THREADED
1106
};