Coverage Report

Created: 2026-03-22 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/lib/monkey/mk_server/mk_net.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Monkey HTTP Server
4
 *  ==================
5
 *  Copyright 2001-2016 Monkey Software LLC <eduardo@monkey.io>
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <monkey/mk_core.h>
21
#include <monkey/mk_net.h>
22
#include <monkey/mk_scheduler.h>
23
#include <monkey/mk_plugin.h>
24
#include <monkey/mk_thread.h>
25
#include <monkey/mk_tls.h>
26
27
#ifdef _WIN32
28
#include <winsock2.h>
29
#include <afunix.h>
30
#else
31
#include <sys/socket.h>
32
#include <netinet/tcp.h>
33
#endif
34
35
/* Initialize the network stack*/
36
int mk_net_init()
37
0
{
38
#ifdef _WIN32
39
    int result;
40
    WSADATA wsa_data;
41
    static int initialized = 0;
42
43
    if(0 != initialized) {
44
        return 0;
45
    }
46
47
    result = WSAStartup(MAKEWORD(2, 2), &wsa_data);
48
49
    if(0 != result) {
50
        if(WSAEINPROGRESS == result)
51
        {
52
            Sleep(100); /* Let the other thread finish initializing the stack */
53
54
            return 0;
55
        }
56
57
        return -1;
58
    }
59
60
    initialized = 1;    
61
#endif
62
63
0
    return 0;
64
0
}
65
66
/* Connect to a TCP socket server */
67
static int mk_net_fd_connect(int fd, char *host, unsigned long port)
68
0
{
69
0
    int ret;
70
0
    struct addrinfo hints;
71
0
    struct addrinfo *res;
72
0
    char _port[6];
73
74
0
    memset(&hints, 0, sizeof hints);
75
0
    hints.ai_family = AF_UNSPEC;
76
0
    hints.ai_socktype = SOCK_STREAM;
77
78
0
    snprintf(_port, sizeof(_port), "%lu", port);
79
0
    ret = getaddrinfo(host, _port, &hints, &res);
80
0
    if (ret != 0) {
81
0
        return -1;
82
0
    }
83
84
0
    ret = connect(fd, res->ai_addr, res->ai_addrlen);
85
0
    freeaddrinfo(res);
86
87
0
    return ret;
88
0
}
89
90
struct mk_net_connection *mk_net_conn_create(char *addr, int port)
91
0
{
92
0
    int fd;
93
0
    int ret;
94
0
    int error = 0;
95
0
    socklen_t len = sizeof(error);
96
0
    struct mk_sched_worker *sched;
97
0
    struct mk_net_connection *conn;
98
99
    /* Allocate connection context */
100
0
    conn = mk_mem_alloc(sizeof(struct mk_net_connection));
101
0
    if (!conn) {
102
0
        return NULL;
103
0
    }
104
105
    /* Create socket */
106
0
    fd = mk_socket_create(AF_INET, SOCK_STREAM, 0);
107
0
    if (fd == -1) {
108
0
        mk_mem_free(conn);
109
0
        return NULL;
110
0
    }
111
112
    /* Make socket async */
113
0
    mk_socket_set_nonblocking(fd);
114
0
    conn->fd = fd;
115
116
0
    ret = mk_net_fd_connect(conn->fd, addr, port);
117
0
    if (ret == -1) {
118
0
        if (errno != EINPROGRESS) {
119
0
            close(fd);
120
0
            mk_mem_free(conn);
121
0
            return NULL;
122
0
        }
123
124
0
        MK_EVENT_NEW(&conn->event);
125
126
0
        sched = mk_sched_get_thread_conf();
127
        // FIXME: not including the thread
128
        //conn->thread = mk_thread_get();
129
0
        ret = mk_event_add(sched->loop, conn->fd, MK_EVENT_THREAD,
130
0
                           MK_EVENT_WRITE, &conn->event);
131
0
        if (ret == -1) {
132
0
            close(fd);
133
0
            mk_mem_free(conn);
134
0
            return NULL;
135
0
        }
136
137
        /*
138
         * Return the control to the parent caller, we need to wait for
139
         * the event loop to get back to us.
140
         */
141
0
        mk_thread_yield(conn->thread);
142
143
        /* We got a notification, remove the event registered */
144
0
        ret = mk_event_del(sched->loop, &conn->event);
145
146
        /* Check the connection status */
147
0
        if (conn->event.mask & MK_EVENT_WRITE) {
148
0
            ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
149
0
            if (ret == -1) {
150
0
                close(fd);
151
0
                mk_mem_free(conn);
152
0
                return NULL;
153
0
            }
154
155
0
            if (error != 0) {
156
                /* Connection is broken, not much to do here */
157
0
                fprintf(stderr, "Async connection failed %s:%i\n",
158
0
                        conn->host, conn->port);
159
0
                close(fd);
160
0
                mk_mem_free(conn);
161
0
                return NULL;
162
0
            }
163
0
            MK_EVENT_NEW(&conn->event);
164
0
            return conn;
165
0
        }
166
0
        else {
167
0
            close(fd);
168
0
            mk_mem_free(conn);
169
0
            return NULL;
170
0
        }
171
0
    }
172
173
0
    return NULL;
174
0
}
175
176
int mk_net_conn_write(struct mk_channel *channel,
177
                      void *data, size_t len)
178
0
{
179
0
    int ret = 0;
180
0
    int error;
181
0
    ssize_t bytes;
182
0
    size_t total = 0;
183
0
    size_t send;
184
0
    socklen_t slen = sizeof(error);
185
0
    struct mk_thread *th = MK_TLS_GET(mk_thread);
186
0
    struct mk_sched_worker *sched;
187
188
0
    sched = mk_sched_get_thread_conf();
189
0
    if (!sched) {
190
0
        return -1;
191
0
    }
192
193
0
 retry:
194
0
    error = 0;
195
196
0
    if (len - total > 524288) {
197
0
        send = 524288;
198
0
    }
199
0
    else {
200
0
        send = (len - total);
201
0
    }
202
203
0
    send = len - total;
204
0
    bytes = channel->io->write(channel->io->plugin, channel->fd, (uint8_t *)data + total, send);
205
0
    if (bytes == -1) {
206
0
        if (errno == EAGAIN) {
207
0
            MK_EVENT_NEW(channel->event);
208
0
            channel->thread = th;
209
0
            ret = mk_event_add(sched->loop,
210
0
                               channel->fd,
211
0
                               MK_EVENT_THREAD,
212
0
                               MK_EVENT_WRITE, channel->event);
213
0
            if (ret == -1) {
214
                /*
215
                 * If we failed here there no much that we can do, just
216
                 * let the caller we failed
217
                 */
218
0
                return -1;
219
0
            }
220
221
            /*
222
             * Return the control to the parent caller, we need to wait for
223
             * the event loop to get back to us.
224
             */
225
0
            mk_thread_yield(th);
226
227
            /* We got a notification, remove the event registered */
228
0
            ret = mk_event_del(sched->loop, channel->event);
229
0
            if (ret == -1) {
230
0
                return -1;
231
0
            }
232
233
            /* Check the connection status */
234
0
            if (channel->event->mask & MK_EVENT_WRITE) {
235
0
                ret = getsockopt(channel->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
236
0
                if (ret == -1) {
237
0
                    fprintf(stderr, "[io] could not validate socket status");
238
0
                    return -1;
239
0
                }
240
241
0
                if (error != 0) {
242
0
                    return -1;
243
0
                }
244
245
0
                MK_EVENT_NEW(channel->event);
246
0
                goto retry;
247
0
            }
248
0
            else {
249
0
                return -1;
250
0
            }
251
252
0
        }
253
0
        else {
254
0
            return -1;
255
0
        }
256
0
    }
257
258
    /* Update counters */
259
0
    total += bytes;
260
0
    if (total < len) {
261
0
        channel->thread = th;
262
0
        ret = mk_event_add(sched->loop,
263
0
                           channel->fd,
264
0
                           MK_EVENT_THREAD,
265
0
                           MK_EVENT_WRITE, channel->event);
266
0
        if (ret == -1) {
267
            /*
268
             * If we failed here there no much that we can do, just
269
             * let the caller we failed
270
             */
271
0
            return -1;
272
0
        }
273
274
0
        mk_thread_yield(th);
275
0
        goto retry;
276
0
    }
277
278
0
    if (channel->event->status & MK_EVENT_REGISTERED) {
279
        /* We got a notification, remove the event registered */
280
0
        ret = mk_event_del(sched->loop, channel->event);
281
0
    }
282
283
0
    return total;
284
0
}