Coverage Report

Created: 2026-03-31 06:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/freeradius-server/src/lib/server/pool.c
Line
Count
Source
1
/*
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the Free Software
14
 *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15
 */
16
17
/**
18
 * @file pool.c
19
 * @brief Handle pools of connections (threads, sockets, etc.)
20
 * @note This API must be used by all modules in the public distribution that
21
 * maintain pools of connections.
22
 *
23
 * @copyright 2012 The FreeRADIUS server project
24
 * @copyright 2012 Alan DeKok (aland@deployingradius.com)
25
 */
26
RCSID("$Id: 185e43b6f392295097c6327b91955835522520ad $")
27
28
0
#define LOG_PREFIX pool->log_prefix
29
30
#include <freeradius-devel/server/main_config.h>
31
#include <freeradius-devel/server/modpriv.h>
32
#include <freeradius-devel/server/trigger.h>
33
34
#include <freeradius-devel/util/debug.h>
35
36
#include <freeradius-devel/util/misc.h>
37
38
39
typedef struct fr_pool_connection_s fr_pool_connection_t;
40
41
static int connection_check(fr_pool_t *pool, request_t *request);
42
static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule);
43
44
/** An individual connection within the connection pool
45
 *
46
 * Defines connection counters, timestamps, and holds a pointer to the
47
 * connection handle itself.
48
 *
49
 * @see fr_pool_t
50
 */
51
struct fr_pool_connection_s {
52
  fr_pool_connection_t  *prev;      //!< Previous connection in list.
53
  fr_pool_connection_t  *next;      //!< Next connection in list.
54
  fr_heap_index_t heap_id;      //!< For the next connection heap.
55
56
  fr_time_t created;    //!< Time connection was created.
57
  fr_time_t last_reserved;    //!< Last time the connection was reserved.
58
59
  fr_time_t last_released;    //!< Time the connection was released.
60
61
  uint32_t  num_uses;   //!< Number of times the connection has been reserved.
62
  uint64_t  number;     //!< Unique ID assigned when the connection is created,
63
            //!< these will monotonically increase over the
64
            //!< lifetime of the connection pool.
65
  void    *connection;    //!< Pointer to whatever the module uses for a connection
66
            //!< handle.
67
  bool    in_use;     //!< Whether the connection is currently reserved.
68
69
  bool    needs_reconnecting; //!< Reconnect this connection before use.
70
71
#ifdef PTHREAD_DEBUG
72
  pthread_t pthread_id;   //!< When 'in_use == true'.
73
#endif
74
};
75
76
/** A connection pool
77
 *
78
 * Defines the configuration of the connection pool, all the counters and
79
 * timestamps related to the connection pool, the mutex that stops multiple
80
 * threads leaving the pool in an inconsistent state, and the callbacks
81
 * required to open, close and check the status of connections within the pool.
82
 *
83
 * @see connection
84
 */
85
struct fr_pool_s {
86
  int   ref;      //!< Reference counter to prevent connection
87
            //!< pool being freed multiple times.
88
  uint32_t  start;      //!< Number of initial connections.
89
  uint32_t  min;      //!< Minimum number of concurrent connections to keep open.
90
  uint32_t  max;      //!< Maximum number of concurrent connections to allow.
91
  uint32_t  max_pending;    //!< Max number of pending connections to allow.
92
  uint32_t  spare;      //!< Number of spare connections to try.
93
  uint64_t  max_uses;   //!< Maximum number of times a connection can be used
94
            //!< before being closed.
95
  uint32_t  pending_window;   //!< Sliding window of pending connections.
96
97
  fr_time_delta_t retry_delay;    //!< seconds to delay re-open after a failed open.
98
  fr_time_delta_t cleanup_interval;   //!< Initial timer for how often we sweep the pool
99
            //!< for free connections. (0 is infinite).
100
  fr_time_delta_t delay_interval;   //!< When we next do a cleanup.  Initialized to
101
            //!< cleanup_interval, and increase from there based
102
            //!< on the delay.
103
  fr_time_delta_t lifetime;   //!< How long a connection can be open before being
104
            //!< closed (irrespective of whether it's idle or not).
105
  fr_time_delta_t idle_timeout;   //!< How long a connection can be idle before
106
            //!< being closed.
107
  fr_time_delta_t connect_timeout;  //!< New connection timeout, enforced by the create
108
            //!< callback.
109
110
  bool    spread;     //!< If true we spread requests over the connections,
111
            //!< using the connection released longest ago, first.
112
113
  fr_heap_t *heap;      //!< For the next connection heap
114
115
  fr_pool_connection_t  *head;    //!< Start of the connection list.
116
  fr_pool_connection_t  *tail;    //!< End of the connection list.
117
118
  pthread_mutex_t mutex;      //!< Mutex used to keep consistent state when making
119
            //!< modifications in threaded mode.
120
  pthread_cond_t  done_spawn;   //!< Threads that need to ensure no spawning is in progress,
121
            //!< should block on this condition if pending != 0.
122
  pthread_cond_t  done_reconnecting;  //!< Before calling the create callback, threads should
123
            //!< block on this condition if reconnecting == true.
124
125
  CONF_SECTION const *cs;     //!< Configuration section holding the section of parsed
126
            //!< config file that relates to this pool.
127
  void    *opaque;    //!< Pointer to context data that will be passed to callbacks.
128
129
  char const  *log_prefix;    //!< Log prefix to prepend to all log messages created
130
            //!< by the connection pool code.
131
132
  bool    triggers_enabled; //!< Whether we call the trigger functions.
133
134
  char const  *trigger_prefix;  //!< Prefix to prepend to names of all triggers
135
            //!< fired by the connection pool code.
136
  fr_pair_list_t  trigger_args;   //!< Arguments to make available in connection pool triggers.
137
138
  fr_time_delta_t held_trigger_min; //!< If a connection is held for less than the specified
139
            //!< period, fire a trigger.
140
  fr_time_delta_t held_trigger_max; //!< If a connection is held for longer than the specified
141
            //!< period, fire a trigger.
142
143
  fr_pool_connection_create_t create; //!< Function used to create new connections.
144
  fr_pool_connection_alive_t  alive;  //!< Function used to check status of connections.
145
146
  fr_pool_reconnect_t reconnect;  //!< Called during connection pool reconnect.
147
148
  fr_pool_state_t state;      //!< Stats and state of the connection pool.
149
};
150
151
static const conf_parser_t pool_config[] = {
152
  { FR_CONF_OFFSET("start", fr_pool_t, start), .dflt = "0" },
153
  { FR_CONF_OFFSET("min", fr_pool_t, min), .dflt = "0" },
154
  { FR_CONF_OFFSET("max", fr_pool_t, max), .dflt_func = max_dflt },
155
  { FR_CONF_OFFSET("max_pending", fr_pool_t, max_pending), .dflt = "0" },
156
  { FR_CONF_OFFSET("spare", fr_pool_t, spare), .dflt = "3" },
157
  { FR_CONF_OFFSET("uses", fr_pool_t, max_uses), .dflt = "0" },
158
  { FR_CONF_OFFSET("lifetime", fr_pool_t, lifetime), .dflt = "0" },
159
  { FR_CONF_OFFSET("cleanup_interval", fr_pool_t, cleanup_interval), .dflt = "30" },
160
  { FR_CONF_OFFSET("idle_timeout", fr_pool_t, idle_timeout), .dflt = "60" },
161
  { FR_CONF_OFFSET("connect_timeout", fr_pool_t, connect_timeout), .dflt = "3.0" },
162
  { FR_CONF_OFFSET("held_trigger_min", fr_pool_t, held_trigger_min), .dflt = "0.0" },
163
  { FR_CONF_OFFSET("held_trigger_max", fr_pool_t, held_trigger_max), .dflt = "0.5" },
164
  { FR_CONF_OFFSET("retry_delay", fr_pool_t, retry_delay), .dflt = "1" },
165
  { FR_CONF_OFFSET("spread", fr_pool_t, spread), .dflt = "no" },
166
  CONF_PARSER_TERMINATOR
167
};
168
169
static int max_dflt(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
170
0
{
171
0
  char    *strvalue;
172
173
0
  strvalue = talloc_asprintf(NULL, "%u", main_config->max_workers);
174
0
  *out = cf_pair_alloc(cs, rule->name1, strvalue, T_OP_EQ, T_BARE_WORD, quote);
175
0
  talloc_free(strvalue);
176
177
0
  return 0;
178
0
}
179
180
/** Order connections by reserved most recently
181
 */
182
static int8_t last_reserved_cmp(void const *one, void const *two)
183
0
{
184
0
  fr_pool_connection_t const *a = one, *b = two;
185
186
0
  return fr_time_cmp(a->last_reserved, b->last_reserved);
187
0
}
188
189
/** Order connections by released longest ago
190
 */
191
static int8_t last_released_cmp(void const *one, void const *two)
192
0
{
193
0
  fr_pool_connection_t const *a = one, *b = two;
194
195
0
  return fr_time_cmp(a->last_released, b->last_released);
196
0
}
197
198
/** Removes a connection from the connection list
199
 *
200
 * @note Must be called with the mutex held.
201
 *
202
 * @param[in] pool  to modify.
203
 * @param[in] this  Connection to delete.
204
 */
205
static void connection_unlink(fr_pool_t *pool, fr_pool_connection_t *this)
206
0
{
207
0
  if (this->prev) {
208
0
    fr_assert(pool->head != this);
209
0
    this->prev->next = this->next;
210
0
  } else {
211
0
    fr_assert(pool->head == this);
212
0
    pool->head = this->next;
213
0
  }
214
0
  if (this->next) {
215
0
    fr_assert(pool->tail != this);
216
0
    this->next->prev = this->prev;
217
0
  } else {
218
0
    fr_assert(pool->tail == this);
219
0
    pool->tail = this->prev;
220
0
  }
221
222
0
  this->prev = this->next = NULL;
223
0
}
224
225
/** Adds a connection to the head of the connection list
226
 *
227
 * @note Must be called with the mutex held.
228
 *
229
 * @param[in] pool  to modify.
230
 * @param[in] this  Connection to add.
231
 */
232
static void connection_link_head(fr_pool_t *pool, fr_pool_connection_t *this)
233
0
{
234
0
  fr_assert(pool != NULL);
235
0
  fr_assert(this != NULL);
236
0
  fr_assert(pool->head != this);
237
0
  fr_assert(pool->tail != this);
238
239
0
  if (pool->head) {
240
0
    pool->head->prev = this;
241
0
  }
242
243
0
  this->next = pool->head;
244
0
  this->prev = NULL;
245
0
  pool->head = this;
246
0
  if (!pool->tail) {
247
0
    fr_assert(this->next == NULL);
248
0
    pool->tail = this;
249
0
  } else {
250
0
    fr_assert(this->next != NULL);
251
0
  }
252
0
}
253
254
/** Send a connection pool trigger.
255
 *
256
 * @param[in] pool  to send trigger for.
257
 * @param[in] event trigger name suffix.
258
 */
259
static inline void fr_pool_trigger(fr_pool_t *pool, char const *event)
260
0
{
261
0
  char  name[128];
262
263
0
  fr_assert(pool != NULL);
264
0
  fr_assert(event != NULL);
265
266
0
  if (!pool->triggers_enabled) return;
267
268
0
  snprintf(name, sizeof(name), "%s.%s", pool->trigger_prefix, event);
269
0
  trigger(unlang_interpret_get_thread_default(), pool->cs, NULL, name, true, &pool->trigger_args);
270
0
}
271
272
/** Find a connection handle in the connection list
273
 *
274
 * Walks over the list of connections searching for a specified connection
275
 * handle and returns the first connection that contains that pointer.
276
 *
277
 * @note Will lock mutex and only release mutex if connection handle
278
 * is not found, so will usually return will mutex held.
279
 * @note Must be called with the mutex free.
280
 *
281
 * @param[in] pool  to search in.
282
 * @param[in] conn  handle to search for.
283
 * @return
284
 *  - Connection containing the specified handle.
285
 *  - NULL if no such connection was found.
286
 */
287
static fr_pool_connection_t *connection_find(fr_pool_t *pool, void *conn)
288
0
{
289
0
  fr_pool_connection_t *this;
290
291
0
  if (!pool || !conn) return NULL;
292
293
0
  pthread_mutex_lock(&pool->mutex);
294
295
  /*
296
   *  FIXME: This loop could be avoided if we passed a 'void
297
   *  **connection' instead.  We could use "offsetof" in
298
   *  order to find top of the parent structure.
299
   */
300
0
  for (this = pool->head; this != NULL; this = this->next) {
301
0
    if (this->connection == conn) {
302
#ifdef PTHREAD_DEBUG
303
      pthread_t pthread_id;
304
305
      pthread_id = pthread_self();
306
      fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
307
#endif
308
309
0
      fr_assert(this->in_use == true);
310
0
      return this;
311
0
    }
312
0
  }
313
314
0
  pthread_mutex_unlock(&pool->mutex);
315
0
  return NULL;
316
0
}
317
318
/** Spawns a new connection
319
 *
320
 * Spawns a new connection using the create callback, and returns it for
321
 * adding to the connection list.
322
 *
323
 * @note Will call the 'open' trigger.
324
 * @note Must be called with the mutex free.
325
 *
326
 * @param[in] pool  to modify.
327
 * @param[in] request The current request.
328
 * @param[in] now Current time.
329
 * @param[in] in_use  whether the new connection should be "in_use" or not
330
 * @param[in] unlock  whether we should unlock the mutex before returning
331
 * @return
332
 *  - New connection struct.
333
 *  - NULL on error.
334
 */
335
static fr_pool_connection_t *connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock)
336
0
{
337
0
  uint64_t    number;
338
0
  uint32_t    pending_window;
339
0
  TALLOC_CTX    *ctx;
340
341
0
  fr_pool_connection_t  *this;
342
0
  void      *conn;
343
344
0
  fr_assert(pool != NULL);
345
346
  /*
347
   *  If we have NO connections, and we've previously failed
348
   *  opening connections, don't open multiple connections until
349
   *  we successfully open at least one.
350
   */
351
0
  if ((pool->state.num == 0) &&
352
0
      pool->state.pending &&
353
0
      fr_time_gt(pool->state.last_failed, fr_time_wrap(0))) return NULL;
354
355
0
  pthread_mutex_lock(&pool->mutex);
356
0
  fr_assert(pool->state.num <= pool->max);
357
358
  /*
359
   *  Don't spawn too many connections at the same time.
360
   */
361
0
  if ((pool->state.num + pool->state.pending) >= pool->max) {
362
0
    pthread_mutex_unlock(&pool->mutex);
363
364
0
    ERROR("Cannot open new connection, already at max");
365
0
    return NULL;
366
0
  }
367
368
  /*
369
   *  If the last attempt failed, wait a bit before
370
   *  retrying.
371
   */
372
0
  if (fr_time_gt(pool->state.last_failed, fr_time_wrap(0)) &&
373
0
      fr_time_gt(fr_time_add(pool->state.last_failed, pool->retry_delay), now)) {
374
0
    bool complain = false;
375
376
0
    if (fr_time_delta_gteq(fr_time_sub(now, pool->state.last_throttled), fr_time_delta_from_sec(1))) {
377
0
      complain = true;
378
379
0
      pool->state.last_throttled = now;
380
0
    }
381
382
0
    pthread_mutex_unlock(&pool->mutex);
383
384
0
    if (!fr_rate_limit_enabled() || complain) {
385
0
      ERROR("Last connection attempt failed, waiting %pV seconds before retrying",
386
0
            fr_box_time_delta(fr_time_sub(fr_time_add(pool->state.last_failed, pool->retry_delay), now)));
387
0
    }
388
389
0
    return NULL;
390
0
  }
391
392
  /*
393
   *  We limit the rate of new connections after a failed attempt.
394
   */
395
0
  if (pool->state.pending > pool->pending_window) {
396
0
    pthread_mutex_unlock(&pool->mutex);
397
398
0
    RATE_LIMIT_GLOBAL_ROPTIONAL(RWARN, WARN,
399
0
              "Cannot open a new connection due to rate limit after failure");
400
401
0
    return NULL;
402
0
  }
403
404
0
  pool->state.pending++;
405
0
  number = pool->state.count++;
406
407
  /*
408
   *  Don't starve out the thread trying to reconnect
409
   *  the pool, by continuously opening new connections.
410
   */
411
0
  while (pool->state.reconnecting) pthread_cond_wait(&pool->done_reconnecting, &pool->mutex);
412
413
  /*
414
   *  The true value for pending_window is the smaller of
415
   *  free connection slots, or pool->pending_window.
416
   */
417
0
  pending_window = (pool->max - pool->state.num);
418
0
  if (pool->pending_window < pending_window) pending_window = pool->pending_window;
419
0
  ROPTIONAL(RDEBUG2, DEBUG2, "Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
420
0
      number, pool->state.pending, pending_window);
421
422
  /*
423
   *  Unlock the mutex while we try to open a new
424
   *  connection.  If there are issues with the back-end,
425
   *  opening a new connection may take a LONG time.  In
426
   *  that case, we want the other connections to continue
427
   *  to be used.
428
   */
429
0
  pthread_mutex_unlock(&pool->mutex);
430
431
  /*
432
   *  Allocate a new top level ctx for the create callback
433
   *  to hang its memory off of.
434
   */
435
0
  MEM(ctx = talloc_init("connection_ctx"));
436
437
  /*
438
   *  This may take a long time, which prevents other
439
   *  threads from releasing connections.  We don't care
440
   *  about other threads opening new connections, as we
441
   *  already have no free connections.
442
   */
443
0
  conn = pool->create(ctx, pool->opaque, pool->connect_timeout);
444
0
  if (!conn) {
445
0
    ERROR("Opening connection failed (%" PRIu64 ")", number);
446
447
0
    pthread_mutex_lock(&pool->mutex);
448
0
    pool->state.last_failed = now;
449
0
    pool->pending_window = 1;
450
0
    pool->state.pending--;
451
452
    /*
453
     *  Must be done inside the mutex, reconnect callback
454
     *  may modify args.
455
     */
456
0
    fr_pool_trigger(pool, "fail");
457
0
    pthread_cond_broadcast(&pool->done_spawn);
458
0
    pthread_mutex_unlock(&pool->mutex);
459
460
0
    talloc_free(ctx);
461
462
0
    return NULL;
463
0
  }
464
465
  /*
466
   *  And lock the mutex again while we link the new
467
   *  connection back into the pool.
468
   */
469
0
  pthread_mutex_lock(&pool->mutex);
470
471
0
  this = talloc_zero(pool, fr_pool_connection_t);
472
0
  if (!this) {
473
0
    pool->state.last_failed = now;
474
0
    pool->state.pending--;
475
476
0
    pthread_cond_broadcast(&pool->done_spawn);
477
0
    pthread_mutex_unlock(&pool->mutex);
478
479
0
    ERROR("Memory allocation failed for new connection (%" PRIu64 ")", number);
480
481
0
    talloc_free(ctx);
482
483
0
    return NULL;
484
0
  }
485
0
  MEM(talloc_link_ctx(this, ctx) >= 0);
486
487
0
  this->created = now;
488
0
  this->connection = conn;
489
0
  this->in_use = in_use;
490
491
0
  this->number = number;
492
0
  this->last_reserved = fr_time();
493
0
  this->last_released = this->last_reserved;
494
495
  /*
496
   *  The connection pool is starting up.  Insert the
497
   *  connection into the heap.
498
   */
499
0
  if (!in_use) fr_heap_insert(&pool->heap, this);
500
501
0
  connection_link_head(pool, this);
502
503
  /*
504
   *  Do NOT insert the connection into the heap.  That's
505
   *  done when the connection is released.
506
   */
507
508
0
  pool->state.num++;
509
510
0
  fr_assert(pool->state.pending > 0);
511
0
  pool->state.pending--;
512
513
  /*
514
   *  We've successfully opened one more connection.  Allow
515
   *  more connections to open in parallel.
516
   */
517
0
  if ((pool->pending_window < pool->max) &&
518
0
      ((pool->max_pending == 0) || (pool->pending_window < pool->max_pending))) {
519
0
    pool->pending_window++;
520
0
  }
521
522
0
  pool->state.last_spawned = fr_time();
523
0
  pool->delay_interval = pool->cleanup_interval;
524
0
  pool->state.next_delay = pool->cleanup_interval;
525
0
  pool->state.last_failed = fr_time_wrap(0);
526
527
  /*
528
   *  Must be done inside the mutex, reconnect callback
529
   *  may modify args.
530
   */
531
0
  fr_pool_trigger(pool, "open");
532
533
0
  pthread_cond_broadcast(&pool->done_spawn);
534
0
  if (unlock) pthread_mutex_unlock(&pool->mutex);
535
536
  /* coverity[missing_unlock] */
537
0
  return this;
538
0
}
539
540
/** Close an existing connection.
541
 *
542
 * Removes the connection from the list, calls the delete callback to close
543
 * the connection, then frees memory allocated to the connection.
544
 *
545
 * @note Will call the 'close' trigger.
546
 * @note Must be called with the mutex held.
547
 *
548
 * @param[in] pool  to modify.
549
 * @param[in] this  Connection to delete.
550
 */
551
static void connection_close_internal(fr_pool_t *pool, fr_pool_connection_t *this)
552
0
{
553
  /*
554
   *  If it's in use, release it.
555
   */
556
0
  if (this->in_use) {
557
#ifdef PTHREAD_DEBUG
558
    pthread_t pthread_id = pthread_self();
559
    fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
560
#endif
561
562
0
    this->in_use = false;
563
564
0
    fr_assert(pool->state.active != 0);
565
0
    pool->state.active--;
566
567
0
  } else {
568
    /*
569
     *  Connection isn't used, remove it from the heap.
570
     */
571
0
    fr_heap_extract(&pool->heap, this);
572
0
  }
573
574
0
  fr_pool_trigger(pool, "close");
575
576
0
  connection_unlink(pool, this);
577
578
0
  fr_assert(pool->state.num > 0);
579
0
  pool->state.num--;
580
0
  talloc_free(this);
581
0
}
582
583
/** Check whether a connection needs to be removed from the pool
584
 *
585
 * Will verify that the connection is within idle_timeout, max_uses, and
586
 * lifetime values. If it is not, the connection will be closed.
587
 *
588
 * @note Will only close connections not in use.
589
 * @note Must be called with the mutex held.
590
 *
591
 * @param[in] pool  to modify.
592
 * @param[in] request The current request.
593
 * @param[in] this  Connection to manage.
594
 * @param[in] now Current time.
595
 * @return
596
 *  - 0 if connection was closed.
597
 *  - 1 if connection handle was left open.
598
 */
599
static int connection_manage(fr_pool_t *pool, request_t *request, fr_pool_connection_t *this, fr_time_t now)
600
0
{
601
0
  fr_assert(pool != NULL);
602
0
  fr_assert(this != NULL);
603
604
  /*
605
   *  Don't terminated in-use connections
606
   */
607
0
  if (this->in_use) return 1;
608
609
0
  if (this->needs_reconnecting) {
610
0
    ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Needs reconnecting",
611
0
        this->number);
612
0
  do_delete:
613
0
    if (pool->state.num <= pool->min) {
614
0
      ROPTIONAL(RDEBUG2, DEBUG2, "You probably need to lower \"min\"");
615
0
    }
616
0
    connection_close_internal(pool, this);
617
0
    return 0;
618
0
  }
619
620
0
  if ((pool->max_uses > 0) &&
621
0
      (this->num_uses >= pool->max_uses)) {
622
0
    ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit max_uses limit",
623
0
        this->number);
624
0
    goto do_delete;
625
0
  }
626
627
0
  if (fr_time_delta_ispos(pool->lifetime) &&
628
0
      (fr_time_lt(fr_time_add(this->created, pool->lifetime), now))) {
629
0
    ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit lifetime limit",
630
0
        this->number);
631
0
    goto do_delete;
632
0
  }
633
634
0
  if (fr_time_delta_ispos(pool->idle_timeout) &&
635
0
      (fr_time_lt(fr_time_add(this->last_released, pool->idle_timeout), now))) {
636
0
    ROPTIONAL(RINFO, INFO, "Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %pVs",
637
0
            this->number, fr_box_time_delta(fr_time_sub(now, this->last_released)));
638
0
    goto do_delete;
639
0
  }
640
641
0
  return 1;
642
0
}
643
644
645
/** Check whether any connections need to be removed from the pool
646
 *
647
 * Maintains the number of connections in the pool as per the configuration
648
 * parameters for the connection pool.
649
 *
650
 * @note Will only run checks the first time it's called in a given second,
651
 * to throttle connection spawning/closing.
652
 * @note Will only close connections not in use.
653
 * @note Must be called with the mutex held, will release mutex before returning.
654
 *
655
 * @param[in] pool  to manage.
656
 * @param[in] request The current request.
657
 * @return 1
658
 */
659
static int connection_check(fr_pool_t *pool, request_t *request)
660
0
{
661
0
  uint32_t    num, spare;
662
0
  fr_time_t   now = fr_time();
663
0
  fr_pool_connection_t  *this, *next;
664
665
0
  if (fr_time_delta_lt(fr_time_sub(now, pool->state.last_checked), fr_time_delta_from_sec(1))) {
666
0
    pthread_mutex_unlock(&pool->mutex);
667
0
    return 1;
668
0
  }
669
670
  /*
671
   *  Get "real" number of connections, and count pending
672
   *  connections as spare.
673
   */
674
0
  num = pool->state.num + pool->state.pending;
675
0
  spare = pool->state.pending + (pool->state.num - pool->state.active);
676
677
  /*
678
   *  The other end can close connections.  If so, we'll
679
   *  have fewer than "min".  When that happens, open more
680
   *  connections to enforce "min".
681
   *
682
   *  The code for spawning connections enforces that
683
   *  num + pending <= max.
684
   */
685
0
  if (num < pool->min) {
686
0
    ROPTIONAL(RINFO, INFO, "Need %i more connections to reach min connections (%i)", pool->min - num, pool->min);
687
0
    goto add_connection;
688
0
  }
689
690
  /*
691
   *  On the odd chance that we've opened too many
692
   *  connections, take care of that.
693
   */
694
0
  if (num > pool->max) {
695
    /*
696
     *  Pending connections don't get closed as "spare".
697
     */
698
0
    if (pool->state.pending > 0) goto manage_connections;
699
700
    /*
701
     *  Otherwise close one of the connections to
702
     *  bring us down to "max".
703
     */
704
0
    goto close_connection;
705
0
  }
706
707
  /*
708
   *  Now that we've enforced min/max connections, try to
709
   *  keep the "spare" connections at the correct number.
710
   */
711
712
  /*
713
   *  Nothing to do?  Go check all of the connections for
714
   *  timeouts, etc.
715
   */
716
0
  if (spare == pool->spare) goto manage_connections;
717
718
  /*
719
   *  Too many spare connections, delete some.
720
   */
721
0
  if (spare > pool->spare) {
722
0
    fr_pool_connection_t *found;
723
724
    /*
725
     *  Pending connections don't get closed as "spare".
726
     */
727
0
    if (pool->state.pending > 0) goto manage_connections;
728
729
    /*
730
     *  Don't close too many connections, even they
731
     *  are spare.
732
     */
733
0
    if (num <= pool->min) goto manage_connections;
734
735
    /*
736
     *  Too many spares, go close one.
737
     */
738
739
0
  close_connection:
740
    /*
741
     *  Don't close connections too often, in order to
742
     *  prevent flapping. Coverity doesn't notice that
743
     *  all callers have the lock, so we annotate the issue.
744
     */
745
    /* coverity[missing_lock] */
746
0
    if (fr_time_lt(now, fr_time_add(pool->state.last_spawned, pool->delay_interval))) goto manage_connections;
747
748
    /*
749
     *  Find a connection to close.
750
     */
751
0
    found = NULL;
752
0
    for (this = pool->tail; this != NULL; this = this->prev) {
753
0
      if (this->in_use) continue;
754
755
0
      if (!found || (fr_time_lt(this->last_reserved, found->last_reserved))) found = this;
756
0
    }
757
758
0
    if (!fr_cond_assert(found)) goto done;
759
760
0
    ROPTIONAL(RDEBUG2, DEBUG2, "Closing connection (%" PRIu64 ") as we have too many unused connections",
761
0
        found->number);
762
0
    connection_close_internal(pool, found);
763
764
    /*
765
     *  Decrease the delay for the next time we clean
766
     *  up.
767
     */
768
0
    pool->state.next_delay = fr_time_delta_wrap(fr_time_delta_unwrap(pool->state.next_delay) >> 1);
769
0
    if (!fr_time_delta_ispos(pool->state.next_delay)) pool->state.next_delay = fr_time_delta_wrap(1);
770
0
    pool->delay_interval = fr_time_delta_add(pool->delay_interval, pool->state.next_delay);
771
772
0
    goto manage_connections;
773
0
  }
774
775
  /*
776
   *  Too few connections, open some more.
777
   */
778
0
  if (spare < pool->spare) {
779
    /*
780
     *  Don't open too many pending connections.
781
     *  Again, coverity doesn't realize all callers have the lock,
782
     *  so we must annotate here as well.
783
     */
784
    /* coverity[missing_lock] */
785
0
    if (pool->state.pending >= pool->pending_window) goto manage_connections;
786
787
    /*
788
     *  Don't open too many connections, even if we
789
     *  need more spares.
790
     */
791
0
    if (num >= pool->max) goto manage_connections;
792
793
    /*
794
     *  Too few spares, go add one.
795
     */
796
0
    ROPTIONAL(RINFO, INFO, "Need %i more connections to reach %i spares", pool->spare - spare, pool->spare);
797
798
0
  add_connection:
799
    /*
800
     *  Only try to open spares if we're not already attempting to open
801
     *  a connection. Avoids spurious log messages.
802
     */
803
0
    pthread_mutex_unlock(&pool->mutex);
804
0
    (void) connection_spawn(pool, request, now, false, true);
805
0
    pthread_mutex_lock(&pool->mutex);
806
0
    goto manage_connections;
807
0
  }
808
809
  /*
810
   *  Pass over all of the connections in the pool, limiting
811
   *  lifetime, idle time, max requests, etc.
812
   */
813
0
manage_connections:
814
0
  for (this = pool->head; this != NULL; this = next) {
815
0
    next = this->next;
816
0
    connection_manage(pool, request, this, now);
817
0
  }
818
819
0
  pool->state.last_checked = now;
820
821
0
done:
822
0
  pthread_mutex_unlock(&pool->mutex);
823
824
0
  return 1;
825
0
}
826
827
/** Get a connection from the connection pool
828
 *
829
 * @note Must be called with the mutex free.
830
 *
831
 * @param[in] pool  to reserve the connection from.
832
 * @param[in] request The current request.
833
 * @param[in] spawn whether to spawn a new connection
834
 * @return
835
 *  - A pointer to the connection handle.
836
 *  - NULL on error.
837
 */
838
static void *connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn)
839
0
{
840
0
  fr_time_t now;
841
0
  fr_pool_connection_t *this;
842
843
0
  if (!pool) return NULL;
844
845
0
  pthread_mutex_lock(&pool->mutex);
846
847
0
  now = fr_time();
848
849
  /*
850
   *  Grab the link with the lowest latency, and check it
851
   *  for limits.  If "connection manage" says the link is
852
   *  no longer usable, go grab another one.
853
   */
854
0
  do {
855
0
    this = fr_heap_peek(pool->heap);
856
0
    if (!this) break;
857
0
  } while (!connection_manage(pool, request, this, now));
858
859
  /*
860
   *  We have a working connection.  Extract it from the
861
   *  heap and use it.
862
   */
863
0
  if (this) {
864
0
    fr_heap_extract(&pool->heap, this);
865
0
    goto do_return;
866
0
  }
867
868
0
  if (pool->state.num == pool->max) {
869
0
    bool complain = false;
870
871
    /*
872
     *  Rate-limit complaints.
873
     */
874
0
    if (fr_time_delta_gt(fr_time_sub(now, pool->state.last_at_max), fr_time_delta_from_sec(1))) {
875
0
      complain = true;
876
0
      pool->state.last_at_max = now;
877
0
    }
878
879
0
    if (!fr_rate_limit_enabled() || complain) {
880
0
      ERROR("No connections available and at max connection limit");
881
882
      /*
883
       *  Must be done inside the mutex, reconnect callback
884
       *  may modify args.
885
       */
886
0
      fr_pool_trigger(pool, "none");
887
0
    }
888
0
    pthread_mutex_unlock(&pool->mutex);
889
890
0
    return NULL;
891
0
  }
892
893
0
  pthread_mutex_unlock(&pool->mutex);
894
895
0
  if (!spawn) return NULL;
896
897
0
  ROPTIONAL(RDEBUG2, DEBUG2, "%i of %u connections in use.  You may need to increase \"spare\"",
898
0
         pool->state.active, pool->state.num);
899
900
  /*
901
   *  Returns unlocked on failure, or locked on success
902
   */
903
0
  this = connection_spawn(pool, request, now, true, false);
904
0
  if (!this) return NULL;
905
906
0
do_return:
907
0
  pool->state.active++;
908
0
  this->num_uses++;
909
0
  this->last_reserved = fr_time();
910
0
  this->in_use = true;
911
912
#ifdef PTHREAD_DEBUG
913
  this->pthread_id = pthread_self();
914
#endif
915
0
  pthread_mutex_unlock(&pool->mutex);
916
917
0
  ROPTIONAL(RDEBUG2, DEBUG2, "Reserved connection (%" PRIu64 ")", this->number);
918
919
0
  return this->connection;
920
0
}
921
922
/** Enable triggers for a connection pool
923
 *
924
 * @param[in] pool    to enable triggers for.
925
 * @param[in] trigger_prefix  prefix to prepend to all trigger names.  Usually a path
926
 *        to the module's trigger configuration e.g.
927
 *            @verbatim modules.<name>.pool @endverbatim
928
 *        @verbatim <trigger name> @endverbatim is appended to form
929
 *        the complete path.
930
 * @param[in] trigger_args  to make available in any triggers executed by the connection pool.
931
 *        These will usually be fr_pair_t (s) describing the host
932
 *        associated with the pool.
933
 *        Trigger args will be copied, input trigger_args should be freed
934
 *        if necessary.
935
 */
936
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
937
{
938
  pool->triggers_enabled = true;
939
940
  talloc_const_free(pool->trigger_prefix);
941
  MEM(pool->trigger_prefix = trigger_prefix ? talloc_strdup(pool, trigger_prefix) : "");
942
943
  fr_pair_list_free(&pool->trigger_args);
944
945
  if (!trigger_args) return;
946
947
  MEM(fr_pair_list_copy(pool, &pool->trigger_args, trigger_args) >= 0);
948
}
949
950
/** Create a new connection pool
951
 *
952
 * Allocates structures used by the connection pool, initialises the various
953
 * configuration options and counters, and sets the callback functions.
954
 *
955
 * Will also spawn the number of connections specified by the 'start' configuration
956
 * option.
957
 *
958
 * @note Will call the 'start' trigger.
959
 *
960
 * @param[in] ctx   Context to link pool's destruction to.
961
 * @param[in] cs    pool section.
962
 * @param[in] opaque data pointer to pass to callbacks.
963
 * @param[in] c     Callback to create new connections.
964
 * @param[in] a     Callback to check the status of connections.
965
 * @param[in] log_prefix  prefix to prepend to all log messages.
966
 * @return
967
 *  - New connection pool.
968
 *  - NULL on error.
969
 */
970
fr_pool_t *fr_pool_init(TALLOC_CTX *ctx,
971
      CONF_SECTION const *cs,
972
      void *opaque,
973
      fr_pool_connection_create_t c, fr_pool_connection_alive_t a,
974
      char const *log_prefix)
975
0
{
976
0
  fr_pool_t   *pool = NULL;
977
978
0
  if (!cs || !opaque || !c) return NULL;
979
980
  /*
981
   *  Pool is allocated in the NULL context as
982
   *  threads are likely to allocate memory
983
   *  beneath the pool.
984
   */
985
0
  MEM(pool = talloc_zero(NULL, fr_pool_t));
986
0
  fr_pair_list_init(&pool->trigger_args);
987
988
  /*
989
   *  Ensure the pool is freed at the same time
990
   *  as its parent.
991
   */
992
0
  if (ctx && (talloc_link_ctx(ctx, pool) < 0)) {
993
0
    PERROR("%s: Failed linking pool ctx", __FUNCTION__);
994
0
    talloc_free(pool);
995
996
0
    return NULL;
997
0
  }
998
999
0
  pool->cs = cs;
1000
0
  pool->opaque = opaque;
1001
0
  pool->create = c;
1002
0
  pool->alive = a;
1003
1004
0
  pool->head = pool->tail = NULL;
1005
1006
  /*
1007
   *  We keep a heap of connections, sorted by the last time
1008
   *  we STARTED using them.  Newly opened connections
1009
   *  aren't in the heap.  They're only inserted in the list
1010
   *  once they're released.
1011
   *
1012
   *  We do "most recently started" instead of "most
1013
   *  recently used", because MRU is done as most recently
1014
   *  *released*.  We want to order connections by
1015
   *  responsiveness, and MRU prioritizes high latency
1016
   *  connections.
1017
   *
1018
   *  We want most recently *started*, which gives
1019
   *  preference to low latency links, and pushes high
1020
   *  latency links down in the priority heap.
1021
   *
1022
   *  https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
1023
   */
1024
0
  if (!pool->spread) {
1025
0
    pool->heap = fr_heap_talloc_alloc(pool, last_reserved_cmp, fr_pool_connection_t, heap_id, 0);
1026
  /*
1027
   *  For some types of connections we need to used a different
1028
   *  algorithm, because load balancing benefits are secondary
1029
   *  to maintaining a cache of open connections.
1030
   *
1031
   *  With libcurl's multihandle, connections can only be reused
1032
   *  if all handles that make up the multihandle are done processing
1033
   *  their requests.
1034
   *
1035
   *  We can't tell when that's happened using libcurl, and even
1036
   *  if we could, blocking until all servers had responded
1037
   *  would have huge cost.
1038
   *
1039
   *  The solution is to order the heap so that the connection that
1040
   *  was released longest ago is at the top.
1041
   *
1042
   *  That way we maximise time between connection use.
1043
   */
1044
0
  } else {
1045
0
    pool->heap = fr_heap_talloc_alloc(pool, last_released_cmp, fr_pool_connection_t, heap_id, 0);
1046
0
  }
1047
0
  if (!pool->heap) {
1048
0
    ERROR("%s: Failed creating connection heap", __FUNCTION__);
1049
0
  error:
1050
0
    fr_pool_free(pool);
1051
0
    return NULL;
1052
0
  }
1053
1054
0
  pool->log_prefix = log_prefix ? talloc_strdup(pool, log_prefix) : "core";
1055
0
  pthread_mutex_init(&pool->mutex, NULL);
1056
0
  pthread_cond_init(&pool->done_spawn, NULL);
1057
0
  pthread_cond_init(&pool->done_reconnecting, NULL);
1058
1059
0
  DEBUG2("Initialising connection pool");
1060
1061
0
  if (cf_section_rules_push(UNCONST(CONF_SECTION *, cs), pool_config) < 0) goto error;
1062
0
  if (cf_section_parse(pool, pool, UNCONST(CONF_SECTION *, cs)) < 0) {
1063
0
    PERROR("Configuration parsing failed");
1064
0
    goto error;
1065
0
  }
1066
1067
  /*
1068
   *  Some simple limits
1069
   */
1070
0
  if (pool->max == 0) {
1071
0
    cf_log_err(cs, "Cannot set 'max' to zero");
1072
0
    goto error;
1073
0
  }
1074
1075
  /*
1076
   *  Coverity notices that other uses of max_pending are protected with a mutex,
1077
   *  and thus thinks it should be locked/unlocked here...but coverity does not
1078
   *  consider that until this function returns a pointer to a pool, nobody can
1079
   *  use the pool, so there's no point to doing so.
1080
   */
1081
  /* coverity[missing_lock] */
1082
0
  pool->pending_window = (pool->max_pending > 0) ? pool->max_pending : pool->max;
1083
1084
0
  if (pool->min > pool->max) {
1085
0
    cf_log_err(cs, "Cannot set 'min' to more than 'max'");
1086
0
    goto error;
1087
0
  }
1088
1089
0
  FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1090
0
  FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1091
0
  FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1092
1093
0
  if (fr_time_delta_ispos(pool->lifetime)) {
1094
0
    FR_TIME_DELTA_COND_CHECK("idle_timeout", pool->idle_timeout,
1095
0
           fr_time_delta_lteq(pool->idle_timeout, pool->lifetime), fr_time_delta_wrap(0));
1096
0
  }
1097
1098
0
  if (fr_time_delta_ispos(pool->idle_timeout)) {
1099
0
    FR_TIME_DELTA_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1100
0
  }
1101
1102
  /*
1103
   *  Some libraries treat 0.0 as infinite timeout, others treat it
1104
   *  as instantaneous timeout.  Solve the inconsistency by making
1105
   *  the smallest allowable timeout 100ms.
1106
   */
1107
0
  FR_TIME_DELTA_BOUND_CHECK("connect_timeout", pool->connect_timeout, >=, fr_time_delta_from_msec(100));
1108
1109
  /*
1110
   *  Don't open any connections.  Instead, force the limits
1111
   *  to only 1 connection.
1112
   *
1113
   */
1114
0
  if (check_config) pool->start = pool->min = pool->max = 1;
1115
1116
0
  return pool;
1117
0
}
1118
1119
int fr_pool_start(fr_pool_t *pool)
1120
0
{
1121
0
  uint32_t    i;
1122
0
  fr_pool_connection_t  *this;
1123
1124
  /*
1125
   *  Don't spawn any connections
1126
   */
1127
0
  if (check_config) return 0;
1128
1129
  /*
1130
   *  Create all of the connections, unless the admin says
1131
   *  not to.
1132
   */
1133
0
  for (i = 0; i < pool->start; i++) {
1134
    /*
1135
     *  Call time() once for each spawn attempt as there
1136
     *  could be a significant delay.
1137
     */
1138
0
    this = connection_spawn(pool, NULL, fr_time(), false, true);
1139
0
    if (!this) {
1140
0
      ERROR("Failed spawning initial connections");
1141
0
      return -1;
1142
0
    }
1143
0
  }
1144
1145
0
  fr_pool_trigger(pool, "start");
1146
1147
0
  return 0;
1148
0
}
1149
1150
/** Allocate a new pool using an existing one as a template
1151
 *
1152
 * @param[in] ctx to allocate new pool in.
1153
 * @param[in] pool  to copy.
1154
 * @param[in] opaque  data to pass to connection function.
1155
 * @return
1156
 *  - New connection pool.
1157
 *  - NULL on error.
1158
 */
1159
fr_pool_t *fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque)
1160
0
{
1161
0
  fr_pool_t *copy;
1162
1163
0
  copy = fr_pool_init(ctx, pool->cs, opaque, pool->create, pool->alive, pool->log_prefix);
1164
0
  if (!copy) return NULL;
1165
1166
0
  if (pool->trigger_prefix) fr_pool_enable_triggers(copy, pool->trigger_prefix, &pool->trigger_args);
1167
1168
0
  return copy;
1169
0
}
1170
1171
/** Get the number of connections currently in the pool
1172
 *
1173
 * @param[in] pool to count connections for.
1174
 * @return the number of connections in the pool
1175
 */
1176
fr_pool_state_t const *fr_pool_state(fr_pool_t *pool)
1177
0
{
1178
0
  return &pool->state;
1179
0
}
1180
1181
/** Connection pool get timeout
1182
 *
1183
 * @param[in] pool to get connection timeout for.
1184
 * @return the connection timeout configured for the pool.
1185
 */
1186
fr_time_delta_t fr_pool_timeout(fr_pool_t *pool)
1187
0
{
1188
0
  return pool->connect_timeout;
1189
0
}
1190
1191
/** Connection pool get start
1192
 *
1193
 * @param[in] pool to get connection start for.
1194
 * @return the connection start value configured for the pool.
1195
 */
1196
int fr_pool_start_num(fr_pool_t *pool)
1197
0
{
1198
0
  return pool->start;
1199
0
}
1200
1201
/** Return the opaque data associated with a connection pool
1202
 *
1203
 * @param pool to return data for.
1204
 * @return opaque data associated with pool.
1205
 */
1206
void const *fr_pool_opaque(fr_pool_t *pool)
1207
0
{
1208
0
  return pool->opaque;
1209
0
}
1210
1211
/** Increment pool reference by one.
1212
 *
1213
 * @param[in] pool to increment reference counter for.
1214
 */
1215
void fr_pool_ref(fr_pool_t *pool)
1216
0
{
1217
0
  pool->ref++;
1218
0
}
1219
1220
/** Set a reconnection callback for the connection pool
1221
 *
1222
 * This can be called at any time during the pool's lifecycle.
1223
 *
1224
 * @param[in] pool to set reconnect callback for.
1225
 * @param reconnect callback to call when reconnecting pool's connections.
1226
 */
1227
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
1228
0
{
1229
0
  pool->reconnect = reconnect;
1230
0
}
1231
1232
/** Mark connections for reconnection, and spawn at least 'start' connections
1233
 *
1234
 * @note This call may block whilst waiting for pending connection attempts to complete.
1235
 *
1236
 * This intended to be called on a connection pool that's in use, to have it reflect
1237
 * a configuration change, or because the administrator knows that all connections
1238
 * in the pool are inviable and need to be reconnected.
1239
 *
1240
 * @param[in] pool  to reconnect.
1241
 * @param[in] request The current request.
1242
 * @return
1243
 *  -  0 On success.
1244
 *  - -1 If we couldn't create start connections, this may be ignored
1245
 *       depending on the context in which this function is being called.
1246
 */
1247
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
1248
0
{
1249
0
  uint32_t    i;
1250
0
  fr_pool_connection_t  *this;
1251
0
  fr_time_t   now;
1252
1253
0
  pthread_mutex_lock(&pool->mutex);
1254
1255
  /*
1256
   *  Pause new spawn attempts (we release the mutex
1257
   *  during our cond wait).
1258
   */
1259
0
  pool->state.reconnecting = true;
1260
1261
  /*
1262
   *  When the loop exits, we'll hold the lock for the pool,
1263
   *  and we're guaranteed the connection create callback
1264
   *  will not be using the opaque data.
1265
   */
1266
0
  while (pool->state.pending) pthread_cond_wait(&pool->done_spawn, &pool->mutex);
1267
1268
  /*
1269
   *  We want to ensure at least 'start' connections
1270
   *  have been reconnected. We can't call reconnect
1271
   *  because, we might get the same connection each
1272
   *  time we reserve one, so we close 'start'
1273
   *  connections, and then attempt to spawn them again.
1274
   */
1275
0
  for (i = 0; i < pool->start; i++) {
1276
0
    this = fr_heap_peek(pool->heap);
1277
0
    if (!this) break; /* There wasn't 'start' connections available */
1278
1279
0
    connection_close_internal(pool, this);
1280
0
  }
1281
1282
  /*
1283
   *  Mark all remaining connections in the pool as
1284
   *  requiring reconnection.
1285
   */
1286
0
  for (this = pool->head; this; this = this->next) this->needs_reconnecting = true;
1287
1288
  /*
1289
   *  Call the reconnect callback (if one's set)
1290
   *  This may modify the opaque data associated
1291
   *  with the pool.
1292
   */
1293
0
  if (pool->reconnect) pool->reconnect(pool, pool->opaque);
1294
1295
  /*
1296
   *  Must be done inside the mutex, reconnect callback
1297
   *  may modify args.
1298
   */
1299
0
  fr_pool_trigger(pool, "reconnect");
1300
1301
  /*
1302
   *  Allow new spawn attempts, and wakeup any threads
1303
   *  waiting to spawn new connections.
1304
   */
1305
0
  pool->state.reconnecting = false;
1306
0
  pthread_cond_broadcast(&pool->done_reconnecting);
1307
0
  pthread_mutex_unlock(&pool->mutex);
1308
1309
0
  now = fr_time();
1310
1311
  /*
1312
   *  Now attempt to spawn 'start' connections.
1313
   */
1314
0
  for (i = 0; i < pool->start; i++) {
1315
0
    this = connection_spawn(pool, request, now, false, true);
1316
0
    if (!this) return -1;
1317
0
  }
1318
1319
0
  return 0;
1320
0
}
1321
1322
/** Delete a connection pool
1323
 *
1324
 * Closes, unlinks and frees all connections in the connection pool, then frees
1325
 * all memory used by the connection pool.
1326
 *
1327
 * @note Will call the 'stop' trigger.
1328
 * @note Must be called with the mutex free.
1329
 *
1330
 * @param[in,out] pool to delete.
1331
 */
1332
void fr_pool_free(fr_pool_t *pool)
1333
0
{
1334
0
  fr_pool_connection_t *this;
1335
1336
0
  if (!pool) return;
1337
1338
  /*
1339
   *  More modules hold a reference to this pool, don't free
1340
   *  it yet.
1341
   */
1342
0
  if (pool->ref > 0) {
1343
0
    pool->ref--;
1344
0
    return;
1345
0
  }
1346
1347
0
  DEBUG2("Removing connection pool");
1348
1349
0
  pthread_mutex_lock(&pool->mutex);
1350
1351
  /*
1352
   *  Don't loop over the list.  Just keep removing the head
1353
   *  until they're all gone.
1354
   */
1355
0
  while ((this = pool->head) != NULL) {
1356
0
    INFO("Closing connection (%" PRIu64 ")", this->number);
1357
1358
0
    connection_close_internal(pool, this);
1359
0
  }
1360
1361
0
  fr_pool_trigger(pool, "stop");
1362
1363
0
  fr_assert(pool->head == NULL);
1364
0
  fr_assert(pool->tail == NULL);
1365
0
  fr_assert(pool->state.num == 0);
1366
1367
0
  pthread_mutex_unlock(&pool->mutex);
1368
0
  pthread_mutex_destroy(&pool->mutex);
1369
0
  pthread_cond_destroy(&pool->done_spawn);
1370
0
  pthread_cond_destroy(&pool->done_reconnecting);
1371
1372
0
  talloc_free(pool);
1373
0
}
1374
1375
/** Reserve a connection in the connection pool
1376
 *
1377
 * Will attempt to find an unused connection in the connection pool, if one is
1378
 * found, will mark it as in use, and increment the number of active connections
1379
 * and return the connection handle.
1380
 *
1381
 * If no free connections are found will attempt to spawn a new one, conditional
1382
 * on a connection spawning not already being in progress, and not being at the
1383
 * 'max' connection limit.
1384
 *
1385
 * @note fr_pool_connection_release must be called once the caller has finished
1386
 * using the connection.
1387
 *
1388
 * @see fr_pool_connection_release
1389
 * @param[in] pool  to reserve the connection from.
1390
 * @param[in] request The current request.
1391
 * @return
1392
 *  - A pointer to the connection handle.
1393
 *  - NULL on error.
1394
 */
1395
void *fr_pool_connection_get(fr_pool_t *pool, request_t *request)
1396
0
{
1397
0
  return connection_get_internal(pool, request, true);
1398
0
}
1399
1400
/** Release a connection
1401
 *
1402
 * Will mark a connection as unused and decrement the number of active
1403
 * connections.
1404
 *
1405
 * @see fr_pool_connection_get
1406
 * @param[in] pool  to release the connection in.
1407
 * @param[in] request The current request.
1408
 * @param[in] conn  to release.
1409
 */
1410
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
1411
0
{
1412
0
  fr_pool_connection_t  *this;
1413
0
  fr_time_delta_t   held;
1414
0
  bool      trigger_min = false, trigger_max = false;
1415
1416
0
  this = connection_find(pool, conn);
1417
0
  if (!this) return;
1418
1419
0
  this->in_use = false;
1420
1421
  /*
1422
   *  Record when the connection was last released
1423
   */
1424
0
  this->last_released = fr_time();
1425
0
  pool->state.last_released = this->last_released;
1426
1427
  /*
1428
   *  This is done inside the mutex to ensure
1429
   *  updates are atomic.
1430
   */
1431
0
  held = fr_time_sub(this->last_released, this->last_reserved);
1432
1433
  /*
1434
   *  Check we've not exceeded out trigger limits
1435
   *
1436
   *      These should only fire once per second.
1437
   */
1438
0
  if (fr_time_delta_ispos(pool->held_trigger_min) &&
1439
0
      (fr_time_delta_lt(held, pool->held_trigger_min)) &&
1440
0
      (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_min), fr_time_delta_from_sec(1)))) {
1441
0
        trigger_min = true;
1442
0
        pool->state.last_held_min = this->last_released;
1443
0
  }
1444
1445
0
  if (fr_time_delta_ispos(pool->held_trigger_max) &&
1446
0
      (fr_time_delta_gt(held, pool->held_trigger_max)) &&
1447
0
      (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_max), fr_time_delta_from_sec(1)))) {
1448
0
        trigger_max = true;
1449
0
        pool->state.last_held_max = this->last_released;
1450
0
  }
1451
1452
  /*
1453
   *  Insert the connection in the heap.
1454
   *
1455
   *  This will either be based on when we *started* using it
1456
   *  (allowing fast links to be re-used, and slow links to be
1457
   *  gradually expired), or when we released it (allowing
1458
   *  the maximum amount of time between connection use).
1459
   */
1460
0
  fr_heap_insert(&pool->heap, this);
1461
1462
0
  fr_assert(pool->state.active != 0);
1463
0
  pool->state.active--;
1464
1465
0
  ROPTIONAL(RDEBUG2, DEBUG2, "Released connection (%" PRIu64 ")", this->number);
1466
1467
  /*
1468
   *  We mirror the "spawn on get" functionality by having
1469
   *  "delete on release".  If there are too many spare
1470
   *  connections, go manage the pool && clean some up.
1471
   */
1472
0
  connection_check(pool, request);
1473
1474
0
  if (trigger_min) fr_pool_trigger(pool, "min");
1475
0
  if (trigger_max) fr_pool_trigger(pool, "max");
1476
0
}
1477
1478
/** Reconnect a suspected inviable connection
1479
 *
1480
 * This should be called by the module if it suspects that a connection is
1481
 * not viable (e.g. the server has closed it).
1482
 *
1483
 * When implementing a module that uses the connection pool API, it is advisable
1484
 * to pass a pointer to the pointer to the handle (void **conn)
1485
 * to all functions which may call reconnect. This is so that if a new handle
1486
 * is created and returned, the handle pointer can be updated up the callstack,
1487
 * and a function higher up the stack doesn't attempt to use a now invalid
1488
 * connection handle.
1489
 *
1490
 * @note Will free any talloced memory hung off the context of the connection,
1491
 *  being reconnected.
1492
 *
1493
 * @warning After calling reconnect the caller *MUST NOT* attempt to use
1494
 *  the old handle in any other operations, as its memory will have been
1495
 *  freed.
1496
 *
1497
 * @see fr_pool_connection_get
1498
 * @param[in] pool  to reconnect the connection in.
1499
 * @param[in] request The current request.
1500
 * @param[in] conn  to reconnect.
1501
 * @return new connection handle if successful else NULL.
1502
 */
1503
void *fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn)
1504
0
{
1505
0
  fr_pool_connection_t  *this;
1506
1507
0
  if (!pool || !conn) return NULL;
1508
1509
  /*
1510
   *  If connection_find is successful the pool is now locked
1511
   */
1512
0
  this = connection_find(pool, conn);
1513
0
  if (!this) return NULL;
1514
1515
0
  ROPTIONAL(RINFO, INFO, "Deleting inviable connection (%" PRIu64 ")", this->number);
1516
1517
0
  connection_close_internal(pool, this);
1518
0
  connection_check(pool, request);      /* Whilst we still have the lock (will release the lock) */
1519
1520
  /*
1521
   *  Return an existing connection or spawn a new one.
1522
   */
1523
0
  return connection_get_internal(pool, request, true);
1524
0
}
1525
1526
/** Delete a connection from the connection pool.
1527
 *
1528
 * Resolves the connection handle to a connection, then (if found)
1529
 * closes, unlinks and frees that connection.
1530
 *
1531
 * @note Must be called with the mutex free.
1532
 *
1533
 * @param[in] pool  Connection pool to modify.
1534
 * @param[in] request The current request.
1535
 * @param[in] conn  to delete.
1536
 * @return
1537
 *  - 0 If the connection could not be found.
1538
 *  - 1 if the connection was deleted.
1539
 */
1540
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
1541
0
{
1542
0
  fr_pool_connection_t *this;
1543
1544
0
  this = connection_find(pool, conn);
1545
0
  if (!this) return 0;
1546
1547
  /*
1548
   *  Record the last time a connection was closed
1549
   */
1550
0
  pool->state.last_closed = fr_time();
1551
1552
0
  ROPTIONAL(RINFO, INFO, "Deleting connection (%" PRIu64 ")", this->number);
1553
1554
0
  connection_close_internal(pool, this);
1555
0
  connection_check(pool, request);
1556
0
  return 1;
1557
0
}