Coverage Report

Created: 2025-07-04 07:08

/src/fluent-bit/plugins/in_splunk/splunk_prot.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_version.h>
22
#include <fluent-bit/flb_error.h>
23
#include <fluent-bit/flb_pack.h>
24
#include <fluent-bit/flb_gzip.h>
25
26
#include <monkey/monkey.h>
27
#include <monkey/mk_core.h>
28
29
#include "splunk.h"
30
#include "splunk_conn.h"
31
#include "splunk_prot.h"
32
33
0
#define HTTP_CONTENT_JSON  0
34
0
#define HTTP_CONTENT_TEXT  1
35
0
#define HTTP_CONTENT_UNKNOWN 2
36
37
static int send_response(struct splunk_conn *conn, int http_status, char *message)
38
0
{
39
0
    struct flb_splunk *context;
40
0
    size_t           sent;
41
0
    int              len;
42
0
    flb_sds_t        out;
43
44
0
    context = (struct flb_splunk *) conn->ctx;
45
46
0
    out = flb_sds_create_size(256);
47
0
    if (!out) {
48
0
        return -1;
49
0
    }
50
51
0
    if (message) {
52
0
        len = strlen(message);
53
0
    }
54
0
    else {
55
0
        len = 0;
56
0
    }
57
58
0
    if (http_status == 201) {
59
0
        flb_sds_printf(&out,
60
0
                       "HTTP/1.1 201 Created \r\n"
61
0
                       "Server: Fluent Bit v%s\r\n"
62
0
                       "%s"
63
0
                       "Content-Length: 0\r\n\r\n",
64
0
                       FLB_VERSION_STR,
65
0
                       context->success_headers_str);
66
0
    }
67
0
    else if (http_status == 200) {
68
0
        flb_sds_printf(&out,
69
0
                       "HTTP/1.1 200 OK\r\n"
70
0
                       "Server: Fluent Bit v%s\r\n"
71
0
                       "%s"
72
0
                       "Content-Length: 0\r\n\r\n",
73
0
                       FLB_VERSION_STR,
74
0
                       context->success_headers_str);
75
0
    }
76
0
    else if (http_status == 204) {
77
0
        flb_sds_printf(&out,
78
0
                       "HTTP/1.1 204 No Content\r\n"
79
0
                       "Server: Fluent Bit v%s\r\n"
80
0
                       "%s"
81
0
                       "\r\n\r\n",
82
0
                       FLB_VERSION_STR,
83
0
                       context->success_headers_str);
84
0
    }
85
0
    else if (http_status == 400) {
86
0
        flb_sds_printf(&out,
87
0
                       "HTTP/1.1 400 Bad Request\r\n"
88
0
                       "Server: Fluent Bit v%s\r\n"
89
0
                       "Content-Length: %i\r\n\r\n%s",
90
0
                       FLB_VERSION_STR,
91
0
                       len, message);
92
0
    }
93
0
    else if (http_status == 401) {
94
0
        flb_sds_printf(&out,
95
0
                       "HTTP/1.1 401 Unauthorized\r\n"
96
0
                       "Server: Fluent Bit v%s\r\n"
97
0
                       "Content-Length: %i\r\n\r\n%s",
98
0
                       FLB_VERSION_STR,
99
0
                       len, message);
100
0
    }
101
    /* We should check this operations result */
102
0
    flb_io_net_write(conn->connection,
103
0
                     (void *) out,
104
0
                     flb_sds_len(out),
105
0
                     &sent);
106
107
0
    flb_sds_destroy(out);
108
109
0
    return 0;
110
0
}
111
112
static int send_json_message_response(struct splunk_conn *conn, int http_status, char *message)
113
0
{
114
0
    size_t    sent;
115
0
    int       len;
116
0
    flb_sds_t out;
117
118
0
    out = flb_sds_create_size(256);
119
0
    if (!out) {
120
0
        return -1;
121
0
    }
122
123
0
    if (message) {
124
0
        len = strlen(message);
125
0
    }
126
0
    else {
127
0
        len = 0;
128
0
    }
129
130
0
    if (http_status == 200) {
131
0
        flb_sds_printf(&out,
132
0
                       "HTTP/1.1 200 OK\r\n"
133
0
                       "Content-Type: application/json\r\n"
134
0
                       "Content-Length: %i\r\n\r\n%s",
135
0
                       len, message);
136
0
    }
137
138
    /* We should check this operations result */
139
0
    flb_io_net_write(conn->connection,
140
0
                     (void *) out,
141
0
                     flb_sds_len(out),
142
0
                     &sent);
143
144
0
    flb_sds_destroy(out);
145
146
0
    return 0;
147
0
}
148
149
/* implements functionality to get tag from key in record */
150
static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map)
151
0
{
152
0
    size_t map_size = map->via.map.size;
153
0
    msgpack_object_kv *kv;
154
0
    msgpack_object  key;
155
0
    msgpack_object  val;
156
0
    char *key_str = NULL;
157
0
    char *val_str = NULL;
158
0
    size_t key_str_size = 0;
159
0
    size_t val_str_size = 0;
160
0
    int j;
161
0
    int check = FLB_FALSE;
162
0
    int found = FLB_FALSE;
163
0
    flb_sds_t tag;
164
165
0
    kv = map->via.map.ptr;
166
167
0
    for(j=0; j < map_size; j++) {
168
0
        check = FLB_FALSE;
169
0
        found = FLB_FALSE;
170
0
        key = (kv+j)->key;
171
0
        if (key.type == MSGPACK_OBJECT_BIN) {
172
0
            key_str  = (char *) key.via.bin.ptr;
173
0
            key_str_size = key.via.bin.size;
174
0
            check = FLB_TRUE;
175
0
        }
176
0
        if (key.type == MSGPACK_OBJECT_STR) {
177
0
            key_str  = (char *) key.via.str.ptr;
178
0
            key_str_size = key.via.str.size;
179
0
            check = FLB_TRUE;
180
0
        }
181
182
0
        if (check == FLB_TRUE) {
183
0
            if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) {
184
0
                val = (kv+j)->val;
185
0
                if (val.type == MSGPACK_OBJECT_BIN) {
186
0
                    val_str  = (char *) val.via.bin.ptr;
187
0
                    val_str_size = val.via.str.size;
188
0
                    found = FLB_TRUE;
189
0
                    break;
190
0
                }
191
0
                if (val.type == MSGPACK_OBJECT_STR) {
192
0
                    val_str  = (char *) val.via.str.ptr;
193
0
                    val_str_size = val.via.str.size;
194
0
                    found = FLB_TRUE;
195
0
                    break;
196
0
                }
197
0
            }
198
0
        }
199
0
    }
200
201
0
    if (found == FLB_TRUE) {
202
0
        tag = flb_sds_create_len(val_str, val_str_size);
203
0
        if (!tag) {
204
0
            flb_errno();
205
0
            return NULL;
206
0
        }
207
0
        return tag;
208
0
    }
209
210
211
0
    flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key);
212
0
    return NULL;
213
0
}
214
215
/*
216
 * Process a raw text payload for Splunk HEC requests, uses the delimited character to split records,
217
 * return the number of processed bytes
218
 */
219
static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size)
220
0
{
221
0
    int ret = FLB_EVENT_ENCODER_SUCCESS;
222
223
0
    ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
224
225
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
226
0
        ret = flb_log_event_encoder_set_current_timestamp(&ctx->log_encoder);
227
0
    }
228
229
0
    if (ctx->store_token_in_metadata == FLB_TRUE) {
230
0
        if (ret == FLB_EVENT_ENCODER_SUCCESS) {
231
0
            ret = flb_log_event_encoder_append_body_values(
232
0
                    &ctx->log_encoder,
233
0
                    FLB_LOG_EVENT_CSTRING_VALUE("log"),
234
0
                    FLB_LOG_EVENT_STRING_VALUE(buf, size));
235
0
        }
236
0
    }
237
238
0
    if (ctx->store_token_in_metadata == FLB_TRUE) {
239
0
        if (ctx->ingested_auth_header != NULL) {
240
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
241
0
                ret = flb_log_event_encoder_append_metadata_values(
242
0
                    &ctx->log_encoder,
243
0
                    FLB_LOG_EVENT_CSTRING_VALUE("hec_token"),
244
0
                    FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header,
245
0
                                               ctx->ingested_auth_header_len));
246
0
            }
247
0
        }
248
0
    }
249
0
    else {
250
0
        if (ctx->ingested_auth_header != NULL) {
251
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
252
0
                ret = flb_log_event_encoder_append_body_values(
253
0
                    &ctx->log_encoder,
254
0
                    FLB_LOG_EVENT_CSTRING_VALUE(ctx->store_token_key),
255
0
                    FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header,
256
0
                                               ctx->ingested_auth_header_len),
257
0
                    FLB_LOG_EVENT_CSTRING_VALUE("log"),
258
0
                    FLB_LOG_EVENT_STRING_VALUE(buf, size));
259
260
0
            }
261
0
        }
262
0
    }
263
264
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
265
0
        ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
266
0
    }
267
268
0
    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
269
0
        flb_log_event_encoder_rollback_record(&ctx->log_encoder);
270
0
        return -1;
271
0
    }
272
273
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
274
0
        if (tag) {
275
0
            flb_input_log_append(ctx->ins, tag, flb_sds_len(tag),
276
0
                                 ctx->log_encoder.output_buffer,
277
0
                                 ctx->log_encoder.output_length);
278
0
        }
279
0
        else {
280
            /* use default plugin Tag (it internal name, e.g: http.0 */
281
0
            flb_input_log_append(ctx->ins, NULL, 0,
282
0
                                 ctx->log_encoder.output_buffer,
283
0
                                 ctx->log_encoder.output_length);
284
0
        }
285
0
    }
286
0
    else {
287
0
        flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
288
0
    }
289
290
0
    return 0;
291
0
}
292
293
static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *record,
294
                                   flb_sds_t tag, flb_sds_t tag_from_record,
295
0
                                   struct flb_time tm) {
296
0
    int ret;
297
0
    int i;
298
0
    msgpack_object_kv *kv;
299
300
0
    ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
301
302
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
303
0
        ret = flb_log_event_encoder_set_timestamp(
304
0
                &ctx->log_encoder,
305
0
                &tm);
306
0
    }
307
308
0
    if (ctx->store_token_in_metadata == FLB_TRUE) {
309
0
        if (ret == FLB_EVENT_ENCODER_SUCCESS) {
310
0
            ret = flb_log_event_encoder_set_body_from_msgpack_object(
311
0
                    &ctx->log_encoder,
312
0
                    record);
313
0
        }
314
315
0
        if (ctx->ingested_auth_header != NULL) {
316
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
317
0
                ret = flb_log_event_encoder_append_metadata_values(
318
0
                    &ctx->log_encoder,
319
0
                    FLB_LOG_EVENT_CSTRING_VALUE("hec_token"),
320
0
                    FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header,
321
0
                                               ctx->ingested_auth_header_len));
322
0
            }
323
0
        }
324
0
    }
325
0
    else {
326
0
        if (ctx->ingested_auth_header != NULL) {
327
            /* iterate through the old record map to create the appendable new buffer */
328
0
            kv = record->via.map.ptr;
329
0
            for(i = 0; i < record->via.map.size && ret == FLB_EVENT_ENCODER_SUCCESS; i++) {
330
0
                ret = flb_log_event_encoder_append_body_values(
331
0
                        &ctx->log_encoder,
332
0
                        FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
333
0
                        FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
334
0
            }
335
336
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
337
0
                ret = flb_log_event_encoder_append_body_values(
338
0
                    &ctx->log_encoder,
339
0
                    FLB_LOG_EVENT_CSTRING_VALUE(ctx->store_token_key),
340
0
                    FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header,
341
0
                                               ctx->ingested_auth_header_len));
342
0
            }
343
0
        }
344
0
        else {
345
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
346
0
                ret = flb_log_event_encoder_set_body_from_msgpack_object(
347
0
                        &ctx->log_encoder,
348
0
                        record);
349
0
            }
350
0
        }
351
0
    }
352
353
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
354
0
        ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
355
0
    }
356
357
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
358
0
        if (tag_from_record) {
359
0
            flb_input_log_append(ctx->ins,
360
0
                                 tag_from_record,
361
0
                                 flb_sds_len(tag_from_record),
362
0
                                 ctx->log_encoder.output_buffer,
363
0
                                 ctx->log_encoder.output_length);
364
0
        }
365
0
        else if (tag) {
366
0
            flb_input_log_append(ctx->ins, tag, flb_sds_len(tag),
367
0
                                 ctx->log_encoder.output_buffer,
368
0
                                 ctx->log_encoder.output_length);
369
0
        }
370
0
        else {
371
            /* use default plugin Tag (it internal name, e.g: http.0 */
372
0
            flb_input_log_append(ctx->ins, NULL, 0,
373
0
                                 ctx->log_encoder.output_buffer,
374
0
                                 ctx->log_encoder.output_length);
375
0
        }
376
0
    }
377
0
    else {
378
0
        flb_plg_error(ctx->ins, "Error encoding record : %d", ret);
379
0
    }
380
381
0
    if (tag_from_record) {
382
0
        flb_sds_destroy(tag_from_record);
383
0
    }
384
0
}
385
386
static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size)
387
0
{
388
0
    size_t off = 0;
389
0
    msgpack_unpacked result;
390
0
    struct flb_time tm;
391
0
    int i = 0;
392
0
    msgpack_object *obj;
393
0
    msgpack_object record;
394
0
    flb_sds_t tag_from_record = NULL;
395
396
0
    flb_time_get(&tm);
397
398
0
    msgpack_unpacked_init(&result);
399
0
    while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) {
400
0
        if (result.data.type == MSGPACK_OBJECT_MAP) {
401
0
            tag_from_record = NULL;
402
0
            if (ctx->tag_key) {
403
0
                tag_from_record = tag_key(ctx, &result.data);
404
0
            }
405
406
0
            process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm);
407
408
0
            flb_log_event_encoder_reset(&ctx->log_encoder);
409
0
        }
410
0
        else if (result.data.type == MSGPACK_OBJECT_ARRAY) {
411
0
            obj = &result.data;
412
0
            for (i = 0; i < obj->via.array.size; i++)
413
0
            {
414
0
                record = obj->via.array.ptr[i];
415
416
0
                tag_from_record = NULL;
417
0
                if (ctx->tag_key) {
418
0
                    tag_from_record = tag_key(ctx, &record);
419
0
                }
420
421
0
                process_flb_log_append(ctx, &record, tag, tag_from_record, tm);
422
423
                /* TODO : Optimize this
424
                 *
425
                 * This is wasteful, considering that we are emitting a series
426
                 * of records we should start and commit each one and then
427
                 * emit them all at once after the loop.
428
                 */
429
430
0
                flb_log_event_encoder_reset(&ctx->log_encoder);
431
0
            }
432
433
0
            break;
434
0
        }
435
0
        else {
436
0
            flb_plg_error(ctx->ins, "skip record from invalid type: %i",
437
0
                         result.data.type);
438
439
0
            msgpack_unpacked_destroy(&result);
440
441
0
            return -1;
442
0
        }
443
0
    }
444
445
0
    msgpack_unpacked_destroy(&result);
446
447
0
    return 0;
448
0
}
449
450
static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
451
                                      char *payload, size_t size)
452
0
{
453
0
    int ret;
454
0
    int out_size;
455
0
    char *pack;
456
0
    struct flb_pack_state pack_state;
457
458
    /* Initialize packer */
459
0
    flb_pack_state_init(&pack_state);
460
461
    /* Pack JSON as msgpack */
462
0
    ret = flb_pack_json_state(payload, size,
463
0
                              &pack, &out_size, &pack_state);
464
0
    flb_pack_state_reset(&pack_state);
465
466
    /* Handle exceptions */
467
0
    if (ret == FLB_ERR_JSON_PART) {
468
0
        flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
469
0
        return -1;
470
0
    }
471
0
    else if (ret == FLB_ERR_JSON_INVAL) {
472
0
        flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
473
0
        return -1;
474
0
    }
475
0
    else if (ret == -1) {
476
0
        return -1;
477
0
    }
478
479
    /* Process the packaged JSON and return the last byte used */
480
0
    process_json_payload_pack(ctx, tag, pack, out_size);
481
0
    flb_free(pack);
482
483
0
    return 0;
484
0
}
485
486
static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request)
487
0
{
488
0
    int ret = 0;
489
0
    struct mk_list *head;
490
0
    struct mk_http_header *auth_header = NULL;
491
0
    struct flb_splunk_tokens *splunk_token;
492
0
    flb_sds_t authorization = NULL;
493
494
0
    if (mk_list_size(&ctx->auth_tokens) == 0) {
495
0
        return SPLUNK_AUTH_UNAUTH;
496
0
    }
497
498
0
    auth_header = mk_http_header_get(MK_HEADER_AUTHORIZATION, request, NULL, 0);
499
0
    if (auth_header == NULL) {
500
0
        return SPLUNK_AUTH_MISSING_CRED;
501
0
    }
502
503
0
    authorization = flb_sds_create_len(auth_header->val.data, auth_header->val.len);
504
0
    if (authorization == NULL) {
505
0
        return SPLUNK_AUTH_MISSING_CRED;
506
0
    }
507
508
0
    if (authorization != NULL && auth_header->val.len > 0) {
509
0
        mk_list_foreach(head, &ctx->auth_tokens) {
510
0
            splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head);
511
0
            if (flb_sds_len(authorization) != splunk_token->length) {
512
0
                ret = SPLUNK_AUTH_UNAUTHORIZED;
513
0
                continue;
514
0
            }
515
516
0
            if (strncasecmp(splunk_token->header,
517
0
                        authorization,
518
0
                        splunk_token->length) == 0) {
519
0
                flb_sds_destroy(authorization);
520
521
0
                return SPLUNK_AUTH_SUCCESS;
522
0
            }
523
0
        }
524
525
0
        ret = SPLUNK_AUTH_UNAUTHORIZED;
526
0
        flb_sds_destroy(authorization);
527
0
        return ret;
528
0
    }
529
0
    else {
530
0
        flb_sds_destroy(authorization);
531
0
        return SPLUNK_AUTH_MISSING_CRED;
532
0
    }
533
534
0
    return SPLUNK_AUTH_SUCCESS;
535
0
}
536
537
static int handle_hec_payload(struct flb_splunk *ctx, int content_type,
538
                              flb_sds_t tag, char *buf, size_t size)
539
0
{
540
0
    int ret = -1;
541
542
0
    if (content_type == HTTP_CONTENT_JSON) {
543
0
        ret = parse_hec_payload_json(ctx, tag, buf, size);
544
0
    }
545
0
    else if (content_type == HTTP_CONTENT_TEXT) {
546
0
        ret = process_raw_payload_pack(ctx, tag, buf, size);
547
0
    }
548
0
    else if (content_type == HTTP_CONTENT_UNKNOWN) {
549
0
        if (buf[0] == '{') {
550
0
            ret = parse_hec_payload_json(ctx, tag, buf, size);
551
0
        }
552
0
        else {
553
0
            ret = process_raw_payload_pack(ctx, tag, buf, size);
554
0
        }
555
0
    }
556
557
0
    return ret;
558
0
}
559
560
static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
561
                               flb_sds_t tag,
562
                               struct mk_http_session *session,
563
                               struct mk_http_request *request)
564
0
{
565
0
    int i = 0;
566
0
    int ret = 0;
567
0
    int type = -1;
568
0
    struct mk_http_header *header;
569
0
    struct mk_http_header *header_auth;
570
0
    int extra_size = -1;
571
0
    struct mk_http_header *headers_extra;
572
0
    int gzip_compressed = FLB_FALSE;
573
0
    void *gz_data = NULL;
574
0
    size_t gz_size = -1;
575
576
0
    header = &session->parser.headers[MK_HEADER_CONTENT_TYPE];
577
0
    if (header->key.data == NULL) {
578
0
        flb_plg_debug(ctx->ins, "header 'Content-Type' is not set");
579
0
    }
580
581
0
    if (header->val.len == 16 &&
582
0
        strncasecmp(header->val.data, "application/json", 16) == 0) {
583
0
        type = HTTP_CONTENT_JSON;
584
0
    }
585
0
    else if (header->val.len == 10 &&
586
0
        strncasecmp(header->val.data, "text/plain", 10) == 0) {
587
0
        type = HTTP_CONTENT_TEXT;
588
0
    }
589
0
    else {
590
        /* Not necessary to specify content-type for Splunk HEC. */
591
0
        flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads");
592
0
        type = HTTP_CONTENT_UNKNOWN;
593
0
    }
594
595
0
    if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) {
596
0
        send_response(conn, 400, "error: no payload found\n");
597
0
        return -2;
598
0
    }
599
600
0
    header_auth = &session->parser.headers[MK_HEADER_AUTHORIZATION];
601
0
    if (header_auth->key.data != NULL) {
602
0
        if (strncasecmp(header_auth->val.data, "Splunk ", 7) == 0) {
603
0
            ctx->ingested_auth_header = header_auth->val.data;
604
0
            ctx->ingested_auth_header_len = header_auth->val.len;
605
0
        }
606
0
    }
607
608
0
    extra_size = session->parser.headers_extra_count;
609
0
    if (extra_size > 0) {
610
0
        for (i = 0; i < extra_size; i++) {
611
0
            headers_extra = &session->parser.headers_extra[i];
612
0
            if (headers_extra->key.len == 16 &&
613
0
                strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) {
614
0
                if (headers_extra->val.len == 4 &&
615
0
                    strncasecmp(headers_extra->val.data, "gzip", 4) == 0) {
616
0
                    flb_plg_debug(ctx->ins, "body is gzipped");
617
0
                    gzip_compressed = FLB_TRUE;
618
0
                }
619
0
            }
620
0
        }
621
0
    }
622
623
0
    if (gzip_compressed == FLB_TRUE) {
624
0
        ret = flb_gzip_uncompress((void *) request->data.data, request->data.len,
625
0
                                  &gz_data, &gz_size);
626
0
        if (ret == -1) {
627
0
            flb_plg_error(ctx->ins, "gzip uncompress is failed");
628
0
            return -1;
629
0
        }
630
631
0
        ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size);
632
0
        flb_free(gz_data);
633
0
    }
634
0
    else {
635
0
        ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len);
636
0
    }
637
638
0
    return 0;
639
0
}
640
641
static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
642
                                   flb_sds_t tag,
643
                                   struct mk_http_session *session,
644
                                   struct mk_http_request *request)
645
0
{
646
0
    int ret = -1;
647
0
    struct mk_http_header *header;
648
0
    struct mk_http_header *header_auth;
649
650
0
    header = &session->parser.headers[MK_HEADER_CONTENT_TYPE];
651
0
    if (header->key.data == NULL) {
652
0
        send_response(conn, 400, "error: header 'Content-Type' is not set\n");
653
0
        return -1;
654
0
    }
655
0
    else if (header->val.len != 10 ||
656
0
             strncasecmp(header->val.data, "text/plain", 10) != 0) {
657
        /* Not necessary to specify content-type for Splunk HEC. */
658
0
        flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads");
659
0
    }
660
661
0
    if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) {
662
0
        send_response(conn, 400, "2 error: no payload found\n");
663
0
        return -1;
664
0
    }
665
666
0
    header_auth = &session->parser.headers[MK_HEADER_AUTHORIZATION];
667
0
    if (header_auth->key.data != NULL) {
668
0
        if (strncasecmp(header_auth->val.data, "Splunk ", 7) == 0) {
669
0
            ctx->ingested_auth_header = header_auth->val.data;
670
0
            ctx->ingested_auth_header_len = header_auth->val.len;
671
0
        }
672
0
    }
673
674
    /* Always handle as raw type of payloads here */
675
0
    ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len);
676
677
0
    return ret;
678
0
}
679
680
static inline int mk_http_point_header(mk_ptr_t *h,
681
                                       struct mk_http_parser *parser, int key)
682
0
{
683
0
    struct mk_http_header *header;
684
685
0
    header = &parser->headers[key];
686
0
    if (header->type == key) {
687
0
        h->data = header->val.data;
688
0
        h->len  = header->val.len;
689
0
        return 0;
690
0
    }
691
0
    else {
692
0
        h->data = NULL;
693
0
        h->len  = -1;
694
0
    }
695
696
0
    return -1;
697
0
}
698
699
/*
700
 * Handle an incoming request. It perform extra checks over the request, if
701
 * everything is OK, it enqueue the incoming payload.
702
 */
703
int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
704
                       struct mk_http_session *session,
705
                       struct mk_http_request *request)
706
0
{
707
0
    int i;
708
0
    int ret;
709
0
    int len;
710
0
    char *uri;
711
0
    char *qs;
712
0
    char *original_data = NULL;
713
0
    size_t original_data_size = 0;
714
0
    char *out_chunked = NULL;
715
0
    size_t out_chunked_size = 0;
716
0
    off_t diff;
717
0
    flb_sds_t tag;
718
0
    struct mk_http_header *header;
719
720
0
    if (request->uri.data[0] != '/') {
721
0
        send_response(conn, 400, "error: invalid request\n");
722
0
        return -1;
723
0
    }
724
725
    /* Decode URI */
726
0
    uri = mk_utils_url_decode(request->uri);
727
0
    if (!uri) {
728
0
        uri = mk_mem_alloc_z(request->uri.len + 1);
729
0
        if (!uri) {
730
0
            return -1;
731
0
        }
732
0
        memcpy(uri, request->uri.data, request->uri.len);
733
0
        uri[request->uri.len] = '\0';
734
0
    }
735
736
    /* Try to match a query string so we can remove it */
737
0
    qs = strchr(uri, '?');
738
0
    if (qs) {
739
        /* remove the query string part */
740
0
        diff = qs - uri;
741
0
        uri[diff] = '\0';
742
0
    }
743
744
    /* Refer the tag at first*/
745
0
    if (ctx->ins->tag && !ctx->ins->tag_default) {
746
0
        tag = flb_sds_create(ctx->ins->tag);
747
0
        if (tag == NULL) {
748
0
            mk_mem_free(uri);
749
0
            return -1;
750
0
        }
751
0
    }
752
0
    else {
753
        /* Compose the query string using the URI */
754
0
        len = strlen(uri);
755
756
0
        if (len == 1) {
757
0
            tag = NULL; /* use default tag */
758
0
        }
759
0
        else {
760
            /* New tag skipping the URI '/' */
761
0
            tag = flb_sds_create_len(&uri[1], len - 1);
762
0
            if (!tag) {
763
0
                mk_mem_free(uri);
764
0
                return -1;
765
0
            }
766
767
            /* Sanitize, only allow alphanum chars */
768
0
            for (i = 0; i < flb_sds_len(tag); i++) {
769
0
                if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
770
0
                    tag[i] = '_';
771
0
                }
772
0
            }
773
0
        }
774
0
    }
775
776
    /* Check if we have a Host header: Hostname ; port */
777
0
    mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);
778
779
    /* Header: Connection */
780
0
    mk_http_point_header(&request->connection, &session->parser,
781
0
                         MK_HEADER_CONNECTION);
782
783
    /* HTTP/1.1 needs Host header */
784
0
    if (request->host.data == NULL && request->protocol == MK_HTTP_PROTOCOL_11) {
785
0
        flb_sds_destroy(tag);
786
0
        mk_mem_free(uri);
787
788
0
        return -1;
789
0
    }
790
791
    /* Should we close the session after this request ? */
792
0
    mk_http_keepalive_check(session, request, ctx->server);
793
794
    /* Content Length */
795
0
    header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH];
796
0
    if (header->type == MK_HEADER_CONTENT_LENGTH) {
797
0
        request->_content_length.data = header->val.data;
798
0
        request->_content_length.len  = header->val.len;
799
0
    }
800
0
    else {
801
0
        request->_content_length.data = NULL;
802
0
    }
803
804
0
    if (request->method == MK_METHOD_GET) {
805
        /* Handle health monitoring of splunk hec endpoint for load balancers */
806
0
        if (strcasecmp(uri, "/services/collector/health") == 0) {
807
0
            send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":200}");
808
0
        }
809
0
        else {
810
0
            send_response(conn, 400, "error: invalid HTTP endpoint\n");
811
0
        }
812
813
0
        flb_sds_destroy(tag);
814
0
        mk_mem_free(uri);
815
816
0
        return 0;
817
0
    }
818
819
    /* Under services/collector endpoints are required for
820
     * authentication if provided splunk_token */
821
0
    ret = validate_auth_header(ctx, request);
822
0
    if (ret < 0){
823
0
        send_response(conn, 401, "error: unauthorized\n");
824
0
        if (ret == SPLUNK_AUTH_MISSING_CRED) {
825
0
            flb_plg_warn(ctx->ins, "missing credentials in request headers");
826
0
        }
827
0
        else if (ret == SPLUNK_AUTH_UNAUTHORIZED) {
828
0
            flb_plg_warn(ctx->ins, "wrong credentials in request headers");
829
0
        }
830
831
0
        flb_sds_destroy(tag);
832
0
        mk_mem_free(uri);
833
834
0
        return -1;
835
0
    }
836
837
    /* If the request contains chunked transfer encoded data, decode it */\
838
0
    if (mk_http_parser_is_content_chunked(&session->parser)) {
839
0
        ret = mk_http_parser_chunked_decode(&session->parser,
840
0
                                            conn->buf_data,
841
0
                                            conn->buf_len,
842
0
                                            &out_chunked,
843
0
                                            &out_chunked_size);
844
0
        if (ret == -1) {
845
0
            flb_plg_error(ctx->ins, "failed to decode chunked data");
846
0
            send_response(conn, 400, "error: invalid chunked data\n");
847
848
0
            flb_sds_destroy(tag);
849
0
            mk_mem_free(uri);
850
851
0
            return -1;
852
0
        }
853
854
        /* Update the request data */
855
0
        original_data = request->data.data;
856
0
        original_data_size = request->data.len;
857
858
        /* assign the chunked one */
859
0
        request->data.data = out_chunked;
860
0
        request->data.len = out_chunked_size;
861
0
    }
862
863
    /* Handle every ingested payload cleanly */
864
0
    flb_log_event_encoder_reset(&ctx->log_encoder);
865
866
0
    if (request->method == MK_METHOD_POST) {
867
0
        if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 ||
868
0
            strcasecmp(uri, "/services/collector/raw") == 0) {
869
0
            ret = process_hec_raw_payload(ctx, conn, tag, session, request);
870
871
0
            if (!ret) {
872
0
                send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}");
873
0
            }
874
0
            send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}");
875
0
        }
876
0
        else if (strcasecmp(uri, "/services/collector/event/1.0") == 0 ||
877
0
                 strcasecmp(uri, "/services/collector/event") == 0 ||
878
0
                 strcasecmp(uri, "/services/collector") == 0) {
879
880
0
            ret = process_hec_payload(ctx, conn, tag, session, request);
881
0
            if (ret == -2) {
882
0
                flb_sds_destroy(tag);
883
0
                mk_mem_free(uri);
884
885
0
                if (out_chunked) {
886
0
                    mk_mem_free(out_chunked);
887
0
                }
888
0
                request->data.data = original_data;
889
0
                request->data.len = original_data_size;
890
891
0
                return -1;
892
0
            }
893
894
0
            if (!ret) {
895
0
                send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}");
896
0
            }
897
0
            send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}");
898
0
        }
899
0
        else {
900
0
            send_response(conn, 400, "error: invalid HTTP endpoint\n");
901
902
0
            flb_sds_destroy(tag);
903
0
            mk_mem_free(uri);
904
905
0
            if (out_chunked) {
906
0
                mk_mem_free(out_chunked);
907
0
            }
908
0
            request->data.data = original_data;
909
0
            request->data.len = original_data_size;
910
911
0
            return -1;
912
0
        }
913
0
    }
914
0
    else {
915
        /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/
916
917
0
        flb_sds_destroy(tag);
918
0
        mk_mem_free(uri);
919
920
0
        if (out_chunked) {
921
0
            mk_mem_free(out_chunked);
922
0
        }
923
0
        request->data.data = original_data;
924
0
        request->data.len = original_data_size;
925
926
0
        send_response(conn, 400, "error: invalid HTTP method\n");
927
0
        return -1;
928
0
    }
929
930
0
    flb_sds_destroy(tag);
931
0
    mk_mem_free(uri);
932
933
0
    if (out_chunked) {
934
0
        mk_mem_free(out_chunked);
935
0
    }
936
0
    request->data.data = original_data;
937
0
    request->data.len = original_data_size;
938
939
0
    return ret;
940
0
}
941
942
/*
943
 * Handle an incoming request which has resulted in an http parser error.
944
 */
945
int splunk_prot_handle_error(struct flb_splunk *ctx, struct splunk_conn *conn,
946
                             struct mk_http_session *session,
947
                             struct mk_http_request *request)
948
0
{
949
0
    send_response(conn, 400, "error: invalid request\n");
950
0
    return -1;
951
0
}
952
953
954
955
956
957
958
959
/* New gen HTTP server */
960
961
static int send_response_ng(struct flb_http_response *response,
962
                            int http_status,
963
                            char *message)
964
0
{
965
0
    flb_http_response_set_status(response, http_status);
966
967
0
    if (http_status == 201) {
968
0
        flb_http_response_set_message(response, "Created");
969
0
    }
970
0
    else if (http_status == 200) {
971
0
        flb_http_response_set_message(response, "OK");
972
0
    }
973
0
    else if (http_status == 204) {
974
0
        flb_http_response_set_message(response, "No Content");
975
0
    }
976
0
    else if (http_status == 400) {
977
0
        flb_http_response_set_message(response, "Bad Request");
978
0
    }
979
980
0
    if (message != NULL) {
981
0
        flb_http_response_set_body(response,
982
0
                                   (unsigned char *) message,
983
0
                                   strlen(message));
984
0
    }
985
986
0
    flb_http_response_commit(response);
987
988
0
    return 0;
989
0
}
990
991
static int send_json_message_response_ng(struct flb_http_response *response,
992
                                         int http_status,
993
                                         char *message)
994
0
{
995
0
    flb_http_response_set_status(response, http_status);
996
997
0
    if (http_status == 201) {
998
0
        flb_http_response_set_message(response, "Created");
999
0
    }
1000
0
    else if (http_status == 200) {
1001
0
        flb_http_response_set_message(response, "OK");
1002
0
    }
1003
0
    else if (http_status == 204) {
1004
0
        flb_http_response_set_message(response, "No Content");
1005
0
    }
1006
0
    else if (http_status == 400) {
1007
0
        flb_http_response_set_message(response, "Bad Request");
1008
0
    }
1009
1010
0
    flb_http_response_set_header(response,
1011
0
                                "content-type", 0,
1012
0
                                "application/json", 0);
1013
1014
0
    if (message != NULL) {
1015
0
        flb_http_response_set_body(response,
1016
0
                                   (unsigned char *) message,
1017
0
                                   strlen(message));
1018
0
    }
1019
1020
0
    flb_http_response_commit(response);
1021
1022
0
    return 0;
1023
0
}
1024
1025
static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request)
1026
0
{
1027
0
    struct mk_list *tmp;
1028
0
    struct mk_list *head;
1029
0
    char *auth_header;
1030
0
    struct flb_splunk_tokens *splunk_token;
1031
1032
0
    if (mk_list_size(&ctx->auth_tokens) == 0) {
1033
0
        return SPLUNK_AUTH_UNAUTH;
1034
0
    }
1035
1036
0
    auth_header = flb_http_request_get_header(request, "authorization");
1037
1038
0
    if (auth_header == NULL) {
1039
0
        return SPLUNK_AUTH_MISSING_CRED;
1040
0
    }
1041
1042
0
    if (auth_header != NULL && strlen(auth_header) > 0) {
1043
0
        mk_list_foreach_safe(head, tmp, &ctx->auth_tokens) {
1044
0
            splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head);
1045
0
            if (strlen(auth_header) != splunk_token->length) {
1046
0
                continue;
1047
0
            }
1048
1049
0
            if (strncasecmp(splunk_token->header,
1050
0
                        auth_header,
1051
0
                        splunk_token->length) == 0) {
1052
0
                return SPLUNK_AUTH_SUCCESS;
1053
0
            }
1054
0
        }
1055
1056
0
        return SPLUNK_AUTH_UNAUTHORIZED;
1057
0
    }
1058
0
    else {
1059
0
        return SPLUNK_AUTH_MISSING_CRED;
1060
0
    }
1061
1062
0
    return SPLUNK_AUTH_SUCCESS;
1063
0
}
1064
1065
static int process_hec_payload_ng(struct flb_http_request *request,
1066
                                  struct flb_http_response *response,
1067
                                  flb_sds_t tag,
1068
                                  struct flb_splunk *ctx)
1069
0
{
1070
0
    int type = -1;
1071
0
    int ret = 0;
1072
0
    size_t size = 0;
1073
0
    char *auth_header;
1074
1075
0
    type = HTTP_CONTENT_UNKNOWN;
1076
1077
0
    if (request->content_type != NULL) {
1078
0
        if (strcasecmp(request->content_type, "application/json") == 0) {
1079
0
            type = HTTP_CONTENT_JSON;
1080
0
        }
1081
0
        else if (strcasecmp(request->content_type, "text/plain") == 0) {
1082
0
            type = HTTP_CONTENT_TEXT;
1083
0
        }
1084
0
        else {
1085
            /* Not necessary to specify content-type for Splunk HEC. */
1086
0
            flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads");
1087
0
        }
1088
0
    }
1089
1090
0
    ret = flb_hash_table_get(request->headers, "authorization", 13, (void **)&auth_header, &size);
1091
0
    if (ret != 0 && size > 0) {
1092
0
        if (strncasecmp(auth_header, "Splunk ", 7) == 0) {
1093
0
            ctx->ingested_auth_header = auth_header;
1094
0
            ctx->ingested_auth_header_len = strlen(auth_header);
1095
0
        }
1096
0
    }
1097
1098
0
    if (request->body == NULL || cfl_sds_len(request->body) <= 0) {
1099
0
        send_response_ng(response, 400, "error: no payload found\n");
1100
1101
0
        return -1;
1102
0
    }
1103
1104
0
    return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body));
1105
0
}
1106
1107
static int process_hec_raw_payload_ng(struct flb_http_request *request,
1108
                                      struct flb_http_response *response,
1109
                                      flb_sds_t tag,
1110
                                      struct flb_splunk *ctx)
1111
0
{
1112
0
    int ret = 0;
1113
0
    size_t size = 0;
1114
0
    char *auth_header;
1115
1116
0
    if (request->content_type == NULL) {
1117
0
        send_response_ng(response, 400, "error: header 'Content-Type' is not set\n");
1118
1119
0
        return -1;
1120
0
    }
1121
0
    else if (strcasecmp(request->content_type, "text/plain") != 0) {
1122
        /* Not necessary to specify content-type for Splunk HEC. */
1123
0
        flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads");
1124
0
    }
1125
1126
0
    ret = flb_hash_table_get(request->headers, "authorization", 13, (void **)&auth_header, &size);
1127
0
    if (ret != 0 && size > 0) {
1128
0
        if (strncasecmp(auth_header, "Splunk ", 7) == 0) {
1129
0
            ctx->ingested_auth_header = auth_header;
1130
0
            ctx->ingested_auth_header_len = strlen(auth_header);
1131
0
        }
1132
0
    }
1133
1134
0
    if (request->body == NULL || cfl_sds_len(request->body) == 0) {
1135
0
        send_response_ng(response, 400, "error: no payload found\n");
1136
1137
0
        return -1;
1138
0
    }
1139
1140
    /* Always handle as raw type of payloads here */
1141
0
    return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body));
1142
0
}
1143
1144
int splunk_prot_handle_ng(struct flb_http_request *request,
1145
                          struct flb_http_response *response)
1146
0
{
1147
0
    struct flb_splunk *context;
1148
0
    int                ret = -1;
1149
0
    flb_sds_t          tag;
1150
1151
0
    context = (struct flb_splunk *) response->stream->user_data;
1152
1153
0
    if (request->path[0] != '/') {
1154
0
        send_response_ng(response, 400, "error: invalid request\n");
1155
0
        return -1;
1156
0
    }
1157
1158
    /* HTTP/1.1 needs Host header */
1159
0
    if (request->protocol_version == HTTP_PROTOCOL_VERSION_11 &&
1160
0
        request->host == NULL) {
1161
1162
0
        return -1;
1163
0
    }
1164
1165
0
    if (request->method == HTTP_METHOD_GET) {
1166
        /* Handle health monitoring of splunk hec endpoint for load balancers */
1167
0
        if (strcasecmp(request->path, "/services/collector/health") == 0) {
1168
0
            send_json_message_response_ng(response, 200, "{\"text\":\"Success\",\"code\":200}");
1169
0
        }
1170
0
        else {
1171
0
            send_response_ng(response, 400, "error: invalid HTTP endpoint\n");
1172
0
        }
1173
1174
0
        return 0;
1175
0
    }
1176
1177
    /* Under services/collector endpoints are required for
1178
     * authentication if provided splunk_token */
1179
0
    ret = validate_auth_header_ng(context, request);
1180
1181
0
    if (ret < 0) {
1182
0
        send_response_ng(response, 401, "error: unauthorized\n");
1183
1184
0
        if (ret == SPLUNK_AUTH_MISSING_CRED) {
1185
0
            flb_plg_warn(context->ins, "missing credentials in request headers");
1186
0
        }
1187
0
        else if (ret == SPLUNK_AUTH_UNAUTHORIZED) {
1188
0
            flb_plg_warn(context->ins, "wrong credentials in request headers");
1189
0
        }
1190
1191
0
        return -1;
1192
0
    }
1193
1194
    /* Handle every ingested payload cleanly */
1195
0
    flb_log_event_encoder_reset(&context->log_encoder);
1196
1197
0
    if (request->method != HTTP_METHOD_POST) {
1198
        /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/
1199
0
        send_response_ng(response, 400, "error: invalid HTTP method\n");
1200
1201
0
        return -1;
1202
0
    }
1203
1204
0
    tag = flb_sds_create(context->ins->tag);
1205
1206
0
    if (tag == NULL) {
1207
0
        return -1;
1208
0
    }
1209
1210
0
    if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 ||
1211
0
        strcasecmp(request->path, "/services/collector/raw") == 0) {
1212
0
        ret = process_hec_raw_payload_ng(request, response, tag, context);
1213
0
        if (ret != 0) {
1214
0
            send_json_message_response_ng(response, 400, "{\"text\":\"Invalid data format\",\"code\":6}");
1215
0
            ret = -1;
1216
0
        }
1217
0
        else {
1218
0
            send_json_message_response_ng(response, 200, "{\"text\":\"Success\",\"code\":0}");
1219
0
            ret = 0;
1220
0
        }
1221
0
    }
1222
0
    else if (strcasecmp(request->path, "/services/collector/event/1.0") == 0 ||
1223
0
             strcasecmp(request->path, "/services/collector/event") == 0 ||
1224
0
             strcasecmp(request->path, "/services/collector") == 0) {
1225
0
        ret = process_hec_payload_ng(request, response, tag, context);
1226
0
        if (ret != 0) {
1227
0
            send_json_message_response_ng(response, 400, "{\"text\":\"Invalid data format\",\"code\":6}");
1228
0
            ret = -1;
1229
0
        }
1230
0
        else {
1231
0
            send_json_message_response_ng(response, 200, "{\"text\":\"Success\",\"code\":0}");
1232
0
            ret = 0;
1233
0
        }
1234
0
    }
1235
0
    else {
1236
0
        send_response_ng(response, 400, "error: invalid HTTP endpoint\n");
1237
0
        ret = -1;
1238
0
    }
1239
1240
0
    flb_sds_destroy(tag);
1241
0
    return ret;
1242
0
}