/src/h2o/lib/common/socketpool.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku |
3 | | * |
4 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | | * of this software and associated documentation files (the "Software"), to |
6 | | * deal in the Software without restriction, including without limitation the |
7 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
8 | | * sell copies of the Software, and to permit persons to whom the Software is |
9 | | * furnished to do so, subject to the following conditions: |
10 | | * |
11 | | * The above copyright notice and this permission notice shall be included in |
12 | | * all copies or substantial portions of the Software. |
13 | | * |
14 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
19 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
20 | | * IN THE SOFTWARE. |
21 | | */ |
22 | | #include <assert.h> |
23 | | #include <errno.h> |
24 | | #include <netdb.h> |
25 | | #include <stdlib.h> |
26 | | #include <sys/socket.h> |
27 | | #include <sys/types.h> |
28 | | #include <sys/un.h> |
29 | | #include <netinet/in.h> |
30 | | #include "h2o/hostinfo.h" |
31 | | #include "h2o/linklist.h" |
32 | | #include "h2o/socketpool.h" |
33 | | #include "h2o/string_.h" |
34 | | #include "h2o/socket.h" |
35 | | #include "h2o/balancer.h" |
36 | | |
37 | | /** |
38 | | * timeout will be set to this value when calculated less than this value |
39 | | */ |
40 | 2 | #define CHECK_EXPIRATION_MIN_INTERVAL 1000 |
41 | | |
42 | | struct pool_entry_t { |
43 | | h2o_socket_export_t sockinfo; |
44 | | size_t target; |
45 | | h2o_linklist_t all_link; |
46 | | h2o_linklist_t target_link; |
47 | | uint64_t added_at; |
48 | | }; |
49 | | |
50 | | struct st_h2o_socketpool_connect_request_t { |
51 | | void *data; |
52 | | h2o_socketpool_connect_cb cb; |
53 | | h2o_socketpool_t *pool; |
54 | | h2o_loop_t *loop; |
55 | | h2o_hostinfo_getaddr_req_t *getaddr_req; |
56 | | h2o_socket_t *sock; |
57 | | h2o_multithread_receiver_t *getaddr_receiver; |
58 | | size_t selected_target; |
59 | | size_t remaining_try_count; |
60 | | struct { |
61 | | char *tried; |
62 | | } lb; |
63 | | h2o_iovec_t alpn_protos; |
64 | | }; |
65 | | |
66 | | struct on_close_data_t { |
67 | | h2o_socketpool_t *pool; |
68 | | size_t target; |
69 | | }; |
70 | | |
71 | | static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen); |
72 | | static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req); |
73 | | |
74 | | static void destroy_detached(struct pool_entry_t *entry) |
75 | 0 | { |
76 | 0 | h2o_socket_dispose_export(&entry->sockinfo); |
77 | 0 | free(entry); |
78 | 0 | } |
79 | | |
80 | | static void destroy_attached(struct pool_entry_t *entry) |
81 | 0 | { |
82 | 0 | h2o_linklist_unlink(&entry->all_link); |
83 | 0 | h2o_linklist_unlink(&entry->target_link); |
84 | 0 | destroy_detached(entry); |
85 | 0 | } |
86 | | |
87 | | /* caller should lock the mutex */ |
88 | | static uint64_t destroy_expired_locked(h2o_socketpool_t *pool) |
89 | 829 | { |
90 | 829 | if (pool->_interval_cb.loop != NULL) { |
91 | 829 | uint64_t now_ms = h2o_now(pool->_interval_cb.loop); |
92 | 829 | uint64_t expire_before = now_ms - pool->timeout; |
93 | 829 | while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { |
94 | 0 | struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next); |
95 | 0 | if (entry->added_at > expire_before) { |
96 | 0 | return entry->added_at + pool->timeout - now_ms; |
97 | 0 | } |
98 | 0 | destroy_attached(entry); |
99 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
100 | 0 | __sync_sub_and_fetch(&pool->_shared.pooled_count, 1); |
101 | 0 | } |
102 | 829 | } |
103 | 829 | return UINT64_MAX; |
104 | 829 | } |
105 | | |
106 | | /* caller should lock the mutex */ |
107 | | static void check_pool_expired_locked(h2o_socketpool_t *pool, h2o_loop_t *this_loop) |
108 | 829 | { |
109 | 829 | uint64_t next_expired = destroy_expired_locked(pool); |
110 | 829 | if (next_expired != UINT64_MAX) { |
111 | 0 | if (this_loop == pool->_interval_cb.loop && !h2o_timer_is_linked(&pool->_interval_cb.timeout)) { |
112 | 0 | if (next_expired < CHECK_EXPIRATION_MIN_INTERVAL) |
113 | 0 | next_expired = CHECK_EXPIRATION_MIN_INTERVAL; |
114 | 0 | h2o_timer_link(pool->_interval_cb.loop, next_expired, &pool->_interval_cb.timeout); |
115 | 0 | } |
116 | 0 | } |
117 | 829 | } |
118 | | |
119 | | static void on_timeout(h2o_timer_t *timeout) |
120 | 2 | { |
121 | | /* decrease the frequency of this function being called; the expiration |
122 | | * check can be (should be) performed in the `connect` function as well |
123 | | */ |
124 | 2 | h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.timeout, timeout); |
125 | | |
126 | 2 | if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) { |
127 | 2 | check_pool_expired_locked(pool, pool->_interval_cb.loop); |
128 | 2 | pthread_mutex_unlock(&pool->_shared.mutex); |
129 | 2 | } |
130 | 2 | } |
131 | | |
132 | | static void common_init(h2o_socketpool_t *pool, h2o_socketpool_target_t **targets, size_t num_targets, size_t capacity, |
133 | | h2o_balancer_t *balancer) |
134 | 2 | { |
135 | 2 | memset(pool, 0, sizeof(*pool)); |
136 | | |
137 | 2 | pool->capacity = capacity; |
138 | 2 | pool->timeout = 2000; |
139 | | |
140 | 2 | pthread_mutex_init(&pool->_shared.mutex, NULL); |
141 | 2 | h2o_linklist_init_anchor(&pool->_shared.sockets); |
142 | | |
143 | 2 | h2o_vector_reserve(NULL, &pool->targets, num_targets); |
144 | 3 | for (; pool->targets.size < num_targets; ++pool->targets.size) |
145 | 1 | pool->targets.entries[pool->targets.size] = targets[pool->targets.size]; |
146 | | |
147 | 2 | pool->balancer = balancer; |
148 | 2 | } |
149 | | |
150 | | h2o_socketpool_target_type_t detect_target_type(h2o_url_t *url, struct sockaddr_storage *sa, socklen_t *salen) |
151 | 1 | { |
152 | 1 | memset(sa, 0, sizeof(*sa)); |
153 | 1 | const char *to_sun_err = h2o_url_host_to_sun(url->host, (struct sockaddr_un *)sa); |
154 | 1 | if (to_sun_err == h2o_url_host_to_sun_err_is_not_unix_socket) { |
155 | 0 | sa->ss_family = AF_INET; |
156 | 0 | struct sockaddr_in *sin = (struct sockaddr_in *)sa; |
157 | 0 | *salen = sizeof(*sin); |
158 | |
|
159 | 0 | if (h2o_hostinfo_aton(url->host, &sin->sin_addr) == 0) { |
160 | 0 | sin->sin_port = htons(h2o_url_get_port(url)); |
161 | 0 | return H2O_SOCKETPOOL_TYPE_SOCKADDR; |
162 | 0 | } else { |
163 | 0 | return H2O_SOCKETPOOL_TYPE_NAMED; |
164 | 0 | } |
165 | 1 | } else { |
166 | 1 | assert(to_sun_err == NULL); |
167 | 1 | *salen = sizeof(struct sockaddr_un); |
168 | 1 | return H2O_SOCKETPOOL_TYPE_SOCKADDR; |
169 | 1 | } |
170 | 1 | } |
171 | | |
172 | | h2o_socketpool_target_t *h2o_socketpool_create_target(h2o_url_t *origin, h2o_socketpool_target_conf_t *lb_target_conf) |
173 | 1 | { |
174 | 1 | struct sockaddr_storage sa; |
175 | 1 | socklen_t salen; |
176 | | |
177 | 1 | h2o_socketpool_target_t *target = h2o_mem_alloc(sizeof(*target)); |
178 | 1 | h2o_url_copy(NULL, &target->url, origin); |
179 | 1 | assert(target->url.host.base[target->url.host.len] == '\0'); /* needs to be null-terminated in order to be used in SNI */ |
180 | 1 | target->type = detect_target_type(origin, &sa, &salen); |
181 | 1 | if (!(target->type == H2O_SOCKETPOOL_TYPE_SOCKADDR && sa.ss_family == AF_UNIX)) { |
182 | 0 | h2o_strtolower(target->url.authority.base, target->url.authority.len); |
183 | 0 | h2o_strtolower(target->url.host.base, target->url.host.len); |
184 | 0 | } |
185 | | |
186 | 1 | switch (target->type) { |
187 | 0 | case H2O_SOCKETPOOL_TYPE_NAMED: |
188 | 0 | target->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR)); |
189 | 0 | target->peer.named_serv.len = sprintf(target->peer.named_serv.base, "%u", (unsigned)h2o_url_get_port(&target->url)); |
190 | 0 | break; |
191 | 1 | case H2O_SOCKETPOOL_TYPE_SOCKADDR: |
192 | 1 | assert(salen <= sizeof(target->peer.sockaddr.bytes)); |
193 | 1 | memcpy(&target->peer.sockaddr.bytes, &sa, salen); |
194 | 1 | target->peer.sockaddr.len = salen; |
195 | 1 | break; |
196 | 1 | } |
197 | 1 | target->_shared.leased_count = 0; |
198 | 1 | if (lb_target_conf != NULL) |
199 | 0 | target->conf.weight_m1 = lb_target_conf->weight_m1; |
200 | 1 | else { |
201 | 1 | target->conf.weight_m1 = 0; |
202 | 1 | } |
203 | | |
204 | 1 | h2o_linklist_init_anchor(&target->_shared.sockets); |
205 | 1 | return target; |
206 | 1 | } |
207 | | |
208 | | void h2o_socketpool_init_specific(h2o_socketpool_t *pool, size_t capacity, h2o_socketpool_target_t **targets, size_t num_targets, |
209 | | h2o_balancer_t *balancer) |
210 | 1 | { |
211 | 1 | if (balancer == NULL) |
212 | 1 | balancer = h2o_balancer_create_rr(); |
213 | 1 | common_init(pool, targets, num_targets, capacity, balancer); |
214 | 1 | } |
215 | | |
216 | | int h2o_socketpool_is_global(h2o_socketpool_t *pool) |
217 | 827 | { |
218 | 827 | return pool->balancer == NULL; |
219 | 827 | } |
220 | | |
221 | | void h2o_socketpool_init_global(h2o_socketpool_t *pool, size_t capacity) |
222 | 1 | { |
223 | 1 | common_init(pool, NULL, 0, capacity, NULL); |
224 | 1 | } |
225 | | |
226 | | void h2o_socketpool_destroy_target(h2o_socketpool_target_t *target) |
227 | 0 | { |
228 | 0 | switch (target->type) { |
229 | 0 | case H2O_SOCKETPOOL_TYPE_NAMED: |
230 | 0 | free(target->peer.named_serv.base); |
231 | 0 | break; |
232 | 0 | case H2O_SOCKETPOOL_TYPE_SOCKADDR: |
233 | 0 | break; |
234 | 0 | } |
235 | 0 | free(target->url.authority.base); |
236 | 0 | free(target->url.host.base); |
237 | 0 | free(target->url.path.base); |
238 | 0 | free(target); |
239 | 0 | } |
240 | | |
241 | | void h2o_socketpool_dispose(h2o_socketpool_t *pool) |
242 | 0 | { |
243 | 0 | size_t i; |
244 | |
|
245 | 0 | pthread_mutex_lock(&pool->_shared.mutex); |
246 | 0 | while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { |
247 | 0 | struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next); |
248 | 0 | destroy_attached(entry); |
249 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
250 | 0 | __sync_sub_and_fetch(&pool->_shared.pooled_count, 1); |
251 | 0 | } |
252 | 0 | pthread_mutex_unlock(&pool->_shared.mutex); |
253 | 0 | pthread_mutex_destroy(&pool->_shared.mutex); |
254 | |
|
255 | 0 | if (pool->balancer != NULL) { |
256 | 0 | pool->balancer->callbacks->destroy(pool->balancer); |
257 | 0 | } |
258 | |
|
259 | 0 | if (pool->_ssl_ctx != NULL) |
260 | 0 | SSL_CTX_free(pool->_ssl_ctx); |
261 | |
|
262 | 0 | if (pool->_interval_cb.loop != NULL) |
263 | 0 | h2o_socketpool_unregister_loop(pool, pool->_interval_cb.loop); |
264 | |
|
265 | 0 | for (i = 0; i < pool->targets.size; i++) { |
266 | 0 | h2o_socketpool_destroy_target(pool->targets.entries[i]); |
267 | 0 | } |
268 | 0 | free(pool->targets.entries); |
269 | 0 | } |
270 | | |
271 | | void h2o_socketpool_set_ssl_ctx(h2o_socketpool_t *pool, SSL_CTX *ssl_ctx) |
272 | 1 | { |
273 | 1 | if (pool->_ssl_ctx != NULL) |
274 | 0 | SSL_CTX_free(pool->_ssl_ctx); |
275 | 1 | if (ssl_ctx != NULL) |
276 | 0 | SSL_CTX_up_ref(ssl_ctx); |
277 | 1 | pool->_ssl_ctx = ssl_ctx; |
278 | 1 | } |
279 | | |
280 | | void h2o_socketpool_register_loop(h2o_socketpool_t *pool, h2o_loop_t *loop) |
281 | 2 | { |
282 | 2 | if (pool->_interval_cb.loop != NULL) |
283 | 0 | return; |
284 | | |
285 | 2 | pool->_interval_cb.loop = loop; |
286 | 2 | h2o_timer_init(&pool->_interval_cb.timeout, on_timeout); |
287 | 2 | h2o_timer_link(loop, CHECK_EXPIRATION_MIN_INTERVAL, &pool->_interval_cb.timeout); |
288 | 2 | } |
289 | | |
290 | | void h2o_socketpool_unregister_loop(h2o_socketpool_t *pool, h2o_loop_t *loop) |
291 | 0 | { |
292 | 0 | if (pool->_interval_cb.loop != loop) |
293 | 0 | return; |
294 | 0 | h2o_timer_unlink(&pool->_interval_cb.timeout); |
295 | 0 | pool->_interval_cb.loop = NULL; |
296 | 0 | } |
297 | | |
298 | | static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr) |
299 | 780 | { |
300 | 780 | h2o_socketpool_connect_cb cb = req->cb; |
301 | 780 | h2o_socket_t *sock = req->sock; |
302 | 780 | void *data = req->data; |
303 | 780 | h2o_socketpool_target_t *selected_target = req->pool->targets.entries[req->selected_target]; |
304 | | |
305 | 780 | if (req->lb.tried != NULL) { |
306 | 780 | free(req->lb.tried); |
307 | 780 | } |
308 | | |
309 | 780 | free(req); |
310 | | |
311 | 780 | if (sock != NULL) |
312 | 780 | sock->data = NULL; |
313 | 780 | cb(sock, errstr, data, &selected_target->url); |
314 | 780 | } |
315 | | |
316 | | static void try_connect(h2o_socketpool_connect_request_t *req) |
317 | 827 | { |
318 | 827 | h2o_socketpool_target_t *target; |
319 | | |
320 | 827 | req->remaining_try_count--; |
321 | | |
322 | 827 | if (req->lb.tried != NULL) { |
323 | 827 | if (req->pool->targets.size > 1) { |
324 | 0 | req->selected_target = req->pool->balancer->callbacks->select_(req->pool->balancer, &req->pool->targets, req->lb.tried); |
325 | 0 | assert(!req->lb.tried[req->selected_target]); |
326 | 0 | req->lb.tried[req->selected_target] = 1; |
327 | 827 | } else { |
328 | 827 | req->selected_target = 0; |
329 | 827 | } |
330 | 827 | } |
331 | 827 | target = req->pool->targets.entries[req->selected_target]; |
332 | 827 | __sync_add_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
333 | | |
334 | 827 | switch (target->type) { |
335 | 0 | case H2O_SOCKETPOOL_TYPE_NAMED: |
336 | | /* resolve the name, and connect */ |
337 | 0 | req->getaddr_req = h2o_hostinfo_getaddr(req->getaddr_receiver, target->url.host, target->peer.named_serv, AF_UNSPEC, |
338 | 0 | SOCK_STREAM, IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req); |
339 | 0 | break; |
340 | 827 | case H2O_SOCKETPOOL_TYPE_SOCKADDR: |
341 | | /* connect (using sockaddr_in) */ |
342 | 827 | start_connect(req, (void *)&target->peer.sockaddr.bytes, target->peer.sockaddr.len); |
343 | 827 | break; |
344 | 827 | } |
345 | 827 | } |
346 | | |
347 | | static void on_handshake_complete(h2o_socket_t *sock, const char *err) |
348 | 0 | { |
349 | 0 | h2o_socketpool_connect_request_t *req = sock->data; |
350 | |
|
351 | 0 | assert(req->sock == sock); |
352 | | |
353 | 0 | if (err == h2o_socket_error_ssl_cert_name_mismatch && (SSL_CTX_get_verify_mode(req->pool->_ssl_ctx) & SSL_VERIFY_PEER) == 0) { |
354 | | /* ignore CN mismatch if we are not verifying peer */ |
355 | 0 | } else if (err != NULL) { |
356 | 0 | h2o_socket_close(sock); |
357 | 0 | req->sock = NULL; |
358 | 0 | } |
359 | |
|
360 | 0 | call_connect_cb(req, err); |
361 | 0 | } |
362 | | |
363 | | static void on_connect(h2o_socket_t *sock, const char *err) |
364 | 780 | { |
365 | 780 | h2o_socketpool_connect_request_t *req = sock->data; |
366 | | |
367 | 780 | assert(req->sock == sock); |
368 | | |
369 | 780 | if (err != NULL) { |
370 | 0 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
371 | 0 | h2o_socket_close(sock); |
372 | 0 | if (req->remaining_try_count > 0) { |
373 | 0 | try_connect(req); |
374 | 0 | return; |
375 | 0 | } |
376 | 0 | __sync_sub_and_fetch(&req->pool->_shared.count, 1); |
377 | 0 | req->sock = NULL; |
378 | 780 | } else { |
379 | 780 | h2o_url_t *target_url = &req->pool->targets.entries[req->selected_target]->url; |
380 | 780 | if (target_url->scheme->is_ssl) { |
381 | 0 | assert(req->pool->_ssl_ctx != NULL && "h2o_socketpool_set_ssl_ctx must be called for a pool that contains SSL target"); |
382 | 0 | h2o_socket_ssl_handshake(sock, req->pool->_ssl_ctx, target_url->host.base, req->alpn_protos, on_handshake_complete); |
383 | 0 | return; |
384 | 0 | } |
385 | 780 | } |
386 | | |
387 | 780 | call_connect_cb(req, err); |
388 | 780 | } |
389 | | |
390 | | static void on_close(void *data) |
391 | 827 | { |
392 | 827 | struct on_close_data_t *close_data = data; |
393 | 827 | h2o_socketpool_t *pool = close_data->pool; |
394 | 827 | __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1); |
395 | 827 | free(close_data); |
396 | 827 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
397 | 827 | } |
398 | | |
399 | | static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen) |
400 | 827 | { |
401 | 827 | struct on_close_data_t *close_data; |
402 | | |
403 | 827 | req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect, NULL); |
404 | 827 | if (req->sock == NULL) { |
405 | 0 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
406 | 0 | if (req->remaining_try_count > 0) { |
407 | 0 | try_connect(req); |
408 | 0 | return; |
409 | 0 | } |
410 | 0 | __sync_sub_and_fetch(&req->pool->_shared.count, 1); |
411 | 0 | call_connect_cb(req, h2o_socket_error_conn_fail); |
412 | 0 | return; |
413 | 0 | } |
414 | 827 | close_data = h2o_mem_alloc(sizeof(*close_data)); |
415 | 827 | close_data->pool = req->pool; |
416 | 827 | close_data->target = req->selected_target; |
417 | 827 | req->sock->data = req; |
418 | 827 | req->sock->on_close.cb = on_close; |
419 | 827 | req->sock->on_close.data = close_data; |
420 | 827 | } |
421 | | |
422 | | static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req) |
423 | 0 | { |
424 | 0 | h2o_socketpool_connect_request_t *req = _req; |
425 | |
|
426 | 0 | assert(getaddr_req == req->getaddr_req); |
427 | 0 | req->getaddr_req = NULL; |
428 | |
|
429 | 0 | if (errstr != NULL) { |
430 | 0 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
431 | 0 | if (req->remaining_try_count > 0) { |
432 | 0 | try_connect(req); |
433 | 0 | return; |
434 | 0 | } |
435 | 0 | __sync_sub_and_fetch(&req->pool->_shared.count, 1); |
436 | 0 | call_connect_cb(req, errstr); |
437 | 0 | return; |
438 | 0 | } |
439 | | |
440 | 0 | struct addrinfo *selected = h2o_hostinfo_select_one(res); |
441 | 0 | start_connect(req, selected->ai_addr, selected->ai_addrlen); |
442 | 0 | } |
443 | | |
444 | | static size_t lookup_target(h2o_socketpool_t *pool, h2o_url_t *url) |
445 | 0 | { |
446 | 0 | uint16_t port = h2o_url_get_port(url); |
447 | 0 | size_t i = 0; |
448 | 0 | for (; i != pool->targets.size; ++i) { |
449 | 0 | h2o_socketpool_target_t *target = pool->targets.entries[i]; |
450 | 0 | if (target->url.scheme != url->scheme) |
451 | 0 | continue; |
452 | 0 | if (h2o_url_get_port(&target->url) != port) |
453 | 0 | continue; |
454 | 0 | if (!h2o_url_hosts_are_equal(&target->url, url)) |
455 | 0 | continue; |
456 | 0 | return i; |
457 | 0 | } |
458 | 0 | return SIZE_MAX; |
459 | 0 | } |
460 | | |
461 | | void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_url_t *url, h2o_loop_t *loop, |
462 | | h2o_multithread_receiver_t *getaddr_receiver, h2o_iovec_t alpn_protos, h2o_socketpool_connect_cb cb, |
463 | | void *data) |
464 | 827 | { |
465 | 827 | struct pool_entry_t *entry = NULL; |
466 | 827 | struct on_close_data_t *close_data; |
467 | | |
468 | 827 | if (_req != NULL) |
469 | 827 | *_req = NULL; |
470 | | |
471 | 827 | size_t target = SIZE_MAX; |
472 | 827 | h2o_linklist_t *sockets = NULL; |
473 | | |
474 | | /* fetch an entry and return it */ |
475 | 827 | pthread_mutex_lock(&pool->_shared.mutex); |
476 | 827 | check_pool_expired_locked(pool, loop); |
477 | | |
478 | | /* TODO lookup outside this critical section */ |
479 | 827 | if (h2o_socketpool_is_global(pool)) { |
480 | 0 | target = lookup_target(pool, url); |
481 | 0 | if (target == SIZE_MAX) { |
482 | 0 | h2o_vector_reserve(NULL, &pool->targets, pool->targets.size + 1); |
483 | 0 | pool->targets.entries[pool->targets.size++] = h2o_socketpool_create_target(url, NULL); |
484 | 0 | target = pool->targets.size - 1; |
485 | 0 | } |
486 | 0 | sockets = &pool->targets.entries[target]->_shared.sockets; |
487 | 827 | } else { |
488 | 827 | sockets = &pool->_shared.sockets; |
489 | 827 | } |
490 | 827 | assert(pool->targets.size != 0); |
491 | | |
492 | 827 | while (!h2o_linklist_is_empty(sockets)) { |
493 | 0 | if (h2o_socketpool_is_global(pool)) { |
494 | 0 | entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, target_link, sockets->next); |
495 | 0 | } else { |
496 | 0 | entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, sockets->next); |
497 | 0 | } |
498 | 0 | h2o_linklist_unlink(&entry->all_link); |
499 | 0 | h2o_linklist_unlink(&entry->target_link); |
500 | 0 | pthread_mutex_unlock(&pool->_shared.mutex); |
501 | |
|
502 | 0 | __sync_sub_and_fetch(&pool->_shared.pooled_count, 1); |
503 | | |
504 | | /* test if the connection is still alive */ |
505 | 0 | char buf[1]; |
506 | 0 | ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK); |
507 | 0 | if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { |
508 | | /* yes! return it */ |
509 | 0 | size_t entry_target = entry->target; |
510 | 0 | h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo); |
511 | 0 | free(entry); |
512 | 0 | close_data = h2o_mem_alloc(sizeof(*close_data)); |
513 | 0 | close_data->pool = pool; |
514 | 0 | close_data->target = entry_target; |
515 | 0 | sock->on_close.cb = on_close; |
516 | 0 | sock->on_close.data = close_data; |
517 | 0 | __sync_add_and_fetch(&pool->targets.entries[entry_target]->_shared.leased_count, 1); |
518 | 0 | cb(sock, NULL, data, &pool->targets.entries[entry_target]->url); |
519 | 0 | return; |
520 | 0 | } |
521 | | |
522 | | /* connection is dead, report, close, and retry */ |
523 | 0 | if (rret <= 0) { |
524 | 0 | static long counter = 0; |
525 | 0 | if (__sync_fetch_and_add(&counter, 1) == 0) |
526 | 0 | h2o_error_printf("[WARN] detected close by upstream before the expected timeout (see issue #679)\n"); |
527 | 0 | } else { |
528 | 0 | static long counter = 0; |
529 | 0 | if (__sync_fetch_and_add(&counter, 1) == 0) |
530 | 0 | h2o_error_printf("[WARN] unexpectedly received data to a pooled socket (see issue #679)\n"); |
531 | 0 | } |
532 | 0 | destroy_detached(entry); |
533 | 0 | pthread_mutex_lock(&pool->_shared.mutex); |
534 | 0 | } |
535 | 827 | pthread_mutex_unlock(&pool->_shared.mutex); |
536 | | |
537 | | /* FIXME repsect `capacity` */ |
538 | 827 | __sync_add_and_fetch(&pool->_shared.count, 1); |
539 | | |
540 | | /* prepare request object */ |
541 | 827 | h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req)); |
542 | 827 | *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop}; |
543 | | |
544 | 827 | if (_req != NULL) |
545 | 827 | *_req = req; |
546 | 827 | req->getaddr_receiver = getaddr_receiver; |
547 | 827 | req->alpn_protos = alpn_protos; |
548 | | |
549 | 827 | req->selected_target = target; |
550 | 827 | if (target == SIZE_MAX) { |
551 | 827 | req->lb.tried = h2o_mem_alloc(sizeof(req->lb.tried[0]) * pool->targets.size); |
552 | 827 | memset(req->lb.tried, 0, sizeof(req->lb.tried[0]) * pool->targets.size); |
553 | 827 | req->remaining_try_count = pool->targets.size; |
554 | 827 | } else { |
555 | 0 | req->remaining_try_count = 1; |
556 | 0 | } |
557 | 827 | try_connect(req); |
558 | 827 | } |
559 | | |
560 | | void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req) |
561 | 47 | { |
562 | 47 | if (req->getaddr_req != NULL) { |
563 | 0 | h2o_hostinfo_getaddr_cancel(req->getaddr_req); |
564 | 0 | req->getaddr_req = NULL; |
565 | 0 | } |
566 | 47 | if (req->sock != NULL) |
567 | 47 | h2o_socket_close(req->sock); |
568 | 47 | if (req->lb.tried != NULL) { |
569 | 47 | free(req->lb.tried); |
570 | 47 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
571 | 47 | } |
572 | 47 | free(req); |
573 | 47 | } |
574 | | |
575 | | int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock) |
576 | 0 | { |
577 | 0 | struct pool_entry_t *entry; |
578 | 0 | struct on_close_data_t *close_data; |
579 | 0 | size_t target; |
580 | |
|
581 | 0 | close_data = sock->on_close.data; |
582 | 0 | target = close_data->target; |
583 | | /* reset the on_close callback */ |
584 | 0 | assert(close_data->pool == pool); |
585 | 0 | __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1); |
586 | 0 | free(close_data); |
587 | 0 | sock->on_close.cb = NULL; |
588 | 0 | sock->on_close.data = NULL; |
589 | |
|
590 | 0 | entry = h2o_mem_alloc(sizeof(*entry)); |
591 | 0 | if (h2o_socket_export(sock, &entry->sockinfo) != 0) { |
592 | 0 | free(entry); |
593 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
594 | 0 | return -1; |
595 | 0 | } |
596 | 0 | memset(&entry->all_link, 0, sizeof(entry->all_link)); |
597 | 0 | memset(&entry->target_link, 0, sizeof(entry->target_link)); |
598 | 0 | entry->added_at = h2o_now(h2o_socket_get_loop(sock)); |
599 | 0 | entry->target = target; |
600 | |
|
601 | 0 | __sync_add_and_fetch(&pool->_shared.pooled_count, 1); |
602 | |
|
603 | 0 | pthread_mutex_lock(&pool->_shared.mutex); |
604 | 0 | h2o_linklist_insert(&pool->_shared.sockets, &entry->all_link); |
605 | 0 | h2o_linklist_insert(&pool->targets.entries[target]->_shared.sockets, &entry->target_link); |
606 | 0 | check_pool_expired_locked(pool, h2o_socket_get_loop(sock)); |
607 | 0 | pthread_mutex_unlock(&pool->_shared.mutex); |
608 | 0 | return 0; |
609 | 0 | } |
610 | | |
611 | | void h2o_socketpool_detach(h2o_socketpool_t *pool, h2o_socket_t *sock) |
612 | 0 | { |
613 | 0 | struct on_close_data_t *close_data = sock->on_close.data; |
614 | 0 | assert(close_data->pool == pool); |
615 | | |
616 | 0 | __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1); |
617 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
618 | |
|
619 | 0 | sock->on_close.cb = NULL; |
620 | 0 | sock->on_close.data = NULL; |
621 | 0 | free(close_data); |
622 | 0 | } |
623 | | |
624 | | int h2o_socketpool_can_keepalive(h2o_socketpool_t *pool) |
625 | 780 | { |
626 | 780 | return pool->timeout > 0; |
627 | 780 | } |