Coverage Report

Created: 2025-11-16 06:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/h2o/lib/common/socketpool.c
Line
Count
Source
1
/*
2
 * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <assert.h>
23
#include <errno.h>
24
#include <netdb.h>
25
#include <stdlib.h>
26
#include <sys/socket.h>
27
#include <sys/types.h>
28
#include <sys/un.h>
29
#include <netinet/in.h>
30
#include "h2o/hostinfo.h"
31
#include "h2o/linklist.h"
32
#include "h2o/socketpool.h"
33
#include "h2o/string_.h"
34
#include "h2o/socket.h"
35
#include "h2o/balancer.h"
36
37
/**
38
 * timeout will be set to this value when calculated less than this value
39
 */
40
6
#define CHECK_EXPIRATION_MIN_INTERVAL 1000
41
42
struct pool_entry_t {
43
    h2o_socket_export_t sockinfo;
44
    size_t target;
45
    h2o_linklist_t all_link;
46
    h2o_linklist_t target_link;
47
    uint64_t added_at;
48
};
49
50
struct st_h2o_socketpool_connect_request_t {
51
    void *data;
52
    h2o_socketpool_connect_cb cb;
53
    h2o_socketpool_t *pool;
54
    h2o_loop_t *loop;
55
    h2o_hostinfo_getaddr_req_t *getaddr_req;
56
    h2o_socket_t *sock;
57
    h2o_multithread_receiver_t *getaddr_receiver;
58
    size_t selected_target;
59
    size_t remaining_try_count;
60
    struct {
61
        char *tried;
62
    } lb;
63
    h2o_iovec_t alpn_protos;
64
};
65
66
struct on_close_data_t {
67
    h2o_socketpool_t *pool;
68
    size_t target;
69
};
70
71
static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen);
72
static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req);
73
74
static void destroy_detached(struct pool_entry_t *entry)
75
0
{
76
0
    h2o_socket_dispose_export(&entry->sockinfo);
77
0
    free(entry);
78
0
}
79
80
static void destroy_attached(struct pool_entry_t *entry)
81
0
{
82
0
    h2o_linklist_unlink(&entry->all_link);
83
0
    h2o_linklist_unlink(&entry->target_link);
84
0
    destroy_detached(entry);
85
0
}
86
87
/* caller should lock the mutex */
88
static uint64_t destroy_expired_locked(h2o_socketpool_t *pool)
89
2.97k
{
90
2.97k
    if (pool->_interval_cb.loop != NULL) {
91
2.97k
        uint64_t now_ms = h2o_now(pool->_interval_cb.loop);
92
2.97k
        uint64_t expire_before = now_ms - pool->timeout;
93
2.97k
        while (!h2o_linklist_is_empty(&pool->_shared.sockets)) {
94
0
            struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next);
95
0
            if (entry->added_at > expire_before) {
96
0
                return entry->added_at + pool->timeout - now_ms;
97
0
            }
98
0
            destroy_attached(entry);
99
0
            __sync_sub_and_fetch(&pool->_shared.count, 1);
100
0
            __sync_sub_and_fetch(&pool->_shared.pooled_count, 1);
101
0
        }
102
2.97k
    }
103
2.97k
    return UINT64_MAX;
104
2.97k
}
105
106
/* caller should lock the mutex */
107
static void check_pool_expired_locked(h2o_socketpool_t *pool, h2o_loop_t *this_loop)
108
2.97k
{
109
2.97k
    uint64_t next_expired = destroy_expired_locked(pool);
110
2.97k
    if (next_expired != UINT64_MAX) {
111
0
        if (this_loop == pool->_interval_cb.loop && !h2o_timer_is_linked(&pool->_interval_cb.timeout)) {
112
0
            if (next_expired < CHECK_EXPIRATION_MIN_INTERVAL)
113
0
                next_expired = CHECK_EXPIRATION_MIN_INTERVAL;
114
0
            h2o_timer_link(pool->_interval_cb.loop, next_expired, &pool->_interval_cb.timeout);
115
0
        }
116
0
    }
117
2.97k
}
118
119
static void on_timeout(h2o_timer_t *timeout)
120
6
{
121
    /* decrease the frequency of this function being called; the expiration
122
     * check can be (should be) performed in the `connect` function as well
123
     */
124
6
    h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.timeout, timeout);
125
126
6
    if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) {
127
6
        check_pool_expired_locked(pool, pool->_interval_cb.loop);
128
6
        pthread_mutex_unlock(&pool->_shared.mutex);
129
6
    }
130
6
}
131
132
static void common_init(h2o_socketpool_t *pool, h2o_socketpool_target_t **targets, size_t num_targets, size_t capacity,
133
                        h2o_balancer_t *balancer)
134
6
{
135
6
    memset(pool, 0, sizeof(*pool));
136
137
6
    pool->capacity = capacity;
138
6
    pool->timeout = 2000;
139
140
6
    pthread_mutex_init(&pool->_shared.mutex, NULL);
141
6
    h2o_linklist_init_anchor(&pool->_shared.sockets);
142
143
6
    h2o_vector_reserve(NULL, &pool->targets, num_targets);
144
9
    for (; pool->targets.size < num_targets; ++pool->targets.size)
145
3
        pool->targets.entries[pool->targets.size] = targets[pool->targets.size];
146
147
6
    pool->balancer = balancer;
148
6
    pool->address_family = AF_UNSPEC;
149
6
}
150
151
h2o_socketpool_target_type_t detect_target_type(h2o_url_t *url, struct sockaddr_storage *sa, socklen_t *salen)
152
3
{
153
3
    memset(sa, 0, sizeof(*sa));
154
3
    const char *to_sun_err = h2o_url_host_to_sun(url->host, (struct sockaddr_un *)sa);
155
3
    if (to_sun_err == h2o_url_host_to_sun_err_is_not_unix_socket) {
156
0
        sa->ss_family = AF_INET;
157
0
        struct sockaddr_in *sin = (struct sockaddr_in *)sa;
158
0
        *salen = sizeof(*sin);
159
160
0
        if (h2o_hostinfo_aton(url->host, &sin->sin_addr) == 0) {
161
0
            sin->sin_port = htons(h2o_url_get_port(url));
162
0
            return H2O_SOCKETPOOL_TYPE_SOCKADDR;
163
0
        } else {
164
0
            return H2O_SOCKETPOOL_TYPE_NAMED;
165
0
        }
166
3
    } else {
167
3
        assert(to_sun_err == NULL);
168
3
        *salen = sizeof(struct sockaddr_un);
169
3
        return H2O_SOCKETPOOL_TYPE_SOCKADDR;
170
3
    }
171
3
}
172
173
h2o_socketpool_target_t *h2o_socketpool_create_target(h2o_url_t *origin, h2o_socketpool_target_conf_t *lb_target_conf)
174
3
{
175
3
    struct sockaddr_storage sa;
176
3
    socklen_t salen;
177
178
3
    h2o_socketpool_target_t *target = h2o_mem_alloc(sizeof(*target));
179
3
    h2o_url_copy(NULL, &target->url, origin);
180
3
    assert(target->url.host.base[target->url.host.len] == '\0'); /* needs to be null-terminated in order to be used in SNI */
181
3
    target->type = detect_target_type(origin, &sa, &salen);
182
3
    if (!(target->type == H2O_SOCKETPOOL_TYPE_SOCKADDR && sa.ss_family == AF_UNIX)) {
183
0
        h2o_strtolower(target->url.authority.base, target->url.authority.len);
184
0
        h2o_strtolower(target->url.host.base, target->url.host.len);
185
0
    }
186
187
3
    switch (target->type) {
188
0
    case H2O_SOCKETPOOL_TYPE_NAMED:
189
0
        target->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR));
190
0
        target->peer.named_serv.len = sprintf(target->peer.named_serv.base, "%u", (unsigned)h2o_url_get_port(&target->url));
191
0
        break;
192
3
    case H2O_SOCKETPOOL_TYPE_SOCKADDR:
193
3
        assert(salen <= sizeof(target->peer.sockaddr.bytes));
194
3
        memcpy(&target->peer.sockaddr.bytes, &sa, salen);
195
3
        target->peer.sockaddr.len = salen;
196
3
        break;
197
3
    }
198
3
    target->_shared.leased_count = 0;
199
3
    if (lb_target_conf != NULL)
200
0
        target->conf.weight_m1 = lb_target_conf->weight_m1;
201
3
    else {
202
3
        target->conf.weight_m1 = 0;
203
3
    }
204
205
3
    h2o_linklist_init_anchor(&target->_shared.sockets);
206
3
    return target;
207
3
}
208
209
void h2o_socketpool_init_specific(h2o_socketpool_t *pool, size_t capacity, h2o_socketpool_target_t **targets, size_t num_targets,
210
                                  h2o_balancer_t *balancer)
211
3
{
212
3
    if (balancer == NULL)
213
3
        balancer = h2o_balancer_create_rr();
214
3
    common_init(pool, targets, num_targets, capacity, balancer);
215
3
}
216
217
int h2o_socketpool_is_global(h2o_socketpool_t *pool)
218
2.96k
{
219
2.96k
    return pool->balancer == NULL;
220
2.96k
}
221
222
void h2o_socketpool_init_global(h2o_socketpool_t *pool, size_t capacity)
223
3
{
224
3
    common_init(pool, NULL, 0, capacity, NULL);
225
3
}
226
227
void h2o_socketpool_destroy_target(h2o_socketpool_target_t *target)
228
0
{
229
0
    switch (target->type) {
230
0
    case H2O_SOCKETPOOL_TYPE_NAMED:
231
0
        free(target->peer.named_serv.base);
232
0
        break;
233
0
    case H2O_SOCKETPOOL_TYPE_SOCKADDR:
234
0
        break;
235
0
    }
236
0
    free(target->url.authority.base);
237
0
    free(target->url.host.base);
238
0
    free(target->url.path.base);
239
0
    free(target);
240
0
}
241
242
void h2o_socketpool_dispose(h2o_socketpool_t *pool)
243
0
{
244
0
    size_t i;
245
246
0
    pthread_mutex_lock(&pool->_shared.mutex);
247
0
    while (!h2o_linklist_is_empty(&pool->_shared.sockets)) {
248
0
        struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next);
249
0
        destroy_attached(entry);
250
0
        __sync_sub_and_fetch(&pool->_shared.count, 1);
251
0
        __sync_sub_and_fetch(&pool->_shared.pooled_count, 1);
252
0
    }
253
0
    pthread_mutex_unlock(&pool->_shared.mutex);
254
0
    pthread_mutex_destroy(&pool->_shared.mutex);
255
256
0
    if (pool->balancer != NULL) {
257
0
        pool->balancer->callbacks->destroy(pool->balancer);
258
0
    }
259
260
0
    if (pool->_ssl_ctx != NULL)
261
0
        SSL_CTX_free(pool->_ssl_ctx);
262
263
0
    if (pool->_interval_cb.loop != NULL)
264
0
        h2o_socketpool_unregister_loop(pool, pool->_interval_cb.loop);
265
266
0
    for (i = 0; i < pool->targets.size; i++) {
267
0
        h2o_socketpool_destroy_target(pool->targets.entries[i]);
268
0
    }
269
0
    free(pool->targets.entries);
270
0
}
271
272
void h2o_socketpool_set_ssl_ctx(h2o_socketpool_t *pool, SSL_CTX *ssl_ctx)
273
3
{
274
3
    if (pool->_ssl_ctx != NULL)
275
0
        SSL_CTX_free(pool->_ssl_ctx);
276
3
    if (ssl_ctx != NULL)
277
0
        SSL_CTX_up_ref(ssl_ctx);
278
3
    pool->_ssl_ctx = ssl_ctx;
279
3
}
280
281
void h2o_socketpool_register_loop(h2o_socketpool_t *pool, h2o_loop_t *loop)
282
6
{
283
6
    if (pool->_interval_cb.loop != NULL)
284
0
        return;
285
286
6
    pool->_interval_cb.loop = loop;
287
6
    h2o_timer_init(&pool->_interval_cb.timeout, on_timeout);
288
6
    h2o_timer_link(loop, CHECK_EXPIRATION_MIN_INTERVAL, &pool->_interval_cb.timeout);
289
6
}
290
291
void h2o_socketpool_unregister_loop(h2o_socketpool_t *pool, h2o_loop_t *loop)
292
0
{
293
0
    if (pool->_interval_cb.loop != loop)
294
0
        return;
295
0
    h2o_timer_unlink(&pool->_interval_cb.timeout);
296
0
    pool->_interval_cb.loop = NULL;
297
0
}
298
299
static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr)
300
2.87k
{
301
2.87k
    h2o_socketpool_connect_cb cb = req->cb;
302
2.87k
    h2o_socket_t *sock = req->sock;
303
2.87k
    void *data = req->data;
304
2.87k
    h2o_socketpool_target_t *selected_target = req->pool->targets.entries[req->selected_target];
305
306
2.87k
    if (req->lb.tried != NULL) {
307
2.87k
        free(req->lb.tried);
308
2.87k
    }
309
310
2.87k
    free(req);
311
312
2.87k
    if (sock != NULL)
313
2.87k
        sock->data = NULL;
314
2.87k
    cb(sock, errstr, data, &selected_target->url);
315
2.87k
}
316
317
static void try_connect(h2o_socketpool_connect_request_t *req)
318
2.96k
{
319
2.96k
    h2o_socketpool_target_t *target;
320
321
2.96k
    req->remaining_try_count--;
322
323
2.96k
    if (req->lb.tried != NULL) {
324
2.96k
        if (req->pool->targets.size > 1) {
325
0
            req->selected_target = req->pool->balancer->callbacks->select_(req->pool->balancer, &req->pool->targets, req->lb.tried);
326
0
            assert(!req->lb.tried[req->selected_target]);
327
0
            req->lb.tried[req->selected_target] = 1;
328
2.96k
        } else {
329
2.96k
            req->selected_target = 0;
330
2.96k
        }
331
2.96k
    }
332
2.96k
    target = req->pool->targets.entries[req->selected_target];
333
2.96k
    __sync_add_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
334
335
2.96k
    switch (target->type) {
336
0
    case H2O_SOCKETPOOL_TYPE_NAMED:
337
        /* resolve the name, and connect */
338
0
        req->getaddr_req =
339
0
            h2o_hostinfo_getaddr(req->getaddr_receiver, target->url.host, target->peer.named_serv, req->pool->address_family,
340
0
                                 SOCK_STREAM, IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req);
341
0
        break;
342
2.96k
    case H2O_SOCKETPOOL_TYPE_SOCKADDR:
343
        /* connect (using sockaddr_in) */
344
2.96k
        start_connect(req, (void *)&target->peer.sockaddr.bytes, target->peer.sockaddr.len);
345
2.96k
        break;
346
2.96k
    }
347
2.96k
}
348
349
static void on_handshake_complete(h2o_socket_t *sock, const char *err)
350
0
{
351
0
    h2o_socketpool_connect_request_t *req = sock->data;
352
353
0
    assert(req->sock == sock);
354
355
0
    if (err == h2o_socket_error_ssl_cert_name_mismatch && (SSL_CTX_get_verify_mode(req->pool->_ssl_ctx) & SSL_VERIFY_PEER) == 0) {
356
        /* ignore CN mismatch if we are not verifying peer */
357
0
    } else if (err != NULL) {
358
0
        h2o_socket_close(sock);
359
0
        req->sock = NULL;
360
0
    }
361
362
0
    call_connect_cb(req, err);
363
0
}
364
365
static void on_connect(h2o_socket_t *sock, const char *err)
366
2.87k
{
367
2.87k
    h2o_socketpool_connect_request_t *req = sock->data;
368
369
2.87k
    assert(req->sock == sock);
370
371
2.87k
    if (err != NULL) {
372
0
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
373
0
        h2o_socket_close(sock);
374
0
        if (req->remaining_try_count > 0) {
375
0
            try_connect(req);
376
0
            return;
377
0
        }
378
0
        __sync_sub_and_fetch(&req->pool->_shared.count, 1);
379
0
        req->sock = NULL;
380
2.87k
    } else {
381
2.87k
        h2o_url_t *target_url = &req->pool->targets.entries[req->selected_target]->url;
382
2.87k
        if (target_url->scheme->is_ssl) {
383
0
            assert(req->pool->_ssl_ctx != NULL && "h2o_socketpool_set_ssl_ctx must be called for a pool that contains SSL target");
384
0
            h2o_socket_ssl_handshake(sock, req->pool->_ssl_ctx, target_url->host.base, req->alpn_protos, on_handshake_complete);
385
0
            return;
386
0
        }
387
2.87k
    }
388
389
2.87k
    call_connect_cb(req, err);
390
2.87k
}
391
392
static void on_close(void *data)
393
2.96k
{
394
2.96k
    struct on_close_data_t *close_data = data;
395
2.96k
    h2o_socketpool_t *pool = close_data->pool;
396
2.96k
    __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1);
397
2.96k
    free(close_data);
398
2.96k
    __sync_sub_and_fetch(&pool->_shared.count, 1);
399
2.96k
}
400
401
static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen)
402
2.96k
{
403
2.96k
    struct on_close_data_t *close_data;
404
405
2.96k
    req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect, NULL);
406
2.96k
    if (req->sock == NULL) {
407
0
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
408
0
        if (req->remaining_try_count > 0) {
409
0
            try_connect(req);
410
0
            return;
411
0
        }
412
0
        __sync_sub_and_fetch(&req->pool->_shared.count, 1);
413
0
        call_connect_cb(req, h2o_socket_error_conn_fail);
414
0
        return;
415
0
    }
416
2.96k
    close_data = h2o_mem_alloc(sizeof(*close_data));
417
2.96k
    close_data->pool = req->pool;
418
2.96k
    close_data->target = req->selected_target;
419
2.96k
    req->sock->data = req;
420
2.96k
    req->sock->on_close.cb = on_close;
421
2.96k
    req->sock->on_close.data = close_data;
422
2.96k
}
423
424
static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req)
425
0
{
426
0
    h2o_socketpool_connect_request_t *req = _req;
427
428
0
    assert(getaddr_req == req->getaddr_req);
429
0
    req->getaddr_req = NULL;
430
431
0
    if (errstr != NULL) {
432
0
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
433
0
        if (req->remaining_try_count > 0) {
434
0
            try_connect(req);
435
0
            return;
436
0
        }
437
0
        __sync_sub_and_fetch(&req->pool->_shared.count, 1);
438
0
        call_connect_cb(req, errstr);
439
0
        return;
440
0
    }
441
442
0
    struct addrinfo *selected = h2o_hostinfo_select_one(res);
443
0
    start_connect(req, selected->ai_addr, selected->ai_addrlen);
444
0
}
445
446
static size_t lookup_target(h2o_socketpool_t *pool, h2o_url_t *url)
447
0
{
448
0
    uint16_t port = h2o_url_get_port(url);
449
0
    size_t i = 0;
450
0
    for (; i != pool->targets.size; ++i) {
451
0
        h2o_socketpool_target_t *target = pool->targets.entries[i];
452
0
        if (target->url.scheme != url->scheme)
453
0
            continue;
454
0
        if (h2o_url_get_port(&target->url) != port)
455
0
            continue;
456
0
        if (!h2o_url_hosts_are_equal(&target->url, url))
457
0
            continue;
458
0
        return i;
459
0
    }
460
0
    return SIZE_MAX;
461
0
}
462
463
void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_url_t *url, h2o_loop_t *loop,
464
                            h2o_multithread_receiver_t *getaddr_receiver, h2o_iovec_t alpn_protos, h2o_socketpool_connect_cb cb,
465
                            void *data)
466
2.96k
{
467
2.96k
    struct pool_entry_t *entry = NULL;
468
2.96k
    struct on_close_data_t *close_data;
469
470
2.96k
    if (_req != NULL)
471
2.96k
        *_req = NULL;
472
473
2.96k
    size_t target = SIZE_MAX;
474
2.96k
    h2o_linklist_t *sockets = NULL;
475
476
    /* As an optimization, avoid trying to reuse (and obtaining lock for the purpose) when the pool is not going to retain sockets.
477
     * The optimization is limited to non-global socket pools, as the rest of the logic depends on a valid entry within
478
     * `pool->targets` to be targeted, that a global pool allocates dynamically. */
479
2.96k
    if (pool->timeout == 0 && !h2o_socketpool_is_global(pool))
480
0
        goto SkipLookup;
481
482
    /* fetch an entry and return it */
483
2.96k
    pthread_mutex_lock(&pool->_shared.mutex);
484
2.96k
    check_pool_expired_locked(pool, loop);
485
486
    /* TODO lookup outside this critical section */
487
2.96k
    if (h2o_socketpool_is_global(pool)) {
488
0
        target = lookup_target(pool, url);
489
0
        if (target == SIZE_MAX) {
490
0
            h2o_vector_reserve(NULL, &pool->targets, pool->targets.size + 1);
491
0
            pool->targets.entries[pool->targets.size++] = h2o_socketpool_create_target(url, NULL);
492
0
            target = pool->targets.size - 1;
493
0
        }
494
0
        sockets = &pool->targets.entries[target]->_shared.sockets;
495
2.96k
    } else {
496
2.96k
        sockets = &pool->_shared.sockets;
497
2.96k
    }
498
2.96k
    assert(pool->targets.size != 0);
499
500
2.96k
    while (!h2o_linklist_is_empty(sockets)) {
501
0
        if (h2o_socketpool_is_global(pool)) {
502
0
            entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, target_link, sockets->next);
503
0
        } else {
504
0
            entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, sockets->next);
505
0
        }
506
0
        h2o_linklist_unlink(&entry->all_link);
507
0
        h2o_linklist_unlink(&entry->target_link);
508
0
        pthread_mutex_unlock(&pool->_shared.mutex);
509
510
0
        __sync_sub_and_fetch(&pool->_shared.pooled_count, 1);
511
512
        /* test if the connection is still alive */
513
0
        char buf[1];
514
0
        ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK);
515
0
        if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
516
            /* yes! return it */
517
0
            size_t entry_target = entry->target;
518
0
            h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo);
519
0
            free(entry);
520
0
            close_data = h2o_mem_alloc(sizeof(*close_data));
521
0
            close_data->pool = pool;
522
0
            close_data->target = entry_target;
523
0
            sock->on_close.cb = on_close;
524
0
            sock->on_close.data = close_data;
525
0
            __sync_add_and_fetch(&pool->targets.entries[entry_target]->_shared.leased_count, 1);
526
0
            cb(sock, NULL, data, &pool->targets.entries[entry_target]->url);
527
0
            return;
528
0
        }
529
530
        /* connection is dead, report, close, and retry */
531
0
        if (rret <= 0) {
532
0
            static long counter = 0;
533
0
            if (__sync_fetch_and_add(&counter, 1) == 0)
534
0
                h2o_error_printf("[WARN] detected close by upstream before the expected timeout (see issue #679)\n");
535
0
        } else {
536
0
            static long counter = 0;
537
0
            if (__sync_fetch_and_add(&counter, 1) == 0)
538
0
                h2o_error_printf("[WARN] unexpectedly received data to a pooled socket (see issue #679)\n");
539
0
        }
540
0
        destroy_detached(entry);
541
0
        pthread_mutex_lock(&pool->_shared.mutex);
542
0
    }
543
2.96k
    pthread_mutex_unlock(&pool->_shared.mutex);
544
545
2.96k
SkipLookup:
546
    /* FIXME respect `capacity` */
547
2.96k
    __sync_add_and_fetch(&pool->_shared.count, 1);
548
549
    /* prepare request object */
550
2.96k
    h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req));
551
2.96k
    *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop};
552
553
2.96k
    if (_req != NULL)
554
2.96k
        *_req = req;
555
2.96k
    req->getaddr_receiver = getaddr_receiver;
556
2.96k
    req->alpn_protos = alpn_protos;
557
558
2.96k
    req->selected_target = target;
559
2.96k
    if (target == SIZE_MAX) {
560
2.96k
        req->lb.tried = h2o_mem_alloc(sizeof(req->lb.tried[0]) * pool->targets.size);
561
2.96k
        memset(req->lb.tried, 0, sizeof(req->lb.tried[0]) * pool->targets.size);
562
2.96k
        req->remaining_try_count = pool->targets.size;
563
2.96k
    } else {
564
0
        req->remaining_try_count = 1;
565
0
    }
566
2.96k
    try_connect(req);
567
2.96k
}
568
569
void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req)
570
90
{
571
90
    if (req->getaddr_req != NULL) {
572
0
        h2o_hostinfo_getaddr_cancel(req->getaddr_req);
573
0
        req->getaddr_req = NULL;
574
0
    }
575
90
    if (req->sock != NULL)
576
90
        h2o_socket_close(req->sock);
577
90
    if (req->lb.tried != NULL) {
578
90
        free(req->lb.tried);
579
90
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
580
90
    }
581
90
    free(req);
582
90
}
583
584
int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock)
585
0
{
586
0
    struct pool_entry_t *entry;
587
0
    struct on_close_data_t *close_data;
588
0
    size_t target;
589
590
0
    close_data = sock->on_close.data;
591
0
    target = close_data->target;
592
    /* reset the on_close callback */
593
0
    assert(close_data->pool == pool);
594
0
    __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1);
595
0
    free(close_data);
596
0
    sock->on_close.cb = NULL;
597
0
    sock->on_close.data = NULL;
598
599
0
    entry = h2o_mem_alloc(sizeof(*entry));
600
0
    if (h2o_socket_export(sock, &entry->sockinfo) != 0) {
601
0
        free(entry);
602
0
        __sync_sub_and_fetch(&pool->_shared.count, 1);
603
0
        return -1;
604
0
    }
605
0
    memset(&entry->all_link, 0, sizeof(entry->all_link));
606
0
    memset(&entry->target_link, 0, sizeof(entry->target_link));
607
0
    entry->added_at = h2o_now(h2o_socket_get_loop(sock));
608
0
    entry->target = target;
609
610
0
    __sync_add_and_fetch(&pool->_shared.pooled_count, 1);
611
612
0
    pthread_mutex_lock(&pool->_shared.mutex);
613
0
    h2o_linklist_insert(&pool->_shared.sockets, &entry->all_link);
614
0
    h2o_linklist_insert(&pool->targets.entries[target]->_shared.sockets, &entry->target_link);
615
0
    check_pool_expired_locked(pool, h2o_socket_get_loop(sock));
616
0
    pthread_mutex_unlock(&pool->_shared.mutex);
617
0
    return 0;
618
0
}
619
620
void h2o_socketpool_detach(h2o_socketpool_t *pool, h2o_socket_t *sock)
621
0
{
622
0
    struct on_close_data_t *close_data = sock->on_close.data;
623
0
    assert(close_data->pool == pool);
624
625
0
    __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1);
626
0
    __sync_sub_and_fetch(&pool->_shared.count, 1);
627
628
0
    sock->on_close.cb = NULL;
629
0
    sock->on_close.data = NULL;
630
0
    free(close_data);
631
0
}
632
633
int h2o_socketpool_can_keepalive(h2o_socketpool_t *pool)
634
2.87k
{
635
2.87k
    return pool->timeout > 0;
636
2.87k
}