/src/fluent-bit/plugins/out_td/td.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2022 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_network.h> |
22 | | #include <fluent-bit/flb_pack.h> |
23 | | #include <fluent-bit/flb_http_client.h> |
24 | | #include <fluent-bit/flb_time.h> |
25 | | #include <msgpack.h> |
26 | | |
27 | | #include "td.h" |
28 | | #include "td_http.h" |
29 | | #include "td_config.h" |
30 | | |
31 | | #include <stdio.h> |
32 | | #include <stdlib.h> |
33 | | #include <assert.h> |
34 | | #include <errno.h> |
35 | | |
36 | | /* |
37 | | * Convert the internal Fluent Bit data representation to the required |
38 | | * one by Treasure Data cloud service. |
39 | | * |
40 | | * This function returns a new msgpack buffer and store the bytes length |
41 | | * in the out_size variable. |
42 | | */ |
43 | | static char *td_format(const void *data, size_t bytes, int *out_size) |
44 | 0 | { |
45 | 0 | int i; |
46 | 0 | int ret; |
47 | 0 | int n_size; |
48 | 0 | size_t off = 0; |
49 | 0 | time_t atime; |
50 | 0 | char *buf; |
51 | 0 | struct msgpack_sbuffer mp_sbuf; |
52 | 0 | struct msgpack_packer mp_pck; |
53 | 0 | msgpack_unpacked result; |
54 | 0 | msgpack_object root; |
55 | 0 | msgpack_object map; |
56 | 0 | msgpack_sbuffer *sbuf; |
57 | 0 | msgpack_object *obj; |
58 | 0 | struct flb_time tm; |
59 | | |
60 | | /* Initialize contexts for new output */ |
61 | 0 | msgpack_sbuffer_init(&mp_sbuf); |
62 | 0 | msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); |
63 | | |
64 | | /* Iterate the original buffer and perform adjustments */ |
65 | 0 | msgpack_unpacked_init(&result); |
66 | | |
67 | | /* Perform some format validation */ |
68 | 0 | ret = msgpack_unpack_next(&result, data, bytes, &off); |
69 | 0 | if (ret == MSGPACK_UNPACK_CONTINUE) { |
70 | 0 | msgpack_unpacked_destroy(&result); |
71 | 0 | return NULL; |
72 | 0 | } |
73 | | |
74 | | /* We 'should' get an array */ |
75 | 0 | if (result.data.type != MSGPACK_OBJECT_ARRAY) { |
76 | | /* |
77 | | * If we got a different format, we assume the caller knows what he is |
78 | | * doing, we just duplicate the content in a new buffer and cleanup. |
79 | | */ |
80 | 0 | buf = flb_malloc(bytes); |
81 | 0 | if (!buf) { |
82 | 0 | flb_errno(); |
83 | 0 | msgpack_unpacked_destroy(&result); |
84 | 0 | return NULL; |
85 | 0 | } |
86 | | |
87 | 0 | memcpy(buf, data, bytes); |
88 | 0 | *out_size = bytes; |
89 | 0 | msgpack_unpacked_destroy(&result); |
90 | 0 | return buf; |
91 | 0 | } |
92 | | |
93 | 0 | root = result.data; |
94 | 0 | if (root.via.array.size == 0) { |
95 | 0 | msgpack_unpacked_destroy(&result); |
96 | 0 | return NULL; |
97 | 0 | } |
98 | | |
99 | 0 | off = 0; |
100 | 0 | msgpack_unpacked_destroy(&result); |
101 | 0 | msgpack_unpacked_init(&result); |
102 | 0 | while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { |
103 | 0 | if (result.data.type != MSGPACK_OBJECT_ARRAY) { |
104 | 0 | continue; |
105 | 0 | } |
106 | | |
107 | | /* Each array must have two entries: time and record */ |
108 | 0 | root = result.data; |
109 | 0 | if (root.via.array.size != 2) { |
110 | 0 | continue; |
111 | 0 | } |
112 | | |
113 | 0 | flb_time_pop_from_msgpack(&tm, &result, &obj); |
114 | |
|
115 | 0 | atime = tm.tm.tv_sec; |
116 | 0 | map = root.via.array.ptr[1]; |
117 | |
|
118 | 0 | n_size = map.via.map.size + 1; |
119 | 0 | msgpack_pack_map(&mp_pck, n_size); |
120 | 0 | msgpack_pack_str(&mp_pck, 4); |
121 | 0 | msgpack_pack_str_body(&mp_pck, "time", 4); |
122 | 0 | msgpack_pack_int32(&mp_pck, atime); |
123 | |
|
124 | 0 | for (i = 0; i < n_size - 1; i++) { |
125 | 0 | msgpack_pack_object(&mp_pck, map.via.map.ptr[i].key); |
126 | 0 | msgpack_pack_object(&mp_pck, map.via.map.ptr[i].val); |
127 | 0 | } |
128 | 0 | } |
129 | 0 | msgpack_unpacked_destroy(&result); |
130 | | |
131 | | /* Create new buffer */ |
132 | 0 | sbuf = &mp_sbuf; |
133 | 0 | *out_size = sbuf->size; |
134 | 0 | buf = flb_malloc(sbuf->size); |
135 | 0 | if (!buf) { |
136 | 0 | flb_errno(); |
137 | 0 | return NULL; |
138 | 0 | } |
139 | | |
140 | | /* set a new buffer and re-initialize our MessagePack context */ |
141 | 0 | memcpy(buf, sbuf->data, sbuf->size); |
142 | 0 | msgpack_sbuffer_destroy(&mp_sbuf); |
143 | |
|
144 | 0 | return buf; |
145 | 0 | } |
146 | | |
147 | | static int cb_td_init(struct flb_output_instance *ins, struct flb_config *config, |
148 | | void *data) |
149 | 0 | { |
150 | 0 | struct flb_td *ctx; |
151 | 0 | struct flb_upstream *upstream; |
152 | 0 | (void) data; |
153 | |
|
154 | 0 | ctx = td_config_init(ins); |
155 | 0 | if (!ctx) { |
156 | 0 | flb_plg_warn(ins, "Error reading configuration"); |
157 | 0 | return -1; |
158 | 0 | } |
159 | | |
160 | 0 | if (ctx->region == FLB_TD_REGION_US) { |
161 | 0 | flb_output_net_default("api.treasuredata.com", 443, ins); |
162 | 0 | } |
163 | 0 | else if (ctx->region == FLB_TD_REGION_JP) { |
164 | 0 | flb_output_net_default("api.treasuredata.co.jp", 443, ins); |
165 | 0 | } |
166 | |
|
167 | 0 | upstream = flb_upstream_create(config, |
168 | 0 | ins->host.name, |
169 | 0 | ins->host.port, |
170 | 0 | FLB_IO_TLS, ins->tls); |
171 | 0 | if (!upstream) { |
172 | 0 | flb_free(ctx); |
173 | 0 | return -1; |
174 | 0 | } |
175 | 0 | ctx->u = upstream; |
176 | 0 | flb_output_upstream_set(ctx->u, ins); |
177 | |
|
178 | 0 | flb_output_set_context(ins, ctx); |
179 | 0 | return 0; |
180 | 0 | } |
181 | | |
182 | | static void cb_td_flush(struct flb_event_chunk *event_chunk, |
183 | | struct flb_output_flush *out_flush, |
184 | | struct flb_input_instance *i_ins, |
185 | | void *out_context, |
186 | | struct flb_config *config) |
187 | 0 | { |
188 | 0 | int ret; |
189 | 0 | int bytes_out; |
190 | 0 | char *pack; |
191 | 0 | size_t b_sent; |
192 | 0 | char *body = NULL; |
193 | 0 | struct flb_td *ctx = out_context; |
194 | 0 | struct flb_connection *u_conn; |
195 | 0 | struct flb_http_client *c; |
196 | 0 | (void) i_ins; |
197 | | |
198 | | /* Convert format */ |
199 | 0 | pack = td_format(event_chunk->data, event_chunk->size, &bytes_out); |
200 | 0 | if (!pack) { |
201 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
202 | 0 | } |
203 | | |
204 | | /* Lookup an available connection context */ |
205 | 0 | u_conn = flb_upstream_conn_get(ctx->u); |
206 | 0 | if (!u_conn) { |
207 | 0 | flb_plg_error(ctx->ins, "no upstream connections available"); |
208 | 0 | flb_free(pack); |
209 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
210 | 0 | } |
211 | | |
212 | | /* Compose request */ |
213 | 0 | c = td_http_client(u_conn, pack, bytes_out, &body, ctx, config); |
214 | 0 | if (!c) { |
215 | 0 | flb_free(pack); |
216 | 0 | flb_upstream_conn_release(u_conn); |
217 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
218 | 0 | } |
219 | | |
220 | | /* Issue HTTP request */ |
221 | 0 | ret = flb_http_do(c, &b_sent); |
222 | | |
223 | | /* Release Resources */ |
224 | 0 | flb_free(pack); |
225 | 0 | flb_free(body); |
226 | | |
227 | | /* Validate HTTP status */ |
228 | 0 | if (ret == 0) { |
229 | | /* We expect a HTTP 200 OK */ |
230 | 0 | if (c->resp.status != 200) { |
231 | 0 | if (c->resp.payload_size > 0) { |
232 | 0 | flb_plg_warn(ctx->ins, "HTTP status %i\n%s", |
233 | 0 | c->resp.status, c->resp.payload); |
234 | 0 | } |
235 | 0 | else { |
236 | 0 | flb_plg_warn(ctx->ins, "HTTP status %i", c->resp.status); |
237 | 0 | } |
238 | 0 | goto retry; |
239 | 0 | } |
240 | 0 | else { |
241 | 0 | flb_plg_info(ctx->ins, "HTTP status 200 OK"); |
242 | 0 | } |
243 | 0 | } |
244 | 0 | else { |
245 | 0 | flb_plg_error(ctx->ins, "http_do=%i", ret); |
246 | 0 | goto retry; |
247 | 0 | } |
248 | | |
249 | | /* release */ |
250 | 0 | flb_upstream_conn_release(u_conn); |
251 | 0 | flb_http_client_destroy(c); |
252 | |
|
253 | 0 | FLB_OUTPUT_RETURN(FLB_OK); |
254 | | |
255 | 0 | retry: |
256 | 0 | flb_upstream_conn_release(u_conn); |
257 | 0 | flb_http_client_destroy(c); |
258 | |
|
259 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
260 | 0 | } |
261 | | |
262 | | static int cb_td_exit(void *data, struct flb_config *config) |
263 | 0 | { |
264 | 0 | struct flb_td *ctx = data; |
265 | |
|
266 | 0 | if (!ctx) { |
267 | 0 | return 0; |
268 | 0 | } |
269 | | |
270 | 0 | flb_upstream_destroy(ctx->u); |
271 | 0 | flb_free(ctx); |
272 | |
|
273 | 0 | return 0; |
274 | 0 | } |
275 | | |
276 | | static struct flb_config_map config_map[] = { |
277 | | { |
278 | | FLB_CONFIG_MAP_STR, "API", (char *)NULL, |
279 | | 0, FLB_TRUE, offsetof(struct flb_td, api), |
280 | | "Set the API key" |
281 | | }, |
282 | | { |
283 | | FLB_CONFIG_MAP_STR, "Database", (char *)NULL, |
284 | | 0, FLB_TRUE, offsetof(struct flb_td, db_name), |
285 | | "Set the Database file" |
286 | | }, |
287 | | { |
288 | | FLB_CONFIG_MAP_STR, "Table", (char *)NULL, |
289 | | 0, FLB_TRUE, offsetof(struct flb_td, db_table), |
290 | | "Set the Database Table" |
291 | | }, |
292 | | { |
293 | | FLB_CONFIG_MAP_STR, "Region", (char *)NULL, |
294 | | 0, FLB_TRUE, offsetof(struct flb_td, region_str), |
295 | | "Set the Region: us or jp" |
296 | | }, |
297 | | /* EOF */ |
298 | | {0} |
299 | | }; |
300 | | |
301 | | /* Plugin reference */ |
302 | | struct flb_output_plugin out_td_plugin = { |
303 | | .name = "td", |
304 | | .description = "Treasure Data", |
305 | | .cb_init = cb_td_init, |
306 | | .cb_pre_run = NULL, |
307 | | .cb_flush = cb_td_flush, |
308 | | .cb_exit = cb_td_exit, |
309 | | .config_map = config_map, |
310 | | .flags = FLB_IO_TLS, |
311 | | }; |