Coverage Report

Created: 2025-06-22 06:18

/src/h2o/lib/common/socketpool.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <assert.h>
23
#include <errno.h>
24
#include <netdb.h>
25
#include <stdlib.h>
26
#include <sys/socket.h>
27
#include <sys/types.h>
28
#include <sys/un.h>
29
#include <netinet/in.h>
30
#include "h2o/hostinfo.h"
31
#include "h2o/linklist.h"
32
#include "h2o/socketpool.h"
33
#include "h2o/string_.h"
34
#include "h2o/socket.h"
35
#include "h2o/balancer.h"
36
37
/**
38
 * timeout will be set to this value when calculated less than this value
39
 */
40
2
#define CHECK_EXPIRATION_MIN_INTERVAL 1000
41
42
struct pool_entry_t {
43
    h2o_socket_export_t sockinfo;
44
    size_t target;
45
    h2o_linklist_t all_link;
46
    h2o_linklist_t target_link;
47
    uint64_t added_at;
48
};
49
50
struct st_h2o_socketpool_connect_request_t {
51
    void *data;
52
    h2o_socketpool_connect_cb cb;
53
    h2o_socketpool_t *pool;
54
    h2o_loop_t *loop;
55
    h2o_hostinfo_getaddr_req_t *getaddr_req;
56
    h2o_socket_t *sock;
57
    h2o_multithread_receiver_t *getaddr_receiver;
58
    size_t selected_target;
59
    size_t remaining_try_count;
60
    struct {
61
        char *tried;
62
    } lb;
63
    h2o_iovec_t alpn_protos;
64
};
65
66
struct on_close_data_t {
67
    h2o_socketpool_t *pool;
68
    size_t target;
69
};
70
71
static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen);
72
static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req);
73
74
static void destroy_detached(struct pool_entry_t *entry)
75
0
{
76
0
    h2o_socket_dispose_export(&entry->sockinfo);
77
0
    free(entry);
78
0
}
79
80
static void destroy_attached(struct pool_entry_t *entry)
81
0
{
82
0
    h2o_linklist_unlink(&entry->all_link);
83
0
    h2o_linklist_unlink(&entry->target_link);
84
0
    destroy_detached(entry);
85
0
}
86
87
/* caller should lock the mutex */
88
static uint64_t destroy_expired_locked(h2o_socketpool_t *pool)
89
829
{
90
829
    if (pool->_interval_cb.loop != NULL) {
91
829
        uint64_t now_ms = h2o_now(pool->_interval_cb.loop);
92
829
        uint64_t expire_before = now_ms - pool->timeout;
93
829
        while (!h2o_linklist_is_empty(&pool->_shared.sockets)) {
94
0
            struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next);
95
0
            if (entry->added_at > expire_before) {
96
0
                return entry->added_at + pool->timeout - now_ms;
97
0
            }
98
0
            destroy_attached(entry);
99
0
            __sync_sub_and_fetch(&pool->_shared.count, 1);
100
0
            __sync_sub_and_fetch(&pool->_shared.pooled_count, 1);
101
0
        }
102
829
    }
103
829
    return UINT64_MAX;
104
829
}
105
106
/* caller should lock the mutex */
107
static void check_pool_expired_locked(h2o_socketpool_t *pool, h2o_loop_t *this_loop)
108
829
{
109
829
    uint64_t next_expired = destroy_expired_locked(pool);
110
829
    if (next_expired != UINT64_MAX) {
111
0
        if (this_loop == pool->_interval_cb.loop && !h2o_timer_is_linked(&pool->_interval_cb.timeout)) {
112
0
            if (next_expired < CHECK_EXPIRATION_MIN_INTERVAL)
113
0
                next_expired = CHECK_EXPIRATION_MIN_INTERVAL;
114
0
            h2o_timer_link(pool->_interval_cb.loop, next_expired, &pool->_interval_cb.timeout);
115
0
        }
116
0
    }
117
829
}
118
119
static void on_timeout(h2o_timer_t *timeout)
120
2
{
121
    /* decrease the frequency of this function being called; the expiration
122
     * check can be (should be) performed in the `connect` function as well
123
     */
124
2
    h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.timeout, timeout);
125
126
2
    if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) {
127
2
        check_pool_expired_locked(pool, pool->_interval_cb.loop);
128
2
        pthread_mutex_unlock(&pool->_shared.mutex);
129
2
    }
130
2
}
131
132
static void common_init(h2o_socketpool_t *pool, h2o_socketpool_target_t **targets, size_t num_targets, size_t capacity,
133
                        h2o_balancer_t *balancer)
134
2
{
135
2
    memset(pool, 0, sizeof(*pool));
136
137
2
    pool->capacity = capacity;
138
2
    pool->timeout = 2000;
139
140
2
    pthread_mutex_init(&pool->_shared.mutex, NULL);
141
2
    h2o_linklist_init_anchor(&pool->_shared.sockets);
142
143
2
    h2o_vector_reserve(NULL, &pool->targets, num_targets);
144
3
    for (; pool->targets.size < num_targets; ++pool->targets.size)
145
1
        pool->targets.entries[pool->targets.size] = targets[pool->targets.size];
146
147
2
    pool->balancer = balancer;
148
2
}
149
150
h2o_socketpool_target_type_t detect_target_type(h2o_url_t *url, struct sockaddr_storage *sa, socklen_t *salen)
151
1
{
152
1
    memset(sa, 0, sizeof(*sa));
153
1
    const char *to_sun_err = h2o_url_host_to_sun(url->host, (struct sockaddr_un *)sa);
154
1
    if (to_sun_err == h2o_url_host_to_sun_err_is_not_unix_socket) {
155
0
        sa->ss_family = AF_INET;
156
0
        struct sockaddr_in *sin = (struct sockaddr_in *)sa;
157
0
        *salen = sizeof(*sin);
158
159
0
        if (h2o_hostinfo_aton(url->host, &sin->sin_addr) == 0) {
160
0
            sin->sin_port = htons(h2o_url_get_port(url));
161
0
            return H2O_SOCKETPOOL_TYPE_SOCKADDR;
162
0
        } else {
163
0
            return H2O_SOCKETPOOL_TYPE_NAMED;
164
0
        }
165
1
    } else {
166
1
        assert(to_sun_err == NULL);
167
1
        *salen = sizeof(struct sockaddr_un);
168
1
        return H2O_SOCKETPOOL_TYPE_SOCKADDR;
169
1
    }
170
1
}
171
172
h2o_socketpool_target_t *h2o_socketpool_create_target(h2o_url_t *origin, h2o_socketpool_target_conf_t *lb_target_conf)
173
1
{
174
1
    struct sockaddr_storage sa;
175
1
    socklen_t salen;
176
177
1
    h2o_socketpool_target_t *target = h2o_mem_alloc(sizeof(*target));
178
1
    h2o_url_copy(NULL, &target->url, origin);
179
1
    assert(target->url.host.base[target->url.host.len] == '\0'); /* needs to be null-terminated in order to be used in SNI */
180
1
    target->type = detect_target_type(origin, &sa, &salen);
181
1
    if (!(target->type == H2O_SOCKETPOOL_TYPE_SOCKADDR && sa.ss_family == AF_UNIX)) {
182
0
        h2o_strtolower(target->url.authority.base, target->url.authority.len);
183
0
        h2o_strtolower(target->url.host.base, target->url.host.len);
184
0
    }
185
186
1
    switch (target->type) {
187
0
    case H2O_SOCKETPOOL_TYPE_NAMED:
188
0
        target->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR));
189
0
        target->peer.named_serv.len = sprintf(target->peer.named_serv.base, "%u", (unsigned)h2o_url_get_port(&target->url));
190
0
        break;
191
1
    case H2O_SOCKETPOOL_TYPE_SOCKADDR:
192
1
        assert(salen <= sizeof(target->peer.sockaddr.bytes));
193
1
        memcpy(&target->peer.sockaddr.bytes, &sa, salen);
194
1
        target->peer.sockaddr.len = salen;
195
1
        break;
196
1
    }
197
1
    target->_shared.leased_count = 0;
198
1
    if (lb_target_conf != NULL)
199
0
        target->conf.weight_m1 = lb_target_conf->weight_m1;
200
1
    else {
201
1
        target->conf.weight_m1 = 0;
202
1
    }
203
204
1
    h2o_linklist_init_anchor(&target->_shared.sockets);
205
1
    return target;
206
1
}
207
208
void h2o_socketpool_init_specific(h2o_socketpool_t *pool, size_t capacity, h2o_socketpool_target_t **targets, size_t num_targets,
209
                                  h2o_balancer_t *balancer)
210
1
{
211
1
    if (balancer == NULL)
212
1
        balancer = h2o_balancer_create_rr();
213
1
    common_init(pool, targets, num_targets, capacity, balancer);
214
1
}
215
216
int h2o_socketpool_is_global(h2o_socketpool_t *pool)
217
827
{
218
827
    return pool->balancer == NULL;
219
827
}
220
221
void h2o_socketpool_init_global(h2o_socketpool_t *pool, size_t capacity)
222
1
{
223
1
    common_init(pool, NULL, 0, capacity, NULL);
224
1
}
225
226
void h2o_socketpool_destroy_target(h2o_socketpool_target_t *target)
227
0
{
228
0
    switch (target->type) {
229
0
    case H2O_SOCKETPOOL_TYPE_NAMED:
230
0
        free(target->peer.named_serv.base);
231
0
        break;
232
0
    case H2O_SOCKETPOOL_TYPE_SOCKADDR:
233
0
        break;
234
0
    }
235
0
    free(target->url.authority.base);
236
0
    free(target->url.host.base);
237
0
    free(target->url.path.base);
238
0
    free(target);
239
0
}
240
241
void h2o_socketpool_dispose(h2o_socketpool_t *pool)
242
0
{
243
0
    size_t i;
244
245
0
    pthread_mutex_lock(&pool->_shared.mutex);
246
0
    while (!h2o_linklist_is_empty(&pool->_shared.sockets)) {
247
0
        struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next);
248
0
        destroy_attached(entry);
249
0
        __sync_sub_and_fetch(&pool->_shared.count, 1);
250
0
        __sync_sub_and_fetch(&pool->_shared.pooled_count, 1);
251
0
    }
252
0
    pthread_mutex_unlock(&pool->_shared.mutex);
253
0
    pthread_mutex_destroy(&pool->_shared.mutex);
254
255
0
    if (pool->balancer != NULL) {
256
0
        pool->balancer->callbacks->destroy(pool->balancer);
257
0
    }
258
259
0
    if (pool->_ssl_ctx != NULL)
260
0
        SSL_CTX_free(pool->_ssl_ctx);
261
262
0
    if (pool->_interval_cb.loop != NULL)
263
0
        h2o_socketpool_unregister_loop(pool, pool->_interval_cb.loop);
264
265
0
    for (i = 0; i < pool->targets.size; i++) {
266
0
        h2o_socketpool_destroy_target(pool->targets.entries[i]);
267
0
    }
268
0
    free(pool->targets.entries);
269
0
}
270
271
void h2o_socketpool_set_ssl_ctx(h2o_socketpool_t *pool, SSL_CTX *ssl_ctx)
272
1
{
273
1
    if (pool->_ssl_ctx != NULL)
274
0
        SSL_CTX_free(pool->_ssl_ctx);
275
1
    if (ssl_ctx != NULL)
276
0
        SSL_CTX_up_ref(ssl_ctx);
277
1
    pool->_ssl_ctx = ssl_ctx;
278
1
}
279
280
void h2o_socketpool_register_loop(h2o_socketpool_t *pool, h2o_loop_t *loop)
281
2
{
282
2
    if (pool->_interval_cb.loop != NULL)
283
0
        return;
284
285
2
    pool->_interval_cb.loop = loop;
286
2
    h2o_timer_init(&pool->_interval_cb.timeout, on_timeout);
287
2
    h2o_timer_link(loop, CHECK_EXPIRATION_MIN_INTERVAL, &pool->_interval_cb.timeout);
288
2
}
289
290
void h2o_socketpool_unregister_loop(h2o_socketpool_t *pool, h2o_loop_t *loop)
291
0
{
292
0
    if (pool->_interval_cb.loop != loop)
293
0
        return;
294
0
    h2o_timer_unlink(&pool->_interval_cb.timeout);
295
0
    pool->_interval_cb.loop = NULL;
296
0
}
297
298
static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr)
299
780
{
300
780
    h2o_socketpool_connect_cb cb = req->cb;
301
780
    h2o_socket_t *sock = req->sock;
302
780
    void *data = req->data;
303
780
    h2o_socketpool_target_t *selected_target = req->pool->targets.entries[req->selected_target];
304
305
780
    if (req->lb.tried != NULL) {
306
780
        free(req->lb.tried);
307
780
    }
308
309
780
    free(req);
310
311
780
    if (sock != NULL)
312
780
        sock->data = NULL;
313
780
    cb(sock, errstr, data, &selected_target->url);
314
780
}
315
316
static void try_connect(h2o_socketpool_connect_request_t *req)
317
827
{
318
827
    h2o_socketpool_target_t *target;
319
320
827
    req->remaining_try_count--;
321
322
827
    if (req->lb.tried != NULL) {
323
827
        if (req->pool->targets.size > 1) {
324
0
            req->selected_target = req->pool->balancer->callbacks->select_(req->pool->balancer, &req->pool->targets, req->lb.tried);
325
0
            assert(!req->lb.tried[req->selected_target]);
326
0
            req->lb.tried[req->selected_target] = 1;
327
827
        } else {
328
827
            req->selected_target = 0;
329
827
        }
330
827
    }
331
827
    target = req->pool->targets.entries[req->selected_target];
332
827
    __sync_add_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
333
334
827
    switch (target->type) {
335
0
    case H2O_SOCKETPOOL_TYPE_NAMED:
336
        /* resolve the name, and connect */
337
0
        req->getaddr_req = h2o_hostinfo_getaddr(req->getaddr_receiver, target->url.host, target->peer.named_serv, AF_UNSPEC,
338
0
                                                SOCK_STREAM, IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req);
339
0
        break;
340
827
    case H2O_SOCKETPOOL_TYPE_SOCKADDR:
341
        /* connect (using sockaddr_in) */
342
827
        start_connect(req, (void *)&target->peer.sockaddr.bytes, target->peer.sockaddr.len);
343
827
        break;
344
827
    }
345
827
}
346
347
static void on_handshake_complete(h2o_socket_t *sock, const char *err)
348
0
{
349
0
    h2o_socketpool_connect_request_t *req = sock->data;
350
351
0
    assert(req->sock == sock);
352
353
0
    if (err == h2o_socket_error_ssl_cert_name_mismatch && (SSL_CTX_get_verify_mode(req->pool->_ssl_ctx) & SSL_VERIFY_PEER) == 0) {
354
        /* ignore CN mismatch if we are not verifying peer */
355
0
    } else if (err != NULL) {
356
0
        h2o_socket_close(sock);
357
0
        req->sock = NULL;
358
0
    }
359
360
0
    call_connect_cb(req, err);
361
0
}
362
363
static void on_connect(h2o_socket_t *sock, const char *err)
364
780
{
365
780
    h2o_socketpool_connect_request_t *req = sock->data;
366
367
780
    assert(req->sock == sock);
368
369
780
    if (err != NULL) {
370
0
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
371
0
        h2o_socket_close(sock);
372
0
        if (req->remaining_try_count > 0) {
373
0
            try_connect(req);
374
0
            return;
375
0
        }
376
0
        __sync_sub_and_fetch(&req->pool->_shared.count, 1);
377
0
        req->sock = NULL;
378
780
    } else {
379
780
        h2o_url_t *target_url = &req->pool->targets.entries[req->selected_target]->url;
380
780
        if (target_url->scheme->is_ssl) {
381
0
            assert(req->pool->_ssl_ctx != NULL && "h2o_socketpool_set_ssl_ctx must be called for a pool that contains SSL target");
382
0
            h2o_socket_ssl_handshake(sock, req->pool->_ssl_ctx, target_url->host.base, req->alpn_protos, on_handshake_complete);
383
0
            return;
384
0
        }
385
780
    }
386
387
780
    call_connect_cb(req, err);
388
780
}
389
390
static void on_close(void *data)
391
827
{
392
827
    struct on_close_data_t *close_data = data;
393
827
    h2o_socketpool_t *pool = close_data->pool;
394
827
    __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1);
395
827
    free(close_data);
396
827
    __sync_sub_and_fetch(&pool->_shared.count, 1);
397
827
}
398
399
static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen)
400
827
{
401
827
    struct on_close_data_t *close_data;
402
403
827
    req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect, NULL);
404
827
    if (req->sock == NULL) {
405
0
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
406
0
        if (req->remaining_try_count > 0) {
407
0
            try_connect(req);
408
0
            return;
409
0
        }
410
0
        __sync_sub_and_fetch(&req->pool->_shared.count, 1);
411
0
        call_connect_cb(req, h2o_socket_error_conn_fail);
412
0
        return;
413
0
    }
414
827
    close_data = h2o_mem_alloc(sizeof(*close_data));
415
827
    close_data->pool = req->pool;
416
827
    close_data->target = req->selected_target;
417
827
    req->sock->data = req;
418
827
    req->sock->on_close.cb = on_close;
419
827
    req->sock->on_close.data = close_data;
420
827
}
421
422
static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req)
423
0
{
424
0
    h2o_socketpool_connect_request_t *req = _req;
425
426
0
    assert(getaddr_req == req->getaddr_req);
427
0
    req->getaddr_req = NULL;
428
429
0
    if (errstr != NULL) {
430
0
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
431
0
        if (req->remaining_try_count > 0) {
432
0
            try_connect(req);
433
0
            return;
434
0
        }
435
0
        __sync_sub_and_fetch(&req->pool->_shared.count, 1);
436
0
        call_connect_cb(req, errstr);
437
0
        return;
438
0
    }
439
440
0
    struct addrinfo *selected = h2o_hostinfo_select_one(res);
441
0
    start_connect(req, selected->ai_addr, selected->ai_addrlen);
442
0
}
443
444
static size_t lookup_target(h2o_socketpool_t *pool, h2o_url_t *url)
445
0
{
446
0
    uint16_t port = h2o_url_get_port(url);
447
0
    size_t i = 0;
448
0
    for (; i != pool->targets.size; ++i) {
449
0
        h2o_socketpool_target_t *target = pool->targets.entries[i];
450
0
        if (target->url.scheme != url->scheme)
451
0
            continue;
452
0
        if (h2o_url_get_port(&target->url) != port)
453
0
            continue;
454
0
        if (!h2o_url_hosts_are_equal(&target->url, url))
455
0
            continue;
456
0
        return i;
457
0
    }
458
0
    return SIZE_MAX;
459
0
}
460
461
void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_url_t *url, h2o_loop_t *loop,
462
                            h2o_multithread_receiver_t *getaddr_receiver, h2o_iovec_t alpn_protos, h2o_socketpool_connect_cb cb,
463
                            void *data)
464
827
{
465
827
    struct pool_entry_t *entry = NULL;
466
827
    struct on_close_data_t *close_data;
467
468
827
    if (_req != NULL)
469
827
        *_req = NULL;
470
471
827
    size_t target = SIZE_MAX;
472
827
    h2o_linklist_t *sockets = NULL;
473
474
    /* fetch an entry and return it */
475
827
    pthread_mutex_lock(&pool->_shared.mutex);
476
827
    check_pool_expired_locked(pool, loop);
477
478
    /* TODO lookup outside this critical section */
479
827
    if (h2o_socketpool_is_global(pool)) {
480
0
        target = lookup_target(pool, url);
481
0
        if (target == SIZE_MAX) {
482
0
            h2o_vector_reserve(NULL, &pool->targets, pool->targets.size + 1);
483
0
            pool->targets.entries[pool->targets.size++] = h2o_socketpool_create_target(url, NULL);
484
0
            target = pool->targets.size - 1;
485
0
        }
486
0
        sockets = &pool->targets.entries[target]->_shared.sockets;
487
827
    } else {
488
827
        sockets = &pool->_shared.sockets;
489
827
    }
490
827
    assert(pool->targets.size != 0);
491
492
827
    while (!h2o_linklist_is_empty(sockets)) {
493
0
        if (h2o_socketpool_is_global(pool)) {
494
0
            entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, target_link, sockets->next);
495
0
        } else {
496
0
            entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, sockets->next);
497
0
        }
498
0
        h2o_linklist_unlink(&entry->all_link);
499
0
        h2o_linklist_unlink(&entry->target_link);
500
0
        pthread_mutex_unlock(&pool->_shared.mutex);
501
502
0
        __sync_sub_and_fetch(&pool->_shared.pooled_count, 1);
503
504
        /* test if the connection is still alive */
505
0
        char buf[1];
506
0
        ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK);
507
0
        if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
508
            /* yes! return it */
509
0
            size_t entry_target = entry->target;
510
0
            h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo);
511
0
            free(entry);
512
0
            close_data = h2o_mem_alloc(sizeof(*close_data));
513
0
            close_data->pool = pool;
514
0
            close_data->target = entry_target;
515
0
            sock->on_close.cb = on_close;
516
0
            sock->on_close.data = close_data;
517
0
            __sync_add_and_fetch(&pool->targets.entries[entry_target]->_shared.leased_count, 1);
518
0
            cb(sock, NULL, data, &pool->targets.entries[entry_target]->url);
519
0
            return;
520
0
        }
521
522
        /* connection is dead, report, close, and retry */
523
0
        if (rret <= 0) {
524
0
            static long counter = 0;
525
0
            if (__sync_fetch_and_add(&counter, 1) == 0)
526
0
                h2o_error_printf("[WARN] detected close by upstream before the expected timeout (see issue #679)\n");
527
0
        } else {
528
0
            static long counter = 0;
529
0
            if (__sync_fetch_and_add(&counter, 1) == 0)
530
0
                h2o_error_printf("[WARN] unexpectedly received data to a pooled socket (see issue #679)\n");
531
0
        }
532
0
        destroy_detached(entry);
533
0
        pthread_mutex_lock(&pool->_shared.mutex);
534
0
    }
535
827
    pthread_mutex_unlock(&pool->_shared.mutex);
536
537
    /* FIXME repsect `capacity` */
538
827
    __sync_add_and_fetch(&pool->_shared.count, 1);
539
540
    /* prepare request object */
541
827
    h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req));
542
827
    *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop};
543
544
827
    if (_req != NULL)
545
827
        *_req = req;
546
827
    req->getaddr_receiver = getaddr_receiver;
547
827
    req->alpn_protos = alpn_protos;
548
549
827
    req->selected_target = target;
550
827
    if (target == SIZE_MAX) {
551
827
        req->lb.tried = h2o_mem_alloc(sizeof(req->lb.tried[0]) * pool->targets.size);
552
827
        memset(req->lb.tried, 0, sizeof(req->lb.tried[0]) * pool->targets.size);
553
827
        req->remaining_try_count = pool->targets.size;
554
827
    } else {
555
0
        req->remaining_try_count = 1;
556
0
    }
557
827
    try_connect(req);
558
827
}
559
560
void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req)
561
47
{
562
47
    if (req->getaddr_req != NULL) {
563
0
        h2o_hostinfo_getaddr_cancel(req->getaddr_req);
564
0
        req->getaddr_req = NULL;
565
0
    }
566
47
    if (req->sock != NULL)
567
47
        h2o_socket_close(req->sock);
568
47
    if (req->lb.tried != NULL) {
569
47
        free(req->lb.tried);
570
47
        __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1);
571
47
    }
572
47
    free(req);
573
47
}
574
575
int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock)
576
0
{
577
0
    struct pool_entry_t *entry;
578
0
    struct on_close_data_t *close_data;
579
0
    size_t target;
580
581
0
    close_data = sock->on_close.data;
582
0
    target = close_data->target;
583
    /* reset the on_close callback */
584
0
    assert(close_data->pool == pool);
585
0
    __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1);
586
0
    free(close_data);
587
0
    sock->on_close.cb = NULL;
588
0
    sock->on_close.data = NULL;
589
590
0
    entry = h2o_mem_alloc(sizeof(*entry));
591
0
    if (h2o_socket_export(sock, &entry->sockinfo) != 0) {
592
0
        free(entry);
593
0
        __sync_sub_and_fetch(&pool->_shared.count, 1);
594
0
        return -1;
595
0
    }
596
0
    memset(&entry->all_link, 0, sizeof(entry->all_link));
597
0
    memset(&entry->target_link, 0, sizeof(entry->target_link));
598
0
    entry->added_at = h2o_now(h2o_socket_get_loop(sock));
599
0
    entry->target = target;
600
601
0
    __sync_add_and_fetch(&pool->_shared.pooled_count, 1);
602
603
0
    pthread_mutex_lock(&pool->_shared.mutex);
604
0
    h2o_linklist_insert(&pool->_shared.sockets, &entry->all_link);
605
0
    h2o_linklist_insert(&pool->targets.entries[target]->_shared.sockets, &entry->target_link);
606
0
    check_pool_expired_locked(pool, h2o_socket_get_loop(sock));
607
0
    pthread_mutex_unlock(&pool->_shared.mutex);
608
0
    return 0;
609
0
}
610
611
void h2o_socketpool_detach(h2o_socketpool_t *pool, h2o_socket_t *sock)
612
0
{
613
0
    struct on_close_data_t *close_data = sock->on_close.data;
614
0
    assert(close_data->pool == pool);
615
616
0
    __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1);
617
0
    __sync_sub_and_fetch(&pool->_shared.count, 1);
618
619
0
    sock->on_close.cb = NULL;
620
0
    sock->on_close.data = NULL;
621
0
    free(close_data);
622
0
}
623
624
int h2o_socketpool_can_keepalive(h2o_socketpool_t *pool)
625
780
{
626
780
    return pool->timeout > 0;
627
780
}