Coverage Report

Created: 2023-11-19 07:36

/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2023 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_version.h>
22
#include <fluent-bit/flb_error.h>
23
#include <fluent-bit/flb_pack.h>
24
#include <fluent-bit/flb_gzip.h>
25
26
#include <monkey/monkey.h>
27
#include <monkey/mk_core.h>
28
29
#include "in_elasticsearch.h"
30
#include "in_elasticsearch_bulk_conn.h"
31
#include "in_elasticsearch_bulk_prot.h"
32
33
0
#define HTTP_CONTENT_JSON   0
34
0
#define HTTP_CONTENT_NDJSON 1
35
36
static int send_empty_response(struct in_elasticsearch_bulk_conn *conn, int http_status)
37
0
{
38
0
    size_t    sent;
39
0
    flb_sds_t out;
40
41
0
    out = flb_sds_create_size(256);
42
0
    if (!out) {
43
0
        return -1;
44
0
    }
45
46
0
    if (http_status == 200) {
47
0
        flb_sds_printf(&out,
48
0
                       "HTTP/1.1 200 OK\r\n"
49
0
                       "Content-Type: application/json\r\n\r\n");
50
0
    }
51
52
    /* We should check this operations result */
53
0
    flb_io_net_write(conn->connection,
54
0
                     (void *) out,
55
0
                     flb_sds_len(out),
56
0
                     &sent);
57
58
0
    flb_sds_destroy(out);
59
60
0
    return 0;
61
0
}
62
63
static int send_json_message_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message)
64
0
{
65
0
    size_t    sent;
66
0
    int       len;
67
0
    flb_sds_t out;
68
69
0
    out = flb_sds_create_size(256);
70
0
    if (!out) {
71
0
        return -1;
72
0
    }
73
74
0
    if (message) {
75
0
        len = strlen(message);
76
0
    }
77
0
    else {
78
0
        len = 0;
79
0
    }
80
81
0
    if (http_status == 200) {
82
0
        flb_sds_printf(&out,
83
0
                       "HTTP/1.1 200 OK\r\n"
84
0
                       "Content-Type: application/json\r\n"
85
0
                       "Content-Length: %i\r\n\r\n%s",
86
0
                       len, message);
87
0
    }
88
89
    /* We should check this operations result */
90
0
    flb_io_net_write(conn->connection,
91
0
                     (void *) out,
92
0
                     flb_sds_len(out),
93
0
                     &sent);
94
95
0
    flb_sds_destroy(out);
96
97
0
    return 0;
98
0
}
99
100
static int send_version_message_response(struct flb_in_elasticsearch *ctx,
101
                                         struct in_elasticsearch_bulk_conn *conn, int http_status)
102
0
{
103
0
    size_t    sent;
104
0
    int       len;
105
0
    flb_sds_t out;
106
0
    flb_sds_t resp;
107
108
0
    out = flb_sds_create_size(256);
109
0
    if (!out) {
110
0
        return -1;
111
0
    }
112
0
    resp = flb_sds_create_size(384);
113
0
    if (!resp) {
114
0
        flb_sds_destroy(out);
115
0
        return -1;
116
0
    }
117
118
0
    flb_sds_printf(&resp,
119
0
                   ES_VERSION_RESPONSE_TEMPLATE,
120
0
                   ctx->es_version);
121
122
0
    len = flb_sds_len(resp);
123
124
0
    if (http_status == 200) {
125
0
        flb_sds_printf(&out,
126
0
                       "HTTP/1.1 200 OK\r\n"
127
0
                       "Content-Type: application/json\r\n"
128
0
                       "Content-Length: %i\r\n\r\n%s",
129
0
                       len, resp);
130
0
    }
131
132
    /* We should check this operations result */
133
0
    flb_io_net_write(conn->connection,
134
0
                     (void *) out,
135
0
                     flb_sds_len(out),
136
0
                     &sent);
137
138
0
    flb_sds_destroy(resp);
139
0
    flb_sds_destroy(out);
140
141
0
    return 0;
142
0
}
143
144
static int send_dummy_sniffer_response(struct in_elasticsearch_bulk_conn *conn, int http_status,
145
                                       struct flb_in_elasticsearch *ctx)
146
0
{
147
0
    size_t    sent;
148
0
    int       len;
149
0
    flb_sds_t out;
150
0
    flb_sds_t resp;
151
0
    flb_sds_t hostname;
152
153
0
    if (ctx->hostname != NULL) {
154
0
        hostname = ctx->hostname;
155
0
    }
156
0
    else {
157
0
        hostname = "localhost";
158
0
    }
159
160
0
    out = flb_sds_create_size(384);
161
0
    if (!out) {
162
0
        return -1;
163
0
    }
164
165
0
    resp = flb_sds_create_size(384);
166
0
    if (!resp) {
167
0
        flb_sds_destroy(out);
168
0
        return -1;
169
0
    }
170
171
0
    flb_sds_printf(&resp,
172
0
                   ES_NODES_TEMPLATE,
173
0
                   ctx->cluster_name, ctx->node_name,
174
0
                   hostname, ctx->tcp_port, ctx->buffer_max_size);
175
176
0
    len = flb_sds_len(resp) ;
177
178
0
    if (http_status == 200) {
179
0
        flb_sds_printf(&out,
180
0
                       "HTTP/1.1 200 OK\r\n"
181
0
                       "Content-Type: application/json\r\n"
182
0
                       "Content-Length: %i\r\n\r\n%s",
183
0
                       len, resp);
184
0
    }
185
186
    /* We should check this operations result */
187
0
    flb_io_net_write(conn->connection,
188
0
                     (void *) out,
189
0
                     flb_sds_len(out),
190
0
                     &sent);
191
192
0
    flb_sds_destroy(resp);
193
0
    flb_sds_destroy(out);
194
195
0
    return 0;
196
0
}
197
198
static int send_response(struct in_elasticsearch_bulk_conn *conn, int http_status, char *message)
199
0
{
200
0
    size_t    sent;
201
0
    int       len;
202
0
    flb_sds_t out;
203
204
0
    out = flb_sds_create_size(256);
205
0
    if (!out) {
206
0
        return -1;
207
0
    }
208
209
0
    if (message) {
210
0
        len = strlen(message);
211
0
    }
212
0
    else {
213
0
        len = 0;
214
0
    }
215
216
0
    if (http_status == 200) {
217
0
        flb_sds_printf(&out,
218
0
                       "HTTP/1.1 200 OK\r\n"
219
0
                       "Server: Fluent Bit v%s\r\n"
220
0
                       "Content-Type: application/json\r\n"
221
0
                       "Content-Length: %i\r\n\r\n%s",
222
0
                       FLB_VERSION_STR,
223
0
                       len, message);
224
0
    }
225
0
    else if (http_status == 400) {
226
0
        flb_sds_printf(&out,
227
0
                       "HTTP/1.1 400 Forbidden\r\n"
228
0
                       "Server: Fluent Bit v%s\r\n"
229
0
                       "Content-Length: %i\r\n\r\n%s",
230
0
                       FLB_VERSION_STR,
231
0
                       len, message);
232
0
    }
233
234
    /* We should check this operations result */
235
0
    flb_io_net_write(conn->connection,
236
0
                     (void *) out,
237
0
                     flb_sds_len(out),
238
0
                     &sent);
239
240
0
    flb_sds_destroy(out);
241
242
0
    return 0;
243
0
}
244
245
/* implements functionality to get tag from key in record */
246
static flb_sds_t tag_key(struct flb_in_elasticsearch *ctx, msgpack_object *map)
247
0
{
248
0
    size_t map_size = map->via.map.size;
249
0
    msgpack_object_kv *kv;
250
0
    msgpack_object  key;
251
0
    msgpack_object  val;
252
0
    char *key_str = NULL;
253
0
    char *val_str = NULL;
254
0
    size_t key_str_size = 0;
255
0
    size_t val_str_size = 0;
256
0
    int j;
257
0
    int check = FLB_FALSE;
258
0
    int found = FLB_FALSE;
259
0
    flb_sds_t tag;
260
261
0
    kv = map->via.map.ptr;
262
263
0
    for(j=0; j < map_size; j++) {
264
0
        check = FLB_FALSE;
265
0
        found = FLB_FALSE;
266
0
        key = (kv+j)->key;
267
0
        if (key.type == MSGPACK_OBJECT_BIN) {
268
0
            key_str  = (char *) key.via.bin.ptr;
269
0
            key_str_size = key.via.bin.size;
270
0
            check = FLB_TRUE;
271
0
        }
272
0
        if (key.type == MSGPACK_OBJECT_STR) {
273
0
            key_str  = (char *) key.via.str.ptr;
274
0
            key_str_size = key.via.str.size;
275
0
            check = FLB_TRUE;
276
0
        }
277
278
0
        if (check == FLB_TRUE) {
279
0
            if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) {
280
0
                val = (kv+j)->val;
281
0
                if (val.type == MSGPACK_OBJECT_BIN) {
282
0
                    val_str  = (char *) val.via.bin.ptr;
283
0
                    val_str_size = val.via.str.size;
284
0
                    found = FLB_TRUE;
285
0
                    break;
286
0
                }
287
0
                if (val.type == MSGPACK_OBJECT_STR) {
288
0
                    val_str  = (char *) val.via.str.ptr;
289
0
                    val_str_size = val.via.str.size;
290
0
                    found = FLB_TRUE;
291
0
                    break;
292
0
                }
293
0
            }
294
0
        }
295
0
    }
296
297
0
    if (found == FLB_TRUE) {
298
0
        tag = flb_sds_create_len(val_str, val_str_size);
299
0
        if (!tag) {
300
0
            flb_errno();
301
0
            return NULL;
302
0
        }
303
0
        return tag;
304
0
    }
305
306
307
0
    flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key);
308
0
    return NULL;
309
0
}
310
311
static int get_write_op(struct flb_in_elasticsearch *ctx, msgpack_object *map, flb_sds_t *out_write_op, size_t *out_key_size)
312
0
{
313
0
    char *op_str = NULL;
314
0
    size_t op_str_size = 0;
315
0
    msgpack_object_kv *kv;
316
0
    msgpack_object key;
317
0
    int check = FLB_FALSE;
318
319
0
    kv = map->via.map.ptr;
320
0
    key = kv[0].key;
321
0
    if (key.type == MSGPACK_OBJECT_BIN) {
322
0
        op_str  = (char *) key.via.bin.ptr;
323
0
        op_str_size = key.via.bin.size;
324
0
        check = FLB_TRUE;
325
0
    }
326
0
    if (key.type == MSGPACK_OBJECT_STR) {
327
0
        op_str  = (char *) key.via.str.ptr;
328
0
        op_str_size = key.via.str.size;
329
0
        check = FLB_TRUE;
330
0
    }
331
332
0
    if (check == FLB_TRUE) {
333
0
        *out_write_op = flb_sds_create_len(op_str, op_str_size);
334
0
        *out_key_size = op_str_size;
335
0
    }
336
337
0
    return check;
338
0
}
339
340
static int status_buffer_avail(struct flb_in_elasticsearch *ctx, flb_sds_t bulk_statuses, size_t threshold)
341
0
{
342
0
    if (flb_sds_avail(bulk_statuses) < threshold) {
343
0
        flb_plg_warn(ctx->ins, "left buffer for bulk status(es) is too small");
344
345
0
        return FLB_FALSE;
346
0
    }
347
348
0
    return FLB_TRUE;
349
0
}
350
351
static int process_ndpack(struct flb_in_elasticsearch *ctx, flb_sds_t tag, char *buf, size_t size, flb_sds_t bulk_statuses)
352
0
{
353
0
    int ret;
354
0
    size_t off = 0;
355
0
    size_t map_copy_index;
356
0
    msgpack_object_kv *map_copy_entry;
357
0
    msgpack_unpacked result;
358
0
    struct flb_time tm;
359
0
    msgpack_object *obj;
360
0
    flb_sds_t tag_from_record = NULL;
361
0
    int idx = 0;
362
0
    flb_sds_t write_op;
363
0
    size_t op_str_size = 0;
364
0
    int op_ret = FLB_FALSE;
365
0
    int error_op = FLB_FALSE;
366
367
0
    flb_time_get(&tm);
368
369
0
    msgpack_unpacked_init(&result);
370
0
    while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) {
371
0
        if (result.data.type == MSGPACK_OBJECT_MAP) {
372
0
            if (idx > 0 && idx % 2 == 0) {
373
0
                flb_sds_cat(bulk_statuses, ",", 1);
374
0
            }
375
0
            if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) {
376
0
                break;
377
0
            }
378
0
            if (idx % 2 == 0) {
379
0
                op_ret = get_write_op(ctx, &result.data, &write_op, &op_str_size);
380
381
0
                if (op_ret) {
382
0
                    if (flb_sds_cmp(write_op, "index", op_str_size) == 0) {
383
0
                        flb_sds_cat(bulk_statuses, "{\"index\":", 9);
384
0
                        error_op = FLB_FALSE;
385
0
                    }
386
0
                    else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) {
387
0
                        flb_sds_cat(bulk_statuses, "{\"create\":", 10);
388
0
                        error_op = FLB_FALSE;
389
0
                    }
390
0
                    else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) {
391
0
                        flb_sds_cat(bulk_statuses, "{\"update\":", 10);
392
0
                        error_op = FLB_TRUE;
393
0
                    }
394
0
                    else if (flb_sds_cmp(write_op, "delete", op_str_size) == 0) {
395
0
                        flb_sds_cat(bulk_statuses, "{\"delete\":{\"status\":404,\"result\":\"not_found\"}}", 46);
396
0
                        error_op = FLB_TRUE;
397
0
                        idx += 1; /* Prepare to adjust to multiple of two
398
                                   * in the end of the loop.
399
                                   * Due to delete actions include only one line. */
400
0
                        flb_sds_destroy(write_op);
401
402
0
                        goto proceed;
403
0
                    }
404
0
                    else {
405
0
                        flb_sds_cat(bulk_statuses, "{\"unknown\":{\"status\":400,\"result\":\"bad_request\"}}", 49);
406
0
                        error_op = FLB_TRUE;
407
408
0
                        flb_sds_destroy(write_op);
409
410
0
                        break;
411
0
                    }
412
0
                } else {
413
0
                    flb_sds_destroy(write_op);
414
0
                    flb_plg_error(ctx->ins, "meta information line is missing");
415
0
                    error_op = FLB_TRUE;
416
417
0
                    break;
418
0
                }
419
420
0
                if (error_op == FLB_FALSE) {
421
0
                    flb_log_event_encoder_reset(&ctx->log_encoder);
422
423
0
                    ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
424
425
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
426
0
                        flb_sds_destroy(write_op);
427
0
                        flb_plg_error(ctx->ins, "event encoder error : %d", ret);
428
0
                        error_op = FLB_TRUE;
429
430
0
                        break;
431
0
                    }
432
433
0
                    ret = flb_log_event_encoder_set_timestamp(
434
0
                            &ctx->log_encoder,
435
0
                            &tm);
436
437
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
438
0
                        flb_sds_destroy(write_op);
439
0
                        flb_plg_error(ctx->ins, "event encoder error : %d", ret);
440
0
                        error_op = FLB_TRUE;
441
442
0
                        break;
443
0
                    }
444
445
0
                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
446
0
                        ret = flb_log_event_encoder_append_body_values(
447
0
                                &ctx->log_encoder,
448
0
                                FLB_LOG_EVENT_CSTRING_VALUE((char *) ctx->meta_key),
449
0
                                FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&result.data));
450
0
                    }
451
452
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
453
0
                        flb_sds_destroy(write_op);
454
0
                        flb_plg_error(ctx->ins, "event encoder error : %d", ret);
455
0
                        error_op = FLB_TRUE;
456
457
0
                        break;
458
0
                    }
459
0
                }
460
0
            }
461
0
            else if (idx % 2 == 1) {
462
0
                if (error_op == FLB_FALSE) {
463
                    /* Pack body */
464
465
0
                    for (map_copy_index = 0 ;
466
0
                         map_copy_index < result.data.via.map.size &&
467
0
                         ret == FLB_EVENT_ENCODER_SUCCESS ;
468
0
                         map_copy_index++) {
469
0
                        map_copy_entry = &result.data.via.map.ptr[map_copy_index];
470
471
0
                        ret = flb_log_event_encoder_append_body_values(
472
0
                                &ctx->log_encoder,
473
0
                                FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&map_copy_entry->key),
474
0
                                FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&map_copy_entry->val));
475
0
                    }
476
477
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
478
0
                        flb_plg_error(ctx->ins, "event encoder error : %d", ret);
479
0
                        error_op = FLB_TRUE;
480
481
0
                        break;
482
0
                    }
483
484
0
                    ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);
485
486
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
487
0
                        flb_plg_error(ctx->ins, "event encoder error : %d", ret);
488
0
                        error_op = FLB_TRUE;
489
490
0
                        break;
491
0
                    }
492
493
0
                    tag_from_record = NULL;
494
495
0
                    if (ctx->tag_key) {
496
0
                        obj = &result.data;
497
0
                        tag_from_record = tag_key(ctx, obj);
498
0
                    }
499
500
0
                    if (tag_from_record) {
501
0
                        flb_input_log_append(ctx->ins,
502
0
                                             tag_from_record,
503
0
                                             flb_sds_len(tag_from_record),
504
0
                                             ctx->log_encoder.output_buffer,
505
0
                                             ctx->log_encoder.output_length);
506
507
0
                        flb_sds_destroy(tag_from_record);
508
0
                    }
509
0
                    else if (tag) {
510
0
                        flb_input_log_append(ctx->ins,
511
0
                                             tag,
512
0
                                             flb_sds_len(tag),
513
0
                                             ctx->log_encoder.output_buffer,
514
0
                                             ctx->log_encoder.output_length);
515
0
                    }
516
0
                    else {
517
                        /* use default plugin Tag (it internal name, e.g: http.0 */
518
0
                        flb_input_log_append(ctx->ins, NULL, 0,
519
0
                                             ctx->log_encoder.output_buffer,
520
0
                                             ctx->log_encoder.output_length);
521
0
                    }
522
523
0
                    flb_log_event_encoder_reset(&ctx->log_encoder);
524
0
                }
525
0
                if (op_ret) {
526
0
                    if (flb_sds_cmp(write_op, "index", op_str_size) == 0) {
527
0
                        flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34);
528
0
                    }
529
0
                    else if (flb_sds_cmp(write_op, "create", op_str_size) == 0) {
530
0
                        flb_sds_cat(bulk_statuses, "{\"status\":201,\"result\":\"created\"}}", 34);
531
0
                    }
532
0
                    else if (flb_sds_cmp(write_op, "update", op_str_size) == 0) {
533
0
                        flb_sds_cat(bulk_statuses, "{\"status\":403,\"result\":\"forbidden\"}}", 36);
534
0
                    }
535
0
                    if (status_buffer_avail(ctx, bulk_statuses, 50) == FLB_FALSE) {
536
0
                        flb_sds_destroy(write_op);
537
538
0
                        break;
539
0
                    }
540
0
                }
541
0
                flb_sds_destroy(write_op);
542
0
            }
543
544
0
        proceed:
545
0
            idx++;
546
0
        }
547
0
        else {
548
0
            flb_plg_error(ctx->ins, "skip record from invalid type: %i",
549
0
                         result.data.type);
550
0
            msgpack_unpacked_destroy(&result);
551
0
            return -1;
552
0
        }
553
0
    }
554
555
0
    if (idx % 2 != 0) {
556
0
        flb_plg_warn(ctx->ins, "decode payload of Bulk API is failed");
557
0
        msgpack_unpacked_destroy(&result);
558
0
        if (error_op == FLB_FALSE) {
559
            /* On lacking of body case in non-error case, there is no
560
             * releasing memory code paths. We should proceed to do
561
             * it here. */
562
0
            flb_sds_destroy(write_op);
563
0
        }
564
565
0
        return -1;
566
0
    }
567
568
0
    msgpack_unpacked_destroy(&result);
569
570
0
    return 0;
571
0
}
572
573
static ssize_t parse_payload_ndjson(struct flb_in_elasticsearch *ctx, flb_sds_t tag,
574
                                    char *payload, size_t size, flb_sds_t bulk_statuses)
575
0
{
576
0
    int ret;
577
0
    int out_size;
578
0
    char *pack;
579
0
    struct flb_pack_state pack_state;
580
581
    /* Initialize packer */
582
0
    flb_pack_state_init(&pack_state);
583
584
    /* Pack JSON as msgpack */
585
0
    ret = flb_pack_json_state(payload, size,
586
0
                              &pack, &out_size, &pack_state);
587
0
    flb_pack_state_reset(&pack_state);
588
589
    /* Handle exceptions */
590
0
    if (ret == FLB_ERR_JSON_PART) {
591
0
        flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
592
0
        return -1;
593
0
    }
594
0
    else if (ret == FLB_ERR_JSON_INVAL) {
595
0
        flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
596
0
        return -1;
597
0
    }
598
0
    else if (ret == -1) {
599
0
        return -1;
600
0
    }
601
602
    /* Process the packaged JSON and return the last byte used */
603
0
    process_ndpack(ctx, tag, pack, out_size, bulk_statuses);
604
0
    flb_free(pack);
605
606
0
    return 0;
607
0
}
608
609
static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticsearch_bulk_conn *conn,
610
                           flb_sds_t tag,
611
                           struct mk_http_session *session,
612
                           struct mk_http_request *request,
613
                           flb_sds_t bulk_statuses)
614
0
{
615
0
    int type = -1;
616
0
    int i = 0;
617
0
    int ret = 0;
618
0
    struct mk_http_header *header;
619
0
    int extra_size = -1;
620
0
    struct mk_http_header *headers_extra;
621
0
    int gzip_compressed = FLB_FALSE;
622
0
    void *gz_data = NULL;
623
0
    size_t gz_size = -1;
624
625
0
    header = &session->parser.headers[MK_HEADER_CONTENT_TYPE];
626
0
    if (header->key.data == NULL) {
627
0
        send_response(conn, 400, "error: header 'Content-Type' is not set\n");
628
0
        return -1;
629
0
    }
630
631
0
    if (header->val.len >= 20 &&
632
0
        strncasecmp(header->val.data, "application/x-ndjson", 20) == 0) {
633
0
        type = HTTP_CONTENT_NDJSON;
634
0
    }
635
636
0
    if (header->val.len >= 16 &&
637
0
        strncasecmp(header->val.data, "application/json", 16) == 0) {
638
0
        type = HTTP_CONTENT_JSON;
639
0
    }
640
641
0
    if (type == -1) {
642
0
        send_response(conn, 400, "error: invalid 'Content-Type'\n");
643
0
        return -1;
644
0
    }
645
646
0
    if (request->data.len <= 0) {
647
0
        send_response(conn, 400, "error: no payload found\n");
648
0
        return -1;
649
0
    }
650
651
0
    extra_size = session->parser.headers_extra_count;
652
0
    if (extra_size > 0) {
653
0
        for (i = 0; i < extra_size; i++) {
654
0
            headers_extra = &session->parser.headers_extra[i];
655
0
            if (headers_extra->key.len == 16 &&
656
0
                strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) {
657
0
                if (headers_extra->val.len == 4 &&
658
0
                    strncasecmp(headers_extra->val.data, "gzip", 4) == 0) {
659
0
                    flb_debug("[elasticsearch_bulk_prot] body is gzipped");
660
0
                    gzip_compressed = FLB_TRUE;
661
0
                }
662
0
            }
663
0
        }
664
0
    }
665
666
0
    if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) {
667
0
        if (gzip_compressed == FLB_TRUE) {
668
0
            ret = flb_gzip_uncompress((void *) request->data.data, request->data.len,
669
0
                                      &gz_data, &gz_size);
670
0
            if (ret == -1) {
671
0
                flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed");
672
0
                return -1;
673
0
            }
674
0
            parse_payload_ndjson(ctx, tag, gz_data, gz_size, bulk_statuses);
675
0
            flb_free(gz_data);
676
0
        }
677
0
        else {
678
0
            parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses);
679
0
        }
680
0
    }
681
682
0
    return 0;
683
0
}
684
685
static inline int mk_http_point_header(mk_ptr_t *h,
686
                                       struct mk_http_parser *parser, int key)
687
0
{
688
0
    struct mk_http_header *header;
689
690
0
    header = &parser->headers[key];
691
0
    if (header->type == key) {
692
0
        h->data = header->val.data;
693
0
        h->len  = header->val.len;
694
0
        return 0;
695
0
    }
696
0
    else {
697
0
        h->data = NULL;
698
0
        h->len  = -1;
699
0
    }
700
701
0
    return -1;
702
0
}
703
704
/*
705
 * Handle an incoming request. It perform extra checks over the request, if
706
 * everything is OK, it enqueue the incoming payload.
707
 */
708
int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx,
709
                                      struct in_elasticsearch_bulk_conn *conn,
710
                                      struct mk_http_session *session,
711
                                      struct mk_http_request *request)
712
0
{
713
0
    int i;
714
0
    int ret;
715
0
    int len;
716
0
    char *uri;
717
0
    char *qs;
718
0
    off_t diff;
719
0
    flb_sds_t tag;
720
0
    struct mk_http_header *header;
721
0
    flb_sds_t bulk_statuses = NULL;
722
0
    flb_sds_t bulk_response = NULL;
723
0
    char *error_str = NULL;
724
725
0
    if (request->uri.data[0] != '/') {
726
0
        send_response(conn, 400, "error: invalid request\n");
727
0
        return -1;
728
0
    }
729
730
    /* Decode URI */
731
0
    uri = mk_utils_url_decode(request->uri);
732
0
    if (!uri) {
733
0
        uri = mk_mem_alloc_z(request->uri.len + 1);
734
0
        if (!uri) {
735
0
            return -1;
736
0
        }
737
0
        memcpy(uri, request->uri.data, request->uri.len);
738
0
        uri[request->uri.len] = '\0';
739
0
    }
740
741
    /* Try to match a query string so we can remove it */
742
0
    qs = strchr(uri, '?');
743
0
    if (qs) {
744
        /* remove the query string part */
745
0
        diff = qs - uri;
746
0
        uri[diff] = '\0';
747
0
    }
748
749
    /* Refer the tag at first*/
750
0
    if (ctx->ins->tag && !ctx->ins->tag_default) {
751
0
        tag = flb_sds_create(ctx->ins->tag);
752
0
        if (tag == NULL) {
753
0
            return -1;
754
0
        }
755
0
    }
756
0
    else {
757
        /* Compose the query string using the URI */
758
0
        len = strlen(uri);
759
760
0
        if (len == 1) {
761
0
            tag = NULL; /* use default tag */
762
0
        }
763
0
        else {
764
            /* New tag skipping the URI '/' */
765
0
            tag = flb_sds_create_len(&uri[1], len - 1);
766
0
            if (!tag) {
767
0
                mk_mem_free(uri);
768
0
                return -1;
769
0
            }
770
771
            /* Sanitize, only allow alphanum chars */
772
0
            for (i = 0; i < flb_sds_len(tag); i++) {
773
0
                if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
774
0
                    tag[i] = '_';
775
0
                }
776
0
            }
777
0
        }
778
0
    }
779
780
    /* Check if we have a Host header: Hostname ; port */
781
0
    mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);
782
783
    /* Header: Connection */
784
0
    mk_http_point_header(&request->connection, &session->parser,
785
0
                         MK_HEADER_CONNECTION);
786
787
    /* HTTP/1.1 needs Host header */
788
0
    if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) {
789
0
        flb_sds_destroy(tag);
790
0
        mk_mem_free(uri);
791
0
        return -1;
792
0
    }
793
794
    /* Should we close the session after this request ? */
795
0
    mk_http_keepalive_check(session, request, ctx->server);
796
797
    /* Content Length */
798
0
    header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH];
799
0
    if (header->type == MK_HEADER_CONTENT_LENGTH) {
800
0
        request->_content_length.data = header->val.data;
801
0
        request->_content_length.len  = header->val.len;
802
0
    }
803
0
    else {
804
0
        request->_content_length.data = NULL;
805
0
    }
806
807
0
    if (request->method == MK_METHOD_HEAD) {
808
0
        send_empty_response(conn, 200);
809
810
0
        flb_sds_destroy(tag);
811
0
        mk_mem_free(uri);
812
813
0
        return 0;
814
0
    }
815
816
0
    if (request->method == MK_METHOD_PUT) {
817
0
        send_json_message_response(conn, 200, "{}");
818
819
0
        flb_sds_destroy(tag);
820
0
        mk_mem_free(uri);
821
822
0
        return 0;
823
0
    }
824
825
0
    if (request->method == MK_METHOD_GET) {
826
0
        if (strncmp(uri, "/_nodes/http", 12) == 0) {
827
0
            send_dummy_sniffer_response(conn, 200, ctx);
828
0
        }
829
0
        else if (strlen(uri) == 1 && strncmp(uri, "/", 1) == 0) {
830
0
            send_version_message_response(ctx, conn, 200);
831
0
        }
832
0
        else {
833
0
            send_json_message_response(conn, 200, "{}");
834
0
        }
835
836
0
        flb_sds_destroy(tag);
837
0
        mk_mem_free(uri);
838
839
0
        return 0;
840
0
    }
841
842
0
    if (request->method == MK_METHOD_POST) {
843
0
        if (strncmp(uri, "/_bulk", 6) == 0) {
844
0
            bulk_statuses = flb_sds_create_size(ctx->buffer_max_size);
845
0
            if (!bulk_statuses) {
846
0
                flb_sds_destroy(tag);
847
0
                mk_mem_free(uri);
848
0
                return -1;
849
0
            }
850
851
0
            bulk_response = flb_sds_create_size(ctx->buffer_max_size);
852
0
            if (!bulk_response) {
853
0
                flb_sds_destroy(bulk_statuses);
854
0
                flb_sds_destroy(tag);
855
0
                mk_mem_free(uri);
856
0
                return -1;
857
0
            }
858
0
        } else {
859
0
            flb_sds_destroy(tag);
860
0
            mk_mem_free(uri);
861
862
0
            send_response(conn, 400, "error: invaild HTTP endpoint\n");
863
864
0
            return -1;
865
0
        }
866
0
    }
867
868
0
    if (request->method != MK_METHOD_POST &&
869
0
        request->method != MK_METHOD_GET &&
870
0
        request->method != MK_METHOD_HEAD &&
871
0
        request->method != MK_METHOD_PUT) {
872
873
0
        if (bulk_statuses) {
874
0
            flb_sds_destroy(bulk_statuses);
875
0
        }
876
0
        if (bulk_response) {
877
0
            flb_sds_destroy(bulk_response);
878
0
        }
879
880
0
        flb_sds_destroy(tag);
881
0
        mk_mem_free(uri);
882
883
0
        send_response(conn, 400, "error: invalid HTTP method\n");
884
0
        return -1;
885
0
    }
886
887
0
    ret = process_payload(ctx, conn, tag, session, request, bulk_statuses);
888
0
    flb_sds_destroy(tag);
889
890
0
    len = flb_sds_len(bulk_statuses);
891
0
    if (flb_sds_alloc(bulk_response) < len + 27) {
892
0
        bulk_response = flb_sds_increase(bulk_response, len + 27 - flb_sds_alloc(bulk_response));
893
0
    }
894
0
    error_str = strstr(bulk_statuses, "\"status\":40");
895
0
    if (error_str){
896
0
        flb_sds_cat(bulk_response, "{\"errors\":true,\"items\":[", 24);
897
0
    }
898
0
    else {
899
0
        flb_sds_cat(bulk_response, "{\"errors\":false,\"items\":[", 25);
900
0
    }
901
0
    flb_sds_cat(bulk_response, bulk_statuses, flb_sds_len(bulk_statuses));
902
0
    flb_sds_cat(bulk_response, "]}", 2);
903
0
    send_response(conn, 200, bulk_response);
904
905
0
    mk_mem_free(uri);
906
0
    flb_sds_destroy(bulk_statuses);
907
0
    flb_sds_destroy(bulk_response);
908
909
0
    return ret;
910
0
}
911
912
/*
913
 * Handle an incoming request which has resulted in an http parser error.
914
 */
915
int in_elasticsearch_bulk_prot_handle_error(struct flb_in_elasticsearch *ctx,
916
                                            struct in_elasticsearch_bulk_conn *conn,
917
                                            struct mk_http_session *session,
918
                                            struct mk_http_request *request)
919
0
{
920
0
    send_response(conn, 400, "error: invalid request\n");
921
0
    return -1;
922
0
}