/src/fluent-bit/plugins/in_splunk/splunk_prot.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_input_plugin.h> |
21 | | #include <fluent-bit/flb_version.h> |
22 | | #include <fluent-bit/flb_error.h> |
23 | | #include <fluent-bit/flb_pack.h> |
24 | | #include <fluent-bit/flb_gzip.h> |
25 | | |
26 | | #include <monkey/monkey.h> |
27 | | #include <monkey/mk_core.h> |
28 | | |
29 | | #include "splunk.h" |
30 | | #include "splunk_conn.h" |
31 | | #include "splunk_prot.h" |
32 | | |
33 | 0 | #define HTTP_CONTENT_JSON 0 |
34 | 0 | #define HTTP_CONTENT_TEXT 1 |
35 | 0 | #define HTTP_CONTENT_UNKNOWN 2 |
36 | | |
37 | | static int send_response(struct splunk_conn *conn, int http_status, char *message) |
38 | 0 | { |
39 | 0 | struct flb_splunk *context; |
40 | 0 | size_t sent; |
41 | 0 | int len; |
42 | 0 | flb_sds_t out; |
43 | |
|
44 | 0 | context = (struct flb_splunk *) conn->ctx; |
45 | |
|
46 | 0 | out = flb_sds_create_size(256); |
47 | 0 | if (!out) { |
48 | 0 | return -1; |
49 | 0 | } |
50 | | |
51 | 0 | if (message) { |
52 | 0 | len = strlen(message); |
53 | 0 | } |
54 | 0 | else { |
55 | 0 | len = 0; |
56 | 0 | } |
57 | |
|
58 | 0 | if (http_status == 201) { |
59 | 0 | flb_sds_printf(&out, |
60 | 0 | "HTTP/1.1 201 Created \r\n" |
61 | 0 | "Server: Fluent Bit v%s\r\n" |
62 | 0 | "%s" |
63 | 0 | "Content-Length: 0\r\n\r\n", |
64 | 0 | FLB_VERSION_STR, |
65 | 0 | context->success_headers_str); |
66 | 0 | } |
67 | 0 | else if (http_status == 200) { |
68 | 0 | flb_sds_printf(&out, |
69 | 0 | "HTTP/1.1 200 OK\r\n" |
70 | 0 | "Server: Fluent Bit v%s\r\n" |
71 | 0 | "%s" |
72 | 0 | "Content-Length: 0\r\n\r\n", |
73 | 0 | FLB_VERSION_STR, |
74 | 0 | context->success_headers_str); |
75 | 0 | } |
76 | 0 | else if (http_status == 204) { |
77 | 0 | flb_sds_printf(&out, |
78 | 0 | "HTTP/1.1 204 No Content\r\n" |
79 | 0 | "Server: Fluent Bit v%s\r\n" |
80 | 0 | "%s" |
81 | 0 | "\r\n\r\n", |
82 | 0 | FLB_VERSION_STR, |
83 | 0 | context->success_headers_str); |
84 | 0 | } |
85 | 0 | else if (http_status == 400) { |
86 | 0 | flb_sds_printf(&out, |
87 | 0 | "HTTP/1.1 400 Bad Request\r\n" |
88 | 0 | "Server: Fluent Bit v%s\r\n" |
89 | 0 | "Content-Length: %i\r\n\r\n%s", |
90 | 0 | FLB_VERSION_STR, |
91 | 0 | len, message); |
92 | 0 | } |
93 | 0 | else if (http_status == 401) { |
94 | 0 | flb_sds_printf(&out, |
95 | 0 | "HTTP/1.1 401 Unauthorized\r\n" |
96 | 0 | "Server: Fluent Bit v%s\r\n" |
97 | 0 | "Content-Length: %i\r\n\r\n%s", |
98 | 0 | FLB_VERSION_STR, |
99 | 0 | len, message); |
100 | 0 | } |
101 | | /* We should check this operations result */ |
102 | 0 | flb_io_net_write(conn->connection, |
103 | 0 | (void *) out, |
104 | 0 | flb_sds_len(out), |
105 | 0 | &sent); |
106 | |
|
107 | 0 | flb_sds_destroy(out); |
108 | |
|
109 | 0 | return 0; |
110 | 0 | } |
111 | | |
112 | | static int send_json_message_response(struct splunk_conn *conn, int http_status, char *message) |
113 | 0 | { |
114 | 0 | size_t sent; |
115 | 0 | int len; |
116 | 0 | flb_sds_t out; |
117 | |
|
118 | 0 | out = flb_sds_create_size(256); |
119 | 0 | if (!out) { |
120 | 0 | return -1; |
121 | 0 | } |
122 | | |
123 | 0 | if (message) { |
124 | 0 | len = strlen(message); |
125 | 0 | } |
126 | 0 | else { |
127 | 0 | len = 0; |
128 | 0 | } |
129 | |
|
130 | 0 | if (http_status == 200) { |
131 | 0 | flb_sds_printf(&out, |
132 | 0 | "HTTP/1.1 200 OK\r\n" |
133 | 0 | "Content-Type: application/json\r\n" |
134 | 0 | "Content-Length: %i\r\n\r\n%s", |
135 | 0 | len, message); |
136 | 0 | } |
137 | | |
138 | | /* We should check this operations result */ |
139 | 0 | flb_io_net_write(conn->connection, |
140 | 0 | (void *) out, |
141 | 0 | flb_sds_len(out), |
142 | 0 | &sent); |
143 | |
|
144 | 0 | flb_sds_destroy(out); |
145 | |
|
146 | 0 | return 0; |
147 | 0 | } |
148 | | |
149 | | /* implements functionality to get tag from key in record */ |
150 | | static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map) |
151 | 0 | { |
152 | 0 | size_t map_size = map->via.map.size; |
153 | 0 | msgpack_object_kv *kv; |
154 | 0 | msgpack_object key; |
155 | 0 | msgpack_object val; |
156 | 0 | char *key_str = NULL; |
157 | 0 | char *val_str = NULL; |
158 | 0 | size_t key_str_size = 0; |
159 | 0 | size_t val_str_size = 0; |
160 | 0 | int j; |
161 | 0 | int check = FLB_FALSE; |
162 | 0 | int found = FLB_FALSE; |
163 | 0 | flb_sds_t tag; |
164 | |
|
165 | 0 | kv = map->via.map.ptr; |
166 | |
|
167 | 0 | for(j=0; j < map_size; j++) { |
168 | 0 | check = FLB_FALSE; |
169 | 0 | found = FLB_FALSE; |
170 | 0 | key = (kv+j)->key; |
171 | 0 | if (key.type == MSGPACK_OBJECT_BIN) { |
172 | 0 | key_str = (char *) key.via.bin.ptr; |
173 | 0 | key_str_size = key.via.bin.size; |
174 | 0 | check = FLB_TRUE; |
175 | 0 | } |
176 | 0 | if (key.type == MSGPACK_OBJECT_STR) { |
177 | 0 | key_str = (char *) key.via.str.ptr; |
178 | 0 | key_str_size = key.via.str.size; |
179 | 0 | check = FLB_TRUE; |
180 | 0 | } |
181 | |
|
182 | 0 | if (check == FLB_TRUE) { |
183 | 0 | if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) { |
184 | 0 | val = (kv+j)->val; |
185 | 0 | if (val.type == MSGPACK_OBJECT_BIN) { |
186 | 0 | val_str = (char *) val.via.bin.ptr; |
187 | 0 | val_str_size = val.via.str.size; |
188 | 0 | found = FLB_TRUE; |
189 | 0 | break; |
190 | 0 | } |
191 | 0 | if (val.type == MSGPACK_OBJECT_STR) { |
192 | 0 | val_str = (char *) val.via.str.ptr; |
193 | 0 | val_str_size = val.via.str.size; |
194 | 0 | found = FLB_TRUE; |
195 | 0 | break; |
196 | 0 | } |
197 | 0 | } |
198 | 0 | } |
199 | 0 | } |
200 | |
|
201 | 0 | if (found == FLB_TRUE) { |
202 | 0 | tag = flb_sds_create_len(val_str, val_str_size); |
203 | 0 | if (!tag) { |
204 | 0 | flb_errno(); |
205 | 0 | return NULL; |
206 | 0 | } |
207 | 0 | return tag; |
208 | 0 | } |
209 | | |
210 | | |
211 | 0 | flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key); |
212 | 0 | return NULL; |
213 | 0 | } |
214 | | |
215 | | /* |
216 | | * Process a raw text payload for Splunk HEC requests, uses the delimited character to split records, |
217 | | * return the number of processed bytes |
218 | | */ |
219 | | static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) |
220 | 0 | { |
221 | 0 | int ret = FLB_EVENT_ENCODER_SUCCESS; |
222 | |
|
223 | 0 | ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); |
224 | |
|
225 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
226 | 0 | ret = flb_log_event_encoder_set_current_timestamp(&ctx->log_encoder); |
227 | 0 | } |
228 | |
|
229 | 0 | if (ctx->store_token_in_metadata == FLB_TRUE) { |
230 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
231 | 0 | ret = flb_log_event_encoder_append_body_values( |
232 | 0 | &ctx->log_encoder, |
233 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("log"), |
234 | 0 | FLB_LOG_EVENT_STRING_VALUE(buf, size)); |
235 | 0 | } |
236 | 0 | } |
237 | |
|
238 | 0 | if (ctx->store_token_in_metadata == FLB_TRUE) { |
239 | 0 | if (ctx->ingested_auth_header != NULL) { |
240 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
241 | 0 | ret = flb_log_event_encoder_append_metadata_values( |
242 | 0 | &ctx->log_encoder, |
243 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("hec_token"), |
244 | 0 | FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header, |
245 | 0 | ctx->ingested_auth_header_len)); |
246 | 0 | } |
247 | 0 | } |
248 | 0 | } |
249 | 0 | else { |
250 | 0 | if (ctx->ingested_auth_header != NULL) { |
251 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
252 | 0 | ret = flb_log_event_encoder_append_body_values( |
253 | 0 | &ctx->log_encoder, |
254 | 0 | FLB_LOG_EVENT_CSTRING_VALUE(ctx->store_token_key), |
255 | 0 | FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header, |
256 | 0 | ctx->ingested_auth_header_len), |
257 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("log"), |
258 | 0 | FLB_LOG_EVENT_STRING_VALUE(buf, size)); |
259 | |
|
260 | 0 | } |
261 | 0 | } |
262 | 0 | } |
263 | |
|
264 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
265 | 0 | ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); |
266 | 0 | } |
267 | |
|
268 | 0 | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
269 | 0 | flb_log_event_encoder_rollback_record(&ctx->log_encoder); |
270 | 0 | return -1; |
271 | 0 | } |
272 | | |
273 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
274 | 0 | if (tag) { |
275 | 0 | flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), |
276 | 0 | ctx->log_encoder.output_buffer, |
277 | 0 | ctx->log_encoder.output_length); |
278 | 0 | } |
279 | 0 | else { |
280 | | /* use default plugin Tag (it internal name, e.g: http.0 */ |
281 | 0 | flb_input_log_append(ctx->ins, NULL, 0, |
282 | 0 | ctx->log_encoder.output_buffer, |
283 | 0 | ctx->log_encoder.output_length); |
284 | 0 | } |
285 | 0 | } |
286 | 0 | else { |
287 | 0 | flb_plg_error(ctx->ins, "log event encoding error : %d", ret); |
288 | 0 | } |
289 | |
|
290 | 0 | return 0; |
291 | 0 | } |
292 | | |
293 | | static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *record, |
294 | | flb_sds_t tag, flb_sds_t tag_from_record, |
295 | 0 | struct flb_time tm) { |
296 | 0 | int ret; |
297 | 0 | int i; |
298 | 0 | msgpack_object_kv *kv; |
299 | |
|
300 | 0 | ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); |
301 | |
|
302 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
303 | 0 | ret = flb_log_event_encoder_set_timestamp( |
304 | 0 | &ctx->log_encoder, |
305 | 0 | &tm); |
306 | 0 | } |
307 | |
|
308 | 0 | if (ctx->store_token_in_metadata == FLB_TRUE) { |
309 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
310 | 0 | ret = flb_log_event_encoder_set_body_from_msgpack_object( |
311 | 0 | &ctx->log_encoder, |
312 | 0 | record); |
313 | 0 | } |
314 | |
|
315 | 0 | if (ctx->ingested_auth_header != NULL) { |
316 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
317 | 0 | ret = flb_log_event_encoder_append_metadata_values( |
318 | 0 | &ctx->log_encoder, |
319 | 0 | FLB_LOG_EVENT_CSTRING_VALUE("hec_token"), |
320 | 0 | FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header, |
321 | 0 | ctx->ingested_auth_header_len)); |
322 | 0 | } |
323 | 0 | } |
324 | 0 | } |
325 | 0 | else { |
326 | 0 | if (ctx->ingested_auth_header != NULL) { |
327 | | /* iterate through the old record map to create the appendable new buffer */ |
328 | 0 | kv = record->via.map.ptr; |
329 | 0 | for(i = 0; i < record->via.map.size && ret == FLB_EVENT_ENCODER_SUCCESS; i++) { |
330 | 0 | ret = flb_log_event_encoder_append_body_values( |
331 | 0 | &ctx->log_encoder, |
332 | 0 | FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), |
333 | 0 | FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); |
334 | 0 | } |
335 | |
|
336 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
337 | 0 | ret = flb_log_event_encoder_append_body_values( |
338 | 0 | &ctx->log_encoder, |
339 | 0 | FLB_LOG_EVENT_CSTRING_VALUE(ctx->store_token_key), |
340 | 0 | FLB_LOG_EVENT_STRING_VALUE(ctx->ingested_auth_header, |
341 | 0 | ctx->ingested_auth_header_len)); |
342 | 0 | } |
343 | 0 | } |
344 | 0 | else { |
345 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
346 | 0 | ret = flb_log_event_encoder_set_body_from_msgpack_object( |
347 | 0 | &ctx->log_encoder, |
348 | 0 | record); |
349 | 0 | } |
350 | 0 | } |
351 | 0 | } |
352 | |
|
353 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
354 | 0 | ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); |
355 | 0 | } |
356 | |
|
357 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
358 | 0 | if (tag_from_record) { |
359 | 0 | flb_input_log_append(ctx->ins, |
360 | 0 | tag_from_record, |
361 | 0 | flb_sds_len(tag_from_record), |
362 | 0 | ctx->log_encoder.output_buffer, |
363 | 0 | ctx->log_encoder.output_length); |
364 | 0 | } |
365 | 0 | else if (tag) { |
366 | 0 | flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), |
367 | 0 | ctx->log_encoder.output_buffer, |
368 | 0 | ctx->log_encoder.output_length); |
369 | 0 | } |
370 | 0 | else { |
371 | | /* use default plugin Tag (it internal name, e.g: http.0 */ |
372 | 0 | flb_input_log_append(ctx->ins, NULL, 0, |
373 | 0 | ctx->log_encoder.output_buffer, |
374 | 0 | ctx->log_encoder.output_length); |
375 | 0 | } |
376 | 0 | } |
377 | 0 | else { |
378 | 0 | flb_plg_error(ctx->ins, "Error encoding record : %d", ret); |
379 | 0 | } |
380 | |
|
381 | 0 | if (tag_from_record) { |
382 | 0 | flb_sds_destroy(tag_from_record); |
383 | 0 | } |
384 | 0 | } |
385 | | |
386 | | static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) |
387 | 0 | { |
388 | 0 | size_t off = 0; |
389 | 0 | msgpack_unpacked result; |
390 | 0 | struct flb_time tm; |
391 | 0 | int i = 0; |
392 | 0 | msgpack_object *obj; |
393 | 0 | msgpack_object record; |
394 | 0 | flb_sds_t tag_from_record = NULL; |
395 | |
|
396 | 0 | flb_time_get(&tm); |
397 | |
|
398 | 0 | msgpack_unpacked_init(&result); |
399 | 0 | while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { |
400 | 0 | if (result.data.type == MSGPACK_OBJECT_MAP) { |
401 | 0 | tag_from_record = NULL; |
402 | 0 | if (ctx->tag_key) { |
403 | 0 | tag_from_record = tag_key(ctx, &result.data); |
404 | 0 | } |
405 | |
|
406 | 0 | process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm); |
407 | |
|
408 | 0 | flb_log_event_encoder_reset(&ctx->log_encoder); |
409 | 0 | } |
410 | 0 | else if (result.data.type == MSGPACK_OBJECT_ARRAY) { |
411 | 0 | obj = &result.data; |
412 | 0 | for (i = 0; i < obj->via.array.size; i++) |
413 | 0 | { |
414 | 0 | record = obj->via.array.ptr[i]; |
415 | |
|
416 | 0 | tag_from_record = NULL; |
417 | 0 | if (ctx->tag_key) { |
418 | 0 | tag_from_record = tag_key(ctx, &record); |
419 | 0 | } |
420 | |
|
421 | 0 | process_flb_log_append(ctx, &record, tag, tag_from_record, tm); |
422 | | |
423 | | /* TODO : Optimize this |
424 | | * |
425 | | * This is wasteful, considering that we are emitting a series |
426 | | * of records we should start and commit each one and then |
427 | | * emit them all at once after the loop. |
428 | | */ |
429 | |
|
430 | 0 | flb_log_event_encoder_reset(&ctx->log_encoder); |
431 | 0 | } |
432 | |
|
433 | 0 | break; |
434 | 0 | } |
435 | 0 | else { |
436 | 0 | flb_plg_error(ctx->ins, "skip record from invalid type: %i", |
437 | 0 | result.data.type); |
438 | |
|
439 | 0 | msgpack_unpacked_destroy(&result); |
440 | |
|
441 | 0 | return -1; |
442 | 0 | } |
443 | 0 | } |
444 | | |
445 | 0 | msgpack_unpacked_destroy(&result); |
446 | |
|
447 | 0 | return 0; |
448 | 0 | } |
449 | | |
450 | | static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, |
451 | | char *payload, size_t size) |
452 | 0 | { |
453 | 0 | int ret; |
454 | 0 | int out_size; |
455 | 0 | char *pack; |
456 | 0 | struct flb_pack_state pack_state; |
457 | | |
458 | | /* Initialize packer */ |
459 | 0 | flb_pack_state_init(&pack_state); |
460 | | |
461 | | /* Pack JSON as msgpack */ |
462 | 0 | ret = flb_pack_json_state(payload, size, |
463 | 0 | &pack, &out_size, &pack_state); |
464 | 0 | flb_pack_state_reset(&pack_state); |
465 | | |
466 | | /* Handle exceptions */ |
467 | 0 | if (ret == FLB_ERR_JSON_PART) { |
468 | 0 | flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); |
469 | 0 | return -1; |
470 | 0 | } |
471 | 0 | else if (ret == FLB_ERR_JSON_INVAL) { |
472 | 0 | flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); |
473 | 0 | return -1; |
474 | 0 | } |
475 | 0 | else if (ret == -1) { |
476 | 0 | return -1; |
477 | 0 | } |
478 | | |
479 | | /* Process the packaged JSON and return the last byte used */ |
480 | 0 | process_json_payload_pack(ctx, tag, pack, out_size); |
481 | 0 | flb_free(pack); |
482 | |
|
483 | 0 | return 0; |
484 | 0 | } |
485 | | |
486 | | static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request) |
487 | 0 | { |
488 | 0 | int ret = 0; |
489 | 0 | struct mk_list *head; |
490 | 0 | struct mk_http_header *auth_header = NULL; |
491 | 0 | struct flb_splunk_tokens *splunk_token; |
492 | 0 | flb_sds_t authorization = NULL; |
493 | |
|
494 | 0 | if (mk_list_size(&ctx->auth_tokens) == 0) { |
495 | 0 | return SPLUNK_AUTH_UNAUTH; |
496 | 0 | } |
497 | | |
498 | 0 | auth_header = mk_http_header_get(MK_HEADER_AUTHORIZATION, request, NULL, 0); |
499 | 0 | if (auth_header == NULL) { |
500 | 0 | return SPLUNK_AUTH_MISSING_CRED; |
501 | 0 | } |
502 | | |
503 | 0 | authorization = flb_sds_create_len(auth_header->val.data, auth_header->val.len); |
504 | 0 | if (authorization == NULL) { |
505 | 0 | return SPLUNK_AUTH_MISSING_CRED; |
506 | 0 | } |
507 | | |
508 | 0 | if (authorization != NULL && auth_header->val.len > 0) { |
509 | 0 | mk_list_foreach(head, &ctx->auth_tokens) { |
510 | 0 | splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head); |
511 | 0 | if (flb_sds_len(authorization) != splunk_token->length) { |
512 | 0 | ret = SPLUNK_AUTH_UNAUTHORIZED; |
513 | 0 | continue; |
514 | 0 | } |
515 | | |
516 | 0 | if (strncasecmp(splunk_token->header, |
517 | 0 | authorization, |
518 | 0 | splunk_token->length) == 0) { |
519 | 0 | flb_sds_destroy(authorization); |
520 | |
|
521 | 0 | return SPLUNK_AUTH_SUCCESS; |
522 | 0 | } |
523 | 0 | } |
524 | | |
525 | 0 | ret = SPLUNK_AUTH_UNAUTHORIZED; |
526 | 0 | flb_sds_destroy(authorization); |
527 | 0 | return ret; |
528 | 0 | } |
529 | 0 | else { |
530 | 0 | flb_sds_destroy(authorization); |
531 | 0 | return SPLUNK_AUTH_MISSING_CRED; |
532 | 0 | } |
533 | | |
534 | 0 | return SPLUNK_AUTH_SUCCESS; |
535 | 0 | } |
536 | | |
537 | | static int handle_hec_payload(struct flb_splunk *ctx, int content_type, |
538 | | flb_sds_t tag, char *buf, size_t size) |
539 | 0 | { |
540 | 0 | int ret = -1; |
541 | |
|
542 | 0 | if (content_type == HTTP_CONTENT_JSON) { |
543 | 0 | ret = parse_hec_payload_json(ctx, tag, buf, size); |
544 | 0 | } |
545 | 0 | else if (content_type == HTTP_CONTENT_TEXT) { |
546 | 0 | ret = process_raw_payload_pack(ctx, tag, buf, size); |
547 | 0 | } |
548 | 0 | else if (content_type == HTTP_CONTENT_UNKNOWN) { |
549 | 0 | if (buf[0] == '{') { |
550 | 0 | ret = parse_hec_payload_json(ctx, tag, buf, size); |
551 | 0 | } |
552 | 0 | else { |
553 | 0 | ret = process_raw_payload_pack(ctx, tag, buf, size); |
554 | 0 | } |
555 | 0 | } |
556 | |
|
557 | 0 | return ret; |
558 | 0 | } |
559 | | |
560 | | static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, |
561 | | flb_sds_t tag, |
562 | | struct mk_http_session *session, |
563 | | struct mk_http_request *request) |
564 | 0 | { |
565 | 0 | int i = 0; |
566 | 0 | int ret = 0; |
567 | 0 | int type = -1; |
568 | 0 | struct mk_http_header *header; |
569 | 0 | struct mk_http_header *header_auth; |
570 | 0 | int extra_size = -1; |
571 | 0 | struct mk_http_header *headers_extra; |
572 | 0 | int gzip_compressed = FLB_FALSE; |
573 | 0 | void *gz_data = NULL; |
574 | 0 | size_t gz_size = -1; |
575 | |
|
576 | 0 | header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; |
577 | 0 | if (header->key.data == NULL) { |
578 | 0 | flb_plg_debug(ctx->ins, "header 'Content-Type' is not set"); |
579 | 0 | } |
580 | |
|
581 | 0 | if (header->val.len == 16 && |
582 | 0 | strncasecmp(header->val.data, "application/json", 16) == 0) { |
583 | 0 | type = HTTP_CONTENT_JSON; |
584 | 0 | } |
585 | 0 | else if (header->val.len == 10 && |
586 | 0 | strncasecmp(header->val.data, "text/plain", 10) == 0) { |
587 | 0 | type = HTTP_CONTENT_TEXT; |
588 | 0 | } |
589 | 0 | else { |
590 | | /* Not necessary to specify content-type for Splunk HEC. */ |
591 | 0 | flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); |
592 | 0 | type = HTTP_CONTENT_UNKNOWN; |
593 | 0 | } |
594 | |
|
595 | 0 | if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { |
596 | 0 | send_response(conn, 400, "error: no payload found\n"); |
597 | 0 | return -2; |
598 | 0 | } |
599 | | |
600 | 0 | header_auth = &session->parser.headers[MK_HEADER_AUTHORIZATION]; |
601 | 0 | if (header_auth->key.data != NULL) { |
602 | 0 | if (strncasecmp(header_auth->val.data, "Splunk ", 7) == 0) { |
603 | 0 | ctx->ingested_auth_header = header_auth->val.data; |
604 | 0 | ctx->ingested_auth_header_len = header_auth->val.len; |
605 | 0 | } |
606 | 0 | } |
607 | |
|
608 | 0 | extra_size = session->parser.headers_extra_count; |
609 | 0 | if (extra_size > 0) { |
610 | 0 | for (i = 0; i < extra_size; i++) { |
611 | 0 | headers_extra = &session->parser.headers_extra[i]; |
612 | 0 | if (headers_extra->key.len == 16 && |
613 | 0 | strncasecmp(headers_extra->key.data, "Content-Encoding", 16) == 0) { |
614 | 0 | if (headers_extra->val.len == 4 && |
615 | 0 | strncasecmp(headers_extra->val.data, "gzip", 4) == 0) { |
616 | 0 | flb_plg_debug(ctx->ins, "body is gzipped"); |
617 | 0 | gzip_compressed = FLB_TRUE; |
618 | 0 | } |
619 | 0 | } |
620 | 0 | } |
621 | 0 | } |
622 | |
|
623 | 0 | if (gzip_compressed == FLB_TRUE) { |
624 | 0 | ret = flb_gzip_uncompress((void *) request->data.data, request->data.len, |
625 | 0 | &gz_data, &gz_size); |
626 | 0 | if (ret == -1) { |
627 | 0 | flb_plg_error(ctx->ins, "gzip uncompress is failed"); |
628 | 0 | return -1; |
629 | 0 | } |
630 | | |
631 | 0 | ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size); |
632 | 0 | flb_free(gz_data); |
633 | 0 | } |
634 | 0 | else { |
635 | 0 | ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len); |
636 | 0 | } |
637 | | |
638 | 0 | return 0; |
639 | 0 | } |
640 | | |
641 | | static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn, |
642 | | flb_sds_t tag, |
643 | | struct mk_http_session *session, |
644 | | struct mk_http_request *request) |
645 | 0 | { |
646 | 0 | int ret = -1; |
647 | 0 | struct mk_http_header *header; |
648 | 0 | struct mk_http_header *header_auth; |
649 | |
|
650 | 0 | header = &session->parser.headers[MK_HEADER_CONTENT_TYPE]; |
651 | 0 | if (header->key.data == NULL) { |
652 | 0 | send_response(conn, 400, "error: header 'Content-Type' is not set\n"); |
653 | 0 | return -1; |
654 | 0 | } |
655 | 0 | else if (header->val.len != 10 || |
656 | 0 | strncasecmp(header->val.data, "text/plain", 10) != 0) { |
657 | | /* Not necessary to specify content-type for Splunk HEC. */ |
658 | 0 | flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); |
659 | 0 | } |
660 | | |
661 | 0 | if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) { |
662 | 0 | send_response(conn, 400, "2 error: no payload found\n"); |
663 | 0 | return -1; |
664 | 0 | } |
665 | | |
666 | 0 | header_auth = &session->parser.headers[MK_HEADER_AUTHORIZATION]; |
667 | 0 | if (header_auth->key.data != NULL) { |
668 | 0 | if (strncasecmp(header_auth->val.data, "Splunk ", 7) == 0) { |
669 | 0 | ctx->ingested_auth_header = header_auth->val.data; |
670 | 0 | ctx->ingested_auth_header_len = header_auth->val.len; |
671 | 0 | } |
672 | 0 | } |
673 | | |
674 | | /* Always handle as raw type of payloads here */ |
675 | 0 | ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len); |
676 | |
|
677 | 0 | return ret; |
678 | 0 | } |
679 | | |
680 | | static inline int mk_http_point_header(mk_ptr_t *h, |
681 | | struct mk_http_parser *parser, int key) |
682 | 0 | { |
683 | 0 | struct mk_http_header *header; |
684 | |
|
685 | 0 | header = &parser->headers[key]; |
686 | 0 | if (header->type == key) { |
687 | 0 | h->data = header->val.data; |
688 | 0 | h->len = header->val.len; |
689 | 0 | return 0; |
690 | 0 | } |
691 | 0 | else { |
692 | 0 | h->data = NULL; |
693 | 0 | h->len = -1; |
694 | 0 | } |
695 | | |
696 | 0 | return -1; |
697 | 0 | } |
698 | | |
699 | | /* |
700 | | * Handle an incoming request. It perform extra checks over the request, if |
701 | | * everything is OK, it enqueue the incoming payload. |
702 | | */ |
703 | | int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, |
704 | | struct mk_http_session *session, |
705 | | struct mk_http_request *request) |
706 | 0 | { |
707 | 0 | int i; |
708 | 0 | int ret; |
709 | 0 | int len; |
710 | 0 | char *uri; |
711 | 0 | char *qs; |
712 | 0 | char *original_data = NULL; |
713 | 0 | size_t original_data_size = 0; |
714 | 0 | char *out_chunked = NULL; |
715 | 0 | size_t out_chunked_size = 0; |
716 | 0 | off_t diff; |
717 | 0 | flb_sds_t tag; |
718 | 0 | struct mk_http_header *header; |
719 | |
|
720 | 0 | if (request->uri.data[0] != '/') { |
721 | 0 | send_response(conn, 400, "error: invalid request\n"); |
722 | 0 | return -1; |
723 | 0 | } |
724 | | |
725 | | /* Decode URI */ |
726 | 0 | uri = mk_utils_url_decode(request->uri); |
727 | 0 | if (!uri) { |
728 | 0 | uri = mk_mem_alloc_z(request->uri.len + 1); |
729 | 0 | if (!uri) { |
730 | 0 | return -1; |
731 | 0 | } |
732 | 0 | memcpy(uri, request->uri.data, request->uri.len); |
733 | 0 | uri[request->uri.len] = '\0'; |
734 | 0 | } |
735 | | |
736 | | /* Try to match a query string so we can remove it */ |
737 | 0 | qs = strchr(uri, '?'); |
738 | 0 | if (qs) { |
739 | | /* remove the query string part */ |
740 | 0 | diff = qs - uri; |
741 | 0 | uri[diff] = '\0'; |
742 | 0 | } |
743 | | |
744 | | /* Refer the tag at first*/ |
745 | 0 | if (ctx->ins->tag && !ctx->ins->tag_default) { |
746 | 0 | tag = flb_sds_create(ctx->ins->tag); |
747 | 0 | if (tag == NULL) { |
748 | 0 | mk_mem_free(uri); |
749 | 0 | return -1; |
750 | 0 | } |
751 | 0 | } |
752 | 0 | else { |
753 | | /* Compose the query string using the URI */ |
754 | 0 | len = strlen(uri); |
755 | |
|
756 | 0 | if (len == 1) { |
757 | 0 | tag = NULL; /* use default tag */ |
758 | 0 | } |
759 | 0 | else { |
760 | | /* New tag skipping the URI '/' */ |
761 | 0 | tag = flb_sds_create_len(&uri[1], len - 1); |
762 | 0 | if (!tag) { |
763 | 0 | mk_mem_free(uri); |
764 | 0 | return -1; |
765 | 0 | } |
766 | | |
767 | | /* Sanitize, only allow alphanum chars */ |
768 | 0 | for (i = 0; i < flb_sds_len(tag); i++) { |
769 | 0 | if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') { |
770 | 0 | tag[i] = '_'; |
771 | 0 | } |
772 | 0 | } |
773 | 0 | } |
774 | 0 | } |
775 | | |
776 | | /* Check if we have a Host header: Hostname ; port */ |
777 | 0 | mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST); |
778 | | |
779 | | /* Header: Connection */ |
780 | 0 | mk_http_point_header(&request->connection, &session->parser, |
781 | 0 | MK_HEADER_CONNECTION); |
782 | | |
783 | | /* HTTP/1.1 needs Host header */ |
784 | 0 | if (request->host.data == NULL && request->protocol == MK_HTTP_PROTOCOL_11) { |
785 | 0 | flb_sds_destroy(tag); |
786 | 0 | mk_mem_free(uri); |
787 | |
|
788 | 0 | return -1; |
789 | 0 | } |
790 | | |
791 | | /* Should we close the session after this request ? */ |
792 | 0 | mk_http_keepalive_check(session, request, ctx->server); |
793 | | |
794 | | /* Content Length */ |
795 | 0 | header = &session->parser.headers[MK_HEADER_CONTENT_LENGTH]; |
796 | 0 | if (header->type == MK_HEADER_CONTENT_LENGTH) { |
797 | 0 | request->_content_length.data = header->val.data; |
798 | 0 | request->_content_length.len = header->val.len; |
799 | 0 | } |
800 | 0 | else { |
801 | 0 | request->_content_length.data = NULL; |
802 | 0 | } |
803 | |
|
804 | 0 | if (request->method == MK_METHOD_GET) { |
805 | | /* Handle health monitoring of splunk hec endpoint for load balancers */ |
806 | 0 | if (strcasecmp(uri, "/services/collector/health") == 0) { |
807 | 0 | send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":200}"); |
808 | 0 | } |
809 | 0 | else { |
810 | 0 | send_response(conn, 400, "error: invalid HTTP endpoint\n"); |
811 | 0 | } |
812 | |
|
813 | 0 | flb_sds_destroy(tag); |
814 | 0 | mk_mem_free(uri); |
815 | |
|
816 | 0 | return 0; |
817 | 0 | } |
818 | | |
819 | | /* Under services/collector endpoints are required for |
820 | | * authentication if provided splunk_token */ |
821 | 0 | ret = validate_auth_header(ctx, request); |
822 | 0 | if (ret < 0){ |
823 | 0 | send_response(conn, 401, "error: unauthorized\n"); |
824 | 0 | if (ret == SPLUNK_AUTH_MISSING_CRED) { |
825 | 0 | flb_plg_warn(ctx->ins, "missing credentials in request headers"); |
826 | 0 | } |
827 | 0 | else if (ret == SPLUNK_AUTH_UNAUTHORIZED) { |
828 | 0 | flb_plg_warn(ctx->ins, "wrong credentials in request headers"); |
829 | 0 | } |
830 | |
|
831 | 0 | flb_sds_destroy(tag); |
832 | 0 | mk_mem_free(uri); |
833 | |
|
834 | 0 | return -1; |
835 | 0 | } |
836 | | |
837 | | /* If the request contains chunked transfer encoded data, decode it */\ |
838 | 0 | if (mk_http_parser_is_content_chunked(&session->parser)) { |
839 | 0 | ret = mk_http_parser_chunked_decode(&session->parser, |
840 | 0 | conn->buf_data, |
841 | 0 | conn->buf_len, |
842 | 0 | &out_chunked, |
843 | 0 | &out_chunked_size); |
844 | 0 | if (ret == -1) { |
845 | 0 | flb_plg_error(ctx->ins, "failed to decode chunked data"); |
846 | 0 | send_response(conn, 400, "error: invalid chunked data\n"); |
847 | |
|
848 | 0 | flb_sds_destroy(tag); |
849 | 0 | mk_mem_free(uri); |
850 | |
|
851 | 0 | return -1; |
852 | 0 | } |
853 | | |
854 | | /* Update the request data */ |
855 | 0 | original_data = request->data.data; |
856 | 0 | original_data_size = request->data.len; |
857 | | |
858 | | /* assign the chunked one */ |
859 | 0 | request->data.data = out_chunked; |
860 | 0 | request->data.len = out_chunked_size; |
861 | 0 | } |
862 | | |
863 | | /* Handle every ingested payload cleanly */ |
864 | 0 | flb_log_event_encoder_reset(&ctx->log_encoder); |
865 | |
|
866 | 0 | if (request->method == MK_METHOD_POST) { |
867 | 0 | if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 || |
868 | 0 | strcasecmp(uri, "/services/collector/raw") == 0) { |
869 | 0 | ret = process_hec_raw_payload(ctx, conn, tag, session, request); |
870 | |
|
871 | 0 | if (!ret) { |
872 | 0 | send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); |
873 | 0 | } |
874 | 0 | send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); |
875 | 0 | } |
876 | 0 | else if (strcasecmp(uri, "/services/collector/event/1.0") == 0 || |
877 | 0 | strcasecmp(uri, "/services/collector/event") == 0 || |
878 | 0 | strcasecmp(uri, "/services/collector") == 0) { |
879 | |
|
880 | 0 | ret = process_hec_payload(ctx, conn, tag, session, request); |
881 | 0 | if (ret == -2) { |
882 | 0 | flb_sds_destroy(tag); |
883 | 0 | mk_mem_free(uri); |
884 | |
|
885 | 0 | if (out_chunked) { |
886 | 0 | mk_mem_free(out_chunked); |
887 | 0 | } |
888 | 0 | request->data.data = original_data; |
889 | 0 | request->data.len = original_data_size; |
890 | |
|
891 | 0 | return -1; |
892 | 0 | } |
893 | | |
894 | 0 | if (!ret) { |
895 | 0 | send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); |
896 | 0 | } |
897 | 0 | send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); |
898 | 0 | } |
899 | 0 | else { |
900 | 0 | send_response(conn, 400, "error: invalid HTTP endpoint\n"); |
901 | |
|
902 | 0 | flb_sds_destroy(tag); |
903 | 0 | mk_mem_free(uri); |
904 | |
|
905 | 0 | if (out_chunked) { |
906 | 0 | mk_mem_free(out_chunked); |
907 | 0 | } |
908 | 0 | request->data.data = original_data; |
909 | 0 | request->data.len = original_data_size; |
910 | |
|
911 | 0 | return -1; |
912 | 0 | } |
913 | 0 | } |
914 | 0 | else { |
915 | | /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ |
916 | |
|
917 | 0 | flb_sds_destroy(tag); |
918 | 0 | mk_mem_free(uri); |
919 | |
|
920 | 0 | if (out_chunked) { |
921 | 0 | mk_mem_free(out_chunked); |
922 | 0 | } |
923 | 0 | request->data.data = original_data; |
924 | 0 | request->data.len = original_data_size; |
925 | |
|
926 | 0 | send_response(conn, 400, "error: invalid HTTP method\n"); |
927 | 0 | return -1; |
928 | 0 | } |
929 | | |
930 | 0 | flb_sds_destroy(tag); |
931 | 0 | mk_mem_free(uri); |
932 | |
|
933 | 0 | if (out_chunked) { |
934 | 0 | mk_mem_free(out_chunked); |
935 | 0 | } |
936 | 0 | request->data.data = original_data; |
937 | 0 | request->data.len = original_data_size; |
938 | |
|
939 | 0 | return ret; |
940 | 0 | } |
941 | | |
942 | | /* |
943 | | * Handle an incoming request which has resulted in an http parser error. |
944 | | */ |
945 | | int splunk_prot_handle_error(struct flb_splunk *ctx, struct splunk_conn *conn, |
946 | | struct mk_http_session *session, |
947 | | struct mk_http_request *request) |
948 | 0 | { |
949 | 0 | send_response(conn, 400, "error: invalid request\n"); |
950 | 0 | return -1; |
951 | 0 | } |
952 | | |
953 | | |
954 | | |
955 | | |
956 | | |
957 | | |
958 | | |
959 | | /* New gen HTTP server */ |
960 | | |
961 | | static int send_response_ng(struct flb_http_response *response, |
962 | | int http_status, |
963 | | char *message) |
964 | 0 | { |
965 | 0 | flb_http_response_set_status(response, http_status); |
966 | |
|
967 | 0 | if (http_status == 201) { |
968 | 0 | flb_http_response_set_message(response, "Created"); |
969 | 0 | } |
970 | 0 | else if (http_status == 200) { |
971 | 0 | flb_http_response_set_message(response, "OK"); |
972 | 0 | } |
973 | 0 | else if (http_status == 204) { |
974 | 0 | flb_http_response_set_message(response, "No Content"); |
975 | 0 | } |
976 | 0 | else if (http_status == 400) { |
977 | 0 | flb_http_response_set_message(response, "Bad Request"); |
978 | 0 | } |
979 | |
|
980 | 0 | if (message != NULL) { |
981 | 0 | flb_http_response_set_body(response, |
982 | 0 | (unsigned char *) message, |
983 | 0 | strlen(message)); |
984 | 0 | } |
985 | |
|
986 | 0 | flb_http_response_commit(response); |
987 | |
|
988 | 0 | return 0; |
989 | 0 | } |
990 | | |
991 | | static int send_json_message_response_ng(struct flb_http_response *response, |
992 | | int http_status, |
993 | | char *message) |
994 | 0 | { |
995 | 0 | flb_http_response_set_status(response, http_status); |
996 | |
|
997 | 0 | if (http_status == 201) { |
998 | 0 | flb_http_response_set_message(response, "Created"); |
999 | 0 | } |
1000 | 0 | else if (http_status == 200) { |
1001 | 0 | flb_http_response_set_message(response, "OK"); |
1002 | 0 | } |
1003 | 0 | else if (http_status == 204) { |
1004 | 0 | flb_http_response_set_message(response, "No Content"); |
1005 | 0 | } |
1006 | 0 | else if (http_status == 400) { |
1007 | 0 | flb_http_response_set_message(response, "Bad Request"); |
1008 | 0 | } |
1009 | |
|
1010 | 0 | flb_http_response_set_header(response, |
1011 | 0 | "content-type", 0, |
1012 | 0 | "application/json", 0); |
1013 | |
|
1014 | 0 | if (message != NULL) { |
1015 | 0 | flb_http_response_set_body(response, |
1016 | 0 | (unsigned char *) message, |
1017 | 0 | strlen(message)); |
1018 | 0 | } |
1019 | |
|
1020 | 0 | flb_http_response_commit(response); |
1021 | |
|
1022 | 0 | return 0; |
1023 | 0 | } |
1024 | | |
1025 | | static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request) |
1026 | 0 | { |
1027 | 0 | struct mk_list *tmp; |
1028 | 0 | struct mk_list *head; |
1029 | 0 | char *auth_header; |
1030 | 0 | struct flb_splunk_tokens *splunk_token; |
1031 | |
|
1032 | 0 | if (mk_list_size(&ctx->auth_tokens) == 0) { |
1033 | 0 | return SPLUNK_AUTH_UNAUTH; |
1034 | 0 | } |
1035 | | |
1036 | 0 | auth_header = flb_http_request_get_header(request, "authorization"); |
1037 | |
|
1038 | 0 | if (auth_header == NULL) { |
1039 | 0 | return SPLUNK_AUTH_MISSING_CRED; |
1040 | 0 | } |
1041 | | |
1042 | 0 | if (auth_header != NULL && strlen(auth_header) > 0) { |
1043 | 0 | mk_list_foreach_safe(head, tmp, &ctx->auth_tokens) { |
1044 | 0 | splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head); |
1045 | 0 | if (strlen(auth_header) != splunk_token->length) { |
1046 | 0 | continue; |
1047 | 0 | } |
1048 | | |
1049 | 0 | if (strncasecmp(splunk_token->header, |
1050 | 0 | auth_header, |
1051 | 0 | splunk_token->length) == 0) { |
1052 | 0 | return SPLUNK_AUTH_SUCCESS; |
1053 | 0 | } |
1054 | 0 | } |
1055 | | |
1056 | 0 | return SPLUNK_AUTH_UNAUTHORIZED; |
1057 | 0 | } |
1058 | 0 | else { |
1059 | 0 | return SPLUNK_AUTH_MISSING_CRED; |
1060 | 0 | } |
1061 | | |
1062 | 0 | return SPLUNK_AUTH_SUCCESS; |
1063 | 0 | } |
1064 | | |
1065 | | static int process_hec_payload_ng(struct flb_http_request *request, |
1066 | | struct flb_http_response *response, |
1067 | | flb_sds_t tag, |
1068 | | struct flb_splunk *ctx) |
1069 | 0 | { |
1070 | 0 | int type = -1; |
1071 | 0 | int ret = 0; |
1072 | 0 | size_t size = 0; |
1073 | 0 | char *auth_header; |
1074 | |
|
1075 | 0 | type = HTTP_CONTENT_UNKNOWN; |
1076 | |
|
1077 | 0 | if (request->content_type != NULL) { |
1078 | 0 | if (strcasecmp(request->content_type, "application/json") == 0) { |
1079 | 0 | type = HTTP_CONTENT_JSON; |
1080 | 0 | } |
1081 | 0 | else if (strcasecmp(request->content_type, "text/plain") == 0) { |
1082 | 0 | type = HTTP_CONTENT_TEXT; |
1083 | 0 | } |
1084 | 0 | else { |
1085 | | /* Not necessary to specify content-type for Splunk HEC. */ |
1086 | 0 | flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); |
1087 | 0 | } |
1088 | 0 | } |
1089 | |
|
1090 | 0 | ret = flb_hash_table_get(request->headers, "authorization", 13, (void **)&auth_header, &size); |
1091 | 0 | if (ret != 0 && size > 0) { |
1092 | 0 | if (strncasecmp(auth_header, "Splunk ", 7) == 0) { |
1093 | 0 | ctx->ingested_auth_header = auth_header; |
1094 | 0 | ctx->ingested_auth_header_len = strlen(auth_header); |
1095 | 0 | } |
1096 | 0 | } |
1097 | |
|
1098 | 0 | if (request->body == NULL || cfl_sds_len(request->body) <= 0) { |
1099 | 0 | send_response_ng(response, 400, "error: no payload found\n"); |
1100 | |
|
1101 | 0 | return -1; |
1102 | 0 | } |
1103 | | |
1104 | 0 | return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body)); |
1105 | 0 | } |
1106 | | |
1107 | | static int process_hec_raw_payload_ng(struct flb_http_request *request, |
1108 | | struct flb_http_response *response, |
1109 | | flb_sds_t tag, |
1110 | | struct flb_splunk *ctx) |
1111 | 0 | { |
1112 | 0 | int ret = 0; |
1113 | 0 | size_t size = 0; |
1114 | 0 | char *auth_header; |
1115 | |
|
1116 | 0 | if (request->content_type == NULL) { |
1117 | 0 | send_response_ng(response, 400, "error: header 'Content-Type' is not set\n"); |
1118 | |
|
1119 | 0 | return -1; |
1120 | 0 | } |
1121 | 0 | else if (strcasecmp(request->content_type, "text/plain") != 0) { |
1122 | | /* Not necessary to specify content-type for Splunk HEC. */ |
1123 | 0 | flb_plg_debug(ctx->ins, "Mark as unknown type for ingested payloads"); |
1124 | 0 | } |
1125 | | |
1126 | 0 | ret = flb_hash_table_get(request->headers, "authorization", 13, (void **)&auth_header, &size); |
1127 | 0 | if (ret != 0 && size > 0) { |
1128 | 0 | if (strncasecmp(auth_header, "Splunk ", 7) == 0) { |
1129 | 0 | ctx->ingested_auth_header = auth_header; |
1130 | 0 | ctx->ingested_auth_header_len = strlen(auth_header); |
1131 | 0 | } |
1132 | 0 | } |
1133 | |
|
1134 | 0 | if (request->body == NULL || cfl_sds_len(request->body) == 0) { |
1135 | 0 | send_response_ng(response, 400, "error: no payload found\n"); |
1136 | |
|
1137 | 0 | return -1; |
1138 | 0 | } |
1139 | | |
1140 | | /* Always handle as raw type of payloads here */ |
1141 | 0 | return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body)); |
1142 | 0 | } |
1143 | | |
1144 | | int splunk_prot_handle_ng(struct flb_http_request *request, |
1145 | | struct flb_http_response *response) |
1146 | 0 | { |
1147 | 0 | struct flb_splunk *context; |
1148 | 0 | int ret = -1; |
1149 | 0 | flb_sds_t tag; |
1150 | |
|
1151 | 0 | context = (struct flb_splunk *) response->stream->user_data; |
1152 | |
|
1153 | 0 | if (request->path[0] != '/') { |
1154 | 0 | send_response_ng(response, 400, "error: invalid request\n"); |
1155 | 0 | return -1; |
1156 | 0 | } |
1157 | | |
1158 | | /* HTTP/1.1 needs Host header */ |
1159 | 0 | if (request->protocol_version == HTTP_PROTOCOL_VERSION_11 && |
1160 | 0 | request->host == NULL) { |
1161 | |
|
1162 | 0 | return -1; |
1163 | 0 | } |
1164 | | |
1165 | 0 | if (request->method == HTTP_METHOD_GET) { |
1166 | | /* Handle health monitoring of splunk hec endpoint for load balancers */ |
1167 | 0 | if (strcasecmp(request->path, "/services/collector/health") == 0) { |
1168 | 0 | send_json_message_response_ng(response, 200, "{\"text\":\"Success\",\"code\":200}"); |
1169 | 0 | } |
1170 | 0 | else { |
1171 | 0 | send_response_ng(response, 400, "error: invalid HTTP endpoint\n"); |
1172 | 0 | } |
1173 | |
|
1174 | 0 | return 0; |
1175 | 0 | } |
1176 | | |
1177 | | /* Under services/collector endpoints are required for |
1178 | | * authentication if provided splunk_token */ |
1179 | 0 | ret = validate_auth_header_ng(context, request); |
1180 | |
|
1181 | 0 | if (ret < 0) { |
1182 | 0 | send_response_ng(response, 401, "error: unauthorized\n"); |
1183 | |
|
1184 | 0 | if (ret == SPLUNK_AUTH_MISSING_CRED) { |
1185 | 0 | flb_plg_warn(context->ins, "missing credentials in request headers"); |
1186 | 0 | } |
1187 | 0 | else if (ret == SPLUNK_AUTH_UNAUTHORIZED) { |
1188 | 0 | flb_plg_warn(context->ins, "wrong credentials in request headers"); |
1189 | 0 | } |
1190 | |
|
1191 | 0 | return -1; |
1192 | 0 | } |
1193 | | |
1194 | | /* Handle every ingested payload cleanly */ |
1195 | 0 | flb_log_event_encoder_reset(&context->log_encoder); |
1196 | |
|
1197 | 0 | if (request->method != HTTP_METHOD_POST) { |
1198 | | /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ |
1199 | 0 | send_response_ng(response, 400, "error: invalid HTTP method\n"); |
1200 | |
|
1201 | 0 | return -1; |
1202 | 0 | } |
1203 | | |
1204 | 0 | tag = flb_sds_create(context->ins->tag); |
1205 | |
|
1206 | 0 | if (tag == NULL) { |
1207 | 0 | return -1; |
1208 | 0 | } |
1209 | | |
1210 | 0 | if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 || |
1211 | 0 | strcasecmp(request->path, "/services/collector/raw") == 0) { |
1212 | 0 | ret = process_hec_raw_payload_ng(request, response, tag, context); |
1213 | 0 | if (ret != 0) { |
1214 | 0 | send_json_message_response_ng(response, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); |
1215 | 0 | ret = -1; |
1216 | 0 | } |
1217 | 0 | else { |
1218 | 0 | send_json_message_response_ng(response, 200, "{\"text\":\"Success\",\"code\":0}"); |
1219 | 0 | ret = 0; |
1220 | 0 | } |
1221 | 0 | } |
1222 | 0 | else if (strcasecmp(request->path, "/services/collector/event/1.0") == 0 || |
1223 | 0 | strcasecmp(request->path, "/services/collector/event") == 0 || |
1224 | 0 | strcasecmp(request->path, "/services/collector") == 0) { |
1225 | 0 | ret = process_hec_payload_ng(request, response, tag, context); |
1226 | 0 | if (ret != 0) { |
1227 | 0 | send_json_message_response_ng(response, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); |
1228 | 0 | ret = -1; |
1229 | 0 | } |
1230 | 0 | else { |
1231 | 0 | send_json_message_response_ng(response, 200, "{\"text\":\"Success\",\"code\":0}"); |
1232 | 0 | ret = 0; |
1233 | 0 | } |
1234 | 0 | } |
1235 | 0 | else { |
1236 | 0 | send_response_ng(response, 400, "error: invalid HTTP endpoint\n"); |
1237 | 0 | ret = -1; |
1238 | 0 | } |
1239 | |
|
1240 | 0 | flb_sds_destroy(tag); |
1241 | 0 | return ret; |
1242 | 0 | } |