Coverage Report

Created: 2025-07-12 06:16

/src/lldpd/libevent/bufferevent_ratelim.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3
 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions
8
 * are met:
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in the
13
 *    documentation and/or other materials provided with the distribution.
14
 * 3. The name of the author may not be used to endorse or promote products
15
 *    derived from this software without specific prior written permission.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
 */
28
#include "evconfig-private.h"
29
30
#include <sys/types.h>
31
#include <limits.h>
32
#include <string.h>
33
#include <stdlib.h>
34
35
#include "event2/event.h"
36
#include "event2/event_struct.h"
37
#include "event2/util.h"
38
#include "event2/bufferevent.h"
39
#include "event2/bufferevent_struct.h"
40
#include "event2/buffer.h"
41
42
#include "ratelim-internal.h"
43
44
#include "bufferevent-internal.h"
45
#include "mm-internal.h"
46
#include "util-internal.h"
47
#include "event-internal.h"
48
49
int
50
ev_token_bucket_init_(struct ev_token_bucket *bucket,
51
    const struct ev_token_bucket_cfg *cfg,
52
    ev_uint32_t current_tick,
53
    int reinitialize)
54
0
{
55
0
  if (reinitialize) {
56
    /* on reinitialization, we only clip downwards, since we've
57
       already used who-knows-how-much bandwidth this tick.  We
58
       leave "last_updated" as it is; the next update will add the
59
       appropriate amount of bandwidth to the bucket.
60
    */
61
0
    if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62
0
      bucket->read_limit = cfg->read_maximum;
63
0
    if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64
0
      bucket->write_limit = cfg->write_maximum;
65
0
  } else {
66
0
    bucket->read_limit = cfg->read_rate;
67
0
    bucket->write_limit = cfg->write_rate;
68
0
    bucket->last_updated = current_tick;
69
0
  }
70
0
  return 0;
71
0
}
72
73
int
74
ev_token_bucket_update_(struct ev_token_bucket *bucket,
75
    const struct ev_token_bucket_cfg *cfg,
76
    ev_uint32_t current_tick)
77
0
{
78
  /* It's okay if the tick number overflows, since we'll just
79
   * wrap around when we do the unsigned substraction. */
80
0
  unsigned n_ticks = current_tick - bucket->last_updated;
81
82
  /* Make sure some ticks actually happened, and that time didn't
83
   * roll back. */
84
0
  if (n_ticks == 0 || n_ticks > INT_MAX)
85
0
    return 0;
86
87
  /* Naively, we would say
88
    bucket->limit += n_ticks * cfg->rate;
89
90
    if (bucket->limit > cfg->maximum)
91
      bucket->limit = cfg->maximum;
92
93
     But we're worried about overflow, so we do it like this:
94
  */
95
96
0
  if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97
0
    bucket->read_limit = cfg->read_maximum;
98
0
  else
99
0
    bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102
0
  if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103
0
    bucket->write_limit = cfg->write_maximum;
104
0
  else
105
0
    bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108
0
  bucket->last_updated = current_tick;
109
110
0
  return 1;
111
0
}
112
113
static inline void
114
bufferevent_update_buckets(struct bufferevent_private *bev)
115
0
{
116
  /* Must hold lock on bev. */
117
0
  struct timeval now;
118
0
  unsigned tick;
119
0
  event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120
0
  tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121
0
  if (tick != bev->rate_limiting->limit.last_updated)
122
0
    ev_token_bucket_update_(&bev->rate_limiting->limit,
123
0
        bev->rate_limiting->cfg, tick);
124
0
}
125
126
ev_uint32_t
127
ev_token_bucket_get_tick_(const struct timeval *tv,
128
    const struct ev_token_bucket_cfg *cfg)
129
0
{
130
  /* This computation uses two multiplies and a divide.  We could do
131
   * fewer if we knew that the tick length was an integer number of
132
   * seconds, or if we knew it divided evenly into a second.  We should
133
   * investigate that more.
134
   */
135
136
  /* We cast to an ev_uint64_t first, since we don't want to overflow
137
   * before we do the final divide. */
138
0
  ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139
0
  return (unsigned)(msec / cfg->msec_per_tick);
140
0
}
141
142
struct ev_token_bucket_cfg *
143
ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144
    size_t write_rate, size_t write_burst,
145
    const struct timeval *tick_len)
146
0
{
147
0
  struct ev_token_bucket_cfg *r;
148
0
  struct timeval g;
149
0
  if (! tick_len) {
150
0
    g.tv_sec = 1;
151
0
    g.tv_usec = 0;
152
0
    tick_len = &g;
153
0
  }
154
0
  if (read_rate > read_burst || write_rate > write_burst ||
155
0
      read_rate < 1 || write_rate < 1)
156
0
    return NULL;
157
0
  if (read_rate > EV_RATE_LIMIT_MAX ||
158
0
      write_rate > EV_RATE_LIMIT_MAX ||
159
0
      read_burst > EV_RATE_LIMIT_MAX ||
160
0
      write_burst > EV_RATE_LIMIT_MAX)
161
0
    return NULL;
162
0
  r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163
0
  if (!r)
164
0
    return NULL;
165
0
  r->read_rate = read_rate;
166
0
  r->write_rate = write_rate;
167
0
  r->read_maximum = read_burst;
168
0
  r->write_maximum = write_burst;
169
0
  memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170
0
  r->msec_per_tick = (tick_len->tv_sec * 1000) +
171
0
      (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172
0
  return r;
173
0
}
174
175
void
176
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177
0
{
178
0
  mm_free(cfg);
179
0
}
180
181
/* Default values for max_single_read & max_single_write variables. */
182
0
#define MAX_SINGLE_READ_DEFAULT 16384
183
0
#define MAX_SINGLE_WRITE_DEFAULT 16384
184
185
0
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186
0
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188
static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189
static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190
static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191
static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193
/** Helper: figure out the maximum amount we should write if is_write, or
194
    the maximum amount we should read if is_read.  Return that maximum, or
195
    0 if our bucket is wholly exhausted.
196
 */
197
static inline ev_ssize_t
198
bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199
0
{
200
  /* needs lock on bev. */
201
0
  ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203
0
#define LIM(x)            \
204
0
  (is_write ? (x).write_limit : (x).read_limit)
205
206
0
#define GROUP_SUSPENDED(g)      \
207
0
  (is_write ? (g)->write_suspended : (g)->read_suspended)
208
209
  /* Sets max_so_far to MIN(x, max_so_far) */
210
0
#define CLAMPTO(x)        \
211
0
  do {         \
212
0
    if (max_so_far > (x))   \
213
0
      max_so_far = (x); \
214
0
  } while (0);
215
216
0
  if (!bev->rate_limiting)
217
0
    return max_so_far;
218
219
  /* If rate-limiting is enabled at all, update the appropriate
220
     bucket, and take the smaller of our rate limit and the group
221
     rate limit.
222
   */
223
224
0
  if (bev->rate_limiting->cfg) {
225
0
    bufferevent_update_buckets(bev);
226
0
    max_so_far = LIM(bev->rate_limiting->limit);
227
0
  }
228
0
  if (bev->rate_limiting->group) {
229
0
    struct bufferevent_rate_limit_group *g =
230
0
        bev->rate_limiting->group;
231
0
    ev_ssize_t share;
232
0
    LOCK_GROUP(g);
233
0
    if (GROUP_SUSPENDED(g)) {
234
      /* We can get here if we failed to lock this
235
       * particular bufferevent while suspending the whole
236
       * group. */
237
0
      if (is_write)
238
0
        bufferevent_suspend_write_(&bev->bev,
239
0
            BEV_SUSPEND_BW_GROUP);
240
0
      else
241
0
        bufferevent_suspend_read_(&bev->bev,
242
0
            BEV_SUSPEND_BW_GROUP);
243
0
      share = 0;
244
0
    } else {
245
      /* XXXX probably we should divide among the active
246
       * members, not the total members. */
247
0
      share = LIM(g->rate_limit) / g->n_members;
248
0
      if (share < g->min_share)
249
0
        share = g->min_share;
250
0
    }
251
0
    UNLOCK_GROUP(g);
252
0
    CLAMPTO(share);
253
0
  }
254
255
0
  if (max_so_far < 0)
256
0
    max_so_far = 0;
257
0
  return max_so_far;
258
0
}
259
260
ev_ssize_t
261
bufferevent_get_read_max_(struct bufferevent_private *bev)
262
0
{
263
0
  return bufferevent_get_rlim_max_(bev, 0);
264
0
}
265
266
ev_ssize_t
267
bufferevent_get_write_max_(struct bufferevent_private *bev)
268
0
{
269
0
  return bufferevent_get_rlim_max_(bev, 1);
270
0
}
271
272
int
273
bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274
0
{
275
  /* XXXXX Make sure all users of this function check its return value */
276
0
  int r = 0;
277
  /* need to hold lock on bev */
278
0
  if (!bev->rate_limiting)
279
0
    return 0;
280
281
0
  if (bev->rate_limiting->cfg) {
282
0
    bev->rate_limiting->limit.read_limit -= bytes;
283
0
    if (bev->rate_limiting->limit.read_limit <= 0) {
284
0
      bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285
0
      if (event_add(&bev->rate_limiting->refill_bucket_event,
286
0
        &bev->rate_limiting->cfg->tick_timeout) < 0)
287
0
        r = -1;
288
0
    } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289
0
      if (!(bev->write_suspended & BEV_SUSPEND_BW))
290
0
        event_del(&bev->rate_limiting->refill_bucket_event);
291
0
      bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292
0
    }
293
0
  }
294
295
0
  if (bev->rate_limiting->group) {
296
0
    LOCK_GROUP(bev->rate_limiting->group);
297
0
    bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298
0
    bev->rate_limiting->group->total_read += bytes;
299
0
    if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300
0
      bev_group_suspend_reading_(bev->rate_limiting->group);
301
0
    } else if (bev->rate_limiting->group->read_suspended) {
302
0
      bev_group_unsuspend_reading_(bev->rate_limiting->group);
303
0
    }
304
0
    UNLOCK_GROUP(bev->rate_limiting->group);
305
0
  }
306
307
0
  return r;
308
0
}
309
310
int
311
bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312
0
{
313
  /* XXXXX Make sure all users of this function check its return value */
314
0
  int r = 0;
315
  /* need to hold lock */
316
0
  if (!bev->rate_limiting)
317
0
    return 0;
318
319
0
  if (bev->rate_limiting->cfg) {
320
0
    bev->rate_limiting->limit.write_limit -= bytes;
321
0
    if (bev->rate_limiting->limit.write_limit <= 0) {
322
0
      bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323
0
      if (event_add(&bev->rate_limiting->refill_bucket_event,
324
0
        &bev->rate_limiting->cfg->tick_timeout) < 0)
325
0
        r = -1;
326
0
    } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327
0
      if (!(bev->read_suspended & BEV_SUSPEND_BW))
328
0
        event_del(&bev->rate_limiting->refill_bucket_event);
329
0
      bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330
0
    }
331
0
  }
332
333
0
  if (bev->rate_limiting->group) {
334
0
    LOCK_GROUP(bev->rate_limiting->group);
335
0
    bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336
0
    bev->rate_limiting->group->total_written += bytes;
337
0
    if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338
0
      bev_group_suspend_writing_(bev->rate_limiting->group);
339
0
    } else if (bev->rate_limiting->group->write_suspended) {
340
0
      bev_group_unsuspend_writing_(bev->rate_limiting->group);
341
0
    }
342
0
    UNLOCK_GROUP(bev->rate_limiting->group);
343
0
  }
344
345
0
  return r;
346
0
}
347
348
/** Stop reading on every bufferevent in <b>g</b> */
349
static int
350
bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351
0
{
352
  /* Needs group lock */
353
0
  struct bufferevent_private *bev;
354
0
  g->read_suspended = 1;
355
0
  g->pending_unsuspend_read = 0;
356
357
  /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358
     to prevent a deadlock.  (Ordinarily, the group lock nests inside
359
     the bufferevent locks.  If we are unable to lock any individual
360
     bufferevent, it will find out later when it looks at its limit
361
     and sees that its group is suspended.)
362
  */
363
0
  LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
365
0
      bufferevent_suspend_read_(&bev->bev,
366
0
          BEV_SUSPEND_BW_GROUP);
367
0
      EVLOCK_UNLOCK(bev->lock, 0);
368
0
    }
369
0
  }
370
0
  return 0;
371
0
}
372
373
/** Stop writing on every bufferevent in <b>g</b> */
374
static int
375
bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376
0
{
377
  /* Needs group lock */
378
0
  struct bufferevent_private *bev;
379
0
  g->write_suspended = 1;
380
0
  g->pending_unsuspend_write = 0;
381
0
  LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
383
0
      bufferevent_suspend_write_(&bev->bev,
384
0
          BEV_SUSPEND_BW_GROUP);
385
0
      EVLOCK_UNLOCK(bev->lock, 0);
386
0
    }
387
0
  }
388
0
  return 0;
389
0
}
390
391
/** Timer callback invoked on a single bufferevent with one or more exhausted
392
    buckets when they are ready to refill. */
393
static void
394
bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395
0
{
396
0
  unsigned tick;
397
0
  struct timeval now;
398
0
  struct bufferevent_private *bev = arg;
399
0
  int again = 0;
400
0
  BEV_LOCK(&bev->bev);
401
0
  if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402
0
    BEV_UNLOCK(&bev->bev);
403
0
    return;
404
0
  }
405
406
  /* First, update the bucket */
407
0
  event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408
0
  tick = ev_token_bucket_get_tick_(&now,
409
0
      bev->rate_limiting->cfg);
410
0
  ev_token_bucket_update_(&bev->rate_limiting->limit,
411
0
      bev->rate_limiting->cfg,
412
0
      tick);
413
414
  /* Now unsuspend any read/write operations as appropriate. */
415
0
  if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416
0
    if (bev->rate_limiting->limit.read_limit > 0)
417
0
      bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418
0
    else
419
0
      again = 1;
420
0
  }
421
0
  if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422
0
    if (bev->rate_limiting->limit.write_limit > 0)
423
0
      bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424
0
    else
425
0
      again = 1;
426
0
  }
427
0
  if (again) {
428
    /* One or more of the buckets may need another refill if they
429
       started negative.
430
431
       XXXX if we need to be quiet for more ticks, we should
432
       maybe figure out what timeout we really want.
433
    */
434
    /* XXXX Handle event_add failure somehow */
435
0
    event_add(&bev->rate_limiting->refill_bucket_event,
436
0
        &bev->rate_limiting->cfg->tick_timeout);
437
0
  }
438
0
  BEV_UNLOCK(&bev->bev);
439
0
}
440
441
/** Helper: grab a random element from a bufferevent group.
442
 *
443
 * Requires that we hold the lock on the group.
444
 */
445
static struct bufferevent_private *
446
bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447
0
{
448
0
  int which;
449
0
  struct bufferevent_private *bev;
450
451
  /* requires group lock */
452
453
0
  if (!group->n_members)
454
0
    return NULL;
455
456
0
  EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458
0
  which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460
0
  bev = LIST_FIRST(&group->members);
461
0
  while (which--)
462
0
    bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464
0
  return bev;
465
0
}
466
467
/** Iterate over the elements of a rate-limiting group 'g' with a random
468
    starting point, assigning each to the variable 'bev', and executing the
469
    block 'block'.
470
471
    We do this in a half-baked effort to get fairness among group members.
472
    XXX Round-robin or some kind of priority queue would be even more fair.
473
 */
474
#define FOREACH_RANDOM_ORDER(block)     \
475
0
  do {           \
476
0
    first = bev_group_random_element_(g); \
477
0
    for (bev = first; bev != LIST_END(&g->members); \
478
0
        bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479
0
      block ;           \
480
0
    }            \
481
0
    for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482
0
        bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483
0
      block ;            \
484
0
    }             \
485
0
  } while (0)
486
487
static void
488
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489
0
{
490
0
  int again = 0;
491
0
  struct bufferevent_private *bev, *first;
492
493
0
  g->read_suspended = 0;
494
0
  FOREACH_RANDOM_ORDER({
495
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
496
0
      bufferevent_unsuspend_read_(&bev->bev,
497
0
          BEV_SUSPEND_BW_GROUP);
498
0
      EVLOCK_UNLOCK(bev->lock, 0);
499
0
    } else {
500
0
      again = 1;
501
0
    }
502
0
  });
503
0
  g->pending_unsuspend_read = again;
504
0
}
505
506
static void
507
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508
0
{
509
0
  int again = 0;
510
0
  struct bufferevent_private *bev, *first;
511
0
  g->write_suspended = 0;
512
513
0
  FOREACH_RANDOM_ORDER({
514
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
515
0
      bufferevent_unsuspend_write_(&bev->bev,
516
0
          BEV_SUSPEND_BW_GROUP);
517
0
      EVLOCK_UNLOCK(bev->lock, 0);
518
0
    } else {
519
0
      again = 1;
520
0
    }
521
0
  });
522
0
  g->pending_unsuspend_write = again;
523
0
}
524
525
/** Callback invoked every tick to add more elements to the group bucket
526
    and unsuspend group members as needed.
527
 */
528
static void
529
bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530
0
{
531
0
  struct bufferevent_rate_limit_group *g = arg;
532
0
  unsigned tick;
533
0
  struct timeval now;
534
535
0
  event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537
0
  LOCK_GROUP(g);
538
539
0
  tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540
0
  ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542
0
  if (g->pending_unsuspend_read ||
543
0
      (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544
0
    bev_group_unsuspend_reading_(g);
545
0
  }
546
0
  if (g->pending_unsuspend_write ||
547
0
      (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548
0
    bev_group_unsuspend_writing_(g);
549
0
  }
550
551
  /* XXXX Rather than waiting to the next tick to unsuspend stuff
552
   * with pending_unsuspend_write/read, we should do it on the
553
   * next iteration of the mainloop.
554
   */
555
556
0
  UNLOCK_GROUP(g);
557
0
}
558
559
int
560
bufferevent_set_rate_limit(struct bufferevent *bev,
561
    struct ev_token_bucket_cfg *cfg)
562
0
{
563
0
  struct bufferevent_private *bevp = BEV_UPCAST(bev);
564
0
  int r = -1;
565
0
  struct bufferevent_rate_limit *rlim;
566
0
  struct timeval now;
567
0
  ev_uint32_t tick;
568
0
  int reinit = 0, suspended = 0;
569
  /* XXX reference-count cfg */
570
571
0
  BEV_LOCK(bev);
572
573
0
  if (cfg == NULL) {
574
0
    if (bevp->rate_limiting) {
575
0
      rlim = bevp->rate_limiting;
576
0
      rlim->cfg = NULL;
577
0
      bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578
0
      bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579
0
      if (event_initialized(&rlim->refill_bucket_event))
580
0
        event_del(&rlim->refill_bucket_event);
581
0
    }
582
0
    r = 0;
583
0
    goto done;
584
0
  }
585
586
0
  event_base_gettimeofday_cached(bev->ev_base, &now);
587
0
  tick = ev_token_bucket_get_tick_(&now, cfg);
588
589
0
  if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590
    /* no-op */
591
0
    r = 0;
592
0
    goto done;
593
0
  }
594
0
  if (bevp->rate_limiting == NULL) {
595
0
    rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596
0
    if (!rlim)
597
0
      goto done;
598
0
    bevp->rate_limiting = rlim;
599
0
  } else {
600
0
    rlim = bevp->rate_limiting;
601
0
  }
602
0
  reinit = rlim->cfg != NULL;
603
604
0
  rlim->cfg = cfg;
605
0
  ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
606
607
0
  if (reinit) {
608
0
    EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609
0
    event_del(&rlim->refill_bucket_event);
610
0
  }
611
0
  event_assign(&rlim->refill_bucket_event, bev->ev_base,
612
0
      -1, EV_FINALIZE, bev_refill_callback_, bevp);
613
614
0
  if (rlim->limit.read_limit > 0) {
615
0
    bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
616
0
  } else {
617
0
    bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
618
0
    suspended=1;
619
0
  }
620
0
  if (rlim->limit.write_limit > 0) {
621
0
    bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
622
0
  } else {
623
0
    bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
624
0
    suspended = 1;
625
0
  }
626
627
0
  if (suspended)
628
0
    event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630
0
  r = 0;
631
632
0
done:
633
0
  BEV_UNLOCK(bev);
634
0
  return r;
635
0
}
636
637
struct bufferevent_rate_limit_group *
638
bufferevent_rate_limit_group_new(struct event_base *base,
639
    const struct ev_token_bucket_cfg *cfg)
640
0
{
641
0
  struct bufferevent_rate_limit_group *g;
642
0
  struct timeval now;
643
0
  ev_uint32_t tick;
644
645
0
  event_base_gettimeofday_cached(base, &now);
646
0
  tick = ev_token_bucket_get_tick_(&now, cfg);
647
648
0
  g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649
0
  if (!g)
650
0
    return NULL;
651
0
  memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652
0
  LIST_INIT(&g->members);
653
654
0
  ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
655
656
0
  event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657
0
      bev_group_refill_callback_, g);
658
  /*XXXX handle event_add failure */
659
0
  event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661
0
  EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663
0
  bufferevent_rate_limit_group_set_min_share(g, 64);
664
665
0
  evutil_weakrand_seed_(&g->weakrand_seed,
666
0
      (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
667
668
0
  return g;
669
0
}
670
671
int
672
bufferevent_rate_limit_group_set_cfg(
673
  struct bufferevent_rate_limit_group *g,
674
  const struct ev_token_bucket_cfg *cfg)
675
0
{
676
0
  int same_tick;
677
0
  if (!g || !cfg)
678
0
    return -1;
679
680
0
  LOCK_GROUP(g);
681
0
  same_tick = evutil_timercmp(
682
0
    &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683
0
  memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
684
685
0
  if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686
0
    g->rate_limit.read_limit = cfg->read_maximum;
687
0
  if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688
0
    g->rate_limit.write_limit = cfg->write_maximum;
689
690
0
  if (!same_tick) {
691
    /* This can cause a hiccup in the schedule */
692
0
    event_add(&g->master_refill_event, &cfg->tick_timeout);
693
0
  }
694
695
  /* The new limits might force us to adjust min_share differently. */
696
0
  bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
697
698
0
  UNLOCK_GROUP(g);
699
0
  return 0;
700
0
}
701
702
int
703
bufferevent_rate_limit_group_set_min_share(
704
  struct bufferevent_rate_limit_group *g,
705
  size_t share)
706
0
{
707
0
  if (share > EV_SSIZE_MAX)
708
0
    return -1;
709
710
0
  g->configured_min_share = share;
711
712
  /* Can't set share to less than the one-tick maximum.  IOW, at steady
713
   * state, at least one connection can go per tick. */
714
0
  if (share > g->rate_limit_cfg.read_rate)
715
0
    share = g->rate_limit_cfg.read_rate;
716
0
  if (share > g->rate_limit_cfg.write_rate)
717
0
    share = g->rate_limit_cfg.write_rate;
718
719
0
  g->min_share = share;
720
0
  return 0;
721
0
}
722
723
void
724
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
725
0
{
726
0
  LOCK_GROUP(g);
727
0
  EVUTIL_ASSERT(0 == g->n_members);
728
0
  event_del(&g->master_refill_event);
729
0
  UNLOCK_GROUP(g);
730
0
  EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
731
0
  mm_free(g);
732
0
}
733
734
int
735
bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736
    struct bufferevent_rate_limit_group *g)
737
0
{
738
0
  int wsuspend, rsuspend;
739
0
  struct bufferevent_private *bevp = BEV_UPCAST(bev);
740
0
  BEV_LOCK(bev);
741
742
0
  if (!bevp->rate_limiting) {
743
0
    struct bufferevent_rate_limit *rlim;
744
0
    rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
745
0
    if (!rlim) {
746
0
      BEV_UNLOCK(bev);
747
0
      return -1;
748
0
    }
749
0
    event_assign(&rlim->refill_bucket_event, bev->ev_base,
750
0
        -1, EV_FINALIZE, bev_refill_callback_, bevp);
751
0
    bevp->rate_limiting = rlim;
752
0
  }
753
754
0
  if (bevp->rate_limiting->group == g) {
755
0
    BEV_UNLOCK(bev);
756
0
    return 0;
757
0
  }
758
0
  if (bevp->rate_limiting->group)
759
0
    bufferevent_remove_from_rate_limit_group(bev);
760
761
0
  LOCK_GROUP(g);
762
0
  bevp->rate_limiting->group = g;
763
0
  ++g->n_members;
764
0
  LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
765
766
0
  rsuspend = g->read_suspended;
767
0
  wsuspend = g->write_suspended;
768
769
0
  UNLOCK_GROUP(g);
770
771
0
  if (rsuspend)
772
0
    bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
773
0
  if (wsuspend)
774
0
    bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
775
776
0
  BEV_UNLOCK(bev);
777
0
  return 0;
778
0
}
779
780
int
781
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
782
0
{
783
0
  return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
784
0
}
785
786
int
787
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
788
    int unsuspend)
789
0
{
790
0
  struct bufferevent_private *bevp = BEV_UPCAST(bev);
791
0
  BEV_LOCK(bev);
792
0
  if (bevp->rate_limiting && bevp->rate_limiting->group) {
793
0
    struct bufferevent_rate_limit_group *g =
794
0
        bevp->rate_limiting->group;
795
0
    LOCK_GROUP(g);
796
0
    bevp->rate_limiting->group = NULL;
797
0
    --g->n_members;
798
0
    LIST_REMOVE(bevp, rate_limiting->next_in_group);
799
0
    UNLOCK_GROUP(g);
800
0
  }
801
0
  if (unsuspend) {
802
0
    bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803
0
    bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
804
0
  }
805
0
  BEV_UNLOCK(bev);
806
0
  return 0;
807
0
}
808
809
/* ===
810
 * API functions to expose rate limits.
811
 *
812
 * Don't use these from inside Libevent; they're meant to be for use by
813
 * the program.
814
 * === */
815
816
/* Mostly you don't want to use this function from inside libevent;
817
 * bufferevent_get_read_max_() is more likely what you want*/
818
ev_ssize_t
819
bufferevent_get_read_limit(struct bufferevent *bev)
820
0
{
821
0
  ev_ssize_t r;
822
0
  struct bufferevent_private *bevp;
823
0
  BEV_LOCK(bev);
824
0
  bevp = BEV_UPCAST(bev);
825
0
  if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826
0
    bufferevent_update_buckets(bevp);
827
0
    r = bevp->rate_limiting->limit.read_limit;
828
0
  } else {
829
0
    r = EV_SSIZE_MAX;
830
0
  }
831
0
  BEV_UNLOCK(bev);
832
0
  return r;
833
0
}
834
835
/* Mostly you don't want to use this function from inside libevent;
836
 * bufferevent_get_write_max_() is more likely what you want*/
837
ev_ssize_t
838
bufferevent_get_write_limit(struct bufferevent *bev)
839
0
{
840
0
  ev_ssize_t r;
841
0
  struct bufferevent_private *bevp;
842
0
  BEV_LOCK(bev);
843
0
  bevp = BEV_UPCAST(bev);
844
0
  if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845
0
    bufferevent_update_buckets(bevp);
846
0
    r = bevp->rate_limiting->limit.write_limit;
847
0
  } else {
848
0
    r = EV_SSIZE_MAX;
849
0
  }
850
0
  BEV_UNLOCK(bev);
851
0
  return r;
852
0
}
853
854
int
855
bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
856
0
{
857
0
  struct bufferevent_private *bevp;
858
0
  BEV_LOCK(bev);
859
0
  bevp = BEV_UPCAST(bev);
860
0
  if (size == 0 || size > EV_SSIZE_MAX)
861
0
    bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
862
0
  else
863
0
    bevp->max_single_read = size;
864
0
  BEV_UNLOCK(bev);
865
0
  return 0;
866
0
}
867
868
int
869
bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
870
0
{
871
0
  struct bufferevent_private *bevp;
872
0
  BEV_LOCK(bev);
873
0
  bevp = BEV_UPCAST(bev);
874
0
  if (size == 0 || size > EV_SSIZE_MAX)
875
0
    bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
876
0
  else
877
0
    bevp->max_single_write = size;
878
0
  BEV_UNLOCK(bev);
879
0
  return 0;
880
0
}
881
882
ev_ssize_t
883
bufferevent_get_max_single_read(struct bufferevent *bev)
884
0
{
885
0
  ev_ssize_t r;
886
887
0
  BEV_LOCK(bev);
888
0
  r = BEV_UPCAST(bev)->max_single_read;
889
0
  BEV_UNLOCK(bev);
890
0
  return r;
891
0
}
892
893
ev_ssize_t
894
bufferevent_get_max_single_write(struct bufferevent *bev)
895
0
{
896
0
  ev_ssize_t r;
897
898
0
  BEV_LOCK(bev);
899
0
  r = BEV_UPCAST(bev)->max_single_write;
900
0
  BEV_UNLOCK(bev);
901
0
  return r;
902
0
}
903
904
ev_ssize_t
905
bufferevent_get_max_to_read(struct bufferevent *bev)
906
0
{
907
0
  ev_ssize_t r;
908
0
  BEV_LOCK(bev);
909
0
  r = bufferevent_get_read_max_(BEV_UPCAST(bev));
910
0
  BEV_UNLOCK(bev);
911
0
  return r;
912
0
}
913
914
ev_ssize_t
915
bufferevent_get_max_to_write(struct bufferevent *bev)
916
0
{
917
0
  ev_ssize_t r;
918
0
  BEV_LOCK(bev);
919
0
  r = bufferevent_get_write_max_(BEV_UPCAST(bev));
920
0
  BEV_UNLOCK(bev);
921
0
  return r;
922
0
}
923
924
const struct ev_token_bucket_cfg *
925
0
bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
926
0
  struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
927
0
  struct ev_token_bucket_cfg *cfg;
928
929
0
  BEV_LOCK(bev);
930
931
0
  if (bufev_private->rate_limiting) {
932
0
    cfg = bufev_private->rate_limiting->cfg;
933
0
  } else {
934
0
    cfg = NULL;
935
0
  }
936
937
0
  BEV_UNLOCK(bev);
938
939
0
  return cfg;
940
0
}
941
942
/* Mostly you don't want to use this function from inside libevent;
943
 * bufferevent_get_read_max_() is more likely what you want*/
944
ev_ssize_t
945
bufferevent_rate_limit_group_get_read_limit(
946
  struct bufferevent_rate_limit_group *grp)
947
0
{
948
0
  ev_ssize_t r;
949
0
  LOCK_GROUP(grp);
950
0
  r = grp->rate_limit.read_limit;
951
0
  UNLOCK_GROUP(grp);
952
0
  return r;
953
0
}
954
955
/* Mostly you don't want to use this function from inside libevent;
956
 * bufferevent_get_write_max_() is more likely what you want. */
957
ev_ssize_t
958
bufferevent_rate_limit_group_get_write_limit(
959
  struct bufferevent_rate_limit_group *grp)
960
0
{
961
0
  ev_ssize_t r;
962
0
  LOCK_GROUP(grp);
963
0
  r = grp->rate_limit.write_limit;
964
0
  UNLOCK_GROUP(grp);
965
0
  return r;
966
0
}
967
968
int
969
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
970
0
{
971
0
  int r = 0;
972
0
  ev_ssize_t old_limit, new_limit;
973
0
  struct bufferevent_private *bevp;
974
0
  BEV_LOCK(bev);
975
0
  bevp = BEV_UPCAST(bev);
976
0
  EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
977
0
  old_limit = bevp->rate_limiting->limit.read_limit;
978
979
0
  new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
980
0
  if (old_limit > 0 && new_limit <= 0) {
981
0
    bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
982
0
    if (event_add(&bevp->rate_limiting->refill_bucket_event,
983
0
      &bevp->rate_limiting->cfg->tick_timeout) < 0)
984
0
      r = -1;
985
0
  } else if (old_limit <= 0 && new_limit > 0) {
986
0
    if (!(bevp->write_suspended & BEV_SUSPEND_BW))
987
0
      event_del(&bevp->rate_limiting->refill_bucket_event);
988
0
    bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
989
0
  }
990
991
0
  BEV_UNLOCK(bev);
992
0
  return r;
993
0
}
994
995
int
996
bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
997
0
{
998
  /* XXXX this is mostly copy-and-paste from
999
   * bufferevent_decrement_read_limit */
1000
0
  int r = 0;
1001
0
  ev_ssize_t old_limit, new_limit;
1002
0
  struct bufferevent_private *bevp;
1003
0
  BEV_LOCK(bev);
1004
0
  bevp = BEV_UPCAST(bev);
1005
0
  EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1006
0
  old_limit = bevp->rate_limiting->limit.write_limit;
1007
1008
0
  new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1009
0
  if (old_limit > 0 && new_limit <= 0) {
1010
0
    bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1011
0
    if (event_add(&bevp->rate_limiting->refill_bucket_event,
1012
0
      &bevp->rate_limiting->cfg->tick_timeout) < 0)
1013
0
      r = -1;
1014
0
  } else if (old_limit <= 0 && new_limit > 0) {
1015
0
    if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1016
0
      event_del(&bevp->rate_limiting->refill_bucket_event);
1017
0
    bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1018
0
  }
1019
1020
0
  BEV_UNLOCK(bev);
1021
0
  return r;
1022
0
}
1023
1024
int
1025
bufferevent_rate_limit_group_decrement_read(
1026
  struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1027
0
{
1028
0
  int r = 0;
1029
0
  ev_ssize_t old_limit, new_limit;
1030
0
  LOCK_GROUP(grp);
1031
0
  old_limit = grp->rate_limit.read_limit;
1032
0
  new_limit = (grp->rate_limit.read_limit -= decr);
1033
1034
0
  if (old_limit > 0 && new_limit <= 0) {
1035
0
    bev_group_suspend_reading_(grp);
1036
0
  } else if (old_limit <= 0 && new_limit > 0) {
1037
0
    bev_group_unsuspend_reading_(grp);
1038
0
  }
1039
1040
0
  UNLOCK_GROUP(grp);
1041
0
  return r;
1042
0
}
1043
1044
int
1045
bufferevent_rate_limit_group_decrement_write(
1046
  struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1047
0
{
1048
0
  int r = 0;
1049
0
  ev_ssize_t old_limit, new_limit;
1050
0
  LOCK_GROUP(grp);
1051
0
  old_limit = grp->rate_limit.write_limit;
1052
0
  new_limit = (grp->rate_limit.write_limit -= decr);
1053
1054
0
  if (old_limit > 0 && new_limit <= 0) {
1055
0
    bev_group_suspend_writing_(grp);
1056
0
  } else if (old_limit <= 0 && new_limit > 0) {
1057
0
    bev_group_unsuspend_writing_(grp);
1058
0
  }
1059
1060
0
  UNLOCK_GROUP(grp);
1061
0
  return r;
1062
0
}
1063
1064
void
1065
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1066
    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1067
0
{
1068
0
  EVUTIL_ASSERT(grp != NULL);
1069
0
  if (total_read_out)
1070
0
    *total_read_out = grp->total_read;
1071
0
  if (total_written_out)
1072
0
    *total_written_out = grp->total_written;
1073
0
}
1074
1075
void
1076
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1077
0
{
1078
0
  grp->total_read = grp->total_written = 0;
1079
0
}
1080
1081
int
1082
bufferevent_ratelim_init_(struct bufferevent_private *bev)
1083
0
{
1084
0
  bev->rate_limiting = NULL;
1085
0
  bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1086
0
  bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1087
1088
0
  return 0;
1089
0
}