Coverage Report

Created: 2025-07-11 06:57

/src/libevent/bufferevent_ratelim.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3
 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4
 * All rights reserved.
5
 *
6
 * Redistribution and use in source and binary forms, with or without
7
 * modification, are permitted provided that the following conditions
8
 * are met:
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in the
13
 *    documentation and/or other materials provided with the distribution.
14
 * 3. The name of the author may not be used to endorse or promote products
15
 *    derived from this software without specific prior written permission.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
 */
28
#include "evconfig-private.h"
29
30
#include <sys/types.h>
31
#include <limits.h>
32
#include <string.h>
33
#include <stdlib.h>
34
35
#include "event2/event.h"
36
#include "event2/event_struct.h"
37
#include "event2/util.h"
38
#include "event2/bufferevent.h"
39
#include "event2/bufferevent_struct.h"
40
#include "event2/buffer.h"
41
42
#include "ratelim-internal.h"
43
44
#include "bufferevent-internal.h"
45
#include "mm-internal.h"
46
#include "util-internal.h"
47
#include "event-internal.h"
48
49
int
50
ev_token_bucket_init_(struct ev_token_bucket *bucket,
51
    const struct ev_token_bucket_cfg *cfg,
52
    ev_uint32_t current_tick,
53
    int reinitialize)
54
0
{
55
0
  if (reinitialize) {
56
    /* on reinitialization, we only clip downwards, since we've
57
       already used who-knows-how-much bandwidth this tick.  We
58
       leave "last_updated" as it is; the next update will add the
59
       appropriate amount of bandwidth to the bucket.
60
    */
61
0
    if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62
0
      bucket->read_limit = cfg->read_maximum;
63
0
    if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64
0
      bucket->write_limit = cfg->write_maximum;
65
0
  } else {
66
0
    bucket->read_limit = cfg->read_rate;
67
0
    bucket->write_limit = cfg->write_rate;
68
0
    bucket->last_updated = current_tick;
69
0
  }
70
0
  return 0;
71
0
}
72
73
int
74
ev_token_bucket_update_(struct ev_token_bucket *bucket,
75
    const struct ev_token_bucket_cfg *cfg,
76
    ev_uint32_t current_tick)
77
0
{
78
  /* It's okay if the tick number overflows, since we'll just
79
   * wrap around when we do the unsigned subtraction. */
80
0
  unsigned n_ticks = current_tick - bucket->last_updated;
81
82
  /* Make sure some ticks actually happened, and that time didn't
83
   * roll back. */
84
0
  if (n_ticks == 0 || n_ticks > INT_MAX)
85
0
    return 0;
86
87
  /* Naively, we would say
88
    bucket->limit += n_ticks * cfg->rate;
89
90
    if (bucket->limit > cfg->maximum)
91
      bucket->limit = cfg->maximum;
92
93
     But we're worried about overflow, so we do it like this:
94
  */
95
96
0
  if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97
0
    bucket->read_limit = cfg->read_maximum;
98
0
  else
99
0
    bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102
0
  if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103
0
    bucket->write_limit = cfg->write_maximum;
104
0
  else
105
0
    bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108
0
  bucket->last_updated = current_tick;
109
110
0
  return 1;
111
0
}
112
113
static inline void
114
bufferevent_update_buckets(struct bufferevent_private *bev)
115
0
{
116
  /* Must hold lock on bev. */
117
0
  struct timeval now;
118
0
  unsigned tick;
119
0
  event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120
0
  tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121
0
  if (tick != bev->rate_limiting->limit.last_updated)
122
0
    ev_token_bucket_update_(&bev->rate_limiting->limit,
123
0
        bev->rate_limiting->cfg, tick);
124
0
}
125
126
ev_uint32_t
127
ev_token_bucket_get_tick_(const struct timeval *tv,
128
    const struct ev_token_bucket_cfg *cfg)
129
0
{
130
  /* This computation uses two multiplies and a divide.  We could do
131
   * fewer if we knew that the tick length was an integer number of
132
   * seconds, or if we knew it divided evenly into a second.  We should
133
   * investigate that more.
134
   */
135
136
  /* We cast to an ev_uint64_t first, since we don't want to overflow
137
   * before we do the final divide. */
138
0
  ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139
0
  return (unsigned)(msec / cfg->msec_per_tick);
140
0
}
141
142
struct ev_token_bucket_cfg *
143
ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144
    size_t write_rate, size_t write_burst,
145
    const struct timeval *tick_len)
146
0
{
147
0
  struct ev_token_bucket_cfg *r;
148
0
  struct timeval g;
149
0
  unsigned msec_per_tick;
150
151
0
  if (! tick_len) {
152
0
    g.tv_sec = 1;
153
0
    g.tv_usec = 0;
154
0
    tick_len = &g;
155
0
  }
156
157
  /* Avoid possible overflow.
158
   * - there is no point in accepting values larger then INT_MAX/1000 anyway
159
   * - on windows tv_sec (tv_usec) is long, which is int, which has upper value limit INT_MAX
160
   * - and also negative values does not make any sense
161
   */
162
0
  if (tick_len->tv_sec < 0 || tick_len->tv_sec > INT_MAX/1000)
163
0
    return NULL;
164
165
  /* Note, overflow with tv_usec is not possible since tv_sec is limited to
166
   * INT_MAX/1000 anyway */
167
0
  msec_per_tick = (unsigned)(tick_len->tv_sec * 1000) +
168
0
      (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
169
0
  if (!msec_per_tick)
170
0
    return NULL;
171
172
0
  if (read_rate > read_burst || write_rate > write_burst ||
173
0
      read_rate < 1 || write_rate < 1)
174
0
    return NULL;
175
0
  if (read_rate > EV_RATE_LIMIT_MAX ||
176
0
      write_rate > EV_RATE_LIMIT_MAX ||
177
0
      read_burst > EV_RATE_LIMIT_MAX ||
178
0
      write_burst > EV_RATE_LIMIT_MAX)
179
0
    return NULL;
180
0
  r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
181
0
  if (!r)
182
0
    return NULL;
183
0
  r->read_rate = read_rate;
184
0
  r->write_rate = write_rate;
185
0
  r->read_maximum = read_burst;
186
0
  r->write_maximum = write_burst;
187
0
  memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
188
0
  r->msec_per_tick = msec_per_tick;
189
0
  return r;
190
0
}
191
192
void
193
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
194
0
{
195
0
  mm_free(cfg);
196
0
}
197
198
/* Default values for max_single_read & max_single_write variables. */
199
0
#define MAX_SINGLE_READ_DEFAULT 16384
200
0
#define MAX_SINGLE_WRITE_DEFAULT 16384
201
202
0
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
203
0
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
204
205
static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
206
static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
207
static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
208
static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
209
210
/** Helper: figure out the maximum amount we should write if is_write, or
211
    the maximum amount we should read if is_read.  Return that maximum, or
212
    0 if our bucket is wholly exhausted.
213
 */
214
static inline ev_ssize_t
215
bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
216
0
{
217
  /* needs lock on bev. */
218
0
  ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
219
220
0
#define LIM(x)            \
221
0
  (is_write ? (x).write_limit : (x).read_limit)
222
223
0
#define GROUP_SUSPENDED(g)      \
224
0
  (is_write ? (g)->write_suspended : (g)->read_suspended)
225
226
  /* Sets max_so_far to MIN(x, max_so_far) */
227
0
#define CLAMPTO(x)        \
228
0
  do {         \
229
0
    if (max_so_far > (x))   \
230
0
      max_so_far = (x); \
231
0
  } while (0);
232
233
0
  if (!bev->rate_limiting)
234
0
    return max_so_far;
235
236
  /* If rate-limiting is enabled at all, update the appropriate
237
     bucket, and take the smaller of our rate limit and the group
238
     rate limit.
239
   */
240
241
0
  if (bev->rate_limiting->cfg) {
242
0
    bufferevent_update_buckets(bev);
243
0
    max_so_far = LIM(bev->rate_limiting->limit);
244
0
  }
245
0
  if (bev->rate_limiting->group) {
246
0
    struct bufferevent_rate_limit_group *g =
247
0
        bev->rate_limiting->group;
248
0
    ev_ssize_t share;
249
0
    LOCK_GROUP(g);
250
0
    if (GROUP_SUSPENDED(g)) {
251
      /* We can get here if we failed to lock this
252
       * particular bufferevent while suspending the whole
253
       * group. */
254
0
      if (is_write)
255
0
        bufferevent_suspend_write_(&bev->bev,
256
0
            BEV_SUSPEND_BW_GROUP);
257
0
      else
258
0
        bufferevent_suspend_read_(&bev->bev,
259
0
            BEV_SUSPEND_BW_GROUP);
260
0
      share = 0;
261
0
    } else {
262
      /* XXXX probably we should divide among the active
263
       * members, not the total members. */
264
0
      share = LIM(g->rate_limit) / g->n_members;
265
0
      if (share < g->min_share)
266
0
        share = g->min_share;
267
0
    }
268
0
    UNLOCK_GROUP(g);
269
0
    CLAMPTO(share);
270
0
  }
271
272
0
  if (max_so_far < 0)
273
0
    max_so_far = 0;
274
0
  return max_so_far;
275
0
}
276
277
ev_ssize_t
278
bufferevent_get_read_max_(struct bufferevent_private *bev)
279
0
{
280
0
  return bufferevent_get_rlim_max_(bev, 0);
281
0
}
282
283
ev_ssize_t
284
bufferevent_get_write_max_(struct bufferevent_private *bev)
285
0
{
286
0
  return bufferevent_get_rlim_max_(bev, 1);
287
0
}
288
289
int
290
bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
291
0
{
292
  /* XXXXX Make sure all users of this function check its return value */
293
0
  int r = 0;
294
  /* need to hold lock on bev */
295
0
  if (!bev->rate_limiting)
296
0
    return 0;
297
298
0
  if (bev->rate_limiting->cfg) {
299
0
    bev->rate_limiting->limit.read_limit -= bytes;
300
0
    if (bev->rate_limiting->limit.read_limit <= 0) {
301
0
      bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
302
0
      if (event_add(&bev->rate_limiting->refill_bucket_event,
303
0
        &bev->rate_limiting->cfg->tick_timeout) < 0)
304
0
        r = -1;
305
0
    } else if (bev->read_suspended & BEV_SUSPEND_BW) {
306
0
      if (!(bev->write_suspended & BEV_SUSPEND_BW))
307
0
        event_del(&bev->rate_limiting->refill_bucket_event);
308
0
      bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
309
0
    }
310
0
  }
311
312
0
  if (bev->rate_limiting->group) {
313
0
    LOCK_GROUP(bev->rate_limiting->group);
314
0
    bev->rate_limiting->group->rate_limit.read_limit -= bytes;
315
0
    bev->rate_limiting->group->total_read += bytes;
316
0
    if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
317
0
      bev_group_suspend_reading_(bev->rate_limiting->group);
318
0
    } else if (bev->rate_limiting->group->read_suspended) {
319
0
      bev_group_unsuspend_reading_(bev->rate_limiting->group);
320
0
    }
321
0
    UNLOCK_GROUP(bev->rate_limiting->group);
322
0
  }
323
324
0
  return r;
325
0
}
326
327
int
328
bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
329
0
{
330
  /* XXXXX Make sure all users of this function check its return value */
331
0
  int r = 0;
332
  /* need to hold lock */
333
0
  if (!bev->rate_limiting)
334
0
    return 0;
335
336
0
  if (bev->rate_limiting->cfg) {
337
0
    bev->rate_limiting->limit.write_limit -= bytes;
338
0
    if (bev->rate_limiting->limit.write_limit <= 0) {
339
0
      bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
340
0
      if (event_add(&bev->rate_limiting->refill_bucket_event,
341
0
        &bev->rate_limiting->cfg->tick_timeout) < 0)
342
0
        r = -1;
343
0
    } else if (bev->write_suspended & BEV_SUSPEND_BW) {
344
0
      if (!(bev->read_suspended & BEV_SUSPEND_BW))
345
0
        event_del(&bev->rate_limiting->refill_bucket_event);
346
0
      bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
347
0
    }
348
0
  }
349
350
0
  if (bev->rate_limiting->group) {
351
0
    LOCK_GROUP(bev->rate_limiting->group);
352
0
    bev->rate_limiting->group->rate_limit.write_limit -= bytes;
353
0
    bev->rate_limiting->group->total_written += bytes;
354
0
    if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
355
0
      bev_group_suspend_writing_(bev->rate_limiting->group);
356
0
    } else if (bev->rate_limiting->group->write_suspended) {
357
0
      bev_group_unsuspend_writing_(bev->rate_limiting->group);
358
0
    }
359
0
    UNLOCK_GROUP(bev->rate_limiting->group);
360
0
  }
361
362
0
  return r;
363
0
}
364
365
/** Stop reading on every bufferevent in <b>g</b> */
366
static int
367
bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
368
0
{
369
  /* Needs group lock */
370
0
  struct bufferevent_private *bev;
371
0
  g->read_suspended = 1;
372
0
  g->pending_unsuspend_read = 0;
373
374
  /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
375
     to prevent a deadlock.  (Ordinarily, the group lock nests inside
376
     the bufferevent locks.  If we are unable to lock any individual
377
     bufferevent, it will find out later when it looks at its limit
378
     and sees that its group is suspended.)
379
  */
380
0
  LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
381
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
382
0
      bufferevent_suspend_read_(&bev->bev,
383
0
          BEV_SUSPEND_BW_GROUP);
384
0
      EVLOCK_UNLOCK(bev->lock, 0);
385
0
    }
386
0
  }
387
0
  return 0;
388
0
}
389
390
/** Stop writing on every bufferevent in <b>g</b> */
391
static int
392
bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
393
0
{
394
  /* Needs group lock */
395
0
  struct bufferevent_private *bev;
396
0
  g->write_suspended = 1;
397
0
  g->pending_unsuspend_write = 0;
398
0
  LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
399
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
400
0
      bufferevent_suspend_write_(&bev->bev,
401
0
          BEV_SUSPEND_BW_GROUP);
402
0
      EVLOCK_UNLOCK(bev->lock, 0);
403
0
    }
404
0
  }
405
0
  return 0;
406
0
}
407
408
/** Timer callback invoked on a single bufferevent with one or more exhausted
409
    buckets when they are ready to refill. */
410
static void
411
bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
412
0
{
413
0
  unsigned tick;
414
0
  struct timeval now;
415
0
  struct bufferevent_private *bev = arg;
416
0
  int again = 0;
417
0
  BEV_LOCK(&bev->bev);
418
0
  if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
419
0
    BEV_UNLOCK(&bev->bev);
420
0
    return;
421
0
  }
422
423
  /* First, update the bucket */
424
0
  event_base_gettimeofday_cached(bev->bev.ev_base, &now);
425
0
  tick = ev_token_bucket_get_tick_(&now,
426
0
      bev->rate_limiting->cfg);
427
0
  ev_token_bucket_update_(&bev->rate_limiting->limit,
428
0
      bev->rate_limiting->cfg,
429
0
      tick);
430
431
  /* Now unsuspend any read/write operations as appropriate. */
432
0
  if ((bev->read_suspended & BEV_SUSPEND_BW)) {
433
0
    if (bev->rate_limiting->limit.read_limit > 0)
434
0
      bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
435
0
    else
436
0
      again = 1;
437
0
  }
438
0
  if ((bev->write_suspended & BEV_SUSPEND_BW)) {
439
0
    if (bev->rate_limiting->limit.write_limit > 0)
440
0
      bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
441
0
    else
442
0
      again = 1;
443
0
  }
444
0
  if (again) {
445
    /* One or more of the buckets may need another refill if they
446
       started negative.
447
448
       XXXX if we need to be quiet for more ticks, we should
449
       maybe figure out what timeout we really want.
450
    */
451
    /* XXXX Handle event_add failure somehow */
452
0
    event_add(&bev->rate_limiting->refill_bucket_event,
453
0
        &bev->rate_limiting->cfg->tick_timeout);
454
0
  }
455
0
  BEV_UNLOCK(&bev->bev);
456
0
}
457
458
/** Helper: grab a random element from a bufferevent group.
459
 *
460
 * Requires that we hold the lock on the group.
461
 */
462
static struct bufferevent_private *
463
bev_group_random_element_(struct bufferevent_rate_limit_group *group)
464
0
{
465
0
  int which;
466
0
  struct bufferevent_private *bev;
467
468
  /* requires group lock */
469
470
0
  if (!group->n_members)
471
0
    return NULL;
472
473
0
  EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
474
475
0
  which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
476
477
0
  bev = LIST_FIRST(&group->members);
478
0
  while (which--)
479
0
    bev = LIST_NEXT(bev, rate_limiting->next_in_group);
480
481
0
  return bev;
482
0
}
483
484
/** Iterate over the elements of a rate-limiting group 'g' with a random
485
    starting point, assigning each to the variable 'bev', and executing the
486
    block 'block'.
487
488
    We do this in a half-baked effort to get fairness among group members.
489
    XXX Round-robin or some kind of priority queue would be even more fair.
490
 */
491
#define FOREACH_RANDOM_ORDER(block)     \
492
0
  do {           \
493
0
    first = bev_group_random_element_(g); \
494
0
    for (bev = first; bev != LIST_END(&g->members); \
495
0
        bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
496
0
      block ;           \
497
0
    }            \
498
0
    for (bev = LIST_FIRST(&g->members); bev && bev != first; \
499
0
        bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
500
0
      block ;            \
501
0
    }             \
502
0
  } while (0)
503
504
static void
505
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
506
0
{
507
0
  int again = 0;
508
0
  struct bufferevent_private *bev, *first;
509
510
0
  g->read_suspended = 0;
511
0
  FOREACH_RANDOM_ORDER({
512
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
513
0
      bufferevent_unsuspend_read_(&bev->bev,
514
0
          BEV_SUSPEND_BW_GROUP);
515
0
      EVLOCK_UNLOCK(bev->lock, 0);
516
0
    } else {
517
0
      again = 1;
518
0
    }
519
0
  });
520
0
  g->pending_unsuspend_read = again;
521
0
}
522
523
static void
524
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
525
0
{
526
0
  int again = 0;
527
0
  struct bufferevent_private *bev, *first;
528
0
  g->write_suspended = 0;
529
530
0
  FOREACH_RANDOM_ORDER({
531
0
    if (EVLOCK_TRY_LOCK_(bev->lock)) {
532
0
      bufferevent_unsuspend_write_(&bev->bev,
533
0
          BEV_SUSPEND_BW_GROUP);
534
0
      EVLOCK_UNLOCK(bev->lock, 0);
535
0
    } else {
536
0
      again = 1;
537
0
    }
538
0
  });
539
0
  g->pending_unsuspend_write = again;
540
0
}
541
542
/** Callback invoked every tick to add more elements to the group bucket
543
    and unsuspend group members as needed.
544
 */
545
static void
546
bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
547
0
{
548
0
  struct bufferevent_rate_limit_group *g = arg;
549
0
  unsigned tick;
550
0
  struct timeval now;
551
552
0
  event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
553
554
0
  LOCK_GROUP(g);
555
556
0
  tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
557
0
  ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
558
559
0
  if (g->pending_unsuspend_read ||
560
0
      (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
561
0
    bev_group_unsuspend_reading_(g);
562
0
  }
563
0
  if (g->pending_unsuspend_write ||
564
0
      (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
565
0
    bev_group_unsuspend_writing_(g);
566
0
  }
567
568
  /* XXXX Rather than waiting to the next tick to unsuspend stuff
569
   * with pending_unsuspend_write/read, we should do it on the
570
   * next iteration of the mainloop.
571
   */
572
573
0
  UNLOCK_GROUP(g);
574
0
}
575
576
int
577
bufferevent_set_rate_limit(struct bufferevent *bev,
578
    struct ev_token_bucket_cfg *cfg)
579
0
{
580
0
  struct bufferevent_private *bevp = BEV_UPCAST(bev);
581
0
  int r = -1;
582
0
  struct bufferevent_rate_limit *rlim;
583
0
  struct timeval now;
584
0
  ev_uint32_t tick;
585
0
  int reinit = 0, suspended = 0;
586
  /* XXX reference-count cfg */
587
588
0
  BEV_LOCK(bev);
589
590
0
  if (cfg == NULL) {
591
0
    if (bevp->rate_limiting) {
592
0
      rlim = bevp->rate_limiting;
593
0
      rlim->cfg = NULL;
594
0
      bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
595
0
      bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
596
0
      if (event_initialized(&rlim->refill_bucket_event))
597
0
        event_del(&rlim->refill_bucket_event);
598
0
    }
599
0
    r = 0;
600
0
    goto done;
601
0
  }
602
603
0
  event_base_gettimeofday_cached(bev->ev_base, &now);
604
0
  tick = ev_token_bucket_get_tick_(&now, cfg);
605
606
0
  if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
607
    /* no-op */
608
0
    r = 0;
609
0
    goto done;
610
0
  }
611
0
  if (bevp->rate_limiting == NULL) {
612
0
    rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
613
0
    if (!rlim)
614
0
      goto done;
615
0
    bevp->rate_limiting = rlim;
616
0
  } else {
617
0
    rlim = bevp->rate_limiting;
618
0
  }
619
0
  reinit = rlim->cfg != NULL;
620
621
0
  rlim->cfg = cfg;
622
0
  ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
623
624
0
  if (reinit) {
625
0
    EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
626
0
    event_del(&rlim->refill_bucket_event);
627
0
  }
628
0
  event_assign(&rlim->refill_bucket_event, bev->ev_base,
629
0
      -1, EV_FINALIZE, bev_refill_callback_, bevp);
630
631
0
  if (rlim->limit.read_limit > 0) {
632
0
    bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
633
0
  } else {
634
0
    bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
635
0
    suspended=1;
636
0
  }
637
0
  if (rlim->limit.write_limit > 0) {
638
0
    bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
639
0
  } else {
640
0
    bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
641
0
    suspended = 1;
642
0
  }
643
644
0
  if (suspended)
645
0
    event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
646
647
0
  r = 0;
648
649
0
done:
650
0
  BEV_UNLOCK(bev);
651
0
  return r;
652
0
}
653
654
struct bufferevent_rate_limit_group *
655
bufferevent_rate_limit_group_new(struct event_base *base,
656
    const struct ev_token_bucket_cfg *cfg)
657
0
{
658
0
  struct bufferevent_rate_limit_group *g;
659
0
  struct timeval now;
660
0
  ev_uint32_t tick;
661
662
0
  event_base_gettimeofday_cached(base, &now);
663
0
  tick = ev_token_bucket_get_tick_(&now, cfg);
664
665
0
  g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
666
0
  if (!g)
667
0
    return NULL;
668
0
  memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
669
0
  LIST_INIT(&g->members);
670
671
0
  ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
672
673
0
  event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
674
0
      bev_group_refill_callback_, g);
675
  /*XXXX handle event_add failure */
676
0
  event_add(&g->master_refill_event, &cfg->tick_timeout);
677
678
0
  EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
679
680
0
  bufferevent_rate_limit_group_set_min_share(g, 64);
681
682
0
  evutil_weakrand_seed_(&g->weakrand_seed,
683
0
      (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
684
685
0
  return g;
686
0
}
687
688
int
689
bufferevent_rate_limit_group_set_cfg(
690
  struct bufferevent_rate_limit_group *g,
691
  const struct ev_token_bucket_cfg *cfg)
692
0
{
693
0
  int same_tick;
694
0
  if (!g || !cfg)
695
0
    return -1;
696
697
0
  LOCK_GROUP(g);
698
0
  same_tick = evutil_timercmp(
699
0
    &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
700
0
  memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
701
702
0
  if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
703
0
    g->rate_limit.read_limit = cfg->read_maximum;
704
0
  if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
705
0
    g->rate_limit.write_limit = cfg->write_maximum;
706
707
0
  if (!same_tick) {
708
    /* This can cause a hiccup in the schedule */
709
0
    event_add(&g->master_refill_event, &cfg->tick_timeout);
710
0
  }
711
712
  /* The new limits might force us to adjust min_share differently. */
713
0
  bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
714
715
0
  UNLOCK_GROUP(g);
716
0
  return 0;
717
0
}
718
719
int
720
bufferevent_rate_limit_group_set_min_share(
721
  struct bufferevent_rate_limit_group *g,
722
  size_t share)
723
0
{
724
0
  if (share > EV_SSIZE_MAX)
725
0
    return -1;
726
727
0
  g->configured_min_share = share;
728
729
  /* Can't set share to less than the one-tick maximum.  IOW, at steady
730
   * state, at least one connection can go per tick. */
731
0
  if (share > g->rate_limit_cfg.read_rate)
732
0
    share = g->rate_limit_cfg.read_rate;
733
0
  if (share > g->rate_limit_cfg.write_rate)
734
0
    share = g->rate_limit_cfg.write_rate;
735
736
0
  g->min_share = share;
737
0
  return 0;
738
0
}
739
740
void
741
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
742
0
{
743
0
  LOCK_GROUP(g);
744
0
  EVUTIL_ASSERT(0 == g->n_members);
745
0
  event_del(&g->master_refill_event);
746
0
  UNLOCK_GROUP(g);
747
0
  EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
748
0
  mm_free(g);
749
0
}
750
751
int
752
bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
753
    struct bufferevent_rate_limit_group *g)
754
0
{
755
0
  int wsuspend, rsuspend;
756
0
  struct bufferevent_private *bevp = BEV_UPCAST(bev);
757
0
  BEV_LOCK(bev);
758
759
0
  if (!bevp->rate_limiting) {
760
0
    struct bufferevent_rate_limit *rlim;
761
0
    rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
762
0
    if (!rlim) {
763
0
      BEV_UNLOCK(bev);
764
0
      return -1;
765
0
    }
766
0
    event_assign(&rlim->refill_bucket_event, bev->ev_base,
767
0
        -1, EV_FINALIZE, bev_refill_callback_, bevp);
768
0
    bevp->rate_limiting = rlim;
769
0
  }
770
771
0
  if (bevp->rate_limiting->group == g) {
772
0
    BEV_UNLOCK(bev);
773
0
    return 0;
774
0
  }
775
0
  if (bevp->rate_limiting->group)
776
0
    bufferevent_remove_from_rate_limit_group(bev);
777
778
0
  LOCK_GROUP(g);
779
0
  bevp->rate_limiting->group = g;
780
0
  ++g->n_members;
781
0
  LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
782
783
0
  rsuspend = g->read_suspended;
784
0
  wsuspend = g->write_suspended;
785
786
0
  UNLOCK_GROUP(g);
787
788
0
  if (rsuspend)
789
0
    bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
790
0
  if (wsuspend)
791
0
    bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
792
793
0
  BEV_UNLOCK(bev);
794
0
  return 0;
795
0
}
796
797
int
798
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
799
0
{
800
0
  return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
801
0
}
802
803
int
804
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
805
    int unsuspend)
806
0
{
807
0
  struct bufferevent_private *bevp = BEV_UPCAST(bev);
808
0
  BEV_LOCK(bev);
809
0
  if (bevp->rate_limiting && bevp->rate_limiting->group) {
810
0
    struct bufferevent_rate_limit_group *g =
811
0
        bevp->rate_limiting->group;
812
0
    LOCK_GROUP(g);
813
0
    bevp->rate_limiting->group = NULL;
814
0
    --g->n_members;
815
0
    LIST_REMOVE(bevp, rate_limiting->next_in_group);
816
0
    UNLOCK_GROUP(g);
817
0
  }
818
0
  if (unsuspend) {
819
0
    bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
820
0
    bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
821
0
  }
822
0
  BEV_UNLOCK(bev);
823
0
  return 0;
824
0
}
825
826
/* ===
827
 * API functions to expose rate limits.
828
 *
829
 * Don't use these from inside Libevent; they're meant to be for use by
830
 * the program.
831
 * === */
832
833
/* Mostly you don't want to use this function from inside libevent;
834
 * bufferevent_get_read_max_() is more likely what you want*/
835
ev_ssize_t
836
bufferevent_get_read_limit(struct bufferevent *bev)
837
0
{
838
0
  ev_ssize_t r;
839
0
  struct bufferevent_private *bevp;
840
0
  BEV_LOCK(bev);
841
0
  bevp = BEV_UPCAST(bev);
842
0
  if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
843
0
    bufferevent_update_buckets(bevp);
844
0
    r = bevp->rate_limiting->limit.read_limit;
845
0
  } else {
846
0
    r = EV_SSIZE_MAX;
847
0
  }
848
0
  BEV_UNLOCK(bev);
849
0
  return r;
850
0
}
851
852
/* Mostly you don't want to use this function from inside libevent;
853
 * bufferevent_get_write_max_() is more likely what you want*/
854
ev_ssize_t
855
bufferevent_get_write_limit(struct bufferevent *bev)
856
0
{
857
0
  ev_ssize_t r;
858
0
  struct bufferevent_private *bevp;
859
0
  BEV_LOCK(bev);
860
0
  bevp = BEV_UPCAST(bev);
861
0
  if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
862
0
    bufferevent_update_buckets(bevp);
863
0
    r = bevp->rate_limiting->limit.write_limit;
864
0
  } else {
865
0
    r = EV_SSIZE_MAX;
866
0
  }
867
0
  BEV_UNLOCK(bev);
868
0
  return r;
869
0
}
870
871
int
872
bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
873
0
{
874
0
  struct bufferevent_private *bevp;
875
0
  int ret = 0;
876
0
  BEV_LOCK(bev);
877
0
  bevp = BEV_UPCAST(bev);
878
0
  if (size == 0 || size > EV_SSIZE_MAX)
879
0
    bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
880
0
  else
881
0
    bevp->max_single_read = size;
882
0
  ret = evbuffer_set_max_read(bev->input, bevp->max_single_read);
883
0
  BEV_UNLOCK(bev);
884
0
  return ret;
885
0
}
886
887
int
888
bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
889
0
{
890
0
  struct bufferevent_private *bevp;
891
0
  BEV_LOCK(bev);
892
0
  bevp = BEV_UPCAST(bev);
893
0
  if (size == 0 || size > EV_SSIZE_MAX)
894
0
    bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
895
0
  else
896
0
    bevp->max_single_write = size;
897
0
  BEV_UNLOCK(bev);
898
0
  return 0;
899
0
}
900
901
ev_ssize_t
902
bufferevent_get_max_single_read(struct bufferevent *bev)
903
0
{
904
0
  ev_ssize_t r;
905
906
0
  BEV_LOCK(bev);
907
0
  r = BEV_UPCAST(bev)->max_single_read;
908
0
  BEV_UNLOCK(bev);
909
0
  return r;
910
0
}
911
912
ev_ssize_t
913
bufferevent_get_max_single_write(struct bufferevent *bev)
914
0
{
915
0
  ev_ssize_t r;
916
917
0
  BEV_LOCK(bev);
918
0
  r = BEV_UPCAST(bev)->max_single_write;
919
0
  BEV_UNLOCK(bev);
920
0
  return r;
921
0
}
922
923
ev_ssize_t
924
bufferevent_get_max_to_read(struct bufferevent *bev)
925
0
{
926
0
  ev_ssize_t r;
927
0
  BEV_LOCK(bev);
928
0
  r = bufferevent_get_read_max_(BEV_UPCAST(bev));
929
0
  BEV_UNLOCK(bev);
930
0
  return r;
931
0
}
932
933
ev_ssize_t
934
bufferevent_get_max_to_write(struct bufferevent *bev)
935
0
{
936
0
  ev_ssize_t r;
937
0
  BEV_LOCK(bev);
938
0
  r = bufferevent_get_write_max_(BEV_UPCAST(bev));
939
0
  BEV_UNLOCK(bev);
940
0
  return r;
941
0
}
942
943
const struct ev_token_bucket_cfg *
944
0
bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
945
0
  struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
946
0
  struct ev_token_bucket_cfg *cfg;
947
948
0
  BEV_LOCK(bev);
949
950
0
  if (bufev_private->rate_limiting) {
951
0
    cfg = bufev_private->rate_limiting->cfg;
952
0
  } else {
953
0
    cfg = NULL;
954
0
  }
955
956
0
  BEV_UNLOCK(bev);
957
958
0
  return cfg;
959
0
}
960
961
/* Mostly you don't want to use this function from inside libevent;
962
 * bufferevent_get_read_max_() is more likely what you want*/
963
ev_ssize_t
964
bufferevent_rate_limit_group_get_read_limit(
965
  struct bufferevent_rate_limit_group *grp)
966
0
{
967
0
  ev_ssize_t r;
968
0
  LOCK_GROUP(grp);
969
0
  r = grp->rate_limit.read_limit;
970
0
  UNLOCK_GROUP(grp);
971
0
  return r;
972
0
}
973
974
/* Mostly you don't want to use this function from inside libevent;
975
 * bufferevent_get_write_max_() is more likely what you want. */
976
ev_ssize_t
977
bufferevent_rate_limit_group_get_write_limit(
978
  struct bufferevent_rate_limit_group *grp)
979
0
{
980
0
  ev_ssize_t r;
981
0
  LOCK_GROUP(grp);
982
0
  r = grp->rate_limit.write_limit;
983
0
  UNLOCK_GROUP(grp);
984
0
  return r;
985
0
}
986
987
int
988
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
989
0
{
990
0
  int r = 0;
991
0
  ev_ssize_t old_limit, new_limit;
992
0
  struct bufferevent_private *bevp;
993
0
  BEV_LOCK(bev);
994
0
  bevp = BEV_UPCAST(bev);
995
0
  EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
996
0
  old_limit = bevp->rate_limiting->limit.read_limit;
997
998
0
  new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
999
0
  if (old_limit > 0 && new_limit <= 0) {
1000
0
    bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
1001
0
    if (event_add(&bevp->rate_limiting->refill_bucket_event,
1002
0
      &bevp->rate_limiting->cfg->tick_timeout) < 0)
1003
0
      r = -1;
1004
0
  } else if (old_limit <= 0 && new_limit > 0) {
1005
0
    if (!(bevp->write_suspended & BEV_SUSPEND_BW))
1006
0
      event_del(&bevp->rate_limiting->refill_bucket_event);
1007
0
    bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
1008
0
  }
1009
1010
0
  BEV_UNLOCK(bev);
1011
0
  return r;
1012
0
}
1013
1014
int
1015
bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1016
0
{
1017
  /* XXXX this is mostly copy-and-paste from
1018
   * bufferevent_decrement_read_limit */
1019
0
  int r = 0;
1020
0
  ev_ssize_t old_limit, new_limit;
1021
0
  struct bufferevent_private *bevp;
1022
0
  BEV_LOCK(bev);
1023
0
  bevp = BEV_UPCAST(bev);
1024
0
  EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1025
0
  old_limit = bevp->rate_limiting->limit.write_limit;
1026
1027
0
  new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1028
0
  if (old_limit > 0 && new_limit <= 0) {
1029
0
    bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1030
0
    if (event_add(&bevp->rate_limiting->refill_bucket_event,
1031
0
      &bevp->rate_limiting->cfg->tick_timeout) < 0)
1032
0
      r = -1;
1033
0
  } else if (old_limit <= 0 && new_limit > 0) {
1034
0
    if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1035
0
      event_del(&bevp->rate_limiting->refill_bucket_event);
1036
0
    bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1037
0
  }
1038
1039
0
  BEV_UNLOCK(bev);
1040
0
  return r;
1041
0
}
1042
1043
int
1044
bufferevent_rate_limit_group_decrement_read(
1045
  struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1046
0
{
1047
0
  int r = 0;
1048
0
  ev_ssize_t old_limit, new_limit;
1049
0
  LOCK_GROUP(grp);
1050
0
  old_limit = grp->rate_limit.read_limit;
1051
0
  new_limit = (grp->rate_limit.read_limit -= decr);
1052
1053
0
  if (old_limit > 0 && new_limit <= 0) {
1054
0
    bev_group_suspend_reading_(grp);
1055
0
  } else if (old_limit <= 0 && new_limit > 0) {
1056
0
    bev_group_unsuspend_reading_(grp);
1057
0
  }
1058
1059
0
  UNLOCK_GROUP(grp);
1060
0
  return r;
1061
0
}
1062
1063
int
1064
bufferevent_rate_limit_group_decrement_write(
1065
  struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1066
0
{
1067
0
  int r = 0;
1068
0
  ev_ssize_t old_limit, new_limit;
1069
0
  LOCK_GROUP(grp);
1070
0
  old_limit = grp->rate_limit.write_limit;
1071
0
  new_limit = (grp->rate_limit.write_limit -= decr);
1072
1073
0
  if (old_limit > 0 && new_limit <= 0) {
1074
0
    bev_group_suspend_writing_(grp);
1075
0
  } else if (old_limit <= 0 && new_limit > 0) {
1076
0
    bev_group_unsuspend_writing_(grp);
1077
0
  }
1078
1079
0
  UNLOCK_GROUP(grp);
1080
0
  return r;
1081
0
}
1082
1083
void
1084
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1085
    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1086
0
{
1087
0
  EVUTIL_ASSERT(grp != NULL);
1088
0
  if (total_read_out)
1089
0
    *total_read_out = grp->total_read;
1090
0
  if (total_written_out)
1091
0
    *total_written_out = grp->total_written;
1092
0
}
1093
1094
void
1095
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1096
0
{
1097
0
  grp->total_read = grp->total_written = 0;
1098
0
}
1099
1100
int
1101
bufferevent_ratelim_init_(struct bufferevent_private *bev)
1102
0
{
1103
0
  bev->rate_limiting = NULL;
1104
0
  bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1105
0
  bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1106
1107
0
  if (evbuffer_set_max_read(bev->bev.input, bev->max_single_read))
1108
0
    return -1;
1109
1110
0
  return 0;
1111
0
}