Coverage Report

Created: 2026-03-09 07:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_kubernetes_events/kubernetes_events.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
21
#include <sys/types.h>
22
#include <sys/stat.h>
23
#include <inttypes.h>
24
25
#include <fluent-bit/flb_input_plugin.h>
26
#include <fluent-bit/flb_network.h>
27
#include <fluent-bit/flb_pack.h>
28
#include <fluent-bit/flb_utils.h>
29
#include <fluent-bit/flb_error.h>
30
#include <fluent-bit/flb_compat.h>
31
#include <fluent-bit/flb_ra_key.h>
32
#include <fluent-bit/flb_time.h>
33
#include <fluent-bit/flb_strptime.h>
34
#include <fluent-bit/flb_parser.h>
35
#include <fluent-bit/flb_log_event_encoder.h>
36
#include <fluent-bit/flb_compat.h>
37
38
#include "kubernetes_events.h"
39
#include "kubernetes_events_conf.h"
40
41
#ifdef FLB_HAVE_SQLDB
42
#include "kubernetes_events_sql.h"
43
static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item);
44
#endif
45
46
0
#define JSON_ARRAY_DELIM "\r\n"
47
48
static int file_to_buffer(const char *path,
49
                          char **out_buf, size_t *out_size)
50
0
{
51
0
    int ret;
52
0
    int len;
53
0
    char *buf;
54
0
    ssize_t bytes;
55
0
    FILE *fp;
56
0
    struct stat st;
57
58
0
    if (!(fp = fopen(path, "r"))) {
59
0
        return -1;
60
0
    }
61
62
0
    ret = stat(path, &st);
63
0
    if (ret == -1) {
64
0
        flb_errno();
65
0
        fclose(fp);
66
0
        return -1;
67
0
    }
68
69
0
    buf = flb_calloc(1, (st.st_size + 1));
70
0
    if (!buf) {
71
0
        flb_errno();
72
0
        fclose(fp);
73
0
        return -1;
74
0
    }
75
76
0
    bytes = fread(buf, st.st_size, 1, fp);
77
0
    if (bytes < 1) {
78
0
        flb_free(buf);
79
0
        fclose(fp);
80
0
        return -1;
81
0
    }
82
83
0
    fclose(fp);
84
85
    /* trim new lines */
86
0
    for (len = st.st_size; len > 0; len--) {
87
0
        if (buf[len-1] != '\n' && buf[len-1] != '\r') {
88
0
            break;
89
0
        }
90
0
    }
91
0
    buf[len] = '\0';
92
93
0
    *out_buf = buf;
94
0
    *out_size = len;
95
96
0
    return 0;
97
0
}
98
99
/* Set K8s Authorization Token and get HTTP Auth Header */
100
static int get_http_auth_header(struct k8s_events *ctx)
101
0
{
102
0
    int ret;
103
0
    char *temp;
104
0
    char *tk = NULL;
105
0
    size_t tk_size = 0;
106
107
0
    if (!ctx->token_file || strlen(ctx->token_file) == 0) {
108
0
        return 0;
109
0
    }
110
111
0
    ret = file_to_buffer(ctx->token_file, &tk, &tk_size);
112
0
    if (ret == -1) {
113
0
        flb_plg_warn(ctx->ins, "cannot open %s", ctx->token_file);
114
0
        return -1;
115
0
    }
116
0
    ctx->token_created = time(NULL);
117
118
    /* Token */
119
0
    if (ctx->token != NULL) {
120
0
        flb_free(ctx->token);
121
0
    }
122
0
    ctx->token = tk;
123
0
    ctx->token_len = tk_size;
124
125
    /* HTTP Auth Header */
126
0
    if (ctx->auth == NULL) {
127
0
        ctx->auth = flb_malloc(tk_size + 32);
128
0
    }
129
0
    else if (ctx->auth_len < tk_size + 32) {
130
0
        temp = flb_realloc(ctx->auth, tk_size + 32);
131
0
        if (temp == NULL) {
132
0
            flb_errno();
133
0
            flb_free(ctx->auth);
134
0
            ctx->auth = NULL;
135
0
            return -1;
136
0
        }
137
0
        ctx->auth = temp;
138
0
    }
139
140
0
    if (!ctx->auth) {
141
0
        return -1;
142
0
    }
143
144
0
    ctx->auth_len = snprintf(ctx->auth, tk_size + 32, "Bearer %s", tk);
145
0
    return 0;
146
0
}
147
148
/* Refresh HTTP Auth Header if K8s Authorization Token is expired */
149
static int refresh_token_if_needed(struct k8s_events *ctx)
150
0
{
151
0
    int expired = FLB_FALSE;
152
0
    int ret;
153
154
0
    if (!ctx->token_file || strlen(ctx->token_file) == 0) {
155
0
        return 0;
156
0
    }
157
158
0
    if (ctx->token_created > 0) {
159
0
        if (time(NULL) > ctx->token_created + ctx->token_ttl) {
160
0
            expired = FLB_TRUE;
161
0
        }
162
0
    }
163
164
0
    if (expired || ctx->token_created == 0) {
165
0
        ret = get_http_auth_header(ctx);
166
0
        if (ret == -1) {
167
0
            return -1;
168
0
        }
169
0
    }
170
171
0
    return 0;
172
0
}
173
174
static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname)
175
0
{
176
0
    int i;
177
0
    msgpack_object *k;
178
0
    msgpack_object *v;
179
180
0
    if (obj->type != MSGPACK_OBJECT_MAP) {
181
0
        return NULL;
182
0
    }
183
184
0
    for (i = 0; i < obj->via.map.size; i++) {
185
0
        k = &obj->via.map.ptr[i].key;
186
0
        if (k->type != MSGPACK_OBJECT_STR) {
187
0
            continue;
188
0
        }
189
190
0
        if (strncmp(k->via.str.ptr, fieldname, strlen(fieldname)) == 0) {
191
0
            v = &obj->via.map.ptr[i].val;
192
0
            return v;
193
0
        }
194
0
    }
195
0
    return NULL;
196
0
}
197
198
static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_sds_t *val)
199
0
{
200
0
    msgpack_object *v;
201
202
0
    v = record_get_field_ptr(obj, fieldname);
203
0
    if (v == NULL) {
204
0
        return 0;
205
0
    }
206
0
    if (v->type != MSGPACK_OBJECT_STR) {
207
0
        return -1;
208
0
    }
209
210
0
    *val = flb_sds_create_len(v->via.str.ptr, v->via.str.size);
211
0
    return 0;
212
0
}
213
214
static int record_get_field_time(msgpack_object *obj, const char *fieldname, struct flb_time *val)
215
0
{
216
0
    msgpack_object *v;
217
0
    struct flb_tm tm = { 0 };
218
219
0
    v = record_get_field_ptr(obj, fieldname);
220
0
    if (v == NULL) {
221
0
        return -1;
222
0
    }
223
0
    if (v->type != MSGPACK_OBJECT_STR) {
224
0
        return -1;
225
0
    }
226
227
0
    if (flb_strptime(v->via.str.ptr, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) {
228
0
        return -2;
229
0
    }
230
231
0
    val->tm.tv_sec = flb_parser_tm2time(&tm, FLB_FALSE);
232
0
    val->tm.tv_nsec = 0;
233
234
0
    return 0;
235
0
}
236
237
static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, uint64_t *val)
238
0
{
239
0
    msgpack_object *v;
240
0
    char *end;
241
242
0
    v = record_get_field_ptr(obj, fieldname);
243
0
    if (v == NULL) {
244
0
        return -1;
245
0
    }
246
247
    /* attempt to parse string as number... */
248
0
    if (v->type == MSGPACK_OBJECT_STR) {
249
0
        *val = strtoul(v->via.str.ptr, &end, 10);
250
0
        if (end == NULL || (end < v->via.str.ptr + v->via.str.size)) {
251
0
            return -1;
252
0
        }
253
0
        return 0;
254
0
    }
255
0
    if (v->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
256
0
        *val = v->via.u64;
257
0
        return 0;
258
0
    }
259
0
    if (v->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
260
0
        *val = (uint64_t)v->via.i64;
261
0
        return 0;
262
0
    }
263
0
    return -1;
264
0
}
265
266
static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time)
267
0
{
268
0
    int ret;
269
0
    msgpack_object *metadata;
270
271
    /* some events can have lastTimestamp and firstTimestamp set to
272
     * NULL while having metadata.creationTimestamp set.
273
     */
274
0
    ret = record_get_field_time(obj, "lastTimestamp", event_time);
275
0
    if (ret != -1) {
276
0
        return FLB_TRUE;
277
0
    }
278
279
0
    ret = record_get_field_time(obj, "firstTimestamp", event_time);
280
0
    if (ret != -1) {
281
0
        return FLB_TRUE;
282
0
    }
283
284
0
    metadata = record_get_field_ptr(obj, "metadata");
285
0
    if (metadata == NULL) {
286
0
        return FLB_FALSE;
287
0
    }
288
289
0
    ret = record_get_field_time(metadata, "creationTimestamp", event_time);
290
0
    if (ret != -1) {
291
0
        return FLB_TRUE;
292
0
    }
293
294
0
    return FLB_FALSE;
295
0
}
296
297
static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj,
298
                                    struct flb_time *event_time)
299
0
{
300
0
    int ret;
301
0
    uint64_t outdated;
302
0
    msgpack_object *metadata;
303
0
    flb_sds_t uid;
304
0
    uint64_t resource_version;
305
306
0
    outdated = cfl_time_now() - (ctx->retention_time * 1000000000L);
307
0
    if (flb_time_to_nanosec(event_time) < outdated) {
308
0
        flb_plg_debug(ctx->ins, "Item is older than retention_time: %" PRIu64 " < %" PRIu64,
309
0
                      flb_time_to_nanosec(event_time), outdated);
310
0
        return FLB_TRUE;
311
0
    }
312
313
0
    metadata = record_get_field_ptr(obj, "metadata");
314
0
    if (metadata == NULL) {
315
0
        flb_plg_error(ctx->ins, "Cannot unpack item metadata in response");
316
0
        return FLB_FALSE;
317
0
    }
318
319
0
    ret = record_get_field_uint64(metadata, "resourceVersion", &resource_version);
320
0
    if (ret == -1) {
321
0
        flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response");
322
0
        return FLB_FALSE;
323
0
    }
324
325
0
    ret = record_get_field_sds(metadata, "uid", &uid);
326
0
    if (ret == -1) {
327
0
        flb_plg_error(ctx->ins, "Cannot get resourceVersion for item in response");
328
0
        return FLB_FALSE;
329
0
    }
330
331
332
0
#ifdef FLB_HAVE_SQLDB
333
0
    bool exists;
334
335
336
0
    if (ctx->db) {
337
0
        sqlite3_bind_text(ctx->stmt_get_kubernetes_event_exists_by_uid,
338
0
                           1, uid, -1, NULL);
339
0
        ret = sqlite3_step(ctx->stmt_get_kubernetes_event_exists_by_uid);
340
0
        if (ret != SQLITE_ROW) {
341
0
            if (ret != SQLITE_DONE) {
342
0
                flb_plg_error(ctx->ins, "cannot execute kubernetes event exists");
343
0
            }
344
0
            sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid);
345
0
            sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid);
346
0
            flb_sds_destroy(uid);
347
0
            return FLB_FALSE;
348
0
        }
349
350
0
        exists = sqlite3_column_int64(ctx->stmt_get_kubernetes_event_exists_by_uid, 0);
351
352
0
        flb_plg_debug(ctx->ins, "is_filtered: uid=%s exists=%d", uid, exists);
353
0
        sqlite3_clear_bindings(ctx->stmt_get_kubernetes_event_exists_by_uid);
354
0
        sqlite3_reset(ctx->stmt_get_kubernetes_event_exists_by_uid);
355
0
        flb_sds_destroy(uid);
356
357
0
        return exists > 0 ? FLB_TRUE : FLB_FALSE;
358
0
    }
359
0
#endif
360
361
    /* check if this is an old event. */
362
0
    if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) {
363
0
        flb_plg_debug(ctx->ins, "skipping old object: %" PRIu64 " (< %" PRIu64 ")",
364
0
                      resource_version, ctx->last_resource_version);
365
0
        flb_sds_destroy(uid);
366
0
        return FLB_TRUE;
367
0
    }
368
369
0
    flb_sds_destroy(uid);
370
0
    return FLB_FALSE;
371
0
}
372
373
374
static int process_event_object(struct k8s_events *ctx, flb_sds_t action,
375
                         msgpack_object *item)
376
0
{
377
0
    int ret = -1;
378
0
    struct flb_time ts;
379
0
    uint64_t resource_version;
380
0
    msgpack_object* item_metadata;
381
382
0
    if(strncmp(action, "ADDED", 5) != 0 && strncmp(action, "MODIFIED", 8) != 0 ) {
383
        /* We don't process DELETED nor BOOKMARK */
384
0
        return 0;
385
0
    }
386
387
0
    item_metadata = record_get_field_ptr(item, "metadata");
388
0
    if (item_metadata == NULL) {
389
0
        flb_plg_warn(ctx->ins, "Event without metadata");
390
0
        return -1;
391
0
    }
392
0
    ret = record_get_field_uint64(item_metadata, "resourceVersion", &resource_version);
393
0
    if (ret == -1) {
394
0
        return ret;
395
0
    }
396
397
    /* reset the log encoder */
398
0
    flb_log_event_encoder_reset(ctx->encoder);
399
400
    /* print every item from the items array */
401
0
    if (item->type != MSGPACK_OBJECT_MAP) {
402
0
        flb_plg_error(ctx->ins, "Cannot unpack item in response");
403
0
        return -1;
404
0
    }
405
406
    /* get event timestamp */
407
0
    ret = item_get_timestamp(item, &ts);
408
0
    if (ret == FLB_FALSE) {
409
0
        flb_plg_error(ctx->ins, "cannot retrieve event timestamp");
410
0
        return -1;
411
0
    }
412
413
0
    if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) {
414
0
        return 0;
415
0
    }
416
417
0
#ifdef FLB_HAVE_SQLDB
418
0
    if (ctx->db) {
419
0
        k8s_events_sql_insert_event(ctx, item);
420
0
    }
421
0
#endif
422
423
    /* encode content as a log event */
424
0
    flb_log_event_encoder_begin_record(ctx->encoder);
425
0
    flb_log_event_encoder_set_timestamp(ctx->encoder, &ts);
426
427
0
    ret = flb_log_event_encoder_set_body_from_msgpack_object(ctx->encoder, item);
428
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
429
0
        ret = flb_log_event_encoder_commit_record(ctx->encoder);
430
0
    }
431
0
    else {
432
0
        flb_plg_warn(ctx->ins, "unable to encode: %" PRIu64, resource_version);
433
0
    }
434
435
0
    if (ctx->encoder->output_length > 0) {
436
0
        flb_input_log_append(ctx->ins, NULL, 0,
437
0
                             ctx->encoder->output_buffer,
438
0
                             ctx->encoder->output_length);
439
0
    }
440
441
0
    return 0;
442
0
}
443
444
0
static int process_watched_event(struct k8s_events *ctx, char *buf_data, size_t buf_size) {
445
0
    int ret = -1;
446
0
    size_t off = 0;
447
0
    msgpack_unpacked result;
448
0
    msgpack_object root;
449
0
    msgpack_object *item = NULL;
450
0
    flb_sds_t event_type = NULL;
451
452
    /* unpack */
453
0
    msgpack_unpacked_init(&result);
454
0
    ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
455
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
456
0
        flb_plg_error(ctx->ins, "Cannot unpack response");
457
0
        return -1;
458
0
    }
459
460
0
    root = result.data;
461
0
    if (root.type != MSGPACK_OBJECT_MAP) {
462
0
        return -1;
463
0
    }
464
465
0
    ret = record_get_field_sds(&root, "type", &event_type);
466
0
    if (ret == -1) {
467
0
        flb_plg_warn(ctx->ins, "Streamed Event 'type' not found");
468
0
        goto msg_error;
469
0
    }
470
471
0
    item = record_get_field_ptr(&root, "object");
472
0
    if (item == NULL || item->type != MSGPACK_OBJECT_MAP) {
473
0
        flb_plg_warn(ctx->ins, "Streamed Event 'object' not found");
474
0
        ret = -1;
475
0
        goto msg_error;
476
0
    }
477
478
0
    ret = process_event_object(ctx, event_type, item);
479
480
0
msg_error:
481
0
    flb_sds_destroy(event_type);
482
0
    msgpack_unpacked_destroy(&result);
483
0
    return ret;
484
0
}
485
486
static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_size,
487
                          uint64_t *max_resource_version, flb_sds_t *continue_token)
488
0
{
489
0
    int i;
490
0
    int ret = -1;
491
0
    int root_type;
492
0
    size_t consumed = 0;
493
0
    char *buf_data;
494
0
    size_t buf_size;
495
0
    size_t off = 0;
496
0
    msgpack_unpacked result;
497
0
    msgpack_object root;
498
0
    msgpack_object k;
499
0
    msgpack_object *items = NULL;
500
0
    msgpack_object *item = NULL;
501
0
    msgpack_object *metadata = NULL;
502
0
    const flb_sds_t action = "ADDED"; /* All items from a k8s list we consider as 'ADDED' */
503
504
0
    ret = flb_pack_json(in_data, in_size, &buf_data, &buf_size, &root_type, &consumed);
505
0
    if (ret == -1) {
506
0
        flb_plg_error(ctx->ins, "could not process payload, incomplete or bad formed JSON");
507
0
        goto json_error;
508
0
    }
509
510
    /* unpack */
511
0
    msgpack_unpacked_init(&result);
512
0
    ret = msgpack_unpack_next(&result, buf_data, buf_size, &off);
513
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
514
0
        flb_plg_error(ctx->ins, "Cannot unpack response");
515
0
        goto unpack_error;
516
0
    }
517
518
    /* lookup the items array */
519
0
    root = result.data;
520
0
    if (root.type != MSGPACK_OBJECT_MAP) {
521
0
        return -1;
522
0
    }
523
524
    /* Traverse the EventList for the metadata (for the continue token) and the items.
525
     * https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/#EventList
526
     */
527
0
    for (i = 0; i < root.via.map.size; i++) {
528
0
        k = root.via.map.ptr[i].key;
529
0
        if (k.type != MSGPACK_OBJECT_STR) {
530
0
            continue;
531
0
        }
532
533
0
        if (strncmp(k.via.str.ptr, "items", 5) == 0) {
534
0
            items = &root.via.map.ptr[i].val;
535
0
            if (items->type != MSGPACK_OBJECT_ARRAY) {
536
0
                flb_plg_error(ctx->ins, "Cannot unpack items");
537
0
                goto msg_error;
538
0
            }
539
0
        }
540
541
0
        if (strncmp(k.via.str.ptr, "metadata", 8) == 0) {
542
0
            metadata = &root.via.map.ptr[i].val;
543
0
            if (metadata->type != MSGPACK_OBJECT_MAP) {
544
0
                flb_plg_error(ctx->ins, "Cannot unpack metadata");
545
0
                goto msg_error;
546
0
            }
547
0
        }
548
0
    }
549
550
0
    if (items == NULL) {
551
0
        flb_plg_error(ctx->ins, "Cannot find items in response");
552
0
        goto msg_error;
553
0
    }
554
555
0
    if (metadata == NULL) {
556
0
        flb_plg_error(ctx->ins, "Cannot find metadata in response");
557
0
        goto msg_error;
558
0
    }
559
560
0
    ret = record_get_field_uint64(metadata, "resourceVersion", max_resource_version);
561
0
    if (ret == -1) {
562
0
        flb_plg_error(ctx->ins, "Cannot find EventList resourceVersion");
563
0
            goto msg_error;
564
0
    }
565
566
0
    ret = record_get_field_sds(metadata, "continue", continue_token);
567
0
    if (ret == -1) {
568
0
        flb_plg_error(ctx->ins, "Cannot process continue token");
569
0
        goto msg_error;
570
0
    }
571
572
0
    for (i = 0; i < items->via.array.size; i++) {
573
0
        item = &items->via.array.ptr[i];
574
0
        if (item->type != MSGPACK_OBJECT_MAP) {
575
0
            flb_plg_error(ctx->ins, "Cannot unpack item in response");
576
0
            goto msg_error;
577
0
        }
578
0
        process_event_object(ctx, action, item);
579
0
    }
580
581
0
msg_error:
582
0
    msgpack_unpacked_destroy(&result);
583
0
unpack_error:
584
0
    flb_free(buf_data);
585
0
json_error:
586
0
    return ret;
587
0
}
588
589
static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx,
590
                                                      uint64_t max_resource_version)
591
0
{
592
0
    flb_sds_t url;
593
0
    struct flb_http_client *c;
594
595
0
    if (ctx->namespace == NULL) {
596
0
        url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
597
0
    }
598
0
    else {
599
0
        url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
600
0
                                  strlen(ctx->namespace));
601
0
        flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
602
0
    }
603
604
0
    flb_sds_printf(&url, "?watch=1&resourceVersion=%" PRIu64, max_resource_version);
605
0
    flb_plg_info(ctx->ins, "Requesting %s", url);
606
0
    c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
607
0
                        NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
608
0
    flb_sds_destroy(url);
609
0
    return c;
610
0
 }
611
612
static struct flb_http_client *make_event_list_api_request(struct k8s_events *ctx,
613
                                                      flb_sds_t continue_token)
614
0
{
615
0
    flb_sds_t url;
616
0
    struct flb_http_client *c;
617
618
0
    if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) {
619
0
        return flb_http_client(ctx->current_connection, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
620
0
                            NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
621
0
    }
622
623
0
    if (ctx->namespace == NULL) {
624
0
        url = flb_sds_create(K8S_EVENTS_KUBE_API_URI);
625
0
    }
626
0
    else {
627
0
        url = flb_sds_create_size(strlen(K8S_EVENTS_KUBE_NAMESPACE_API_URI) +
628
0
                                  strlen(ctx->namespace));
629
0
        flb_sds_printf(&url, K8S_EVENTS_KUBE_NAMESPACE_API_URI, ctx->namespace);
630
0
    }
631
632
0
    flb_sds_cat_safe(&url, "?", 1);
633
0
    if (ctx->limit_request) {
634
0
        if (continue_token != NULL) {
635
0
            flb_sds_printf(&url, "continue=%s&", continue_token);
636
0
        }
637
0
        flb_sds_printf(&url, "limit=%d", ctx->limit_request);
638
0
    }
639
0
    c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
640
0
                        NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
641
0
    flb_sds_destroy(url);
642
0
    return c;
643
0
}
644
645
#ifdef FLB_HAVE_SQLDB
646
647
static int k8s_events_cleanup_db(struct flb_input_instance *ins,
648
                                 struct flb_config *config, void *in_context)
649
0
{
650
0
    int ret;
651
0
    struct k8s_events *ctx = (struct k8s_events *)in_context;
652
0
    uint64_t retention_time_ago;
653
654
0
    if (ctx->db == NULL) {
655
0
        FLB_INPUT_RETURN(0);
656
0
    }
657
658
0
    retention_time_ago = cfl_time_now() - (ctx->retention_time * 1000000000L);
659
0
    sqlite3_bind_int64(ctx->stmt_delete_old_kubernetes_events,
660
0
                        1, (int64_t)retention_time_ago);
661
0
    ret = sqlite3_step(ctx->stmt_delete_old_kubernetes_events);
662
0
    if (ret != SQLITE_ROW && ret != SQLITE_DONE) {
663
0
        flb_plg_error(ctx->ins, "cannot execute delete old kubernetes events");
664
0
    }
665
666
0
    sqlite3_clear_bindings(ctx->stmt_delete_old_kubernetes_events);
667
0
    sqlite3_reset(ctx->stmt_delete_old_kubernetes_events);
668
669
0
    FLB_INPUT_RETURN(0);
670
0
}
671
672
static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *item)
673
0
{
674
0
    int ret;
675
0
    uint64_t resource_version;
676
0
    struct flb_time last;
677
0
    msgpack_object *meta;
678
0
    flb_sds_t uid;
679
680
681
0
    meta = record_get_field_ptr(item, "meta");
682
0
    if (meta == NULL) {
683
0
        flb_plg_error(ctx->ins, "unable to find metadata to save event");
684
0
        return -1;
685
0
    }
686
687
0
    ret = record_get_field_uint64(meta, "resourceVersion", &resource_version);
688
0
    if (ret == -1) {
689
0
        flb_plg_error(ctx->ins, "unable to find resourceVersion in metadata to save event");
690
0
        return -1;
691
0
    }
692
693
0
    ret = record_get_field_sds(meta, "uid", &uid);
694
0
    if (ret == -1) {
695
0
        flb_plg_error(ctx->ins, "unable to find uid in metadata to save event");
696
0
        return -1;
697
0
    }
698
699
0
    ret = item_get_timestamp(item, &last);
700
0
    if (ret == -FLB_FALSE) {
701
0
        flb_plg_error(ctx->ins, "Cannot get timestamp for item to save it");
702
0
        return -1;
703
0
    }
704
705
0
    if (ret == -2) {
706
0
        flb_plg_error(ctx->ins, "unable to parse lastTimestamp in item to save event");
707
0
        flb_sds_destroy(uid);
708
0
        return -1;
709
0
    }
710
711
    /* Bind parameters */
712
0
    sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0);
713
0
    sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version);
714
0
    sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, flb_time_to_nanosec(&last));
715
716
    /* Run the insert */
717
0
    ret = sqlite3_step(ctx->stmt_insert_kubernetes_event);
718
0
    if (ret != SQLITE_DONE) {
719
0
        sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
720
0
        sqlite3_reset(ctx->stmt_insert_kubernetes_event);
721
0
        flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%" PRIu64,
722
0
                      uid, resource_version);
723
0
        flb_sds_destroy(uid);
724
0
        return -1;
725
0
    }
726
727
0
    flb_plg_debug(ctx->ins,
728
0
                  "inserted k8s event: uid=%s, resource_version=%" PRIu64 ", last=%" PRIu64,
729
0
                  uid, resource_version, flb_time_to_nanosec(&last));
730
0
    sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event);
731
0
    sqlite3_reset(ctx->stmt_insert_kubernetes_event);
732
733
0
    flb_sds_destroy(uid);
734
0
    return flb_sqldb_last_id(ctx->db);
735
0
}
736
737
#endif
738
739
static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
740
                              size_t *bytes_consumed)
741
0
{
742
0
    int ret = 0;
743
0
    int root_type;
744
0
    size_t consumed = 0;
745
0
    char *buf_data = NULL;
746
0
    size_t buf_size;
747
0
    size_t token_size = 0;
748
0
    char *token_start = 0;
749
0
    char *token_end = NULL;
750
751
0
    token_start = c->resp.payload;
752
0
    token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
753
0
    while ( token_end != NULL && ret == 0 ) {
754
0
        token_size = token_end - token_start;
755
0
        ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
756
0
        if (ret == -1) {
757
0
            flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
758
0
                          c->resp.payload);
759
0
        }
760
0
        else {
761
0
            *bytes_consumed += token_size + 1;
762
0
            ret = process_watched_event(ctx, buf_data, buf_size);
763
0
        }
764
765
0
        flb_free(buf_data);
766
0
        if (buf_data) {
767
0
            buf_data = NULL;
768
0
        }
769
0
        token_start = token_end+1;
770
0
        token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
771
0
    }
772
773
0
    if (buf_data) {
774
0
        flb_free(buf_data);
775
0
    }
776
0
    return ret;
777
0
}
778
779
static void initialize_http_client(struct flb_http_client* c, struct k8s_events* ctx)
780
0
{
781
0
    flb_http_buffer_size(c, 0);
782
783
0
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
784
0
    if (ctx->auth_len > 0) {
785
0
        flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len);
786
0
    }
787
0
}
788
789
static int check_and_init_stream(struct k8s_events *ctx)
790
0
{
791
    /* Returns FLB_TRUE if stream has been initialized */
792
0
    flb_sds_t continue_token = NULL;
793
0
    uint64_t max_resource_version = 0;
794
0
    size_t b_sent;
795
0
    int ret;
796
0
    struct flb_http_client *c = NULL;
797
798
    /* if the streaming client is already active, just return it */
799
0
    if(ctx->streaming_client) {
800
0
        return FLB_TRUE;
801
0
    }
802
803
    /* setup connection if one does not exist */
804
0
    if(!ctx->current_connection) {
805
0
        ctx->current_connection = flb_upstream_conn_get(ctx->upstream);
806
0
        if (!ctx->current_connection) {
807
0
            flb_plg_error(ctx->ins, "upstream connection initialization error");
808
0
            goto failure;
809
0
        }
810
811
0
        ret = refresh_token_if_needed(ctx);
812
0
        if (ret == -1) {
813
0
            flb_plg_error(ctx->ins, "failed to refresh token");
814
0
            goto failure;
815
0
        }
816
0
    }
817
818
0
    do {
819
0
        c = make_event_list_api_request(ctx, continue_token);
820
0
        if (continue_token != NULL) {
821
0
            flb_sds_destroy(continue_token);
822
0
            continue_token = NULL;
823
0
        }
824
0
        if (!c) {
825
0
            flb_plg_error(ctx->ins, "unable to create http client");
826
0
            goto failure;
827
0
        }
828
0
        initialize_http_client(c, ctx);
829
0
        ret = flb_http_do(c, &b_sent);
830
0
        if (ret != 0) {
831
0
            flb_plg_error(ctx->ins, "http do error");
832
0
            goto failure;
833
0
        }
834
835
0
        if (c->resp.status == 200 && c->resp.payload_size > 0) {
836
0
            ret = process_event_list(ctx, c->resp.payload, c->resp.payload_size,
837
0
                                     &max_resource_version, &continue_token);
838
0
        }
839
0
        else
840
0
        {
841
0
            if (c->resp.payload_size > 0) {
842
0
                flb_plg_error(ctx->ins, "http_status=%i:\n%s", c->resp.status, c->resp.payload);
843
0
            }
844
0
            else {
845
0
                flb_plg_error(ctx->ins, "http_status=%i", c->resp.status);
846
0
            }
847
0
            goto failure;
848
0
        }
849
0
        flb_http_client_destroy(c);
850
0
        c = NULL;
851
0
    } while(continue_token != NULL);
852
853
0
    if (max_resource_version > ctx->last_resource_version) {
854
0
        flb_plg_debug(ctx->ins, "set last resourceVersion=%" PRIu64, max_resource_version);
855
0
        ctx->last_resource_version = max_resource_version;
856
0
    }
857
858
    /* Now that we've done a full list, we can use the resource version and do a watch
859
     * to stream updates efficiently
860
     */
861
0
    ctx->streaming_client = make_event_watch_api_request(ctx, max_resource_version);
862
0
    if (!ctx->streaming_client) {
863
0
        flb_plg_error(ctx->ins, "unable to create http client");
864
0
        goto failure;
865
0
    }
866
0
    initialize_http_client(ctx->streaming_client, ctx);
867
868
    /* Watch will stream chunked json data, so we only send
869
     * the http request, then use flb_http_get_response_data
870
     * to attempt processing on available streamed data
871
     */
872
0
    b_sent = 0;
873
0
    ret = flb_http_do_request(ctx->streaming_client, &b_sent);
874
0
    if (ret != 0) {
875
0
        flb_plg_error(ctx->ins, "http do request error");
876
0
        goto failure;
877
0
    }
878
879
0
    return FLB_TRUE;
880
881
0
failure:
882
0
    if (c) {
883
0
        flb_http_client_destroy(c);
884
0
    }
885
0
    if (ctx->streaming_client) {
886
0
        flb_http_client_destroy(ctx->streaming_client);
887
0
        ctx->streaming_client = NULL;
888
0
    }
889
0
    if (ctx->current_connection) {
890
0
        flb_upstream_conn_release(ctx->current_connection);
891
0
        ctx->current_connection = NULL;
892
0
    }
893
0
    return FLB_FALSE;
894
0
}
895
896
static int k8s_events_collect(struct flb_input_instance *ins,
897
                              struct flb_config *config, void *in_context)
898
0
{
899
0
    int ret;
900
0
    struct k8s_events *ctx = in_context;
901
0
    size_t bytes_consumed;
902
0
    int chunk_proc_ret;
903
904
0
    if (pthread_mutex_trylock(&ctx->lock) != 0) {
905
0
        FLB_INPUT_RETURN(0);
906
0
    }
907
908
0
    if (check_and_init_stream(ctx) == FLB_FALSE) {
909
0
        pthread_mutex_unlock(&ctx->lock);
910
0
        FLB_INPUT_RETURN(0);
911
0
    }
912
913
0
    ret = FLB_HTTP_MORE;
914
0
    bytes_consumed = 0;
915
0
    chunk_proc_ret = 0;
916
0
    while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) {
917
0
        ret = flb_http_get_response_data(ctx->streaming_client, bytes_consumed);
918
0
        bytes_consumed = 0;
919
0
        if(ctx->streaming_client->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
920
0
            chunk_proc_ret = process_http_chunk(ctx, ctx->streaming_client, &bytes_consumed);
921
0
        }
922
0
    }
923
    /* NOTE: skipping any processing after streaming socket closes */
924
925
0
    if (ctx->streaming_client->resp.status != 200 || ret == FLB_HTTP_ERROR || ret == FLB_HTTP_OK) {
926
0
        if (ret == FLB_HTTP_ERROR) {
927
0
            flb_plg_warn(ins, "kubernetes chunked stream error.");
928
0
        }
929
0
        else if (ret == FLB_HTTP_OK) {
930
0
            flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");
931
0
        }
932
0
        else {
933
0
            flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
934
0
                         ctx->streaming_client->resp.status, ctx->streaming_client->resp.payload);
935
0
        }
936
937
0
        flb_plg_info(ins, "kubernetes stream disconnected, ret=%d", ret);
938
0
        flb_http_client_destroy(ctx->streaming_client);
939
0
        flb_upstream_conn_release(ctx->current_connection);
940
0
        ctx->streaming_client = NULL;
941
0
        ctx->current_connection = NULL;
942
0
    }
943
944
0
    pthread_mutex_unlock(&ctx->lock);
945
0
    FLB_INPUT_RETURN(0);
946
0
}
947
948
static int k8s_events_init(struct flb_input_instance *ins,
949
                           struct flb_config *config, void *data)
950
0
{
951
0
    struct k8s_events *ctx = NULL;
952
953
0
    ctx = k8s_events_conf_create(ins);
954
0
    if (!ctx) {
955
0
        return -1;
956
0
    }
957
958
0
    ctx->coll_id = flb_input_set_collector_time(ins,
959
0
                                                k8s_events_collect,
960
0
                                                ctx->interval_sec,
961
0
                                                ctx->interval_nsec,
962
0
                                                config);
963
964
0
#ifdef FLB_HAVE_SQLDB
965
0
    if (ctx->db) {
966
0
        ctx->coll_cleanup_id = flb_input_set_collector_time(ins,
967
0
                                                            k8s_events_cleanup_db,
968
0
                                                            ctx->interval_sec,
969
0
                                                            ctx->interval_nsec,
970
0
                                                            config);
971
0
    }
972
0
#endif
973
974
0
    return 0;
975
0
}
976
977
static int k8s_events_exit(void *data, struct flb_config *config)
978
0
{
979
0
    struct k8s_events *ctx = data;
980
981
0
    if (!ctx) {
982
0
        return 0;
983
0
    }
984
985
0
    k8s_events_conf_destroy(ctx);
986
0
    return 0;
987
0
}
988
989
/* Configuration properties map */
990
static struct flb_config_map config_map[] = {
991
    /* Full Kubernetes API server URL */
992
    {
993
     FLB_CONFIG_MAP_STR, "kube_url", "https://kubernetes.default.svc",
994
     0, FLB_FALSE, 0,
995
     "Kubernetes API server URL"
996
    },
997
998
    /* Refresh interval */
999
    {
1000
      FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC,
1001
      0, FLB_TRUE, offsetof(struct k8s_events, interval_sec),
1002
      "Set the polling interval for each channel"
1003
    },
1004
    {
1005
      FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC,
1006
      0, FLB_TRUE, offsetof(struct k8s_events, interval_nsec),
1007
      "Set the polling interval for each channel (sub seconds)"
1008
    },
1009
1010
    /* TLS: set debug 'level' */
1011
    {
1012
     FLB_CONFIG_MAP_INT, "tls.debug", "0",
1013
     0, FLB_TRUE, offsetof(struct k8s_events, tls_debug),
1014
     "set TLS debug level: 0 (no debug), 1 (error), "
1015
     "2 (state change), 3 (info) and 4 (verbose)"
1016
    },
1017
1018
    /* TLS: enable verification */
1019
    {
1020
     FLB_CONFIG_MAP_BOOL, "tls.verify", "true",
1021
     0, FLB_TRUE, offsetof(struct k8s_events, tls_verify),
1022
     "enable or disable verification of TLS peer certificate"
1023
    },
1024
1025
    /* TLS: set tls.vhost feature */
1026
    {
1027
     FLB_CONFIG_MAP_STR, "tls.vhost", NULL,
1028
     0, FLB_TRUE, offsetof(struct k8s_events, tls_vhost),
1029
     "set optional TLS virtual host"
1030
    },
1031
1032
    /* Kubernetes TLS: CA file */
1033
    {
1034
     FLB_CONFIG_MAP_STR, "kube_ca_file", K8S_EVENTS_KUBE_CA,
1035
     0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_file),
1036
     "Kubernetes TLS CA file"
1037
    },
1038
1039
    /* Kubernetes TLS: CA certs path */
1040
    {
1041
     FLB_CONFIG_MAP_STR, "kube_ca_path", NULL,
1042
     0, FLB_TRUE, offsetof(struct k8s_events, tls_ca_path),
1043
     "Kubernetes TLS ca path"
1044
    },
1045
1046
    /* Kubernetes Token file */
1047
    {
1048
     FLB_CONFIG_MAP_STR, "kube_token_file", K8S_EVENTS_KUBE_TOKEN,
1049
     0, FLB_TRUE, offsetof(struct k8s_events, token_file),
1050
     "Kubernetes authorization token file"
1051
    },
1052
1053
    /* Kubernetes Token file TTL */
1054
    {
1055
     FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m",
1056
     0, FLB_TRUE, offsetof(struct k8s_events, token_ttl),
1057
     "kubernetes token ttl, until it is reread from the token file. Default: 10m"
1058
    },
1059
1060
    {
1061
     FLB_CONFIG_MAP_INT, "kube_request_limit", "0",
1062
     0, FLB_TRUE, offsetof(struct k8s_events, limit_request),
1063
     "kubernetes limit parameter for events query, no limit applied when set to 0"
1064
    },
1065
1066
    {
1067
      FLB_CONFIG_MAP_TIME, "kube_retention_time", "1h",
1068
      0, FLB_TRUE, offsetof(struct k8s_events, retention_time),
1069
      "kubernetes retention time for events. Default: 1h"
1070
    },
1071
1072
    {
1073
      FLB_CONFIG_MAP_STR, "kube_namespace", NULL,
1074
      0, FLB_TRUE, offsetof(struct k8s_events, namespace),
1075
      "kubernetes namespace to get events from, gets event from all namespaces by default."
1076
    },
1077
1078
#ifdef FLB_HAVE_SQLDB
1079
    {
1080
      FLB_CONFIG_MAP_STR, "db", NULL,
1081
      0, FLB_FALSE, 0,
1082
      "set a database file to keep track of recorded kubernetes events."
1083
    },
1084
    {
1085
     FLB_CONFIG_MAP_STR, "db.sync", "normal",
1086
     0, FLB_FALSE, 0,
1087
     "set a database sync method. values: extra, full, normal and off."
1088
    },
1089
    {
1090
     FLB_CONFIG_MAP_BOOL, "db.locking", "false",
1091
     0, FLB_TRUE, offsetof(struct k8s_events, db_locking),
1092
     "set exclusive locking mode, increase performance but don't allow "
1093
     "external connections to the database file."
1094
    },
1095
    {
1096
     FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL",
1097
     0, FLB_TRUE, offsetof(struct k8s_events, db_journal_mode),
1098
     "set the journal mode for the database. values: DELETE, TRUNCATE, "
1099
     "PERSIST, MEMORY, WAL, OFF."
1100
    },
1101
#endif
1102
1103
    /* EOF */
1104
    {0}
1105
};
1106
1107
/* Plugin reference */
1108
struct flb_input_plugin in_kubernetes_events_plugin = {
1109
    .name         = "kubernetes_events",
1110
    .description  = "Kubernetes Events",
1111
    .cb_init      = k8s_events_init,
1112
    .cb_pre_run   = NULL,
1113
    .cb_collect   = k8s_events_collect,
1114
    .cb_flush_buf = NULL,
1115
    .cb_exit      = k8s_events_exit,
1116
    .config_map   = config_map,
1117
    .flags        = FLB_INPUT_NET | FLB_INPUT_CORO | FLB_INPUT_THREADED
1118
};