/src/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2023 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | |
21 | | #include <fluent-bit/flb_input_plugin.h> |
22 | | #include <fluent-bit/flb_network.h> |
23 | | #include <fluent-bit/flb_config.h> |
24 | | #include <fluent-bit/flb_random.h> |
25 | | |
26 | | #include "in_elasticsearch.h" |
27 | | #include "in_elasticsearch_config.h" |
28 | | #include "in_elasticsearch_bulk_conn.h" |
29 | | |
30 | | /* |
31 | | * For a server event, the collection event means a new client have arrived, we |
32 | | * accept the connection and create a new TCP instance which will wait for |
33 | | * JSON map messages. |
34 | | */ |
35 | | static int in_elasticsearch_bulk_collect(struct flb_input_instance *ins, |
36 | | struct flb_config *config, void *in_context) |
37 | 0 | { |
38 | 0 | struct flb_connection *connection; |
39 | 0 | struct in_elasticsearch_bulk_conn *conn; |
40 | 0 | struct flb_in_elasticsearch *ctx; |
41 | |
|
42 | 0 | ctx = in_context; |
43 | |
|
44 | 0 | connection = flb_downstream_conn_get(ctx->downstream); |
45 | |
|
46 | 0 | if (connection == NULL) { |
47 | 0 | flb_plg_error(ctx->ins, "could not accept new connection"); |
48 | |
|
49 | 0 | return -1; |
50 | 0 | } |
51 | | |
52 | 0 | flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", |
53 | 0 | connection->fd); |
54 | |
|
55 | 0 | conn = in_elasticsearch_bulk_conn_add(connection, ctx); |
56 | |
|
57 | 0 | if (conn == NULL) { |
58 | 0 | flb_downstream_conn_release(connection); |
59 | |
|
60 | 0 | return -1; |
61 | 0 | } |
62 | | |
63 | 0 | return 0; |
64 | 0 | } |
65 | | |
66 | 0 | static void bytes_to_groupname(unsigned char *data, char *buf, size_t len) { |
67 | 0 | int index; |
68 | 0 | char charset[] = "0123456789" |
69 | 0 | "abcdefghijklmnopqrstuvwxyz" |
70 | 0 | "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; |
71 | |
|
72 | 0 | while (len-- > 0) { |
73 | 0 | index = (int) data[len]; |
74 | 0 | index = index % (sizeof(charset) - 1); |
75 | 0 | buf[len] = charset[index]; |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | 0 | static void bytes_to_nodename(unsigned char *data, char *buf, size_t len) { |
80 | 0 | int index; |
81 | 0 | char charset[] = "0123456789" |
82 | 0 | "abcdefghijklmnopqrstuvwxyz"; |
83 | |
|
84 | 0 | while (len-- > 0) { |
85 | 0 | index = (int) data[len]; |
86 | 0 | index = index % (sizeof(charset) - 1); |
87 | 0 | buf[len] = charset[index]; |
88 | 0 | } |
89 | 0 | } |
90 | | |
91 | | static int in_elasticsearch_bulk_init(struct flb_input_instance *ins, |
92 | | struct flb_config *config, void *data) |
93 | 0 | { |
94 | 0 | unsigned short int port; |
95 | 0 | int ret; |
96 | 0 | struct flb_in_elasticsearch *ctx; |
97 | 0 | unsigned char rand[16]; |
98 | |
|
99 | 0 | (void) data; |
100 | | |
101 | | /* Create context and basic conf */ |
102 | 0 | ctx = in_elasticsearch_config_create(ins); |
103 | 0 | if (!ctx) { |
104 | 0 | return -1; |
105 | 0 | } |
106 | | |
107 | 0 | ctx->collector_id = -1; |
108 | | |
109 | | /* Populate context with config map defaults and incoming properties */ |
110 | 0 | ret = flb_input_config_map_set(ins, (void *) ctx); |
111 | 0 | if (ret == -1) { |
112 | 0 | flb_plg_error(ctx->ins, "configuration error"); |
113 | 0 | in_elasticsearch_config_destroy(ctx); |
114 | 0 | return -1; |
115 | 0 | } |
116 | | |
117 | | /* Set the context */ |
118 | 0 | flb_input_set_context(ins, ctx); |
119 | |
|
120 | 0 | port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10); |
121 | |
|
122 | 0 | if (flb_random_bytes(rand, 16)) { |
123 | 0 | flb_plg_error(ctx->ins, "cannot generate cluster name"); |
124 | 0 | return -1; |
125 | 0 | } |
126 | | |
127 | 0 | bytes_to_groupname(rand, ctx->cluster_name, 16); |
128 | |
|
129 | 0 | if (flb_random_bytes(rand, 12)) { |
130 | 0 | flb_plg_error(ctx->ins, "cannot generate node name"); |
131 | 0 | return -1; |
132 | 0 | } |
133 | | |
134 | 0 | bytes_to_nodename(rand, ctx->node_name, 12); |
135 | |
|
136 | 0 | ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP, |
137 | 0 | ins->flags, |
138 | 0 | ctx->listen, |
139 | 0 | port, |
140 | 0 | ins->tls, |
141 | 0 | config, |
142 | 0 | &ins->net_setup); |
143 | |
|
144 | 0 | if (ctx->downstream == NULL) { |
145 | 0 | flb_plg_error(ctx->ins, |
146 | 0 | "could not initialize downstream on %s:%s. Aborting", |
147 | 0 | ctx->listen, ctx->tcp_port); |
148 | |
|
149 | 0 | in_elasticsearch_config_destroy(ctx); |
150 | |
|
151 | 0 | return -1; |
152 | 0 | } |
153 | | |
154 | 0 | flb_input_downstream_set(ctx->downstream, ctx->ins); |
155 | | |
156 | | /* Collect upon data available on the standard input */ |
157 | 0 | ret = flb_input_set_collector_socket(ins, |
158 | 0 | in_elasticsearch_bulk_collect, |
159 | 0 | ctx->downstream->server_fd, |
160 | 0 | config); |
161 | 0 | if (ret == -1) { |
162 | 0 | flb_plg_error(ctx->ins, "Could not set collector for IN_ELASTICSEARCH input plugin"); |
163 | 0 | in_elasticsearch_config_destroy(ctx); |
164 | |
|
165 | 0 | return -1; |
166 | 0 | } |
167 | | |
168 | 0 | ctx->collector_id = ret; |
169 | |
|
170 | 0 | return 0; |
171 | 0 | } |
172 | | |
173 | | static int in_elasticsearch_bulk_exit(void *data, struct flb_config *config) |
174 | 0 | { |
175 | 0 | struct flb_in_elasticsearch *ctx; |
176 | |
|
177 | 0 | (void) config; |
178 | |
|
179 | 0 | ctx = data; |
180 | |
|
181 | 0 | if (ctx != NULL) { |
182 | 0 | in_elasticsearch_config_destroy(ctx); |
183 | 0 | } |
184 | |
|
185 | 0 | return 0; |
186 | 0 | } |
187 | | |
188 | | /* Configuration properties map */ |
189 | | static struct flb_config_map config_map[] = { |
190 | | { |
191 | | FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, |
192 | | 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, buffer_max_size), |
193 | | "Set the maximum size of buffer" |
194 | | }, |
195 | | |
196 | | { |
197 | | FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE, |
198 | | 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, buffer_chunk_size), |
199 | | "Set the buffer chunk size" |
200 | | }, |
201 | | |
202 | | { |
203 | | FLB_CONFIG_MAP_STR, "tag_key", NULL, |
204 | | 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, tag_key), |
205 | | "Specify a key name for extracting as a tag" |
206 | | }, |
207 | | |
208 | | { |
209 | | FLB_CONFIG_MAP_STR, "meta_key", "@meta", |
210 | | 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, meta_key), |
211 | | "Specify a key name for meta information" |
212 | | }, |
213 | | |
214 | | { |
215 | | FLB_CONFIG_MAP_STR, "hostname", "localhost", |
216 | | 0, FLB_TRUE, offsetof(struct flb_in_elasticsearch, hostname), |
217 | | "Specify hostname or FQDN. This parameter is effective for sniffering node information." |
218 | | }, |
219 | | |
220 | | /* EOF */ |
221 | | {0} |
222 | | }; |
223 | | |
224 | | /* Plugin reference */ |
225 | | struct flb_input_plugin in_elasticsearch_plugin = { |
226 | | .name = "elasticsearch", |
227 | | .description = "HTTP Endpoints for Elasticsearch (Bulk API)", |
228 | | .cb_init = in_elasticsearch_bulk_init, |
229 | | .cb_pre_run = NULL, |
230 | | .cb_collect = in_elasticsearch_bulk_collect, |
231 | | .cb_flush_buf = NULL, |
232 | | .cb_pause = NULL, |
233 | | .cb_resume = NULL, |
234 | | .cb_exit = in_elasticsearch_bulk_exit, |
235 | | .config_map = config_map, |
236 | | .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS |
237 | | }; |