Coverage Report

Created: 2023-03-10 07:33

/src/fluent-bit/plugins/in_opentelemetry/opentelemetry_prot.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_version.h>
22
#include <fluent-bit/flb_error.h>
23
#include <fluent-bit/flb_pack.h>
24
25
#include <monkey/monkey.h>
26
#include <monkey/mk_core.h>
27
#include <cmetrics/cmt_decode_opentelemetry.h>
28
29
#include <fluent-otel-proto/fluent-otel.h>
30
#include "opentelemetry.h"
31
#include "http_conn.h"
32
33
#define HTTP_CONTENT_JSON  0
34
35
static int otlp_pack_any_value(msgpack_packer *mp_pck,
36
                               Opentelemetry__Proto__Common__V1__AnyValue *body);
37
38
static int send_response(struct http_conn *conn, int http_status, char *message)
39
0
{
40
0
    int len;
41
0
    flb_sds_t out;
42
0
    size_t sent;
43
44
0
    out = flb_sds_create_size(256);
45
0
    if (!out) {
46
0
        return -1;
47
0
    }
48
49
0
    if (message) {
50
0
        len = strlen(message);
51
0
    }
52
0
    else {
53
0
        len = 0;
54
0
    }
55
56
0
    if (http_status == 201) {
57
0
        flb_sds_printf(&out,
58
0
                       "HTTP/1.1 201 Created \r\n"
59
0
                       "Server: Fluent Bit v%s\r\n"
60
0
                       "Content-Length: 0\r\n\r\n",
61
0
                       FLB_VERSION_STR);
62
0
    }
63
0
    else if (http_status == 200) {
64
0
        flb_sds_printf(&out,
65
0
                       "HTTP/1.1 200 OK\r\n"
66
0
                       "Server: Fluent Bit v%s\r\n"
67
0
                       "Content-Length: 0\r\n\r\n",
68
0
                       FLB_VERSION_STR);
69
0
    }
70
0
    else if (http_status == 204) {
71
0
        flb_sds_printf(&out,
72
0
                       "HTTP/1.1 204 No Content\r\n"
73
0
                       "Server: Fluent Bit v%s\r\n"
74
0
                       "Content-Length: 0\r\n\r\n",
75
0
                       FLB_VERSION_STR);
76
0
    }
77
0
    else if (http_status == 400) {
78
0
        flb_sds_printf(&out,
79
0
                       "HTTP/1.1 400 Forbidden\r\n"
80
0
                       "Server: Fluent Bit v%s\r\n"
81
0
                       "Content-Length: %i\r\n\r\n%s",
82
0
                       FLB_VERSION_STR,
83
0
                       len, message);
84
0
    }
85
86
    /* We should check the outcome of this operation */
87
0
    flb_io_net_write(conn->connection,
88
0
                     (void *) out,
89
0
                     flb_sds_len(out),
90
0
                     &sent);
91
92
0
    flb_sds_destroy(out);
93
94
0
    return 0;
95
0
}
96
97
static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
98
                                   flb_sds_t tag,
99
                                   struct mk_http_session *session,
100
                                   struct mk_http_request *request)
101
0
{
102
0
    struct cfl_list  decoded_contexts;
103
0
    struct cfl_list *iterator;
104
0
    struct cmt      *context;
105
0
    size_t           offset;
106
0
    int              result;
107
108
0
    offset = 0;
109
110
0
    result = cmt_decode_opentelemetry_create(&decoded_contexts,
111
0
                                             request->data.data,
112
0
                                             request->data.len,
113
0
                                             &offset);
114
115
0
    if (result == CMT_DECODE_OPENTELEMETRY_SUCCESS) {
116
0
        cfl_list_foreach(iterator, &decoded_contexts) {
117
0
            context = cfl_list_entry(iterator, struct cmt, _head);
118
119
0
            result = flb_input_metrics_append(ctx->ins, NULL, 0, context);
120
121
0
            if (result != 0) {
122
0
                flb_plg_debug(ctx->ins, "could not ingest metrics context : %d", result);
123
0
            }
124
0
        }
125
126
0
        cmt_decode_opentelemetry_destroy(&decoded_contexts);
127
0
    }
128
129
0
    return 0;
130
0
}
131
132
static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
133
                                        flb_sds_t tag,
134
                                        struct mk_http_session *session,
135
                                        struct mk_http_request *request)
136
0
{
137
0
    struct ctrace *decoded_context;
138
0
    size_t         offset;
139
0
    int            result;
140
141
0
    offset = 0;
142
0
    result = ctr_decode_opentelemetry_create(&decoded_context,
143
0
                                             request->data.data,
144
0
                                             request->data.len,
145
0
                                             &offset);
146
0
    if (result == 0) {
147
0
        result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
148
0
        ctr_decode_opentelemetry_destroy(decoded_context);
149
0
    }
150
151
0
    return result;
152
0
}
153
154
static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
155
                                      flb_sds_t tag,
156
                                      struct mk_http_session *session,
157
                                      struct mk_http_request *request)
158
0
{
159
0
    int ret;
160
0
    int root_type;
161
0
    char *out_buf = NULL;
162
0
    size_t out_size;
163
164
0
    msgpack_packer mp_pck;
165
0
    msgpack_sbuffer mp_sbuf;
166
167
0
    msgpack_sbuffer_init(&mp_sbuf);
168
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
169
170
0
    msgpack_pack_array(&mp_pck, 2);
171
0
    flb_pack_time_now(&mp_pck);
172
173
    /* Check if the incoming payload is a valid JSON message and convert it to msgpack */
174
0
    ret = flb_pack_json(request->data.data, request->data.len, &out_buf, &out_size, &root_type);
175
176
0
    if (ret == 0 && root_type == JSMN_OBJECT) {
177
        /* JSON found, pack it msgpack representation */
178
0
        msgpack_sbuffer_write(&mp_sbuf, out_buf, out_size);
179
0
    }
180
0
    else {
181
        /* the content might be a binary payload or invalid JSON */
182
0
        msgpack_pack_map(&mp_pck, 1);
183
0
        msgpack_pack_str_with_body(&mp_pck, "trace", 5);
184
0
        msgpack_pack_str_with_body(&mp_pck, request->data.data, request->data.len);
185
0
    }
186
187
    /* release 'out_buf' if it was allocated */
188
0
    if (out_buf) {
189
0
        flb_free(out_buf);
190
0
    }
191
192
0
    flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
193
0
    msgpack_sbuffer_destroy(&mp_sbuf);
194
195
0
    return 0;
196
0
}
197
198
static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
199
                                  flb_sds_t tag,
200
                                  struct mk_http_session *session,
201
                                  struct mk_http_request *request)
202
0
{
203
0
    int result;
204
205
0
    if (ctx->raw_traces) {
206
0
        result = process_payload_raw_traces(ctx, conn, tag, session, request);
207
0
    }
208
0
    else {
209
0
        result = process_payload_traces_proto(ctx, conn, tag, session, request);
210
0
    }
211
212
0
    return result;
213
0
}
214
215
static int otel_pack_string(msgpack_packer *mp_pck, char *str)
216
0
{
217
0
    return msgpack_pack_str_with_body(mp_pck, str, strlen(str));
218
0
}
219
220
static int otel_pack_bool(msgpack_packer *mp_pck, bool val)
221
0
{
222
0
    if (val) {
223
0
        return msgpack_pack_true(mp_pck);
224
0
    }
225
0
    else {
226
0
        return msgpack_pack_false(mp_pck);
227
0
    }
228
0
}
229
230
static int otel_pack_int(msgpack_packer *mp_pck, int val)
231
0
{
232
0
    return msgpack_pack_int64(mp_pck, val);
233
0
}
234
235
static int otel_pack_double(msgpack_packer *mp_pck, double val)
236
0
{
237
0
    return msgpack_pack_double(mp_pck, val);
238
0
}
239
240
static int otel_pack_kvlist(msgpack_packer *mp_pck,
241
                            Opentelemetry__Proto__Common__V1__KeyValueList *kv_list)
242
0
{
243
0
    int kv_index;
244
0
    int ret;
245
0
    char *key;
246
0
    Opentelemetry__Proto__Common__V1__AnyValue *value;
247
248
0
    ret = msgpack_pack_map(mp_pck, kv_list->n_values);
249
0
    if (ret != 0) {
250
0
        return ret;
251
0
    }
252
253
0
    for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) {
254
0
        key = kv_list->values[kv_index]->key;
255
0
        value = kv_list->values[kv_index]->value;
256
257
0
        ret = otel_pack_string(mp_pck, key);
258
259
0
        if(ret == 0) {
260
0
           ret = otlp_pack_any_value(mp_pck, value);
261
0
        }
262
0
    }
263
264
0
    return ret;
265
0
}
266
267
static int otel_pack_array(msgpack_packer *mp_pck,
268
                           Opentelemetry__Proto__Common__V1__ArrayValue *array)
269
0
{
270
0
    int ret;
271
0
    int array_index;
272
273
0
    ret = 0;
274
275
0
    for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) {
276
0
        ret = otlp_pack_any_value(mp_pck, array->values[array_index]);
277
0
    }
278
279
0
    return ret;
280
0
}
281
282
static int otel_pack_bytes(msgpack_packer *mp_pck,
283
                           ProtobufCBinaryData bytes)
284
0
{
285
0
    return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len);
286
0
}
287
288
static int otlp_pack_any_value(msgpack_packer *mp_pck,
289
                               Opentelemetry__Proto__Common__V1__AnyValue *body)
290
0
{
291
0
    int result;
292
293
0
    result = -2;
294
295
0
    switch(body->value_case){
296
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE:
297
0
            result = otel_pack_string(mp_pck, body->string_value);
298
0
            break;
299
300
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE:
301
0
            result =  otel_pack_bool(mp_pck, body->bool_value);
302
0
            break;
303
304
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE:
305
0
            result = otel_pack_int(mp_pck, body->int_value);
306
0
            break;
307
308
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE:
309
0
            result = otel_pack_double(mp_pck, body->double_value);
310
0
            break;
311
312
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE:
313
0
            result = otel_pack_array(mp_pck, body->array_value);
314
0
            break;
315
316
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE:
317
0
            result = otel_pack_kvlist(mp_pck, body->kvlist_value);
318
0
            break;
319
320
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE:
321
0
            result = otel_pack_bytes(mp_pck, body->bytes_value);
322
0
            break;
323
324
0
        default:
325
0
            break;
326
0
    }
327
328
0
    if (result == -2) {
329
0
        flb_error("[otel]: invalid value type in pack_any_value");
330
0
        result = -1;
331
0
    }
332
333
0
    return result;
334
0
}
335
336
static int binary_payload_to_msgpack(msgpack_packer *mp_pck,
337
                                     uint8_t *in_buf,
338
                                     size_t in_size)
339
0
{
340
0
    int ret;
341
0
    int resource_logs_index;
342
0
    int scope_log_index;
343
0
    int log_record_index;
344
345
0
    Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
346
0
    Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
347
0
    Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log;
348
0
    Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs;
349
0
    Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log;
350
0
    Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
351
0
    Opentelemetry__Proto__Logs__V1__LogRecord *log_record;
352
353
0
    input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
354
0
    if (input_logs == NULL) {
355
0
        flb_error("[otel] Failed to unpack input logs");
356
0
        return -1;
357
0
    }
358
359
0
    resource_logs = input_logs->resource_logs;
360
0
    if (resource_logs == NULL) {
361
0
        flb_error("[otel] No resource logs found");
362
0
        return -1;
363
0
    }
364
365
0
    for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
366
0
        resource_log = resource_logs[resource_logs_index];
367
0
        scope_logs = resource_log->scope_logs;
368
369
0
        if (resource_log->n_scope_logs > 0 && scope_logs == NULL) {
370
0
            flb_error("[otel] No scope logs found");
371
0
            return -1;
372
0
        }
373
374
0
        for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) {
375
0
            scope_log = scope_logs[scope_log_index];
376
0
            log_records = scope_log->log_records;
377
378
0
            if (log_records == NULL) {
379
0
                flb_error("[otel] No log records found");
380
0
                return -1;
381
0
            }
382
383
0
            for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) {
384
0
                msgpack_pack_array(mp_pck, 2);
385
0
                flb_pack_time_now(mp_pck);
386
387
0
                log_record = log_records[log_record_index];
388
389
0
                ret = otlp_pack_any_value(mp_pck, log_record->body);
390
391
0
                if (ret != 0) {
392
0
                    flb_error("[otel] Failed to convert log record body");
393
0
                    return -1;
394
0
                }
395
0
            }
396
0
        }
397
0
    }
398
0
    return 0;
399
0
}
400
401
0
static int get_token_length(jsmntok_t token){
402
0
    return token.end - token.start;
403
0
}
404
405
static char *get_value_from_token(jsmntok_t *tokens,
406
                                  const char *body,
407
0
                                  int pos){
408
0
    char *tmp;
409
0
    jsmntok_t token;
410
0
    int token_len;
411
412
0
    token = tokens[pos];
413
0
    token_len = get_token_length(token);
414
415
0
    tmp = flb_calloc(1, token_len + 1);
416
0
    tmp =  memcpy(tmp, body+token.start, token_len);
417
418
0
    return tmp;
419
0
}
420
421
static int json_payload_to_msgpack(msgpack_packer *mp_pck,
422
                                   const char *body,
423
                                   size_t len)
424
0
{
425
0
    int n_tokens;
426
0
    int token_index;
427
0
    int kv_index;
428
0
    int result;
429
430
0
    char *key;
431
0
    char *otel_value_type;
432
0
    char *otel_log_record;
433
434
0
    jsmn_parser parser;
435
0
    jsmntok_t tokens[1024];
436
0
    jsmntok_t token;
437
438
0
    result = 0;
439
440
0
    jsmn_init(&parser);
441
0
    n_tokens = jsmn_parse(&parser, body, len, tokens, 1024);
442
443
0
    if (n_tokens < 0) {
444
0
        flb_error("[otel] Failed to parse JSON payload, jsmn error %d", n_tokens);
445
0
        return -1;
446
0
    }
447
448
    // position 0 is the root object, skip it
449
0
    for (token_index = 1; token_index < n_tokens; token_index++) {
450
0
        token = tokens[token_index];
451
452
0
        switch (token.type) {
453
454
0
            case JSMN_OBJECT:
455
0
                for (kv_index=0; kv_index < token.size; kv_index++) {
456
0
                    key = get_value_from_token(tokens, body, token_index+kv_index+1);
457
458
0
                    if (strcmp(key, "body") == 0) {
459
0
                        otel_value_type = get_value_from_token(tokens, body, token_index+kv_index+3);
460
0
                        otel_log_record = get_value_from_token(tokens, body, token_index+kv_index+4);
461
462
0
                        msgpack_pack_array(mp_pck, 2);
463
0
                        flb_pack_time_now(mp_pck);
464
465
0
                        if (strcasecmp(otel_value_type, "stringvalue") == 0) {
466
0
                            result = otel_pack_string(mp_pck, otel_log_record);
467
0
                        }
468
469
0
                        else if (strcasecmp(otel_value_type, "intvalue") == 0) {
470
0
                            result = otel_pack_int(mp_pck, atoi(otel_log_record));
471
0
                        }
472
473
0
                        else if (strcasecmp(otel_value_type, "doublevalue") == 0) {
474
0
                            result = otel_pack_double(mp_pck, atof(otel_log_record));
475
0
                        }
476
477
0
                        else if (strcasecmp(otel_value_type, "boolvalue") == 0) {
478
0
                            if (strcasecmp(otel_log_record, "true") == 0) {
479
0
                                result = otel_pack_bool(mp_pck, true);
480
0
                            } else {
481
0
                                result = otel_pack_bool(mp_pck, false);
482
0
                            }
483
0
                        }
484
485
0
                        else if (strcasecmp(otel_value_type, "bytesvalue") == 0){
486
0
                            result = otel_pack_string(mp_pck, otel_log_record);
487
0
                        }
488
489
0
                        flb_free(otel_value_type);
490
0
                        flb_free(otel_log_record);
491
0
                    }
492
493
0
                    flb_free(key);
494
0
                }
495
0
                break;
496
497
0
            default:
498
0
                break;
499
0
        }
500
0
    }
501
0
    return result;
502
0
}
503
504
static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn,
505
                                flb_sds_t tag,
506
                                struct mk_http_session *session,
507
                                struct mk_http_request *request)
508
0
{
509
0
    int ret;
510
0
    char *out_buf = NULL;
511
512
0
    msgpack_packer mp_pck;
513
0
    msgpack_sbuffer mp_sbuf;
514
515
0
    msgpack_sbuffer_init(&mp_sbuf);
516
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
517
518
    /* Check if the incoming payload is a valid JSON message and convert it to msgpack */
519
0
    if (strncasecmp(request->content_type.data,
520
0
                    "application/json",
521
0
                    request->content_type.len) == 0) {
522
0
        ret = json_payload_to_msgpack(&mp_pck, request->data.data, request->data.len);
523
0
    }
524
0
    else if (strncasecmp(request->content_type.data,
525
0
                         "application/x-protobuf",
526
0
                         request->content_type.len) == 0) {
527
0
        ret =  binary_payload_to_msgpack(&mp_pck, (uint8_t *)request->data.data, request->data.len);
528
0
    }
529
0
    else {
530
0
        flb_error("[otel] Unsupported content type %.*s", request->content_type.len, request->content_type.data);
531
0
        ret = -1;
532
0
    }
533
534
    /* release 'out_buf' if it was allocated */
535
0
    if (out_buf) {
536
0
        flb_free(out_buf);
537
0
    }
538
539
0
    flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);
540
541
0
    msgpack_sbuffer_destroy(&mp_sbuf);
542
0
    return ret;
543
0
}
544
545
static inline int mk_http_point_header(mk_ptr_t *h,
546
                                       struct mk_http_parser *parser, int key)
547
0
{
548
0
    struct mk_http_header *header;
549
550
0
    header = &parser->headers[key];
551
0
    if (header->type == key) {
552
0
        h->data = header->val.data;
553
0
        h->len  = header->val.len;
554
0
        return 0;
555
0
    }
556
0
    else {
557
0
        h->data = NULL;
558
0
        h->len  = -1;
559
0
    }
560
561
0
    return -1;
562
0
}
563
564
/*
565
 * Handle an incoming request. It perform extra checks over the request, if
566
 * everything is OK, it enqueue the incoming payload.
567
 */
568
int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *conn,
569
                              struct mk_http_session *session,
570
                              struct mk_http_request *request)
571
0
{
572
0
    int i;
573
0
    int ret = -1;
574
0
    int len;
575
0
    char *uri;
576
0
    char *qs;
577
0
    off_t diff;
578
0
    flb_sds_t tag;
579
0
    struct mk_http_header *header;
580
581
0
    if (request->uri.data[0] != '/') {
582
0
        send_response(conn, 400, "error: invalid request\n");
583
0
        return -1;
584
0
    }
585
586
    /* Decode URI */
587
0
    uri = mk_utils_url_decode(request->uri);
588
0
    if (!uri) {
589
0
        uri = mk_mem_alloc_z(request->uri.len + 1);
590
0
        if (!uri) {
591
0
            return -1;
592
0
        }
593
0
        memcpy(uri, request->uri.data, request->uri.len);
594
0
        uri[request->uri.len] = '\0';
595
0
    }
596
597
0
    if (strcmp(uri, "/v1/metrics") != 0 &&
598
0
        strcmp(uri, "/v1/traces") != 0  &&
599
0
        strcmp(uri, "/v1/logs") != 0) {
600
601
0
        send_response(conn, 400, "error: invalid endpoint\n");
602
0
        mk_mem_free(uri);
603
604
0
        return -1;
605
0
    }
606
607
    /* Try to match a query string so we can remove it */
608
0
    qs = strchr(uri, '?');
609
0
    if (qs) {
610
        /* remove the query string part */
611
0
        diff = qs - uri;
612
0
        uri[diff] = '\0';
613
0
    }
614
615
    /* Compose the query string using the URI */
616
0
    len = strlen(uri);
617
618
0
    if (len == 1) {
619
0
        tag = NULL; /* use default tag */
620
0
    }
621
0
    else {
622
0
        tag = flb_sds_create_size(len);
623
0
        if (!tag) {
624
0
            mk_mem_free(uri);
625
0
            return -1;
626
0
        }
627
628
        /* New tag skipping the URI '/' */
629
0
        flb_sds_cat(tag, uri + 1, len - 1);
630
631
        /* Sanitize, only allow alphanum chars */
632
0
        for (i = 0; i < flb_sds_len(tag); i++) {
633
0
            if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
634
0
                tag[i] = '_';
635
0
            }
636
0
        }
637
0
    }
638
639
    /* Check if we have a Host header: Hostname ; port */
640
0
    mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);
641
642
    /* Header: Connection */
643
0
    mk_http_point_header(&request->connection, &session->parser,
644
0
                         MK_HEADER_CONNECTION);
645
646
    /* HTTP/1.1 needs Host header */
647
0
    if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) {
648
0
        flb_sds_destroy(tag);
649
0
        mk_mem_free(uri);
650
0
        return -1;
651
0
    }
652
653
    /* Should we close the session after this request ? */
654
0
    mk_http_keepalive_check(session, request, ctx->server);
655
656
    /* Content Length */
657
0
    header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH];
658
0
    if (header->type == MK_HEADER_CONTENT_LENGTH) {
659
0
        request->_content_length.data = header->val.data;
660
0
        request->_content_length.len  = header->val.len;
661
0
    }
662
0
    else {
663
0
        request->_content_length.data = NULL;
664
0
    }
665
666
0
    mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE);
667
668
0
    if (request->method != MK_METHOD_POST) {
669
0
        flb_sds_destroy(tag);
670
0
        mk_mem_free(uri);
671
0
        send_response(conn, 400, "error: invalid HTTP method\n");
672
0
        return -1;
673
0
    }
674
675
0
    if (strcmp(uri, "/v1/metrics") == 0) {
676
0
        ret = process_payload_metrics(ctx, conn, tag, session, request);
677
0
    }
678
0
    else if (strcmp(uri, "/v1/traces") == 0) {
679
0
        ret = process_payload_traces(ctx, conn, tag, session, request);
680
0
    }
681
0
    else if (strcmp(uri, "/v1/logs") == 0) {
682
0
        ret = process_payload_logs(ctx, conn, tag, session, request);
683
0
    }
684
685
0
    mk_mem_free(uri);
686
0
    flb_sds_destroy(tag);
687
0
    send_response(conn, ctx->successful_response_code, NULL);
688
0
    return ret;
689
0
}
690
691
/*
692
 * Handle an incoming request which has resulted in an http parser error.
693
 */
694
int opentelemetry_prot_handle_error(struct flb_opentelemetry *ctx, struct http_conn *conn,
695
                                    struct mk_http_session *session,
696
                                    struct mk_http_request *request)
697
0
{
698
0
    send_response(conn, 400, "error: invalid request\n");
699
0
    return -1;
700
0
}