/src/fluent-bit/lib/monkey/mk_server/mk_net.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Monkey HTTP Server |
4 | | * ================== |
5 | | * Copyright 2001-2016 Monkey Software LLC <eduardo@monkey.io> |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <monkey/mk_core.h> |
21 | | #include <monkey/mk_net.h> |
22 | | #include <monkey/mk_scheduler.h> |
23 | | #include <monkey/mk_plugin.h> |
24 | | #include <monkey/mk_thread.h> |
25 | | #include <monkey/mk_tls.h> |
26 | | |
27 | | #ifdef _WIN32 |
28 | | #include <winsock2.h> |
29 | | #include <afunix.h> |
30 | | #else |
31 | | #include <sys/socket.h> |
32 | | #include <netinet/tcp.h> |
33 | | #endif |
34 | | |
35 | | /* Initialize the network stack*/ |
36 | | int mk_net_init() |
37 | 0 | { |
38 | | #ifdef _WIN32 |
39 | | int result; |
40 | | WSADATA wsa_data; |
41 | | static int initialized = 0; |
42 | | |
43 | | if(0 != initialized) { |
44 | | return 0; |
45 | | } |
46 | | |
47 | | result = WSAStartup(MAKEWORD(2, 2), &wsa_data); |
48 | | |
49 | | if(0 != result) { |
50 | | if(WSAEINPROGRESS == result) |
51 | | { |
52 | | Sleep(100); /* Let the other thread finish initializing the stack */ |
53 | | |
54 | | return 0; |
55 | | } |
56 | | |
57 | | return -1; |
58 | | } |
59 | | |
60 | | initialized = 1; |
61 | | #endif |
62 | |
|
63 | 0 | return 0; |
64 | 0 | } |
65 | | |
66 | | /* Connect to a TCP socket server */ |
67 | | static int mk_net_fd_connect(int fd, char *host, unsigned long port) |
68 | 0 | { |
69 | 0 | int ret; |
70 | 0 | struct addrinfo hints; |
71 | 0 | struct addrinfo *res; |
72 | 0 | char _port[6]; |
73 | |
|
74 | 0 | memset(&hints, 0, sizeof hints); |
75 | 0 | hints.ai_family = AF_UNSPEC; |
76 | 0 | hints.ai_socktype = SOCK_STREAM; |
77 | |
|
78 | 0 | snprintf(_port, sizeof(_port), "%lu", port); |
79 | 0 | ret = getaddrinfo(host, _port, &hints, &res); |
80 | 0 | if (ret != 0) { |
81 | 0 | return -1; |
82 | 0 | } |
83 | | |
84 | 0 | ret = connect(fd, res->ai_addr, res->ai_addrlen); |
85 | 0 | freeaddrinfo(res); |
86 | |
|
87 | 0 | return ret; |
88 | 0 | } |
89 | | |
90 | | struct mk_net_connection *mk_net_conn_create(char *addr, int port) |
91 | 0 | { |
92 | 0 | int fd; |
93 | 0 | int ret; |
94 | 0 | int error = 0; |
95 | 0 | socklen_t len = sizeof(error); |
96 | 0 | struct mk_sched_worker *sched; |
97 | 0 | struct mk_net_connection *conn; |
98 | | |
99 | | /* Allocate connection context */ |
100 | 0 | conn = mk_mem_alloc(sizeof(struct mk_net_connection)); |
101 | 0 | if (!conn) { |
102 | 0 | return NULL; |
103 | 0 | } |
104 | | |
105 | | /* Create socket */ |
106 | 0 | fd = mk_socket_create(AF_INET, SOCK_STREAM, 0); |
107 | 0 | if (fd == -1) { |
108 | 0 | mk_mem_free(conn); |
109 | 0 | return NULL; |
110 | 0 | } |
111 | | |
112 | | /* Make socket async */ |
113 | 0 | mk_socket_set_nonblocking(fd); |
114 | 0 | conn->fd = fd; |
115 | |
|
116 | 0 | ret = mk_net_fd_connect(conn->fd, addr, port); |
117 | 0 | if (ret == -1) { |
118 | 0 | if (errno != EINPROGRESS) { |
119 | 0 | close(fd); |
120 | 0 | mk_mem_free(conn); |
121 | 0 | return NULL; |
122 | 0 | } |
123 | | |
124 | 0 | MK_EVENT_NEW(&conn->event); |
125 | |
|
126 | 0 | sched = mk_sched_get_thread_conf(); |
127 | | // FIXME: not including the thread |
128 | | //conn->thread = mk_thread_get(); |
129 | 0 | ret = mk_event_add(sched->loop, conn->fd, MK_EVENT_THREAD, |
130 | 0 | MK_EVENT_WRITE, &conn->event); |
131 | 0 | if (ret == -1) { |
132 | 0 | close(fd); |
133 | 0 | mk_mem_free(conn); |
134 | 0 | return NULL; |
135 | 0 | } |
136 | | |
137 | | /* |
138 | | * Return the control to the parent caller, we need to wait for |
139 | | * the event loop to get back to us. |
140 | | */ |
141 | 0 | mk_thread_yield(conn->thread); |
142 | | |
143 | | /* We got a notification, remove the event registered */ |
144 | 0 | ret = mk_event_del(sched->loop, &conn->event); |
145 | | |
146 | | /* Check the connection status */ |
147 | 0 | if (conn->event.mask & MK_EVENT_WRITE) { |
148 | 0 | ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); |
149 | 0 | if (ret == -1) { |
150 | 0 | close(fd); |
151 | 0 | mk_mem_free(conn); |
152 | 0 | return NULL; |
153 | 0 | } |
154 | | |
155 | 0 | if (error != 0) { |
156 | | /* Connection is broken, not much to do here */ |
157 | 0 | fprintf(stderr, "Async connection failed %s:%i\n", |
158 | 0 | conn->host, conn->port); |
159 | 0 | close(fd); |
160 | 0 | mk_mem_free(conn); |
161 | 0 | return NULL; |
162 | 0 | } |
163 | 0 | MK_EVENT_NEW(&conn->event); |
164 | 0 | return conn; |
165 | 0 | } |
166 | 0 | else { |
167 | 0 | close(fd); |
168 | 0 | mk_mem_free(conn); |
169 | 0 | return NULL; |
170 | 0 | } |
171 | 0 | } |
172 | | |
173 | 0 | return NULL; |
174 | 0 | } |
175 | | |
176 | | int mk_net_conn_write(struct mk_channel *channel, |
177 | | void *data, size_t len) |
178 | 0 | { |
179 | 0 | int ret = 0; |
180 | 0 | int error; |
181 | 0 | ssize_t bytes; |
182 | 0 | size_t total = 0; |
183 | 0 | size_t send; |
184 | 0 | socklen_t slen = sizeof(error); |
185 | 0 | struct mk_thread *th = MK_TLS_GET(mk_thread); |
186 | 0 | struct mk_sched_worker *sched; |
187 | |
|
188 | 0 | sched = mk_sched_get_thread_conf(); |
189 | 0 | if (!sched) { |
190 | 0 | return -1; |
191 | 0 | } |
192 | | |
193 | 0 | retry: |
194 | 0 | error = 0; |
195 | |
|
196 | 0 | if (len - total > 524288) { |
197 | 0 | send = 524288; |
198 | 0 | } |
199 | 0 | else { |
200 | 0 | send = (len - total); |
201 | 0 | } |
202 | |
|
203 | 0 | send = len - total; |
204 | 0 | bytes = channel->io->write(channel->io->plugin, channel->fd, (uint8_t *)data + total, send); |
205 | 0 | if (bytes == -1) { |
206 | 0 | if (errno == EAGAIN) { |
207 | 0 | MK_EVENT_NEW(channel->event); |
208 | 0 | channel->thread = th; |
209 | 0 | ret = mk_event_add(sched->loop, |
210 | 0 | channel->fd, |
211 | 0 | MK_EVENT_THREAD, |
212 | 0 | MK_EVENT_WRITE, channel->event); |
213 | 0 | if (ret == -1) { |
214 | | /* |
215 | | * If we failed here there no much that we can do, just |
216 | | * let the caller we failed |
217 | | */ |
218 | 0 | return -1; |
219 | 0 | } |
220 | | |
221 | | /* |
222 | | * Return the control to the parent caller, we need to wait for |
223 | | * the event loop to get back to us. |
224 | | */ |
225 | 0 | mk_thread_yield(th); |
226 | | |
227 | | /* We got a notification, remove the event registered */ |
228 | 0 | ret = mk_event_del(sched->loop, channel->event); |
229 | 0 | if (ret == -1) { |
230 | 0 | return -1; |
231 | 0 | } |
232 | | |
233 | | /* Check the connection status */ |
234 | 0 | if (channel->event->mask & MK_EVENT_WRITE) { |
235 | 0 | ret = getsockopt(channel->fd, SOL_SOCKET, SO_ERROR, &error, &slen); |
236 | 0 | if (ret == -1) { |
237 | 0 | fprintf(stderr, "[io] could not validate socket status"); |
238 | 0 | return -1; |
239 | 0 | } |
240 | | |
241 | 0 | if (error != 0) { |
242 | 0 | return -1; |
243 | 0 | } |
244 | | |
245 | 0 | MK_EVENT_NEW(channel->event); |
246 | 0 | goto retry; |
247 | 0 | } |
248 | 0 | else { |
249 | 0 | return -1; |
250 | 0 | } |
251 | |
|
252 | 0 | } |
253 | 0 | else { |
254 | 0 | return -1; |
255 | 0 | } |
256 | 0 | } |
257 | | |
258 | | /* Update counters */ |
259 | 0 | total += bytes; |
260 | 0 | if (total < len) { |
261 | 0 | channel->thread = th; |
262 | 0 | ret = mk_event_add(sched->loop, |
263 | 0 | channel->fd, |
264 | 0 | MK_EVENT_THREAD, |
265 | 0 | MK_EVENT_WRITE, channel->event); |
266 | 0 | if (ret == -1) { |
267 | | /* |
268 | | * If we failed here there no much that we can do, just |
269 | | * let the caller we failed |
270 | | */ |
271 | 0 | return -1; |
272 | 0 | } |
273 | | |
274 | 0 | mk_thread_yield(th); |
275 | 0 | goto retry; |
276 | 0 | } |
277 | | |
278 | 0 | if (channel->event->status & MK_EVENT_REGISTERED) { |
279 | | /* We got a notification, remove the event registered */ |
280 | 0 | ret = mk_event_del(sched->loop, channel->event); |
281 | 0 | } |
282 | |
|
283 | 0 | return total; |
284 | 0 | } |