/src/freeradius-server/src/lib/server/pool.c
Line | Count | Source |
1 | | /* |
2 | | * This program is free software; you can redistribute it and/or modify |
3 | | * it under the terms of the GNU General Public License as published by |
4 | | * the Free Software Foundation; either version 2 of the License, or |
5 | | * (at your option) any later version. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA |
15 | | */ |
16 | | |
17 | | /** |
18 | | * @file pool.c |
19 | | * @brief Handle pools of connections (threads, sockets, etc.) |
20 | | * @note This API must be used by all modules in the public distribution that |
21 | | * maintain pools of connections. |
22 | | * |
23 | | * @copyright 2012 The FreeRADIUS server project |
24 | | * @copyright 2012 Alan DeKok (aland@deployingradius.com) |
25 | | */ |
26 | | RCSID("$Id: 185e43b6f392295097c6327b91955835522520ad $") |
27 | | |
28 | 0 | #define LOG_PREFIX pool->log_prefix |
29 | | |
30 | | #include <freeradius-devel/server/main_config.h> |
31 | | #include <freeradius-devel/server/modpriv.h> |
32 | | #include <freeradius-devel/server/trigger.h> |
33 | | |
34 | | #include <freeradius-devel/util/debug.h> |
35 | | |
36 | | #include <freeradius-devel/util/misc.h> |
37 | | |
38 | | |
39 | | typedef struct fr_pool_connection_s fr_pool_connection_t; |
40 | | |
41 | | static int connection_check(fr_pool_t *pool, request_t *request); |
42 | | static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule); |
43 | | |
44 | | /** An individual connection within the connection pool |
45 | | * |
46 | | * Defines connection counters, timestamps, and holds a pointer to the |
47 | | * connection handle itself. |
48 | | * |
49 | | * @see fr_pool_t |
50 | | */ |
51 | | struct fr_pool_connection_s { |
52 | | fr_pool_connection_t *prev; //!< Previous connection in list. |
53 | | fr_pool_connection_t *next; //!< Next connection in list. |
54 | | fr_heap_index_t heap_id; //!< For the next connection heap. |
55 | | |
56 | | fr_time_t created; //!< Time connection was created. |
57 | | fr_time_t last_reserved; //!< Last time the connection was reserved. |
58 | | |
59 | | fr_time_t last_released; //!< Time the connection was released. |
60 | | |
61 | | uint32_t num_uses; //!< Number of times the connection has been reserved. |
62 | | uint64_t number; //!< Unique ID assigned when the connection is created, |
63 | | //!< these will monotonically increase over the |
64 | | //!< lifetime of the connection pool. |
65 | | void *connection; //!< Pointer to whatever the module uses for a connection |
66 | | //!< handle. |
67 | | bool in_use; //!< Whether the connection is currently reserved. |
68 | | |
69 | | bool needs_reconnecting; //!< Reconnect this connection before use. |
70 | | |
71 | | #ifdef PTHREAD_DEBUG |
72 | | pthread_t pthread_id; //!< When 'in_use == true'. |
73 | | #endif |
74 | | }; |
75 | | |
76 | | /** A connection pool |
77 | | * |
78 | | * Defines the configuration of the connection pool, all the counters and |
79 | | * timestamps related to the connection pool, the mutex that stops multiple |
80 | | * threads leaving the pool in an inconsistent state, and the callbacks |
81 | | * required to open, close and check the status of connections within the pool. |
82 | | * |
83 | | * @see connection |
84 | | */ |
85 | | struct fr_pool_s { |
86 | | int ref; //!< Reference counter to prevent connection |
87 | | //!< pool being freed multiple times. |
88 | | uint32_t start; //!< Number of initial connections. |
89 | | uint32_t min; //!< Minimum number of concurrent connections to keep open. |
90 | | uint32_t max; //!< Maximum number of concurrent connections to allow. |
91 | | uint32_t max_pending; //!< Max number of pending connections to allow. |
92 | | uint32_t spare; //!< Number of spare connections to try. |
93 | | uint64_t max_uses; //!< Maximum number of times a connection can be used |
94 | | //!< before being closed. |
95 | | uint32_t pending_window; //!< Sliding window of pending connections. |
96 | | |
97 | | fr_time_delta_t retry_delay; //!< seconds to delay re-open after a failed open. |
98 | | fr_time_delta_t cleanup_interval; //!< Initial timer for how often we sweep the pool |
99 | | //!< for free connections. (0 is infinite). |
100 | | fr_time_delta_t delay_interval; //!< When we next do a cleanup. Initialized to |
101 | | //!< cleanup_interval, and increase from there based |
102 | | //!< on the delay. |
103 | | fr_time_delta_t lifetime; //!< How long a connection can be open before being |
104 | | //!< closed (irrespective of whether it's idle or not). |
105 | | fr_time_delta_t idle_timeout; //!< How long a connection can be idle before |
106 | | //!< being closed. |
107 | | fr_time_delta_t connect_timeout; //!< New connection timeout, enforced by the create |
108 | | //!< callback. |
109 | | |
110 | | bool spread; //!< If true we spread requests over the connections, |
111 | | //!< using the connection released longest ago, first. |
112 | | |
113 | | fr_heap_t *heap; //!< For the next connection heap |
114 | | |
115 | | fr_pool_connection_t *head; //!< Start of the connection list. |
116 | | fr_pool_connection_t *tail; //!< End of the connection list. |
117 | | |
118 | | pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making |
119 | | //!< modifications in threaded mode. |
120 | | pthread_cond_t done_spawn; //!< Threads that need to ensure no spawning is in progress, |
121 | | //!< should block on this condition if pending != 0. |
122 | | pthread_cond_t done_reconnecting; //!< Before calling the create callback, threads should |
123 | | //!< block on this condition if reconnecting == true. |
124 | | |
125 | | CONF_SECTION const *cs; //!< Configuration section holding the section of parsed |
126 | | //!< config file that relates to this pool. |
127 | | void *opaque; //!< Pointer to context data that will be passed to callbacks. |
128 | | |
129 | | char const *log_prefix; //!< Log prefix to prepend to all log messages created |
130 | | //!< by the connection pool code. |
131 | | |
132 | | bool triggers_enabled; //!< Whether we call the trigger functions. |
133 | | |
134 | | char const *trigger_prefix; //!< Prefix to prepend to names of all triggers |
135 | | //!< fired by the connection pool code. |
136 | | fr_pair_list_t trigger_args; //!< Arguments to make available in connection pool triggers. |
137 | | |
138 | | fr_time_delta_t held_trigger_min; //!< If a connection is held for less than the specified |
139 | | //!< period, fire a trigger. |
140 | | fr_time_delta_t held_trigger_max; //!< If a connection is held for longer than the specified |
141 | | //!< period, fire a trigger. |
142 | | |
143 | | fr_pool_connection_create_t create; //!< Function used to create new connections. |
144 | | fr_pool_connection_alive_t alive; //!< Function used to check status of connections. |
145 | | |
146 | | fr_pool_reconnect_t reconnect; //!< Called during connection pool reconnect. |
147 | | |
148 | | fr_pool_state_t state; //!< Stats and state of the connection pool. |
149 | | }; |
150 | | |
151 | | static const conf_parser_t pool_config[] = { |
152 | | { FR_CONF_OFFSET("start", fr_pool_t, start), .dflt = "0" }, |
153 | | { FR_CONF_OFFSET("min", fr_pool_t, min), .dflt = "0" }, |
154 | | { FR_CONF_OFFSET("max", fr_pool_t, max), .dflt_func = max_dflt }, |
155 | | { FR_CONF_OFFSET("max_pending", fr_pool_t, max_pending), .dflt = "0" }, |
156 | | { FR_CONF_OFFSET("spare", fr_pool_t, spare), .dflt = "3" }, |
157 | | { FR_CONF_OFFSET("uses", fr_pool_t, max_uses), .dflt = "0" }, |
158 | | { FR_CONF_OFFSET("lifetime", fr_pool_t, lifetime), .dflt = "0" }, |
159 | | { FR_CONF_OFFSET("cleanup_interval", fr_pool_t, cleanup_interval), .dflt = "30" }, |
160 | | { FR_CONF_OFFSET("idle_timeout", fr_pool_t, idle_timeout), .dflt = "60" }, |
161 | | { FR_CONF_OFFSET("connect_timeout", fr_pool_t, connect_timeout), .dflt = "3.0" }, |
162 | | { FR_CONF_OFFSET("held_trigger_min", fr_pool_t, held_trigger_min), .dflt = "0.0" }, |
163 | | { FR_CONF_OFFSET("held_trigger_max", fr_pool_t, held_trigger_max), .dflt = "0.5" }, |
164 | | { FR_CONF_OFFSET("retry_delay", fr_pool_t, retry_delay), .dflt = "1" }, |
165 | | { FR_CONF_OFFSET("spread", fr_pool_t, spread), .dflt = "no" }, |
166 | | CONF_PARSER_TERMINATOR |
167 | | }; |
168 | | |
169 | | static int max_dflt(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule) |
170 | 0 | { |
171 | 0 | char *strvalue; |
172 | |
|
173 | 0 | strvalue = talloc_asprintf(NULL, "%u", main_config->max_workers); |
174 | 0 | *out = cf_pair_alloc(cs, rule->name1, strvalue, T_OP_EQ, T_BARE_WORD, quote); |
175 | 0 | talloc_free(strvalue); |
176 | |
|
177 | 0 | return 0; |
178 | 0 | } |
179 | | |
180 | | /** Order connections by reserved most recently |
181 | | */ |
182 | | static int8_t last_reserved_cmp(void const *one, void const *two) |
183 | 0 | { |
184 | 0 | fr_pool_connection_t const *a = one, *b = two; |
185 | |
|
186 | 0 | return fr_time_cmp(a->last_reserved, b->last_reserved); |
187 | 0 | } |
188 | | |
189 | | /** Order connections by released longest ago |
190 | | */ |
191 | | static int8_t last_released_cmp(void const *one, void const *two) |
192 | 0 | { |
193 | 0 | fr_pool_connection_t const *a = one, *b = two; |
194 | |
|
195 | 0 | return fr_time_cmp(a->last_released, b->last_released); |
196 | 0 | } |
197 | | |
198 | | /** Removes a connection from the connection list |
199 | | * |
200 | | * @note Must be called with the mutex held. |
201 | | * |
202 | | * @param[in] pool to modify. |
203 | | * @param[in] this Connection to delete. |
204 | | */ |
205 | | static void connection_unlink(fr_pool_t *pool, fr_pool_connection_t *this) |
206 | 0 | { |
207 | 0 | if (this->prev) { |
208 | 0 | fr_assert(pool->head != this); |
209 | 0 | this->prev->next = this->next; |
210 | 0 | } else { |
211 | 0 | fr_assert(pool->head == this); |
212 | 0 | pool->head = this->next; |
213 | 0 | } |
214 | 0 | if (this->next) { |
215 | 0 | fr_assert(pool->tail != this); |
216 | 0 | this->next->prev = this->prev; |
217 | 0 | } else { |
218 | 0 | fr_assert(pool->tail == this); |
219 | 0 | pool->tail = this->prev; |
220 | 0 | } |
221 | |
|
222 | 0 | this->prev = this->next = NULL; |
223 | 0 | } |
224 | | |
225 | | /** Adds a connection to the head of the connection list |
226 | | * |
227 | | * @note Must be called with the mutex held. |
228 | | * |
229 | | * @param[in] pool to modify. |
230 | | * @param[in] this Connection to add. |
231 | | */ |
232 | | static void connection_link_head(fr_pool_t *pool, fr_pool_connection_t *this) |
233 | 0 | { |
234 | 0 | fr_assert(pool != NULL); |
235 | 0 | fr_assert(this != NULL); |
236 | 0 | fr_assert(pool->head != this); |
237 | 0 | fr_assert(pool->tail != this); |
238 | |
|
239 | 0 | if (pool->head) { |
240 | 0 | pool->head->prev = this; |
241 | 0 | } |
242 | |
|
243 | 0 | this->next = pool->head; |
244 | 0 | this->prev = NULL; |
245 | 0 | pool->head = this; |
246 | 0 | if (!pool->tail) { |
247 | 0 | fr_assert(this->next == NULL); |
248 | 0 | pool->tail = this; |
249 | 0 | } else { |
250 | 0 | fr_assert(this->next != NULL); |
251 | 0 | } |
252 | 0 | } |
253 | | |
254 | | /** Send a connection pool trigger. |
255 | | * |
256 | | * @param[in] pool to send trigger for. |
257 | | * @param[in] event trigger name suffix. |
258 | | */ |
259 | | static inline void fr_pool_trigger(fr_pool_t *pool, char const *event) |
260 | 0 | { |
261 | 0 | char name[128]; |
262 | |
|
263 | 0 | fr_assert(pool != NULL); |
264 | 0 | fr_assert(event != NULL); |
265 | |
|
266 | 0 | if (!pool->triggers_enabled) return; |
267 | | |
268 | 0 | snprintf(name, sizeof(name), "%s.%s", pool->trigger_prefix, event); |
269 | 0 | trigger(unlang_interpret_get_thread_default(), pool->cs, NULL, name, true, &pool->trigger_args); |
270 | 0 | } |
271 | | |
272 | | /** Find a connection handle in the connection list |
273 | | * |
274 | | * Walks over the list of connections searching for a specified connection |
275 | | * handle and returns the first connection that contains that pointer. |
276 | | * |
277 | | * @note Will lock mutex and only release mutex if connection handle |
278 | | * is not found, so will usually return will mutex held. |
279 | | * @note Must be called with the mutex free. |
280 | | * |
281 | | * @param[in] pool to search in. |
282 | | * @param[in] conn handle to search for. |
283 | | * @return |
284 | | * - Connection containing the specified handle. |
285 | | * - NULL if no such connection was found. |
286 | | */ |
287 | | static fr_pool_connection_t *connection_find(fr_pool_t *pool, void *conn) |
288 | 0 | { |
289 | 0 | fr_pool_connection_t *this; |
290 | |
|
291 | 0 | if (!pool || !conn) return NULL; |
292 | | |
293 | 0 | pthread_mutex_lock(&pool->mutex); |
294 | | |
295 | | /* |
296 | | * FIXME: This loop could be avoided if we passed a 'void |
297 | | * **connection' instead. We could use "offsetof" in |
298 | | * order to find top of the parent structure. |
299 | | */ |
300 | 0 | for (this = pool->head; this != NULL; this = this->next) { |
301 | 0 | if (this->connection == conn) { |
302 | | #ifdef PTHREAD_DEBUG |
303 | | pthread_t pthread_id; |
304 | | |
305 | | pthread_id = pthread_self(); |
306 | | fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0); |
307 | | #endif |
308 | |
|
309 | 0 | fr_assert(this->in_use == true); |
310 | 0 | return this; |
311 | 0 | } |
312 | 0 | } |
313 | | |
314 | 0 | pthread_mutex_unlock(&pool->mutex); |
315 | 0 | return NULL; |
316 | 0 | } |
317 | | |
318 | | /** Spawns a new connection |
319 | | * |
320 | | * Spawns a new connection using the create callback, and returns it for |
321 | | * adding to the connection list. |
322 | | * |
323 | | * @note Will call the 'open' trigger. |
324 | | * @note Must be called with the mutex free. |
325 | | * |
326 | | * @param[in] pool to modify. |
327 | | * @param[in] request The current request. |
328 | | * @param[in] now Current time. |
329 | | * @param[in] in_use whether the new connection should be "in_use" or not |
330 | | * @param[in] unlock whether we should unlock the mutex before returning |
331 | | * @return |
332 | | * - New connection struct. |
333 | | * - NULL on error. |
334 | | */ |
335 | | static fr_pool_connection_t *connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock) |
336 | 0 | { |
337 | 0 | uint64_t number; |
338 | 0 | uint32_t pending_window; |
339 | 0 | TALLOC_CTX *ctx; |
340 | |
|
341 | 0 | fr_pool_connection_t *this; |
342 | 0 | void *conn; |
343 | |
|
344 | 0 | fr_assert(pool != NULL); |
345 | | |
346 | | /* |
347 | | * If we have NO connections, and we've previously failed |
348 | | * opening connections, don't open multiple connections until |
349 | | * we successfully open at least one. |
350 | | */ |
351 | 0 | if ((pool->state.num == 0) && |
352 | 0 | pool->state.pending && |
353 | 0 | fr_time_gt(pool->state.last_failed, fr_time_wrap(0))) return NULL; |
354 | | |
355 | 0 | pthread_mutex_lock(&pool->mutex); |
356 | 0 | fr_assert(pool->state.num <= pool->max); |
357 | | |
358 | | /* |
359 | | * Don't spawn too many connections at the same time. |
360 | | */ |
361 | 0 | if ((pool->state.num + pool->state.pending) >= pool->max) { |
362 | 0 | pthread_mutex_unlock(&pool->mutex); |
363 | |
|
364 | 0 | ERROR("Cannot open new connection, already at max"); |
365 | 0 | return NULL; |
366 | 0 | } |
367 | | |
368 | | /* |
369 | | * If the last attempt failed, wait a bit before |
370 | | * retrying. |
371 | | */ |
372 | 0 | if (fr_time_gt(pool->state.last_failed, fr_time_wrap(0)) && |
373 | 0 | fr_time_gt(fr_time_add(pool->state.last_failed, pool->retry_delay), now)) { |
374 | 0 | bool complain = false; |
375 | |
|
376 | 0 | if (fr_time_delta_gteq(fr_time_sub(now, pool->state.last_throttled), fr_time_delta_from_sec(1))) { |
377 | 0 | complain = true; |
378 | |
|
379 | 0 | pool->state.last_throttled = now; |
380 | 0 | } |
381 | |
|
382 | 0 | pthread_mutex_unlock(&pool->mutex); |
383 | |
|
384 | 0 | if (!fr_rate_limit_enabled() || complain) { |
385 | 0 | ERROR("Last connection attempt failed, waiting %pV seconds before retrying", |
386 | 0 | fr_box_time_delta(fr_time_sub(fr_time_add(pool->state.last_failed, pool->retry_delay), now))); |
387 | 0 | } |
388 | |
|
389 | 0 | return NULL; |
390 | 0 | } |
391 | | |
392 | | /* |
393 | | * We limit the rate of new connections after a failed attempt. |
394 | | */ |
395 | 0 | if (pool->state.pending > pool->pending_window) { |
396 | 0 | pthread_mutex_unlock(&pool->mutex); |
397 | |
|
398 | 0 | RATE_LIMIT_GLOBAL_ROPTIONAL(RWARN, WARN, |
399 | 0 | "Cannot open a new connection due to rate limit after failure"); |
400 | |
|
401 | 0 | return NULL; |
402 | 0 | } |
403 | | |
404 | 0 | pool->state.pending++; |
405 | 0 | number = pool->state.count++; |
406 | | |
407 | | /* |
408 | | * Don't starve out the thread trying to reconnect |
409 | | * the pool, by continuously opening new connections. |
410 | | */ |
411 | 0 | while (pool->state.reconnecting) pthread_cond_wait(&pool->done_reconnecting, &pool->mutex); |
412 | | |
413 | | /* |
414 | | * The true value for pending_window is the smaller of |
415 | | * free connection slots, or pool->pending_window. |
416 | | */ |
417 | 0 | pending_window = (pool->max - pool->state.num); |
418 | 0 | if (pool->pending_window < pending_window) pending_window = pool->pending_window; |
419 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Opening additional connection (%" PRIu64 "), %u of %u pending slots used", |
420 | 0 | number, pool->state.pending, pending_window); |
421 | | |
422 | | /* |
423 | | * Unlock the mutex while we try to open a new |
424 | | * connection. If there are issues with the back-end, |
425 | | * opening a new connection may take a LONG time. In |
426 | | * that case, we want the other connections to continue |
427 | | * to be used. |
428 | | */ |
429 | 0 | pthread_mutex_unlock(&pool->mutex); |
430 | | |
431 | | /* |
432 | | * Allocate a new top level ctx for the create callback |
433 | | * to hang its memory off of. |
434 | | */ |
435 | 0 | MEM(ctx = talloc_init("connection_ctx")); |
436 | | |
437 | | /* |
438 | | * This may take a long time, which prevents other |
439 | | * threads from releasing connections. We don't care |
440 | | * about other threads opening new connections, as we |
441 | | * already have no free connections. |
442 | | */ |
443 | 0 | conn = pool->create(ctx, pool->opaque, pool->connect_timeout); |
444 | 0 | if (!conn) { |
445 | 0 | ERROR("Opening connection failed (%" PRIu64 ")", number); |
446 | |
|
447 | 0 | pthread_mutex_lock(&pool->mutex); |
448 | 0 | pool->state.last_failed = now; |
449 | 0 | pool->pending_window = 1; |
450 | 0 | pool->state.pending--; |
451 | | |
452 | | /* |
453 | | * Must be done inside the mutex, reconnect callback |
454 | | * may modify args. |
455 | | */ |
456 | 0 | fr_pool_trigger(pool, "fail"); |
457 | 0 | pthread_cond_broadcast(&pool->done_spawn); |
458 | 0 | pthread_mutex_unlock(&pool->mutex); |
459 | |
|
460 | 0 | talloc_free(ctx); |
461 | |
|
462 | 0 | return NULL; |
463 | 0 | } |
464 | | |
465 | | /* |
466 | | * And lock the mutex again while we link the new |
467 | | * connection back into the pool. |
468 | | */ |
469 | 0 | pthread_mutex_lock(&pool->mutex); |
470 | |
|
471 | 0 | this = talloc_zero(pool, fr_pool_connection_t); |
472 | 0 | if (!this) { |
473 | 0 | pool->state.last_failed = now; |
474 | 0 | pool->state.pending--; |
475 | |
|
476 | 0 | pthread_cond_broadcast(&pool->done_spawn); |
477 | 0 | pthread_mutex_unlock(&pool->mutex); |
478 | |
|
479 | 0 | ERROR("Memory allocation failed for new connection (%" PRIu64 ")", number); |
480 | |
|
481 | 0 | talloc_free(ctx); |
482 | |
|
483 | 0 | return NULL; |
484 | 0 | } |
485 | 0 | MEM(talloc_link_ctx(this, ctx) >= 0); |
486 | |
|
487 | 0 | this->created = now; |
488 | 0 | this->connection = conn; |
489 | 0 | this->in_use = in_use; |
490 | |
|
491 | 0 | this->number = number; |
492 | 0 | this->last_reserved = fr_time(); |
493 | 0 | this->last_released = this->last_reserved; |
494 | | |
495 | | /* |
496 | | * The connection pool is starting up. Insert the |
497 | | * connection into the heap. |
498 | | */ |
499 | 0 | if (!in_use) fr_heap_insert(&pool->heap, this); |
500 | |
|
501 | 0 | connection_link_head(pool, this); |
502 | | |
503 | | /* |
504 | | * Do NOT insert the connection into the heap. That's |
505 | | * done when the connection is released. |
506 | | */ |
507 | |
|
508 | 0 | pool->state.num++; |
509 | |
|
510 | 0 | fr_assert(pool->state.pending > 0); |
511 | 0 | pool->state.pending--; |
512 | | |
513 | | /* |
514 | | * We've successfully opened one more connection. Allow |
515 | | * more connections to open in parallel. |
516 | | */ |
517 | 0 | if ((pool->pending_window < pool->max) && |
518 | 0 | ((pool->max_pending == 0) || (pool->pending_window < pool->max_pending))) { |
519 | 0 | pool->pending_window++; |
520 | 0 | } |
521 | |
|
522 | 0 | pool->state.last_spawned = fr_time(); |
523 | 0 | pool->delay_interval = pool->cleanup_interval; |
524 | 0 | pool->state.next_delay = pool->cleanup_interval; |
525 | 0 | pool->state.last_failed = fr_time_wrap(0); |
526 | | |
527 | | /* |
528 | | * Must be done inside the mutex, reconnect callback |
529 | | * may modify args. |
530 | | */ |
531 | 0 | fr_pool_trigger(pool, "open"); |
532 | |
|
533 | 0 | pthread_cond_broadcast(&pool->done_spawn); |
534 | 0 | if (unlock) pthread_mutex_unlock(&pool->mutex); |
535 | | |
536 | | /* coverity[missing_unlock] */ |
537 | 0 | return this; |
538 | 0 | } |
539 | | |
540 | | /** Close an existing connection. |
541 | | * |
542 | | * Removes the connection from the list, calls the delete callback to close |
543 | | * the connection, then frees memory allocated to the connection. |
544 | | * |
545 | | * @note Will call the 'close' trigger. |
546 | | * @note Must be called with the mutex held. |
547 | | * |
548 | | * @param[in] pool to modify. |
549 | | * @param[in] this Connection to delete. |
550 | | */ |
551 | | static void connection_close_internal(fr_pool_t *pool, fr_pool_connection_t *this) |
552 | 0 | { |
553 | | /* |
554 | | * If it's in use, release it. |
555 | | */ |
556 | 0 | if (this->in_use) { |
557 | | #ifdef PTHREAD_DEBUG |
558 | | pthread_t pthread_id = pthread_self(); |
559 | | fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0); |
560 | | #endif |
561 | |
|
562 | 0 | this->in_use = false; |
563 | |
|
564 | 0 | fr_assert(pool->state.active != 0); |
565 | 0 | pool->state.active--; |
566 | |
|
567 | 0 | } else { |
568 | | /* |
569 | | * Connection isn't used, remove it from the heap. |
570 | | */ |
571 | 0 | fr_heap_extract(&pool->heap, this); |
572 | 0 | } |
573 | |
|
574 | 0 | fr_pool_trigger(pool, "close"); |
575 | |
|
576 | 0 | connection_unlink(pool, this); |
577 | |
|
578 | 0 | fr_assert(pool->state.num > 0); |
579 | 0 | pool->state.num--; |
580 | 0 | talloc_free(this); |
581 | 0 | } |
582 | | |
583 | | /** Check whether a connection needs to be removed from the pool |
584 | | * |
585 | | * Will verify that the connection is within idle_timeout, max_uses, and |
586 | | * lifetime values. If it is not, the connection will be closed. |
587 | | * |
588 | | * @note Will only close connections not in use. |
589 | | * @note Must be called with the mutex held. |
590 | | * |
591 | | * @param[in] pool to modify. |
592 | | * @param[in] request The current request. |
593 | | * @param[in] this Connection to manage. |
594 | | * @param[in] now Current time. |
595 | | * @return |
596 | | * - 0 if connection was closed. |
597 | | * - 1 if connection handle was left open. |
598 | | */ |
599 | | static int connection_manage(fr_pool_t *pool, request_t *request, fr_pool_connection_t *this, fr_time_t now) |
600 | 0 | { |
601 | 0 | fr_assert(pool != NULL); |
602 | 0 | fr_assert(this != NULL); |
603 | | |
604 | | /* |
605 | | * Don't terminated in-use connections |
606 | | */ |
607 | 0 | if (this->in_use) return 1; |
608 | | |
609 | 0 | if (this->needs_reconnecting) { |
610 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Needs reconnecting", |
611 | 0 | this->number); |
612 | 0 | do_delete: |
613 | 0 | if (pool->state.num <= pool->min) { |
614 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "You probably need to lower \"min\""); |
615 | 0 | } |
616 | 0 | connection_close_internal(pool, this); |
617 | 0 | return 0; |
618 | 0 | } |
619 | | |
620 | 0 | if ((pool->max_uses > 0) && |
621 | 0 | (this->num_uses >= pool->max_uses)) { |
622 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit max_uses limit", |
623 | 0 | this->number); |
624 | 0 | goto do_delete; |
625 | 0 | } |
626 | | |
627 | 0 | if (fr_time_delta_ispos(pool->lifetime) && |
628 | 0 | (fr_time_lt(fr_time_add(this->created, pool->lifetime), now))) { |
629 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit lifetime limit", |
630 | 0 | this->number); |
631 | 0 | goto do_delete; |
632 | 0 | } |
633 | | |
634 | 0 | if (fr_time_delta_ispos(pool->idle_timeout) && |
635 | 0 | (fr_time_lt(fr_time_add(this->last_released, pool->idle_timeout), now))) { |
636 | 0 | ROPTIONAL(RINFO, INFO, "Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %pVs", |
637 | 0 | this->number, fr_box_time_delta(fr_time_sub(now, this->last_released))); |
638 | 0 | goto do_delete; |
639 | 0 | } |
640 | | |
641 | 0 | return 1; |
642 | 0 | } |
643 | | |
644 | | |
645 | | /** Check whether any connections need to be removed from the pool |
646 | | * |
647 | | * Maintains the number of connections in the pool as per the configuration |
648 | | * parameters for the connection pool. |
649 | | * |
650 | | * @note Will only run checks the first time it's called in a given second, |
651 | | * to throttle connection spawning/closing. |
652 | | * @note Will only close connections not in use. |
653 | | * @note Must be called with the mutex held, will release mutex before returning. |
654 | | * |
655 | | * @param[in] pool to manage. |
656 | | * @param[in] request The current request. |
657 | | * @return 1 |
658 | | */ |
659 | | static int connection_check(fr_pool_t *pool, request_t *request) |
660 | 0 | { |
661 | 0 | uint32_t num, spare; |
662 | 0 | fr_time_t now = fr_time(); |
663 | 0 | fr_pool_connection_t *this, *next; |
664 | |
|
665 | 0 | if (fr_time_delta_lt(fr_time_sub(now, pool->state.last_checked), fr_time_delta_from_sec(1))) { |
666 | 0 | pthread_mutex_unlock(&pool->mutex); |
667 | 0 | return 1; |
668 | 0 | } |
669 | | |
670 | | /* |
671 | | * Get "real" number of connections, and count pending |
672 | | * connections as spare. |
673 | | */ |
674 | 0 | num = pool->state.num + pool->state.pending; |
675 | 0 | spare = pool->state.pending + (pool->state.num - pool->state.active); |
676 | | |
677 | | /* |
678 | | * The other end can close connections. If so, we'll |
679 | | * have fewer than "min". When that happens, open more |
680 | | * connections to enforce "min". |
681 | | * |
682 | | * The code for spawning connections enforces that |
683 | | * num + pending <= max. |
684 | | */ |
685 | 0 | if (num < pool->min) { |
686 | 0 | ROPTIONAL(RINFO, INFO, "Need %i more connections to reach min connections (%i)", pool->min - num, pool->min); |
687 | 0 | goto add_connection; |
688 | 0 | } |
689 | | |
690 | | /* |
691 | | * On the odd chance that we've opened too many |
692 | | * connections, take care of that. |
693 | | */ |
694 | 0 | if (num > pool->max) { |
695 | | /* |
696 | | * Pending connections don't get closed as "spare". |
697 | | */ |
698 | 0 | if (pool->state.pending > 0) goto manage_connections; |
699 | | |
700 | | /* |
701 | | * Otherwise close one of the connections to |
702 | | * bring us down to "max". |
703 | | */ |
704 | 0 | goto close_connection; |
705 | 0 | } |
706 | | |
707 | | /* |
708 | | * Now that we've enforced min/max connections, try to |
709 | | * keep the "spare" connections at the correct number. |
710 | | */ |
711 | | |
712 | | /* |
713 | | * Nothing to do? Go check all of the connections for |
714 | | * timeouts, etc. |
715 | | */ |
716 | 0 | if (spare == pool->spare) goto manage_connections; |
717 | | |
718 | | /* |
719 | | * Too many spare connections, delete some. |
720 | | */ |
721 | 0 | if (spare > pool->spare) { |
722 | 0 | fr_pool_connection_t *found; |
723 | | |
724 | | /* |
725 | | * Pending connections don't get closed as "spare". |
726 | | */ |
727 | 0 | if (pool->state.pending > 0) goto manage_connections; |
728 | | |
729 | | /* |
730 | | * Don't close too many connections, even they |
731 | | * are spare. |
732 | | */ |
733 | 0 | if (num <= pool->min) goto manage_connections; |
734 | | |
735 | | /* |
736 | | * Too many spares, go close one. |
737 | | */ |
738 | | |
739 | 0 | close_connection: |
740 | | /* |
741 | | * Don't close connections too often, in order to |
742 | | * prevent flapping. Coverity doesn't notice that |
743 | | * all callers have the lock, so we annotate the issue. |
744 | | */ |
745 | | /* coverity[missing_lock] */ |
746 | 0 | if (fr_time_lt(now, fr_time_add(pool->state.last_spawned, pool->delay_interval))) goto manage_connections; |
747 | | |
748 | | /* |
749 | | * Find a connection to close. |
750 | | */ |
751 | 0 | found = NULL; |
752 | 0 | for (this = pool->tail; this != NULL; this = this->prev) { |
753 | 0 | if (this->in_use) continue; |
754 | | |
755 | 0 | if (!found || (fr_time_lt(this->last_reserved, found->last_reserved))) found = this; |
756 | 0 | } |
757 | |
|
758 | 0 | if (!fr_cond_assert(found)) goto done; |
759 | | |
760 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Closing connection (%" PRIu64 ") as we have too many unused connections", |
761 | 0 | found->number); |
762 | 0 | connection_close_internal(pool, found); |
763 | | |
764 | | /* |
765 | | * Decrease the delay for the next time we clean |
766 | | * up. |
767 | | */ |
768 | 0 | pool->state.next_delay = fr_time_delta_wrap(fr_time_delta_unwrap(pool->state.next_delay) >> 1); |
769 | 0 | if (!fr_time_delta_ispos(pool->state.next_delay)) pool->state.next_delay = fr_time_delta_wrap(1); |
770 | 0 | pool->delay_interval = fr_time_delta_add(pool->delay_interval, pool->state.next_delay); |
771 | |
|
772 | 0 | goto manage_connections; |
773 | 0 | } |
774 | | |
775 | | /* |
776 | | * Too few connections, open some more. |
777 | | */ |
778 | 0 | if (spare < pool->spare) { |
779 | | /* |
780 | | * Don't open too many pending connections. |
781 | | * Again, coverity doesn't realize all callers have the lock, |
782 | | * so we must annotate here as well. |
783 | | */ |
784 | | /* coverity[missing_lock] */ |
785 | 0 | if (pool->state.pending >= pool->pending_window) goto manage_connections; |
786 | | |
787 | | /* |
788 | | * Don't open too many connections, even if we |
789 | | * need more spares. |
790 | | */ |
791 | 0 | if (num >= pool->max) goto manage_connections; |
792 | | |
793 | | /* |
794 | | * Too few spares, go add one. |
795 | | */ |
796 | 0 | ROPTIONAL(RINFO, INFO, "Need %i more connections to reach %i spares", pool->spare - spare, pool->spare); |
797 | |
|
798 | 0 | add_connection: |
799 | | /* |
800 | | * Only try to open spares if we're not already attempting to open |
801 | | * a connection. Avoids spurious log messages. |
802 | | */ |
803 | 0 | pthread_mutex_unlock(&pool->mutex); |
804 | 0 | (void) connection_spawn(pool, request, now, false, true); |
805 | 0 | pthread_mutex_lock(&pool->mutex); |
806 | 0 | goto manage_connections; |
807 | 0 | } |
808 | | |
809 | | /* |
810 | | * Pass over all of the connections in the pool, limiting |
811 | | * lifetime, idle time, max requests, etc. |
812 | | */ |
813 | 0 | manage_connections: |
814 | 0 | for (this = pool->head; this != NULL; this = next) { |
815 | 0 | next = this->next; |
816 | 0 | connection_manage(pool, request, this, now); |
817 | 0 | } |
818 | |
|
819 | 0 | pool->state.last_checked = now; |
820 | |
|
821 | 0 | done: |
822 | 0 | pthread_mutex_unlock(&pool->mutex); |
823 | |
|
824 | 0 | return 1; |
825 | 0 | } |
826 | | |
827 | | /** Get a connection from the connection pool |
828 | | * |
829 | | * @note Must be called with the mutex free. |
830 | | * |
831 | | * @param[in] pool to reserve the connection from. |
832 | | * @param[in] request The current request. |
833 | | * @param[in] spawn whether to spawn a new connection |
834 | | * @return |
835 | | * - A pointer to the connection handle. |
836 | | * - NULL on error. |
837 | | */ |
838 | | static void *connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn) |
839 | 0 | { |
840 | 0 | fr_time_t now; |
841 | 0 | fr_pool_connection_t *this; |
842 | |
|
843 | 0 | if (!pool) return NULL; |
844 | | |
845 | 0 | pthread_mutex_lock(&pool->mutex); |
846 | |
|
847 | 0 | now = fr_time(); |
848 | | |
849 | | /* |
850 | | * Grab the link with the lowest latency, and check it |
851 | | * for limits. If "connection manage" says the link is |
852 | | * no longer usable, go grab another one. |
853 | | */ |
854 | 0 | do { |
855 | 0 | this = fr_heap_peek(pool->heap); |
856 | 0 | if (!this) break; |
857 | 0 | } while (!connection_manage(pool, request, this, now)); |
858 | | |
859 | | /* |
860 | | * We have a working connection. Extract it from the |
861 | | * heap and use it. |
862 | | */ |
863 | 0 | if (this) { |
864 | 0 | fr_heap_extract(&pool->heap, this); |
865 | 0 | goto do_return; |
866 | 0 | } |
867 | | |
868 | 0 | if (pool->state.num == pool->max) { |
869 | 0 | bool complain = false; |
870 | | |
871 | | /* |
872 | | * Rate-limit complaints. |
873 | | */ |
874 | 0 | if (fr_time_delta_gt(fr_time_sub(now, pool->state.last_at_max), fr_time_delta_from_sec(1))) { |
875 | 0 | complain = true; |
876 | 0 | pool->state.last_at_max = now; |
877 | 0 | } |
878 | |
|
879 | 0 | if (!fr_rate_limit_enabled() || complain) { |
880 | 0 | ERROR("No connections available and at max connection limit"); |
881 | | |
882 | | /* |
883 | | * Must be done inside the mutex, reconnect callback |
884 | | * may modify args. |
885 | | */ |
886 | 0 | fr_pool_trigger(pool, "none"); |
887 | 0 | } |
888 | 0 | pthread_mutex_unlock(&pool->mutex); |
889 | |
|
890 | 0 | return NULL; |
891 | 0 | } |
892 | | |
893 | 0 | pthread_mutex_unlock(&pool->mutex); |
894 | |
|
895 | 0 | if (!spawn) return NULL; |
896 | | |
897 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "%i of %u connections in use. You may need to increase \"spare\"", |
898 | 0 | pool->state.active, pool->state.num); |
899 | | |
900 | | /* |
901 | | * Returns unlocked on failure, or locked on success |
902 | | */ |
903 | 0 | this = connection_spawn(pool, request, now, true, false); |
904 | 0 | if (!this) return NULL; |
905 | | |
906 | 0 | do_return: |
907 | 0 | pool->state.active++; |
908 | 0 | this->num_uses++; |
909 | 0 | this->last_reserved = fr_time(); |
910 | 0 | this->in_use = true; |
911 | |
|
912 | | #ifdef PTHREAD_DEBUG |
913 | | this->pthread_id = pthread_self(); |
914 | | #endif |
915 | 0 | pthread_mutex_unlock(&pool->mutex); |
916 | |
|
917 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Reserved connection (%" PRIu64 ")", this->number); |
918 | |
|
919 | 0 | return this->connection; |
920 | 0 | } |
921 | | |
922 | | /** Enable triggers for a connection pool |
923 | | * |
924 | | * @param[in] pool to enable triggers for. |
925 | | * @param[in] trigger_prefix prefix to prepend to all trigger names. Usually a path |
926 | | * to the module's trigger configuration e.g. |
927 | | * @verbatim modules.<name>.pool @endverbatim |
928 | | * @verbatim <trigger name> @endverbatim is appended to form |
929 | | * the complete path. |
930 | | * @param[in] trigger_args to make available in any triggers executed by the connection pool. |
931 | | * These will usually be fr_pair_t (s) describing the host |
932 | | * associated with the pool. |
933 | | * Trigger args will be copied, input trigger_args should be freed |
934 | | * if necessary. |
935 | | */ |
936 | | void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args) |
937 | | { |
938 | | pool->triggers_enabled = true; |
939 | | |
940 | | talloc_const_free(pool->trigger_prefix); |
941 | | MEM(pool->trigger_prefix = trigger_prefix ? talloc_strdup(pool, trigger_prefix) : ""); |
942 | | |
943 | | fr_pair_list_free(&pool->trigger_args); |
944 | | |
945 | | if (!trigger_args) return; |
946 | | |
947 | | MEM(fr_pair_list_copy(pool, &pool->trigger_args, trigger_args) >= 0); |
948 | | } |
949 | | |
950 | | /** Create a new connection pool |
951 | | * |
952 | | * Allocates structures used by the connection pool, initialises the various |
953 | | * configuration options and counters, and sets the callback functions. |
954 | | * |
955 | | * Will also spawn the number of connections specified by the 'start' configuration |
956 | | * option. |
957 | | * |
958 | | * @note Will call the 'start' trigger. |
959 | | * |
960 | | * @param[in] ctx Context to link pool's destruction to. |
961 | | * @param[in] cs pool section. |
962 | | * @param[in] opaque data pointer to pass to callbacks. |
963 | | * @param[in] c Callback to create new connections. |
964 | | * @param[in] a Callback to check the status of connections. |
965 | | * @param[in] log_prefix prefix to prepend to all log messages. |
966 | | * @return |
967 | | * - New connection pool. |
968 | | * - NULL on error. |
969 | | */ |
970 | | fr_pool_t *fr_pool_init(TALLOC_CTX *ctx, |
971 | | CONF_SECTION const *cs, |
972 | | void *opaque, |
973 | | fr_pool_connection_create_t c, fr_pool_connection_alive_t a, |
974 | | char const *log_prefix) |
975 | 0 | { |
976 | 0 | fr_pool_t *pool = NULL; |
977 | |
|
978 | 0 | if (!cs || !opaque || !c) return NULL; |
979 | | |
980 | | /* |
981 | | * Pool is allocated in the NULL context as |
982 | | * threads are likely to allocate memory |
983 | | * beneath the pool. |
984 | | */ |
985 | 0 | MEM(pool = talloc_zero(NULL, fr_pool_t)); |
986 | 0 | fr_pair_list_init(&pool->trigger_args); |
987 | | |
988 | | /* |
989 | | * Ensure the pool is freed at the same time |
990 | | * as its parent. |
991 | | */ |
992 | 0 | if (ctx && (talloc_link_ctx(ctx, pool) < 0)) { |
993 | 0 | PERROR("%s: Failed linking pool ctx", __FUNCTION__); |
994 | 0 | talloc_free(pool); |
995 | |
|
996 | 0 | return NULL; |
997 | 0 | } |
998 | | |
999 | 0 | pool->cs = cs; |
1000 | 0 | pool->opaque = opaque; |
1001 | 0 | pool->create = c; |
1002 | 0 | pool->alive = a; |
1003 | |
|
1004 | 0 | pool->head = pool->tail = NULL; |
1005 | | |
1006 | | /* |
1007 | | * We keep a heap of connections, sorted by the last time |
1008 | | * we STARTED using them. Newly opened connections |
1009 | | * aren't in the heap. They're only inserted in the list |
1010 | | * once they're released. |
1011 | | * |
1012 | | * We do "most recently started" instead of "most |
1013 | | * recently used", because MRU is done as most recently |
1014 | | * *released*. We want to order connections by |
1015 | | * responsiveness, and MRU prioritizes high latency |
1016 | | * connections. |
1017 | | * |
1018 | | * We want most recently *started*, which gives |
1019 | | * preference to low latency links, and pushes high |
1020 | | * latency links down in the priority heap. |
1021 | | * |
1022 | | * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/ |
1023 | | */ |
1024 | 0 | if (!pool->spread) { |
1025 | 0 | pool->heap = fr_heap_talloc_alloc(pool, last_reserved_cmp, fr_pool_connection_t, heap_id, 0); |
1026 | | /* |
1027 | | * For some types of connections we need to used a different |
1028 | | * algorithm, because load balancing benefits are secondary |
1029 | | * to maintaining a cache of open connections. |
1030 | | * |
1031 | | * With libcurl's multihandle, connections can only be reused |
1032 | | * if all handles that make up the multihandle are done processing |
1033 | | * their requests. |
1034 | | * |
1035 | | * We can't tell when that's happened using libcurl, and even |
1036 | | * if we could, blocking until all servers had responded |
1037 | | * would have huge cost. |
1038 | | * |
1039 | | * The solution is to order the heap so that the connection that |
1040 | | * was released longest ago is at the top. |
1041 | | * |
1042 | | * That way we maximise time between connection use. |
1043 | | */ |
1044 | 0 | } else { |
1045 | 0 | pool->heap = fr_heap_talloc_alloc(pool, last_released_cmp, fr_pool_connection_t, heap_id, 0); |
1046 | 0 | } |
1047 | 0 | if (!pool->heap) { |
1048 | 0 | ERROR("%s: Failed creating connection heap", __FUNCTION__); |
1049 | 0 | error: |
1050 | 0 | fr_pool_free(pool); |
1051 | 0 | return NULL; |
1052 | 0 | } |
1053 | | |
1054 | 0 | pool->log_prefix = log_prefix ? talloc_strdup(pool, log_prefix) : "core"; |
1055 | 0 | pthread_mutex_init(&pool->mutex, NULL); |
1056 | 0 | pthread_cond_init(&pool->done_spawn, NULL); |
1057 | 0 | pthread_cond_init(&pool->done_reconnecting, NULL); |
1058 | |
|
1059 | 0 | DEBUG2("Initialising connection pool"); |
1060 | |
|
1061 | 0 | if (cf_section_rules_push(UNCONST(CONF_SECTION *, cs), pool_config) < 0) goto error; |
1062 | 0 | if (cf_section_parse(pool, pool, UNCONST(CONF_SECTION *, cs)) < 0) { |
1063 | 0 | PERROR("Configuration parsing failed"); |
1064 | 0 | goto error; |
1065 | 0 | } |
1066 | | |
1067 | | /* |
1068 | | * Some simple limits |
1069 | | */ |
1070 | 0 | if (pool->max == 0) { |
1071 | 0 | cf_log_err(cs, "Cannot set 'max' to zero"); |
1072 | 0 | goto error; |
1073 | 0 | } |
1074 | | |
1075 | | /* |
1076 | | * Coverity notices that other uses of max_pending are protected with a mutex, |
1077 | | * and thus thinks it should be locked/unlocked here...but coverity does not |
1078 | | * consider that until this function returns a pointer to a pool, nobody can |
1079 | | * use the pool, so there's no point to doing so. |
1080 | | */ |
1081 | | /* coverity[missing_lock] */ |
1082 | 0 | pool->pending_window = (pool->max_pending > 0) ? pool->max_pending : pool->max; |
1083 | |
|
1084 | 0 | if (pool->min > pool->max) { |
1085 | 0 | cf_log_err(cs, "Cannot set 'min' to more than 'max'"); |
1086 | 0 | goto error; |
1087 | 0 | } |
1088 | | |
1089 | 0 | FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024); |
1090 | 0 | FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max); |
1091 | 0 | FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min)); |
1092 | |
|
1093 | 0 | if (fr_time_delta_ispos(pool->lifetime)) { |
1094 | 0 | FR_TIME_DELTA_COND_CHECK("idle_timeout", pool->idle_timeout, |
1095 | 0 | fr_time_delta_lteq(pool->idle_timeout, pool->lifetime), fr_time_delta_wrap(0)); |
1096 | 0 | } |
1097 | |
|
1098 | 0 | if (fr_time_delta_ispos(pool->idle_timeout)) { |
1099 | 0 | FR_TIME_DELTA_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout); |
1100 | 0 | } |
1101 | | |
1102 | | /* |
1103 | | * Some libraries treat 0.0 as infinite timeout, others treat it |
1104 | | * as instantaneous timeout. Solve the inconsistency by making |
1105 | | * the smallest allowable timeout 100ms. |
1106 | | */ |
1107 | 0 | FR_TIME_DELTA_BOUND_CHECK("connect_timeout", pool->connect_timeout, >=, fr_time_delta_from_msec(100)); |
1108 | | |
1109 | | /* |
1110 | | * Don't open any connections. Instead, force the limits |
1111 | | * to only 1 connection. |
1112 | | * |
1113 | | */ |
1114 | 0 | if (check_config) pool->start = pool->min = pool->max = 1; |
1115 | |
|
1116 | 0 | return pool; |
1117 | 0 | } |
1118 | | |
1119 | | int fr_pool_start(fr_pool_t *pool) |
1120 | 0 | { |
1121 | 0 | uint32_t i; |
1122 | 0 | fr_pool_connection_t *this; |
1123 | | |
1124 | | /* |
1125 | | * Don't spawn any connections |
1126 | | */ |
1127 | 0 | if (check_config) return 0; |
1128 | | |
1129 | | /* |
1130 | | * Create all of the connections, unless the admin says |
1131 | | * not to. |
1132 | | */ |
1133 | 0 | for (i = 0; i < pool->start; i++) { |
1134 | | /* |
1135 | | * Call time() once for each spawn attempt as there |
1136 | | * could be a significant delay. |
1137 | | */ |
1138 | 0 | this = connection_spawn(pool, NULL, fr_time(), false, true); |
1139 | 0 | if (!this) { |
1140 | 0 | ERROR("Failed spawning initial connections"); |
1141 | 0 | return -1; |
1142 | 0 | } |
1143 | 0 | } |
1144 | | |
1145 | 0 | fr_pool_trigger(pool, "start"); |
1146 | |
|
1147 | 0 | return 0; |
1148 | 0 | } |
1149 | | |
1150 | | /** Allocate a new pool using an existing one as a template |
1151 | | * |
1152 | | * @param[in] ctx to allocate new pool in. |
1153 | | * @param[in] pool to copy. |
1154 | | * @param[in] opaque data to pass to connection function. |
1155 | | * @return |
1156 | | * - New connection pool. |
1157 | | * - NULL on error. |
1158 | | */ |
1159 | | fr_pool_t *fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque) |
1160 | 0 | { |
1161 | 0 | fr_pool_t *copy; |
1162 | |
|
1163 | 0 | copy = fr_pool_init(ctx, pool->cs, opaque, pool->create, pool->alive, pool->log_prefix); |
1164 | 0 | if (!copy) return NULL; |
1165 | | |
1166 | 0 | if (pool->trigger_prefix) fr_pool_enable_triggers(copy, pool->trigger_prefix, &pool->trigger_args); |
1167 | |
|
1168 | 0 | return copy; |
1169 | 0 | } |
1170 | | |
1171 | | /** Get the number of connections currently in the pool |
1172 | | * |
1173 | | * @param[in] pool to count connections for. |
1174 | | * @return the number of connections in the pool |
1175 | | */ |
1176 | | fr_pool_state_t const *fr_pool_state(fr_pool_t *pool) |
1177 | 0 | { |
1178 | 0 | return &pool->state; |
1179 | 0 | } |
1180 | | |
1181 | | /** Connection pool get timeout |
1182 | | * |
1183 | | * @param[in] pool to get connection timeout for. |
1184 | | * @return the connection timeout configured for the pool. |
1185 | | */ |
1186 | | fr_time_delta_t fr_pool_timeout(fr_pool_t *pool) |
1187 | 0 | { |
1188 | 0 | return pool->connect_timeout; |
1189 | 0 | } |
1190 | | |
1191 | | /** Connection pool get start |
1192 | | * |
1193 | | * @param[in] pool to get connection start for. |
1194 | | * @return the connection start value configured for the pool. |
1195 | | */ |
1196 | | int fr_pool_start_num(fr_pool_t *pool) |
1197 | 0 | { |
1198 | 0 | return pool->start; |
1199 | 0 | } |
1200 | | |
1201 | | /** Return the opaque data associated with a connection pool |
1202 | | * |
1203 | | * @param pool to return data for. |
1204 | | * @return opaque data associated with pool. |
1205 | | */ |
1206 | | void const *fr_pool_opaque(fr_pool_t *pool) |
1207 | 0 | { |
1208 | 0 | return pool->opaque; |
1209 | 0 | } |
1210 | | |
1211 | | /** Increment pool reference by one. |
1212 | | * |
1213 | | * @param[in] pool to increment reference counter for. |
1214 | | */ |
1215 | | void fr_pool_ref(fr_pool_t *pool) |
1216 | 0 | { |
1217 | 0 | pool->ref++; |
1218 | 0 | } |
1219 | | |
1220 | | /** Set a reconnection callback for the connection pool |
1221 | | * |
1222 | | * This can be called at any time during the pool's lifecycle. |
1223 | | * |
1224 | | * @param[in] pool to set reconnect callback for. |
1225 | | * @param reconnect callback to call when reconnecting pool's connections. |
1226 | | */ |
1227 | | void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect) |
1228 | 0 | { |
1229 | 0 | pool->reconnect = reconnect; |
1230 | 0 | } |
1231 | | |
1232 | | /** Mark connections for reconnection, and spawn at least 'start' connections |
1233 | | * |
1234 | | * @note This call may block whilst waiting for pending connection attempts to complete. |
1235 | | * |
1236 | | * This intended to be called on a connection pool that's in use, to have it reflect |
1237 | | * a configuration change, or because the administrator knows that all connections |
1238 | | * in the pool are inviable and need to be reconnected. |
1239 | | * |
1240 | | * @param[in] pool to reconnect. |
1241 | | * @param[in] request The current request. |
1242 | | * @return |
1243 | | * - 0 On success. |
1244 | | * - -1 If we couldn't create start connections, this may be ignored |
1245 | | * depending on the context in which this function is being called. |
1246 | | */ |
1247 | | int fr_pool_reconnect(fr_pool_t *pool, request_t *request) |
1248 | 0 | { |
1249 | 0 | uint32_t i; |
1250 | 0 | fr_pool_connection_t *this; |
1251 | 0 | fr_time_t now; |
1252 | |
|
1253 | 0 | pthread_mutex_lock(&pool->mutex); |
1254 | | |
1255 | | /* |
1256 | | * Pause new spawn attempts (we release the mutex |
1257 | | * during our cond wait). |
1258 | | */ |
1259 | 0 | pool->state.reconnecting = true; |
1260 | | |
1261 | | /* |
1262 | | * When the loop exits, we'll hold the lock for the pool, |
1263 | | * and we're guaranteed the connection create callback |
1264 | | * will not be using the opaque data. |
1265 | | */ |
1266 | 0 | while (pool->state.pending) pthread_cond_wait(&pool->done_spawn, &pool->mutex); |
1267 | | |
1268 | | /* |
1269 | | * We want to ensure at least 'start' connections |
1270 | | * have been reconnected. We can't call reconnect |
1271 | | * because, we might get the same connection each |
1272 | | * time we reserve one, so we close 'start' |
1273 | | * connections, and then attempt to spawn them again. |
1274 | | */ |
1275 | 0 | for (i = 0; i < pool->start; i++) { |
1276 | 0 | this = fr_heap_peek(pool->heap); |
1277 | 0 | if (!this) break; /* There wasn't 'start' connections available */ |
1278 | | |
1279 | 0 | connection_close_internal(pool, this); |
1280 | 0 | } |
1281 | | |
1282 | | /* |
1283 | | * Mark all remaining connections in the pool as |
1284 | | * requiring reconnection. |
1285 | | */ |
1286 | 0 | for (this = pool->head; this; this = this->next) this->needs_reconnecting = true; |
1287 | | |
1288 | | /* |
1289 | | * Call the reconnect callback (if one's set) |
1290 | | * This may modify the opaque data associated |
1291 | | * with the pool. |
1292 | | */ |
1293 | 0 | if (pool->reconnect) pool->reconnect(pool, pool->opaque); |
1294 | | |
1295 | | /* |
1296 | | * Must be done inside the mutex, reconnect callback |
1297 | | * may modify args. |
1298 | | */ |
1299 | 0 | fr_pool_trigger(pool, "reconnect"); |
1300 | | |
1301 | | /* |
1302 | | * Allow new spawn attempts, and wakeup any threads |
1303 | | * waiting to spawn new connections. |
1304 | | */ |
1305 | 0 | pool->state.reconnecting = false; |
1306 | 0 | pthread_cond_broadcast(&pool->done_reconnecting); |
1307 | 0 | pthread_mutex_unlock(&pool->mutex); |
1308 | |
|
1309 | 0 | now = fr_time(); |
1310 | | |
1311 | | /* |
1312 | | * Now attempt to spawn 'start' connections. |
1313 | | */ |
1314 | 0 | for (i = 0; i < pool->start; i++) { |
1315 | 0 | this = connection_spawn(pool, request, now, false, true); |
1316 | 0 | if (!this) return -1; |
1317 | 0 | } |
1318 | | |
1319 | 0 | return 0; |
1320 | 0 | } |
1321 | | |
1322 | | /** Delete a connection pool |
1323 | | * |
1324 | | * Closes, unlinks and frees all connections in the connection pool, then frees |
1325 | | * all memory used by the connection pool. |
1326 | | * |
1327 | | * @note Will call the 'stop' trigger. |
1328 | | * @note Must be called with the mutex free. |
1329 | | * |
1330 | | * @param[in,out] pool to delete. |
1331 | | */ |
1332 | | void fr_pool_free(fr_pool_t *pool) |
1333 | 0 | { |
1334 | 0 | fr_pool_connection_t *this; |
1335 | |
|
1336 | 0 | if (!pool) return; |
1337 | | |
1338 | | /* |
1339 | | * More modules hold a reference to this pool, don't free |
1340 | | * it yet. |
1341 | | */ |
1342 | 0 | if (pool->ref > 0) { |
1343 | 0 | pool->ref--; |
1344 | 0 | return; |
1345 | 0 | } |
1346 | | |
1347 | 0 | DEBUG2("Removing connection pool"); |
1348 | |
|
1349 | 0 | pthread_mutex_lock(&pool->mutex); |
1350 | | |
1351 | | /* |
1352 | | * Don't loop over the list. Just keep removing the head |
1353 | | * until they're all gone. |
1354 | | */ |
1355 | 0 | while ((this = pool->head) != NULL) { |
1356 | 0 | INFO("Closing connection (%" PRIu64 ")", this->number); |
1357 | |
|
1358 | 0 | connection_close_internal(pool, this); |
1359 | 0 | } |
1360 | |
|
1361 | 0 | fr_pool_trigger(pool, "stop"); |
1362 | |
|
1363 | 0 | fr_assert(pool->head == NULL); |
1364 | 0 | fr_assert(pool->tail == NULL); |
1365 | 0 | fr_assert(pool->state.num == 0); |
1366 | |
|
1367 | 0 | pthread_mutex_unlock(&pool->mutex); |
1368 | 0 | pthread_mutex_destroy(&pool->mutex); |
1369 | 0 | pthread_cond_destroy(&pool->done_spawn); |
1370 | 0 | pthread_cond_destroy(&pool->done_reconnecting); |
1371 | |
|
1372 | 0 | talloc_free(pool); |
1373 | 0 | } |
1374 | | |
1375 | | /** Reserve a connection in the connection pool |
1376 | | * |
1377 | | * Will attempt to find an unused connection in the connection pool, if one is |
1378 | | * found, will mark it as in use, and increment the number of active connections |
1379 | | * and return the connection handle. |
1380 | | * |
1381 | | * If no free connections are found will attempt to spawn a new one, conditional |
1382 | | * on a connection spawning not already being in progress, and not being at the |
1383 | | * 'max' connection limit. |
1384 | | * |
1385 | | * @note fr_pool_connection_release must be called once the caller has finished |
1386 | | * using the connection. |
1387 | | * |
1388 | | * @see fr_pool_connection_release |
1389 | | * @param[in] pool to reserve the connection from. |
1390 | | * @param[in] request The current request. |
1391 | | * @return |
1392 | | * - A pointer to the connection handle. |
1393 | | * - NULL on error. |
1394 | | */ |
1395 | | void *fr_pool_connection_get(fr_pool_t *pool, request_t *request) |
1396 | 0 | { |
1397 | 0 | return connection_get_internal(pool, request, true); |
1398 | 0 | } |
1399 | | |
1400 | | /** Release a connection |
1401 | | * |
1402 | | * Will mark a connection as unused and decrement the number of active |
1403 | | * connections. |
1404 | | * |
1405 | | * @see fr_pool_connection_get |
1406 | | * @param[in] pool to release the connection in. |
1407 | | * @param[in] request The current request. |
1408 | | * @param[in] conn to release. |
1409 | | */ |
1410 | | void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn) |
1411 | 0 | { |
1412 | 0 | fr_pool_connection_t *this; |
1413 | 0 | fr_time_delta_t held; |
1414 | 0 | bool trigger_min = false, trigger_max = false; |
1415 | |
|
1416 | 0 | this = connection_find(pool, conn); |
1417 | 0 | if (!this) return; |
1418 | | |
1419 | 0 | this->in_use = false; |
1420 | | |
1421 | | /* |
1422 | | * Record when the connection was last released |
1423 | | */ |
1424 | 0 | this->last_released = fr_time(); |
1425 | 0 | pool->state.last_released = this->last_released; |
1426 | | |
1427 | | /* |
1428 | | * This is done inside the mutex to ensure |
1429 | | * updates are atomic. |
1430 | | */ |
1431 | 0 | held = fr_time_sub(this->last_released, this->last_reserved); |
1432 | | |
1433 | | /* |
1434 | | * Check we've not exceeded out trigger limits |
1435 | | * |
1436 | | * These should only fire once per second. |
1437 | | */ |
1438 | 0 | if (fr_time_delta_ispos(pool->held_trigger_min) && |
1439 | 0 | (fr_time_delta_lt(held, pool->held_trigger_min)) && |
1440 | 0 | (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_min), fr_time_delta_from_sec(1)))) { |
1441 | 0 | trigger_min = true; |
1442 | 0 | pool->state.last_held_min = this->last_released; |
1443 | 0 | } |
1444 | |
|
1445 | 0 | if (fr_time_delta_ispos(pool->held_trigger_max) && |
1446 | 0 | (fr_time_delta_gt(held, pool->held_trigger_max)) && |
1447 | 0 | (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_max), fr_time_delta_from_sec(1)))) { |
1448 | 0 | trigger_max = true; |
1449 | 0 | pool->state.last_held_max = this->last_released; |
1450 | 0 | } |
1451 | | |
1452 | | /* |
1453 | | * Insert the connection in the heap. |
1454 | | * |
1455 | | * This will either be based on when we *started* using it |
1456 | | * (allowing fast links to be re-used, and slow links to be |
1457 | | * gradually expired), or when we released it (allowing |
1458 | | * the maximum amount of time between connection use). |
1459 | | */ |
1460 | 0 | fr_heap_insert(&pool->heap, this); |
1461 | |
|
1462 | 0 | fr_assert(pool->state.active != 0); |
1463 | 0 | pool->state.active--; |
1464 | |
|
1465 | 0 | ROPTIONAL(RDEBUG2, DEBUG2, "Released connection (%" PRIu64 ")", this->number); |
1466 | | |
1467 | | /* |
1468 | | * We mirror the "spawn on get" functionality by having |
1469 | | * "delete on release". If there are too many spare |
1470 | | * connections, go manage the pool && clean some up. |
1471 | | */ |
1472 | 0 | connection_check(pool, request); |
1473 | |
|
1474 | 0 | if (trigger_min) fr_pool_trigger(pool, "min"); |
1475 | 0 | if (trigger_max) fr_pool_trigger(pool, "max"); |
1476 | 0 | } |
1477 | | |
1478 | | /** Reconnect a suspected inviable connection |
1479 | | * |
1480 | | * This should be called by the module if it suspects that a connection is |
1481 | | * not viable (e.g. the server has closed it). |
1482 | | * |
1483 | | * When implementing a module that uses the connection pool API, it is advisable |
1484 | | * to pass a pointer to the pointer to the handle (void **conn) |
1485 | | * to all functions which may call reconnect. This is so that if a new handle |
1486 | | * is created and returned, the handle pointer can be updated up the callstack, |
1487 | | * and a function higher up the stack doesn't attempt to use a now invalid |
1488 | | * connection handle. |
1489 | | * |
1490 | | * @note Will free any talloced memory hung off the context of the connection, |
1491 | | * being reconnected. |
1492 | | * |
1493 | | * @warning After calling reconnect the caller *MUST NOT* attempt to use |
1494 | | * the old handle in any other operations, as its memory will have been |
1495 | | * freed. |
1496 | | * |
1497 | | * @see fr_pool_connection_get |
1498 | | * @param[in] pool to reconnect the connection in. |
1499 | | * @param[in] request The current request. |
1500 | | * @param[in] conn to reconnect. |
1501 | | * @return new connection handle if successful else NULL. |
1502 | | */ |
1503 | | void *fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn) |
1504 | 0 | { |
1505 | 0 | fr_pool_connection_t *this; |
1506 | |
|
1507 | 0 | if (!pool || !conn) return NULL; |
1508 | | |
1509 | | /* |
1510 | | * If connection_find is successful the pool is now locked |
1511 | | */ |
1512 | 0 | this = connection_find(pool, conn); |
1513 | 0 | if (!this) return NULL; |
1514 | | |
1515 | 0 | ROPTIONAL(RINFO, INFO, "Deleting inviable connection (%" PRIu64 ")", this->number); |
1516 | |
|
1517 | 0 | connection_close_internal(pool, this); |
1518 | 0 | connection_check(pool, request); /* Whilst we still have the lock (will release the lock) */ |
1519 | | |
1520 | | /* |
1521 | | * Return an existing connection or spawn a new one. |
1522 | | */ |
1523 | 0 | return connection_get_internal(pool, request, true); |
1524 | 0 | } |
1525 | | |
1526 | | /** Delete a connection from the connection pool. |
1527 | | * |
1528 | | * Resolves the connection handle to a connection, then (if found) |
1529 | | * closes, unlinks and frees that connection. |
1530 | | * |
1531 | | * @note Must be called with the mutex free. |
1532 | | * |
1533 | | * @param[in] pool Connection pool to modify. |
1534 | | * @param[in] request The current request. |
1535 | | * @param[in] conn to delete. |
1536 | | * @return |
1537 | | * - 0 If the connection could not be found. |
1538 | | * - 1 if the connection was deleted. |
1539 | | */ |
1540 | | int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn) |
1541 | 0 | { |
1542 | 0 | fr_pool_connection_t *this; |
1543 | |
|
1544 | 0 | this = connection_find(pool, conn); |
1545 | 0 | if (!this) return 0; |
1546 | | |
1547 | | /* |
1548 | | * Record the last time a connection was closed |
1549 | | */ |
1550 | 0 | pool->state.last_closed = fr_time(); |
1551 | |
|
1552 | 0 | ROPTIONAL(RINFO, INFO, "Deleting connection (%" PRIu64 ")", this->number); |
1553 | |
|
1554 | 0 | connection_close_internal(pool, this); |
1555 | 0 | connection_check(pool, request); |
1556 | 0 | return 1; |
1557 | 0 | } |