/src/h2o/lib/common/socketpool.c
Line | Count | Source |
1 | | /* |
2 | | * Copyright (c) 2014-2016 DeNA Co., Ltd., Kazuho Oku |
3 | | * |
4 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | | * of this software and associated documentation files (the "Software"), to |
6 | | * deal in the Software without restriction, including without limitation the |
7 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
8 | | * sell copies of the Software, and to permit persons to whom the Software is |
9 | | * furnished to do so, subject to the following conditions: |
10 | | * |
11 | | * The above copyright notice and this permission notice shall be included in |
12 | | * all copies or substantial portions of the Software. |
13 | | * |
14 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
19 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
20 | | * IN THE SOFTWARE. |
21 | | */ |
22 | | #include <assert.h> |
23 | | #include <errno.h> |
24 | | #include <netdb.h> |
25 | | #include <stdlib.h> |
26 | | #include <sys/socket.h> |
27 | | #include <sys/types.h> |
28 | | #include <sys/un.h> |
29 | | #include <netinet/in.h> |
30 | | #include "h2o/hostinfo.h" |
31 | | #include "h2o/linklist.h" |
32 | | #include "h2o/socketpool.h" |
33 | | #include "h2o/string_.h" |
34 | | #include "h2o/socket.h" |
35 | | #include "h2o/balancer.h" |
36 | | |
37 | | /** |
38 | | * timeout will be set to this value when calculated less than this value |
39 | | */ |
40 | 6 | #define CHECK_EXPIRATION_MIN_INTERVAL 1000 |
41 | | |
42 | | struct pool_entry_t { |
43 | | h2o_socket_export_t sockinfo; |
44 | | size_t target; |
45 | | h2o_linklist_t all_link; |
46 | | h2o_linklist_t target_link; |
47 | | uint64_t added_at; |
48 | | }; |
49 | | |
50 | | struct st_h2o_socketpool_connect_request_t { |
51 | | void *data; |
52 | | h2o_socketpool_connect_cb cb; |
53 | | h2o_socketpool_t *pool; |
54 | | h2o_loop_t *loop; |
55 | | h2o_hostinfo_getaddr_req_t *getaddr_req; |
56 | | h2o_socket_t *sock; |
57 | | h2o_multithread_receiver_t *getaddr_receiver; |
58 | | size_t selected_target; |
59 | | size_t remaining_try_count; |
60 | | struct { |
61 | | char *tried; |
62 | | } lb; |
63 | | h2o_iovec_t alpn_protos; |
64 | | }; |
65 | | |
66 | | struct on_close_data_t { |
67 | | h2o_socketpool_t *pool; |
68 | | size_t target; |
69 | | }; |
70 | | |
71 | | static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen); |
72 | | static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req); |
73 | | |
74 | | static void destroy_detached(struct pool_entry_t *entry) |
75 | 0 | { |
76 | 0 | h2o_socket_dispose_export(&entry->sockinfo); |
77 | 0 | free(entry); |
78 | 0 | } |
79 | | |
80 | | static void destroy_attached(struct pool_entry_t *entry) |
81 | 0 | { |
82 | 0 | h2o_linklist_unlink(&entry->all_link); |
83 | 0 | h2o_linklist_unlink(&entry->target_link); |
84 | 0 | destroy_detached(entry); |
85 | 0 | } |
86 | | |
87 | | /* caller should lock the mutex */ |
88 | | static uint64_t destroy_expired_locked(h2o_socketpool_t *pool) |
89 | 2.97k | { |
90 | 2.97k | if (pool->_interval_cb.loop != NULL) { |
91 | 2.97k | uint64_t now_ms = h2o_now(pool->_interval_cb.loop); |
92 | 2.97k | uint64_t expire_before = now_ms - pool->timeout; |
93 | 2.97k | while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { |
94 | 0 | struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next); |
95 | 0 | if (entry->added_at > expire_before) { |
96 | 0 | return entry->added_at + pool->timeout - now_ms; |
97 | 0 | } |
98 | 0 | destroy_attached(entry); |
99 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
100 | 0 | __sync_sub_and_fetch(&pool->_shared.pooled_count, 1); |
101 | 0 | } |
102 | 2.97k | } |
103 | 2.97k | return UINT64_MAX; |
104 | 2.97k | } |
105 | | |
106 | | /* caller should lock the mutex */ |
107 | | static void check_pool_expired_locked(h2o_socketpool_t *pool, h2o_loop_t *this_loop) |
108 | 2.97k | { |
109 | 2.97k | uint64_t next_expired = destroy_expired_locked(pool); |
110 | 2.97k | if (next_expired != UINT64_MAX) { |
111 | 0 | if (this_loop == pool->_interval_cb.loop && !h2o_timer_is_linked(&pool->_interval_cb.timeout)) { |
112 | 0 | if (next_expired < CHECK_EXPIRATION_MIN_INTERVAL) |
113 | 0 | next_expired = CHECK_EXPIRATION_MIN_INTERVAL; |
114 | 0 | h2o_timer_link(pool->_interval_cb.loop, next_expired, &pool->_interval_cb.timeout); |
115 | 0 | } |
116 | 0 | } |
117 | 2.97k | } |
118 | | |
119 | | static void on_timeout(h2o_timer_t *timeout) |
120 | 6 | { |
121 | | /* decrease the frequency of this function being called; the expiration |
122 | | * check can be (should be) performed in the `connect` function as well |
123 | | */ |
124 | 6 | h2o_socketpool_t *pool = H2O_STRUCT_FROM_MEMBER(h2o_socketpool_t, _interval_cb.timeout, timeout); |
125 | | |
126 | 6 | if (pthread_mutex_trylock(&pool->_shared.mutex) == 0) { |
127 | 6 | check_pool_expired_locked(pool, pool->_interval_cb.loop); |
128 | 6 | pthread_mutex_unlock(&pool->_shared.mutex); |
129 | 6 | } |
130 | 6 | } |
131 | | |
132 | | static void common_init(h2o_socketpool_t *pool, h2o_socketpool_target_t **targets, size_t num_targets, size_t capacity, |
133 | | h2o_balancer_t *balancer) |
134 | 6 | { |
135 | 6 | memset(pool, 0, sizeof(*pool)); |
136 | | |
137 | 6 | pool->capacity = capacity; |
138 | 6 | pool->timeout = 2000; |
139 | | |
140 | 6 | pthread_mutex_init(&pool->_shared.mutex, NULL); |
141 | 6 | h2o_linklist_init_anchor(&pool->_shared.sockets); |
142 | | |
143 | 6 | h2o_vector_reserve(NULL, &pool->targets, num_targets); |
144 | 9 | for (; pool->targets.size < num_targets; ++pool->targets.size) |
145 | 3 | pool->targets.entries[pool->targets.size] = targets[pool->targets.size]; |
146 | | |
147 | 6 | pool->balancer = balancer; |
148 | 6 | pool->address_family = AF_UNSPEC; |
149 | 6 | } |
150 | | |
151 | | h2o_socketpool_target_type_t detect_target_type(h2o_url_t *url, struct sockaddr_storage *sa, socklen_t *salen) |
152 | 3 | { |
153 | 3 | memset(sa, 0, sizeof(*sa)); |
154 | 3 | const char *to_sun_err = h2o_url_host_to_sun(url->host, (struct sockaddr_un *)sa); |
155 | 3 | if (to_sun_err == h2o_url_host_to_sun_err_is_not_unix_socket) { |
156 | 0 | sa->ss_family = AF_INET; |
157 | 0 | struct sockaddr_in *sin = (struct sockaddr_in *)sa; |
158 | 0 | *salen = sizeof(*sin); |
159 | |
|
160 | 0 | if (h2o_hostinfo_aton(url->host, &sin->sin_addr) == 0) { |
161 | 0 | sin->sin_port = htons(h2o_url_get_port(url)); |
162 | 0 | return H2O_SOCKETPOOL_TYPE_SOCKADDR; |
163 | 0 | } else { |
164 | 0 | return H2O_SOCKETPOOL_TYPE_NAMED; |
165 | 0 | } |
166 | 3 | } else { |
167 | 3 | assert(to_sun_err == NULL); |
168 | 3 | *salen = sizeof(struct sockaddr_un); |
169 | 3 | return H2O_SOCKETPOOL_TYPE_SOCKADDR; |
170 | 3 | } |
171 | 3 | } |
172 | | |
173 | | h2o_socketpool_target_t *h2o_socketpool_create_target(h2o_url_t *origin, h2o_socketpool_target_conf_t *lb_target_conf) |
174 | 3 | { |
175 | 3 | struct sockaddr_storage sa; |
176 | 3 | socklen_t salen; |
177 | | |
178 | 3 | h2o_socketpool_target_t *target = h2o_mem_alloc(sizeof(*target)); |
179 | 3 | h2o_url_copy(NULL, &target->url, origin); |
180 | 3 | assert(target->url.host.base[target->url.host.len] == '\0'); /* needs to be null-terminated in order to be used in SNI */ |
181 | 3 | target->type = detect_target_type(origin, &sa, &salen); |
182 | 3 | if (!(target->type == H2O_SOCKETPOOL_TYPE_SOCKADDR && sa.ss_family == AF_UNIX)) { |
183 | 0 | h2o_strtolower(target->url.authority.base, target->url.authority.len); |
184 | 0 | h2o_strtolower(target->url.host.base, target->url.host.len); |
185 | 0 | } |
186 | | |
187 | 3 | switch (target->type) { |
188 | 0 | case H2O_SOCKETPOOL_TYPE_NAMED: |
189 | 0 | target->peer.named_serv.base = h2o_mem_alloc(sizeof(H2O_UINT16_LONGEST_STR)); |
190 | 0 | target->peer.named_serv.len = sprintf(target->peer.named_serv.base, "%u", (unsigned)h2o_url_get_port(&target->url)); |
191 | 0 | break; |
192 | 3 | case H2O_SOCKETPOOL_TYPE_SOCKADDR: |
193 | 3 | assert(salen <= sizeof(target->peer.sockaddr.bytes)); |
194 | 3 | memcpy(&target->peer.sockaddr.bytes, &sa, salen); |
195 | 3 | target->peer.sockaddr.len = salen; |
196 | 3 | break; |
197 | 3 | } |
198 | 3 | target->_shared.leased_count = 0; |
199 | 3 | if (lb_target_conf != NULL) |
200 | 0 | target->conf.weight_m1 = lb_target_conf->weight_m1; |
201 | 3 | else { |
202 | 3 | target->conf.weight_m1 = 0; |
203 | 3 | } |
204 | | |
205 | 3 | h2o_linklist_init_anchor(&target->_shared.sockets); |
206 | 3 | return target; |
207 | 3 | } |
208 | | |
209 | | void h2o_socketpool_init_specific(h2o_socketpool_t *pool, size_t capacity, h2o_socketpool_target_t **targets, size_t num_targets, |
210 | | h2o_balancer_t *balancer) |
211 | 3 | { |
212 | 3 | if (balancer == NULL) |
213 | 3 | balancer = h2o_balancer_create_rr(); |
214 | 3 | common_init(pool, targets, num_targets, capacity, balancer); |
215 | 3 | } |
216 | | |
217 | | int h2o_socketpool_is_global(h2o_socketpool_t *pool) |
218 | 2.96k | { |
219 | 2.96k | return pool->balancer == NULL; |
220 | 2.96k | } |
221 | | |
222 | | void h2o_socketpool_init_global(h2o_socketpool_t *pool, size_t capacity) |
223 | 3 | { |
224 | 3 | common_init(pool, NULL, 0, capacity, NULL); |
225 | 3 | } |
226 | | |
227 | | void h2o_socketpool_destroy_target(h2o_socketpool_target_t *target) |
228 | 0 | { |
229 | 0 | switch (target->type) { |
230 | 0 | case H2O_SOCKETPOOL_TYPE_NAMED: |
231 | 0 | free(target->peer.named_serv.base); |
232 | 0 | break; |
233 | 0 | case H2O_SOCKETPOOL_TYPE_SOCKADDR: |
234 | 0 | break; |
235 | 0 | } |
236 | 0 | free(target->url.authority.base); |
237 | 0 | free(target->url.host.base); |
238 | 0 | free(target->url.path.base); |
239 | 0 | free(target); |
240 | 0 | } |
241 | | |
242 | | void h2o_socketpool_dispose(h2o_socketpool_t *pool) |
243 | 0 | { |
244 | 0 | size_t i; |
245 | |
|
246 | 0 | pthread_mutex_lock(&pool->_shared.mutex); |
247 | 0 | while (!h2o_linklist_is_empty(&pool->_shared.sockets)) { |
248 | 0 | struct pool_entry_t *entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, pool->_shared.sockets.next); |
249 | 0 | destroy_attached(entry); |
250 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
251 | 0 | __sync_sub_and_fetch(&pool->_shared.pooled_count, 1); |
252 | 0 | } |
253 | 0 | pthread_mutex_unlock(&pool->_shared.mutex); |
254 | 0 | pthread_mutex_destroy(&pool->_shared.mutex); |
255 | |
|
256 | 0 | if (pool->balancer != NULL) { |
257 | 0 | pool->balancer->callbacks->destroy(pool->balancer); |
258 | 0 | } |
259 | |
|
260 | 0 | if (pool->_ssl_ctx != NULL) |
261 | 0 | SSL_CTX_free(pool->_ssl_ctx); |
262 | |
|
263 | 0 | if (pool->_interval_cb.loop != NULL) |
264 | 0 | h2o_socketpool_unregister_loop(pool, pool->_interval_cb.loop); |
265 | |
|
266 | 0 | for (i = 0; i < pool->targets.size; i++) { |
267 | 0 | h2o_socketpool_destroy_target(pool->targets.entries[i]); |
268 | 0 | } |
269 | 0 | free(pool->targets.entries); |
270 | 0 | } |
271 | | |
272 | | void h2o_socketpool_set_ssl_ctx(h2o_socketpool_t *pool, SSL_CTX *ssl_ctx) |
273 | 3 | { |
274 | 3 | if (pool->_ssl_ctx != NULL) |
275 | 0 | SSL_CTX_free(pool->_ssl_ctx); |
276 | 3 | if (ssl_ctx != NULL) |
277 | 0 | SSL_CTX_up_ref(ssl_ctx); |
278 | 3 | pool->_ssl_ctx = ssl_ctx; |
279 | 3 | } |
280 | | |
281 | | void h2o_socketpool_register_loop(h2o_socketpool_t *pool, h2o_loop_t *loop) |
282 | 6 | { |
283 | 6 | if (pool->_interval_cb.loop != NULL) |
284 | 0 | return; |
285 | | |
286 | 6 | pool->_interval_cb.loop = loop; |
287 | 6 | h2o_timer_init(&pool->_interval_cb.timeout, on_timeout); |
288 | 6 | h2o_timer_link(loop, CHECK_EXPIRATION_MIN_INTERVAL, &pool->_interval_cb.timeout); |
289 | 6 | } |
290 | | |
291 | | void h2o_socketpool_unregister_loop(h2o_socketpool_t *pool, h2o_loop_t *loop) |
292 | 0 | { |
293 | 0 | if (pool->_interval_cb.loop != loop) |
294 | 0 | return; |
295 | 0 | h2o_timer_unlink(&pool->_interval_cb.timeout); |
296 | 0 | pool->_interval_cb.loop = NULL; |
297 | 0 | } |
298 | | |
299 | | static void call_connect_cb(h2o_socketpool_connect_request_t *req, const char *errstr) |
300 | 2.87k | { |
301 | 2.87k | h2o_socketpool_connect_cb cb = req->cb; |
302 | 2.87k | h2o_socket_t *sock = req->sock; |
303 | 2.87k | void *data = req->data; |
304 | 2.87k | h2o_socketpool_target_t *selected_target = req->pool->targets.entries[req->selected_target]; |
305 | | |
306 | 2.87k | if (req->lb.tried != NULL) { |
307 | 2.87k | free(req->lb.tried); |
308 | 2.87k | } |
309 | | |
310 | 2.87k | free(req); |
311 | | |
312 | 2.87k | if (sock != NULL) |
313 | 2.87k | sock->data = NULL; |
314 | 2.87k | cb(sock, errstr, data, &selected_target->url); |
315 | 2.87k | } |
316 | | |
317 | | static void try_connect(h2o_socketpool_connect_request_t *req) |
318 | 2.96k | { |
319 | 2.96k | h2o_socketpool_target_t *target; |
320 | | |
321 | 2.96k | req->remaining_try_count--; |
322 | | |
323 | 2.96k | if (req->lb.tried != NULL) { |
324 | 2.96k | if (req->pool->targets.size > 1) { |
325 | 0 | req->selected_target = req->pool->balancer->callbacks->select_(req->pool->balancer, &req->pool->targets, req->lb.tried); |
326 | 0 | assert(!req->lb.tried[req->selected_target]); |
327 | 0 | req->lb.tried[req->selected_target] = 1; |
328 | 2.96k | } else { |
329 | 2.96k | req->selected_target = 0; |
330 | 2.96k | } |
331 | 2.96k | } |
332 | 2.96k | target = req->pool->targets.entries[req->selected_target]; |
333 | 2.96k | __sync_add_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
334 | | |
335 | 2.96k | switch (target->type) { |
336 | 0 | case H2O_SOCKETPOOL_TYPE_NAMED: |
337 | | /* resolve the name, and connect */ |
338 | 0 | req->getaddr_req = |
339 | 0 | h2o_hostinfo_getaddr(req->getaddr_receiver, target->url.host, target->peer.named_serv, req->pool->address_family, |
340 | 0 | SOCK_STREAM, IPPROTO_TCP, AI_ADDRCONFIG | AI_NUMERICSERV, on_getaddr, req); |
341 | 0 | break; |
342 | 2.96k | case H2O_SOCKETPOOL_TYPE_SOCKADDR: |
343 | | /* connect (using sockaddr_in) */ |
344 | 2.96k | start_connect(req, (void *)&target->peer.sockaddr.bytes, target->peer.sockaddr.len); |
345 | 2.96k | break; |
346 | 2.96k | } |
347 | 2.96k | } |
348 | | |
349 | | static void on_handshake_complete(h2o_socket_t *sock, const char *err) |
350 | 0 | { |
351 | 0 | h2o_socketpool_connect_request_t *req = sock->data; |
352 | |
|
353 | 0 | assert(req->sock == sock); |
354 | | |
355 | 0 | if (err == h2o_socket_error_ssl_cert_name_mismatch && (SSL_CTX_get_verify_mode(req->pool->_ssl_ctx) & SSL_VERIFY_PEER) == 0) { |
356 | | /* ignore CN mismatch if we are not verifying peer */ |
357 | 0 | } else if (err != NULL) { |
358 | 0 | h2o_socket_close(sock); |
359 | 0 | req->sock = NULL; |
360 | 0 | } |
361 | |
|
362 | 0 | call_connect_cb(req, err); |
363 | 0 | } |
364 | | |
365 | | static void on_connect(h2o_socket_t *sock, const char *err) |
366 | 2.87k | { |
367 | 2.87k | h2o_socketpool_connect_request_t *req = sock->data; |
368 | | |
369 | 2.87k | assert(req->sock == sock); |
370 | | |
371 | 2.87k | if (err != NULL) { |
372 | 0 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
373 | 0 | h2o_socket_close(sock); |
374 | 0 | if (req->remaining_try_count > 0) { |
375 | 0 | try_connect(req); |
376 | 0 | return; |
377 | 0 | } |
378 | 0 | __sync_sub_and_fetch(&req->pool->_shared.count, 1); |
379 | 0 | req->sock = NULL; |
380 | 2.87k | } else { |
381 | 2.87k | h2o_url_t *target_url = &req->pool->targets.entries[req->selected_target]->url; |
382 | 2.87k | if (target_url->scheme->is_ssl) { |
383 | 0 | assert(req->pool->_ssl_ctx != NULL && "h2o_socketpool_set_ssl_ctx must be called for a pool that contains SSL target"); |
384 | 0 | h2o_socket_ssl_handshake(sock, req->pool->_ssl_ctx, target_url->host.base, req->alpn_protos, on_handshake_complete); |
385 | 0 | return; |
386 | 0 | } |
387 | 2.87k | } |
388 | | |
389 | 2.87k | call_connect_cb(req, err); |
390 | 2.87k | } |
391 | | |
392 | | static void on_close(void *data) |
393 | 2.96k | { |
394 | 2.96k | struct on_close_data_t *close_data = data; |
395 | 2.96k | h2o_socketpool_t *pool = close_data->pool; |
396 | 2.96k | __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1); |
397 | 2.96k | free(close_data); |
398 | 2.96k | __sync_sub_and_fetch(&pool->_shared.count, 1); |
399 | 2.96k | } |
400 | | |
401 | | static void start_connect(h2o_socketpool_connect_request_t *req, struct sockaddr *addr, socklen_t addrlen) |
402 | 2.96k | { |
403 | 2.96k | struct on_close_data_t *close_data; |
404 | | |
405 | 2.96k | req->sock = h2o_socket_connect(req->loop, addr, addrlen, on_connect, NULL); |
406 | 2.96k | if (req->sock == NULL) { |
407 | 0 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
408 | 0 | if (req->remaining_try_count > 0) { |
409 | 0 | try_connect(req); |
410 | 0 | return; |
411 | 0 | } |
412 | 0 | __sync_sub_and_fetch(&req->pool->_shared.count, 1); |
413 | 0 | call_connect_cb(req, h2o_socket_error_conn_fail); |
414 | 0 | return; |
415 | 0 | } |
416 | 2.96k | close_data = h2o_mem_alloc(sizeof(*close_data)); |
417 | 2.96k | close_data->pool = req->pool; |
418 | 2.96k | close_data->target = req->selected_target; |
419 | 2.96k | req->sock->data = req; |
420 | 2.96k | req->sock->on_close.cb = on_close; |
421 | 2.96k | req->sock->on_close.data = close_data; |
422 | 2.96k | } |
423 | | |
424 | | static void on_getaddr(h2o_hostinfo_getaddr_req_t *getaddr_req, const char *errstr, struct addrinfo *res, void *_req) |
425 | 0 | { |
426 | 0 | h2o_socketpool_connect_request_t *req = _req; |
427 | |
|
428 | 0 | assert(getaddr_req == req->getaddr_req); |
429 | 0 | req->getaddr_req = NULL; |
430 | |
|
431 | 0 | if (errstr != NULL) { |
432 | 0 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
433 | 0 | if (req->remaining_try_count > 0) { |
434 | 0 | try_connect(req); |
435 | 0 | return; |
436 | 0 | } |
437 | 0 | __sync_sub_and_fetch(&req->pool->_shared.count, 1); |
438 | 0 | call_connect_cb(req, errstr); |
439 | 0 | return; |
440 | 0 | } |
441 | | |
442 | 0 | struct addrinfo *selected = h2o_hostinfo_select_one(res); |
443 | 0 | start_connect(req, selected->ai_addr, selected->ai_addrlen); |
444 | 0 | } |
445 | | |
446 | | static size_t lookup_target(h2o_socketpool_t *pool, h2o_url_t *url) |
447 | 0 | { |
448 | 0 | uint16_t port = h2o_url_get_port(url); |
449 | 0 | size_t i = 0; |
450 | 0 | for (; i != pool->targets.size; ++i) { |
451 | 0 | h2o_socketpool_target_t *target = pool->targets.entries[i]; |
452 | 0 | if (target->url.scheme != url->scheme) |
453 | 0 | continue; |
454 | 0 | if (h2o_url_get_port(&target->url) != port) |
455 | 0 | continue; |
456 | 0 | if (!h2o_url_hosts_are_equal(&target->url, url)) |
457 | 0 | continue; |
458 | 0 | return i; |
459 | 0 | } |
460 | 0 | return SIZE_MAX; |
461 | 0 | } |
462 | | |
463 | | void h2o_socketpool_connect(h2o_socketpool_connect_request_t **_req, h2o_socketpool_t *pool, h2o_url_t *url, h2o_loop_t *loop, |
464 | | h2o_multithread_receiver_t *getaddr_receiver, h2o_iovec_t alpn_protos, h2o_socketpool_connect_cb cb, |
465 | | void *data) |
466 | 2.96k | { |
467 | 2.96k | struct pool_entry_t *entry = NULL; |
468 | 2.96k | struct on_close_data_t *close_data; |
469 | | |
470 | 2.96k | if (_req != NULL) |
471 | 2.96k | *_req = NULL; |
472 | | |
473 | 2.96k | size_t target = SIZE_MAX; |
474 | 2.96k | h2o_linklist_t *sockets = NULL; |
475 | | |
476 | | /* As an optimization, avoid trying to reuse (and obtaining lock for the purpose) when the pool is not going to retain sockets. |
477 | | * The optimization is limited to non-global socket pools, as the rest of the logic depends on a valid entry within |
478 | | * `pool->targets` to be targeted, that a global pool allocates dynamically. */ |
479 | 2.96k | if (pool->timeout == 0 && !h2o_socketpool_is_global(pool)) |
480 | 0 | goto SkipLookup; |
481 | | |
482 | | /* fetch an entry and return it */ |
483 | 2.96k | pthread_mutex_lock(&pool->_shared.mutex); |
484 | 2.96k | check_pool_expired_locked(pool, loop); |
485 | | |
486 | | /* TODO lookup outside this critical section */ |
487 | 2.96k | if (h2o_socketpool_is_global(pool)) { |
488 | 0 | target = lookup_target(pool, url); |
489 | 0 | if (target == SIZE_MAX) { |
490 | 0 | h2o_vector_reserve(NULL, &pool->targets, pool->targets.size + 1); |
491 | 0 | pool->targets.entries[pool->targets.size++] = h2o_socketpool_create_target(url, NULL); |
492 | 0 | target = pool->targets.size - 1; |
493 | 0 | } |
494 | 0 | sockets = &pool->targets.entries[target]->_shared.sockets; |
495 | 2.96k | } else { |
496 | 2.96k | sockets = &pool->_shared.sockets; |
497 | 2.96k | } |
498 | 2.96k | assert(pool->targets.size != 0); |
499 | | |
500 | 2.96k | while (!h2o_linklist_is_empty(sockets)) { |
501 | 0 | if (h2o_socketpool_is_global(pool)) { |
502 | 0 | entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, target_link, sockets->next); |
503 | 0 | } else { |
504 | 0 | entry = H2O_STRUCT_FROM_MEMBER(struct pool_entry_t, all_link, sockets->next); |
505 | 0 | } |
506 | 0 | h2o_linklist_unlink(&entry->all_link); |
507 | 0 | h2o_linklist_unlink(&entry->target_link); |
508 | 0 | pthread_mutex_unlock(&pool->_shared.mutex); |
509 | |
|
510 | 0 | __sync_sub_and_fetch(&pool->_shared.pooled_count, 1); |
511 | | |
512 | | /* test if the connection is still alive */ |
513 | 0 | char buf[1]; |
514 | 0 | ssize_t rret = recv(entry->sockinfo.fd, buf, 1, MSG_PEEK); |
515 | 0 | if (rret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { |
516 | | /* yes! return it */ |
517 | 0 | size_t entry_target = entry->target; |
518 | 0 | h2o_socket_t *sock = h2o_socket_import(loop, &entry->sockinfo); |
519 | 0 | free(entry); |
520 | 0 | close_data = h2o_mem_alloc(sizeof(*close_data)); |
521 | 0 | close_data->pool = pool; |
522 | 0 | close_data->target = entry_target; |
523 | 0 | sock->on_close.cb = on_close; |
524 | 0 | sock->on_close.data = close_data; |
525 | 0 | __sync_add_and_fetch(&pool->targets.entries[entry_target]->_shared.leased_count, 1); |
526 | 0 | cb(sock, NULL, data, &pool->targets.entries[entry_target]->url); |
527 | 0 | return; |
528 | 0 | } |
529 | | |
530 | | /* connection is dead, report, close, and retry */ |
531 | 0 | if (rret <= 0) { |
532 | 0 | static long counter = 0; |
533 | 0 | if (__sync_fetch_and_add(&counter, 1) == 0) |
534 | 0 | h2o_error_printf("[WARN] detected close by upstream before the expected timeout (see issue #679)\n"); |
535 | 0 | } else { |
536 | 0 | static long counter = 0; |
537 | 0 | if (__sync_fetch_and_add(&counter, 1) == 0) |
538 | 0 | h2o_error_printf("[WARN] unexpectedly received data to a pooled socket (see issue #679)\n"); |
539 | 0 | } |
540 | 0 | destroy_detached(entry); |
541 | 0 | pthread_mutex_lock(&pool->_shared.mutex); |
542 | 0 | } |
543 | 2.96k | pthread_mutex_unlock(&pool->_shared.mutex); |
544 | | |
545 | 2.96k | SkipLookup: |
546 | | /* FIXME respect `capacity` */ |
547 | 2.96k | __sync_add_and_fetch(&pool->_shared.count, 1); |
548 | | |
549 | | /* prepare request object */ |
550 | 2.96k | h2o_socketpool_connect_request_t *req = h2o_mem_alloc(sizeof(*req)); |
551 | 2.96k | *req = (h2o_socketpool_connect_request_t){data, cb, pool, loop}; |
552 | | |
553 | 2.96k | if (_req != NULL) |
554 | 2.96k | *_req = req; |
555 | 2.96k | req->getaddr_receiver = getaddr_receiver; |
556 | 2.96k | req->alpn_protos = alpn_protos; |
557 | | |
558 | 2.96k | req->selected_target = target; |
559 | 2.96k | if (target == SIZE_MAX) { |
560 | 2.96k | req->lb.tried = h2o_mem_alloc(sizeof(req->lb.tried[0]) * pool->targets.size); |
561 | 2.96k | memset(req->lb.tried, 0, sizeof(req->lb.tried[0]) * pool->targets.size); |
562 | 2.96k | req->remaining_try_count = pool->targets.size; |
563 | 2.96k | } else { |
564 | 0 | req->remaining_try_count = 1; |
565 | 0 | } |
566 | 2.96k | try_connect(req); |
567 | 2.96k | } |
568 | | |
569 | | void h2o_socketpool_cancel_connect(h2o_socketpool_connect_request_t *req) |
570 | 90 | { |
571 | 90 | if (req->getaddr_req != NULL) { |
572 | 0 | h2o_hostinfo_getaddr_cancel(req->getaddr_req); |
573 | 0 | req->getaddr_req = NULL; |
574 | 0 | } |
575 | 90 | if (req->sock != NULL) |
576 | 90 | h2o_socket_close(req->sock); |
577 | 90 | if (req->lb.tried != NULL) { |
578 | 90 | free(req->lb.tried); |
579 | 90 | __sync_sub_and_fetch(&req->pool->targets.entries[req->selected_target]->_shared.leased_count, 1); |
580 | 90 | } |
581 | 90 | free(req); |
582 | 90 | } |
583 | | |
584 | | int h2o_socketpool_return(h2o_socketpool_t *pool, h2o_socket_t *sock) |
585 | 0 | { |
586 | 0 | struct pool_entry_t *entry; |
587 | 0 | struct on_close_data_t *close_data; |
588 | 0 | size_t target; |
589 | |
|
590 | 0 | close_data = sock->on_close.data; |
591 | 0 | target = close_data->target; |
592 | | /* reset the on_close callback */ |
593 | 0 | assert(close_data->pool == pool); |
594 | 0 | __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1); |
595 | 0 | free(close_data); |
596 | 0 | sock->on_close.cb = NULL; |
597 | 0 | sock->on_close.data = NULL; |
598 | |
|
599 | 0 | entry = h2o_mem_alloc(sizeof(*entry)); |
600 | 0 | if (h2o_socket_export(sock, &entry->sockinfo) != 0) { |
601 | 0 | free(entry); |
602 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
603 | 0 | return -1; |
604 | 0 | } |
605 | 0 | memset(&entry->all_link, 0, sizeof(entry->all_link)); |
606 | 0 | memset(&entry->target_link, 0, sizeof(entry->target_link)); |
607 | 0 | entry->added_at = h2o_now(h2o_socket_get_loop(sock)); |
608 | 0 | entry->target = target; |
609 | |
|
610 | 0 | __sync_add_and_fetch(&pool->_shared.pooled_count, 1); |
611 | |
|
612 | 0 | pthread_mutex_lock(&pool->_shared.mutex); |
613 | 0 | h2o_linklist_insert(&pool->_shared.sockets, &entry->all_link); |
614 | 0 | h2o_linklist_insert(&pool->targets.entries[target]->_shared.sockets, &entry->target_link); |
615 | 0 | check_pool_expired_locked(pool, h2o_socket_get_loop(sock)); |
616 | 0 | pthread_mutex_unlock(&pool->_shared.mutex); |
617 | 0 | return 0; |
618 | 0 | } |
619 | | |
620 | | void h2o_socketpool_detach(h2o_socketpool_t *pool, h2o_socket_t *sock) |
621 | 0 | { |
622 | 0 | struct on_close_data_t *close_data = sock->on_close.data; |
623 | 0 | assert(close_data->pool == pool); |
624 | | |
625 | 0 | __sync_sub_and_fetch(&pool->targets.entries[close_data->target]->_shared.leased_count, 1); |
626 | 0 | __sync_sub_and_fetch(&pool->_shared.count, 1); |
627 | |
|
628 | 0 | sock->on_close.cb = NULL; |
629 | 0 | sock->on_close.data = NULL; |
630 | 0 | free(close_data); |
631 | 0 | } |
632 | | |
633 | | int h2o_socketpool_can_keepalive(h2o_socketpool_t *pool) |
634 | 2.87k | { |
635 | 2.87k | return pool->timeout > 0; |
636 | 2.87k | } |