/src/lldpd/libevent/bufferevent_ratelim.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson |
3 | | * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> |
4 | | * All rights reserved. |
5 | | * |
6 | | * Redistribution and use in source and binary forms, with or without |
7 | | * modification, are permitted provided that the following conditions |
8 | | * are met: |
9 | | * 1. Redistributions of source code must retain the above copyright |
10 | | * notice, this list of conditions and the following disclaimer. |
11 | | * 2. Redistributions in binary form must reproduce the above copyright |
12 | | * notice, this list of conditions and the following disclaimer in the |
13 | | * documentation and/or other materials provided with the distribution. |
14 | | * 3. The name of the author may not be used to endorse or promote products |
15 | | * derived from this software without specific prior written permission. |
16 | | * |
17 | | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
18 | | * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
19 | | * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
20 | | * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
21 | | * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
22 | | * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
23 | | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
24 | | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
25 | | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
26 | | * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
27 | | */ |
28 | | #include "evconfig-private.h" |
29 | | |
30 | | #include <sys/types.h> |
31 | | #include <limits.h> |
32 | | #include <string.h> |
33 | | #include <stdlib.h> |
34 | | |
35 | | #include "event2/event.h" |
36 | | #include "event2/event_struct.h" |
37 | | #include "event2/util.h" |
38 | | #include "event2/bufferevent.h" |
39 | | #include "event2/bufferevent_struct.h" |
40 | | #include "event2/buffer.h" |
41 | | |
42 | | #include "ratelim-internal.h" |
43 | | |
44 | | #include "bufferevent-internal.h" |
45 | | #include "mm-internal.h" |
46 | | #include "util-internal.h" |
47 | | #include "event-internal.h" |
48 | | |
49 | | int |
50 | | ev_token_bucket_init_(struct ev_token_bucket *bucket, |
51 | | const struct ev_token_bucket_cfg *cfg, |
52 | | ev_uint32_t current_tick, |
53 | | int reinitialize) |
54 | 0 | { |
55 | 0 | if (reinitialize) { |
56 | | /* on reinitialization, we only clip downwards, since we've |
57 | | already used who-knows-how-much bandwidth this tick. We |
58 | | leave "last_updated" as it is; the next update will add the |
59 | | appropriate amount of bandwidth to the bucket. |
60 | | */ |
61 | 0 | if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) |
62 | 0 | bucket->read_limit = cfg->read_maximum; |
63 | 0 | if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) |
64 | 0 | bucket->write_limit = cfg->write_maximum; |
65 | 0 | } else { |
66 | 0 | bucket->read_limit = cfg->read_rate; |
67 | 0 | bucket->write_limit = cfg->write_rate; |
68 | 0 | bucket->last_updated = current_tick; |
69 | 0 | } |
70 | 0 | return 0; |
71 | 0 | } |
72 | | |
73 | | int |
74 | | ev_token_bucket_update_(struct ev_token_bucket *bucket, |
75 | | const struct ev_token_bucket_cfg *cfg, |
76 | | ev_uint32_t current_tick) |
77 | 0 | { |
78 | | /* It's okay if the tick number overflows, since we'll just |
79 | | * wrap around when we do the unsigned substraction. */ |
80 | 0 | unsigned n_ticks = current_tick - bucket->last_updated; |
81 | | |
82 | | /* Make sure some ticks actually happened, and that time didn't |
83 | | * roll back. */ |
84 | 0 | if (n_ticks == 0 || n_ticks > INT_MAX) |
85 | 0 | return 0; |
86 | | |
87 | | /* Naively, we would say |
88 | | bucket->limit += n_ticks * cfg->rate; |
89 | | |
90 | | if (bucket->limit > cfg->maximum) |
91 | | bucket->limit = cfg->maximum; |
92 | | |
93 | | But we're worried about overflow, so we do it like this: |
94 | | */ |
95 | | |
96 | 0 | if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) |
97 | 0 | bucket->read_limit = cfg->read_maximum; |
98 | 0 | else |
99 | 0 | bucket->read_limit += n_ticks * cfg->read_rate; |
100 | | |
101 | |
|
102 | 0 | if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) |
103 | 0 | bucket->write_limit = cfg->write_maximum; |
104 | 0 | else |
105 | 0 | bucket->write_limit += n_ticks * cfg->write_rate; |
106 | | |
107 | |
|
108 | 0 | bucket->last_updated = current_tick; |
109 | |
|
110 | 0 | return 1; |
111 | 0 | } |
112 | | |
113 | | static inline void |
114 | | bufferevent_update_buckets(struct bufferevent_private *bev) |
115 | 0 | { |
116 | | /* Must hold lock on bev. */ |
117 | 0 | struct timeval now; |
118 | 0 | unsigned tick; |
119 | 0 | event_base_gettimeofday_cached(bev->bev.ev_base, &now); |
120 | 0 | tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); |
121 | 0 | if (tick != bev->rate_limiting->limit.last_updated) |
122 | 0 | ev_token_bucket_update_(&bev->rate_limiting->limit, |
123 | 0 | bev->rate_limiting->cfg, tick); |
124 | 0 | } |
125 | | |
126 | | ev_uint32_t |
127 | | ev_token_bucket_get_tick_(const struct timeval *tv, |
128 | | const struct ev_token_bucket_cfg *cfg) |
129 | 0 | { |
130 | | /* This computation uses two multiplies and a divide. We could do |
131 | | * fewer if we knew that the tick length was an integer number of |
132 | | * seconds, or if we knew it divided evenly into a second. We should |
133 | | * investigate that more. |
134 | | */ |
135 | | |
136 | | /* We cast to an ev_uint64_t first, since we don't want to overflow |
137 | | * before we do the final divide. */ |
138 | 0 | ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; |
139 | 0 | return (unsigned)(msec / cfg->msec_per_tick); |
140 | 0 | } |
141 | | |
142 | | struct ev_token_bucket_cfg * |
143 | | ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, |
144 | | size_t write_rate, size_t write_burst, |
145 | | const struct timeval *tick_len) |
146 | 0 | { |
147 | 0 | struct ev_token_bucket_cfg *r; |
148 | 0 | struct timeval g; |
149 | 0 | if (! tick_len) { |
150 | 0 | g.tv_sec = 1; |
151 | 0 | g.tv_usec = 0; |
152 | 0 | tick_len = &g; |
153 | 0 | } |
154 | 0 | if (read_rate > read_burst || write_rate > write_burst || |
155 | 0 | read_rate < 1 || write_rate < 1) |
156 | 0 | return NULL; |
157 | 0 | if (read_rate > EV_RATE_LIMIT_MAX || |
158 | 0 | write_rate > EV_RATE_LIMIT_MAX || |
159 | 0 | read_burst > EV_RATE_LIMIT_MAX || |
160 | 0 | write_burst > EV_RATE_LIMIT_MAX) |
161 | 0 | return NULL; |
162 | 0 | r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); |
163 | 0 | if (!r) |
164 | 0 | return NULL; |
165 | 0 | r->read_rate = read_rate; |
166 | 0 | r->write_rate = write_rate; |
167 | 0 | r->read_maximum = read_burst; |
168 | 0 | r->write_maximum = write_burst; |
169 | 0 | memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); |
170 | 0 | r->msec_per_tick = (tick_len->tv_sec * 1000) + |
171 | 0 | (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; |
172 | 0 | return r; |
173 | 0 | } |
174 | | |
175 | | void |
176 | | ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) |
177 | 0 | { |
178 | 0 | mm_free(cfg); |
179 | 0 | } |
180 | | |
181 | | /* Default values for max_single_read & max_single_write variables. */ |
182 | 0 | #define MAX_SINGLE_READ_DEFAULT 16384 |
183 | 0 | #define MAX_SINGLE_WRITE_DEFAULT 16384 |
184 | | |
185 | 0 | #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) |
186 | 0 | #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) |
187 | | |
188 | | static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); |
189 | | static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); |
190 | | static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); |
191 | | static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); |
192 | | |
193 | | /** Helper: figure out the maximum amount we should write if is_write, or |
194 | | the maximum amount we should read if is_read. Return that maximum, or |
195 | | 0 if our bucket is wholly exhausted. |
196 | | */ |
197 | | static inline ev_ssize_t |
198 | | bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) |
199 | 0 | { |
200 | | /* needs lock on bev. */ |
201 | 0 | ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; |
202 | |
|
203 | 0 | #define LIM(x) \ |
204 | 0 | (is_write ? (x).write_limit : (x).read_limit) |
205 | |
|
206 | 0 | #define GROUP_SUSPENDED(g) \ |
207 | 0 | (is_write ? (g)->write_suspended : (g)->read_suspended) |
208 | | |
209 | | /* Sets max_so_far to MIN(x, max_so_far) */ |
210 | 0 | #define CLAMPTO(x) \ |
211 | 0 | do { \ |
212 | 0 | if (max_so_far > (x)) \ |
213 | 0 | max_so_far = (x); \ |
214 | 0 | } while (0); |
215 | |
|
216 | 0 | if (!bev->rate_limiting) |
217 | 0 | return max_so_far; |
218 | | |
219 | | /* If rate-limiting is enabled at all, update the appropriate |
220 | | bucket, and take the smaller of our rate limit and the group |
221 | | rate limit. |
222 | | */ |
223 | | |
224 | 0 | if (bev->rate_limiting->cfg) { |
225 | 0 | bufferevent_update_buckets(bev); |
226 | 0 | max_so_far = LIM(bev->rate_limiting->limit); |
227 | 0 | } |
228 | 0 | if (bev->rate_limiting->group) { |
229 | 0 | struct bufferevent_rate_limit_group *g = |
230 | 0 | bev->rate_limiting->group; |
231 | 0 | ev_ssize_t share; |
232 | 0 | LOCK_GROUP(g); |
233 | 0 | if (GROUP_SUSPENDED(g)) { |
234 | | /* We can get here if we failed to lock this |
235 | | * particular bufferevent while suspending the whole |
236 | | * group. */ |
237 | 0 | if (is_write) |
238 | 0 | bufferevent_suspend_write_(&bev->bev, |
239 | 0 | BEV_SUSPEND_BW_GROUP); |
240 | 0 | else |
241 | 0 | bufferevent_suspend_read_(&bev->bev, |
242 | 0 | BEV_SUSPEND_BW_GROUP); |
243 | 0 | share = 0; |
244 | 0 | } else { |
245 | | /* XXXX probably we should divide among the active |
246 | | * members, not the total members. */ |
247 | 0 | share = LIM(g->rate_limit) / g->n_members; |
248 | 0 | if (share < g->min_share) |
249 | 0 | share = g->min_share; |
250 | 0 | } |
251 | 0 | UNLOCK_GROUP(g); |
252 | 0 | CLAMPTO(share); |
253 | 0 | } |
254 | |
|
255 | 0 | if (max_so_far < 0) |
256 | 0 | max_so_far = 0; |
257 | 0 | return max_so_far; |
258 | 0 | } |
259 | | |
260 | | ev_ssize_t |
261 | | bufferevent_get_read_max_(struct bufferevent_private *bev) |
262 | 0 | { |
263 | 0 | return bufferevent_get_rlim_max_(bev, 0); |
264 | 0 | } |
265 | | |
266 | | ev_ssize_t |
267 | | bufferevent_get_write_max_(struct bufferevent_private *bev) |
268 | 0 | { |
269 | 0 | return bufferevent_get_rlim_max_(bev, 1); |
270 | 0 | } |
271 | | |
272 | | int |
273 | | bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) |
274 | 0 | { |
275 | | /* XXXXX Make sure all users of this function check its return value */ |
276 | 0 | int r = 0; |
277 | | /* need to hold lock on bev */ |
278 | 0 | if (!bev->rate_limiting) |
279 | 0 | return 0; |
280 | | |
281 | 0 | if (bev->rate_limiting->cfg) { |
282 | 0 | bev->rate_limiting->limit.read_limit -= bytes; |
283 | 0 | if (bev->rate_limiting->limit.read_limit <= 0) { |
284 | 0 | bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); |
285 | 0 | if (event_add(&bev->rate_limiting->refill_bucket_event, |
286 | 0 | &bev->rate_limiting->cfg->tick_timeout) < 0) |
287 | 0 | r = -1; |
288 | 0 | } else if (bev->read_suspended & BEV_SUSPEND_BW) { |
289 | 0 | if (!(bev->write_suspended & BEV_SUSPEND_BW)) |
290 | 0 | event_del(&bev->rate_limiting->refill_bucket_event); |
291 | 0 | bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); |
292 | 0 | } |
293 | 0 | } |
294 | |
|
295 | 0 | if (bev->rate_limiting->group) { |
296 | 0 | LOCK_GROUP(bev->rate_limiting->group); |
297 | 0 | bev->rate_limiting->group->rate_limit.read_limit -= bytes; |
298 | 0 | bev->rate_limiting->group->total_read += bytes; |
299 | 0 | if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { |
300 | 0 | bev_group_suspend_reading_(bev->rate_limiting->group); |
301 | 0 | } else if (bev->rate_limiting->group->read_suspended) { |
302 | 0 | bev_group_unsuspend_reading_(bev->rate_limiting->group); |
303 | 0 | } |
304 | 0 | UNLOCK_GROUP(bev->rate_limiting->group); |
305 | 0 | } |
306 | |
|
307 | 0 | return r; |
308 | 0 | } |
309 | | |
310 | | int |
311 | | bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) |
312 | 0 | { |
313 | | /* XXXXX Make sure all users of this function check its return value */ |
314 | 0 | int r = 0; |
315 | | /* need to hold lock */ |
316 | 0 | if (!bev->rate_limiting) |
317 | 0 | return 0; |
318 | | |
319 | 0 | if (bev->rate_limiting->cfg) { |
320 | 0 | bev->rate_limiting->limit.write_limit -= bytes; |
321 | 0 | if (bev->rate_limiting->limit.write_limit <= 0) { |
322 | 0 | bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); |
323 | 0 | if (event_add(&bev->rate_limiting->refill_bucket_event, |
324 | 0 | &bev->rate_limiting->cfg->tick_timeout) < 0) |
325 | 0 | r = -1; |
326 | 0 | } else if (bev->write_suspended & BEV_SUSPEND_BW) { |
327 | 0 | if (!(bev->read_suspended & BEV_SUSPEND_BW)) |
328 | 0 | event_del(&bev->rate_limiting->refill_bucket_event); |
329 | 0 | bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); |
330 | 0 | } |
331 | 0 | } |
332 | |
|
333 | 0 | if (bev->rate_limiting->group) { |
334 | 0 | LOCK_GROUP(bev->rate_limiting->group); |
335 | 0 | bev->rate_limiting->group->rate_limit.write_limit -= bytes; |
336 | 0 | bev->rate_limiting->group->total_written += bytes; |
337 | 0 | if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { |
338 | 0 | bev_group_suspend_writing_(bev->rate_limiting->group); |
339 | 0 | } else if (bev->rate_limiting->group->write_suspended) { |
340 | 0 | bev_group_unsuspend_writing_(bev->rate_limiting->group); |
341 | 0 | } |
342 | 0 | UNLOCK_GROUP(bev->rate_limiting->group); |
343 | 0 | } |
344 | |
|
345 | 0 | return r; |
346 | 0 | } |
347 | | |
348 | | /** Stop reading on every bufferevent in <b>g</b> */ |
349 | | static int |
350 | | bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) |
351 | 0 | { |
352 | | /* Needs group lock */ |
353 | 0 | struct bufferevent_private *bev; |
354 | 0 | g->read_suspended = 1; |
355 | 0 | g->pending_unsuspend_read = 0; |
356 | | |
357 | | /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, |
358 | | to prevent a deadlock. (Ordinarily, the group lock nests inside |
359 | | the bufferevent locks. If we are unable to lock any individual |
360 | | bufferevent, it will find out later when it looks at its limit |
361 | | and sees that its group is suspended.) |
362 | | */ |
363 | 0 | LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { |
364 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
365 | 0 | bufferevent_suspend_read_(&bev->bev, |
366 | 0 | BEV_SUSPEND_BW_GROUP); |
367 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
368 | 0 | } |
369 | 0 | } |
370 | 0 | return 0; |
371 | 0 | } |
372 | | |
373 | | /** Stop writing on every bufferevent in <b>g</b> */ |
374 | | static int |
375 | | bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) |
376 | 0 | { |
377 | | /* Needs group lock */ |
378 | 0 | struct bufferevent_private *bev; |
379 | 0 | g->write_suspended = 1; |
380 | 0 | g->pending_unsuspend_write = 0; |
381 | 0 | LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { |
382 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
383 | 0 | bufferevent_suspend_write_(&bev->bev, |
384 | 0 | BEV_SUSPEND_BW_GROUP); |
385 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
386 | 0 | } |
387 | 0 | } |
388 | 0 | return 0; |
389 | 0 | } |
390 | | |
391 | | /** Timer callback invoked on a single bufferevent with one or more exhausted |
392 | | buckets when they are ready to refill. */ |
393 | | static void |
394 | | bev_refill_callback_(evutil_socket_t fd, short what, void *arg) |
395 | 0 | { |
396 | 0 | unsigned tick; |
397 | 0 | struct timeval now; |
398 | 0 | struct bufferevent_private *bev = arg; |
399 | 0 | int again = 0; |
400 | 0 | BEV_LOCK(&bev->bev); |
401 | 0 | if (!bev->rate_limiting || !bev->rate_limiting->cfg) { |
402 | 0 | BEV_UNLOCK(&bev->bev); |
403 | 0 | return; |
404 | 0 | } |
405 | | |
406 | | /* First, update the bucket */ |
407 | 0 | event_base_gettimeofday_cached(bev->bev.ev_base, &now); |
408 | 0 | tick = ev_token_bucket_get_tick_(&now, |
409 | 0 | bev->rate_limiting->cfg); |
410 | 0 | ev_token_bucket_update_(&bev->rate_limiting->limit, |
411 | 0 | bev->rate_limiting->cfg, |
412 | 0 | tick); |
413 | | |
414 | | /* Now unsuspend any read/write operations as appropriate. */ |
415 | 0 | if ((bev->read_suspended & BEV_SUSPEND_BW)) { |
416 | 0 | if (bev->rate_limiting->limit.read_limit > 0) |
417 | 0 | bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); |
418 | 0 | else |
419 | 0 | again = 1; |
420 | 0 | } |
421 | 0 | if ((bev->write_suspended & BEV_SUSPEND_BW)) { |
422 | 0 | if (bev->rate_limiting->limit.write_limit > 0) |
423 | 0 | bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); |
424 | 0 | else |
425 | 0 | again = 1; |
426 | 0 | } |
427 | 0 | if (again) { |
428 | | /* One or more of the buckets may need another refill if they |
429 | | started negative. |
430 | | |
431 | | XXXX if we need to be quiet for more ticks, we should |
432 | | maybe figure out what timeout we really want. |
433 | | */ |
434 | | /* XXXX Handle event_add failure somehow */ |
435 | 0 | event_add(&bev->rate_limiting->refill_bucket_event, |
436 | 0 | &bev->rate_limiting->cfg->tick_timeout); |
437 | 0 | } |
438 | 0 | BEV_UNLOCK(&bev->bev); |
439 | 0 | } |
440 | | |
441 | | /** Helper: grab a random element from a bufferevent group. |
442 | | * |
443 | | * Requires that we hold the lock on the group. |
444 | | */ |
445 | | static struct bufferevent_private * |
446 | | bev_group_random_element_(struct bufferevent_rate_limit_group *group) |
447 | 0 | { |
448 | 0 | int which; |
449 | 0 | struct bufferevent_private *bev; |
450 | | |
451 | | /* requires group lock */ |
452 | |
|
453 | 0 | if (!group->n_members) |
454 | 0 | return NULL; |
455 | | |
456 | 0 | EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); |
457 | | |
458 | 0 | which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); |
459 | |
|
460 | 0 | bev = LIST_FIRST(&group->members); |
461 | 0 | while (which--) |
462 | 0 | bev = LIST_NEXT(bev, rate_limiting->next_in_group); |
463 | |
|
464 | 0 | return bev; |
465 | 0 | } |
466 | | |
467 | | /** Iterate over the elements of a rate-limiting group 'g' with a random |
468 | | starting point, assigning each to the variable 'bev', and executing the |
469 | | block 'block'. |
470 | | |
471 | | We do this in a half-baked effort to get fairness among group members. |
472 | | XXX Round-robin or some kind of priority queue would be even more fair. |
473 | | */ |
474 | | #define FOREACH_RANDOM_ORDER(block) \ |
475 | 0 | do { \ |
476 | 0 | first = bev_group_random_element_(g); \ |
477 | 0 | for (bev = first; bev != LIST_END(&g->members); \ |
478 | 0 | bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ |
479 | 0 | block ; \ |
480 | 0 | } \ |
481 | 0 | for (bev = LIST_FIRST(&g->members); bev && bev != first; \ |
482 | 0 | bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ |
483 | 0 | block ; \ |
484 | 0 | } \ |
485 | 0 | } while (0) |
486 | | |
487 | | static void |
488 | | bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) |
489 | 0 | { |
490 | 0 | int again = 0; |
491 | 0 | struct bufferevent_private *bev, *first; |
492 | |
|
493 | 0 | g->read_suspended = 0; |
494 | 0 | FOREACH_RANDOM_ORDER({ |
495 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
496 | 0 | bufferevent_unsuspend_read_(&bev->bev, |
497 | 0 | BEV_SUSPEND_BW_GROUP); |
498 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
499 | 0 | } else { |
500 | 0 | again = 1; |
501 | 0 | } |
502 | 0 | }); |
503 | 0 | g->pending_unsuspend_read = again; |
504 | 0 | } |
505 | | |
506 | | static void |
507 | | bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) |
508 | 0 | { |
509 | 0 | int again = 0; |
510 | 0 | struct bufferevent_private *bev, *first; |
511 | 0 | g->write_suspended = 0; |
512 | |
|
513 | 0 | FOREACH_RANDOM_ORDER({ |
514 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
515 | 0 | bufferevent_unsuspend_write_(&bev->bev, |
516 | 0 | BEV_SUSPEND_BW_GROUP); |
517 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
518 | 0 | } else { |
519 | 0 | again = 1; |
520 | 0 | } |
521 | 0 | }); |
522 | 0 | g->pending_unsuspend_write = again; |
523 | 0 | } |
524 | | |
525 | | /** Callback invoked every tick to add more elements to the group bucket |
526 | | and unsuspend group members as needed. |
527 | | */ |
528 | | static void |
529 | | bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) |
530 | 0 | { |
531 | 0 | struct bufferevent_rate_limit_group *g = arg; |
532 | 0 | unsigned tick; |
533 | 0 | struct timeval now; |
534 | |
|
535 | 0 | event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); |
536 | |
|
537 | 0 | LOCK_GROUP(g); |
538 | |
|
539 | 0 | tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); |
540 | 0 | ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); |
541 | |
|
542 | 0 | if (g->pending_unsuspend_read || |
543 | 0 | (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { |
544 | 0 | bev_group_unsuspend_reading_(g); |
545 | 0 | } |
546 | 0 | if (g->pending_unsuspend_write || |
547 | 0 | (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ |
548 | 0 | bev_group_unsuspend_writing_(g); |
549 | 0 | } |
550 | | |
551 | | /* XXXX Rather than waiting to the next tick to unsuspend stuff |
552 | | * with pending_unsuspend_write/read, we should do it on the |
553 | | * next iteration of the mainloop. |
554 | | */ |
555 | |
|
556 | 0 | UNLOCK_GROUP(g); |
557 | 0 | } |
558 | | |
559 | | int |
560 | | bufferevent_set_rate_limit(struct bufferevent *bev, |
561 | | struct ev_token_bucket_cfg *cfg) |
562 | 0 | { |
563 | 0 | struct bufferevent_private *bevp = BEV_UPCAST(bev); |
564 | 0 | int r = -1; |
565 | 0 | struct bufferevent_rate_limit *rlim; |
566 | 0 | struct timeval now; |
567 | 0 | ev_uint32_t tick; |
568 | 0 | int reinit = 0, suspended = 0; |
569 | | /* XXX reference-count cfg */ |
570 | |
|
571 | 0 | BEV_LOCK(bev); |
572 | |
|
573 | 0 | if (cfg == NULL) { |
574 | 0 | if (bevp->rate_limiting) { |
575 | 0 | rlim = bevp->rate_limiting; |
576 | 0 | rlim->cfg = NULL; |
577 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); |
578 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); |
579 | 0 | if (event_initialized(&rlim->refill_bucket_event)) |
580 | 0 | event_del(&rlim->refill_bucket_event); |
581 | 0 | } |
582 | 0 | r = 0; |
583 | 0 | goto done; |
584 | 0 | } |
585 | | |
586 | 0 | event_base_gettimeofday_cached(bev->ev_base, &now); |
587 | 0 | tick = ev_token_bucket_get_tick_(&now, cfg); |
588 | |
|
589 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { |
590 | | /* no-op */ |
591 | 0 | r = 0; |
592 | 0 | goto done; |
593 | 0 | } |
594 | 0 | if (bevp->rate_limiting == NULL) { |
595 | 0 | rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); |
596 | 0 | if (!rlim) |
597 | 0 | goto done; |
598 | 0 | bevp->rate_limiting = rlim; |
599 | 0 | } else { |
600 | 0 | rlim = bevp->rate_limiting; |
601 | 0 | } |
602 | 0 | reinit = rlim->cfg != NULL; |
603 | |
|
604 | 0 | rlim->cfg = cfg; |
605 | 0 | ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); |
606 | |
|
607 | 0 | if (reinit) { |
608 | 0 | EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); |
609 | 0 | event_del(&rlim->refill_bucket_event); |
610 | 0 | } |
611 | 0 | event_assign(&rlim->refill_bucket_event, bev->ev_base, |
612 | 0 | -1, EV_FINALIZE, bev_refill_callback_, bevp); |
613 | |
|
614 | 0 | if (rlim->limit.read_limit > 0) { |
615 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); |
616 | 0 | } else { |
617 | 0 | bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); |
618 | 0 | suspended=1; |
619 | 0 | } |
620 | 0 | if (rlim->limit.write_limit > 0) { |
621 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); |
622 | 0 | } else { |
623 | 0 | bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); |
624 | 0 | suspended = 1; |
625 | 0 | } |
626 | |
|
627 | 0 | if (suspended) |
628 | 0 | event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); |
629 | |
|
630 | 0 | r = 0; |
631 | |
|
632 | 0 | done: |
633 | 0 | BEV_UNLOCK(bev); |
634 | 0 | return r; |
635 | 0 | } |
636 | | |
637 | | struct bufferevent_rate_limit_group * |
638 | | bufferevent_rate_limit_group_new(struct event_base *base, |
639 | | const struct ev_token_bucket_cfg *cfg) |
640 | 0 | { |
641 | 0 | struct bufferevent_rate_limit_group *g; |
642 | 0 | struct timeval now; |
643 | 0 | ev_uint32_t tick; |
644 | |
|
645 | 0 | event_base_gettimeofday_cached(base, &now); |
646 | 0 | tick = ev_token_bucket_get_tick_(&now, cfg); |
647 | |
|
648 | 0 | g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); |
649 | 0 | if (!g) |
650 | 0 | return NULL; |
651 | 0 | memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); |
652 | 0 | LIST_INIT(&g->members); |
653 | |
|
654 | 0 | ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); |
655 | |
|
656 | 0 | event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, |
657 | 0 | bev_group_refill_callback_, g); |
658 | | /*XXXX handle event_add failure */ |
659 | 0 | event_add(&g->master_refill_event, &cfg->tick_timeout); |
660 | |
|
661 | 0 | EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
662 | |
|
663 | 0 | bufferevent_rate_limit_group_set_min_share(g, 64); |
664 | |
|
665 | 0 | evutil_weakrand_seed_(&g->weakrand_seed, |
666 | 0 | (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); |
667 | |
|
668 | 0 | return g; |
669 | 0 | } |
670 | | |
671 | | int |
672 | | bufferevent_rate_limit_group_set_cfg( |
673 | | struct bufferevent_rate_limit_group *g, |
674 | | const struct ev_token_bucket_cfg *cfg) |
675 | 0 | { |
676 | 0 | int same_tick; |
677 | 0 | if (!g || !cfg) |
678 | 0 | return -1; |
679 | | |
680 | 0 | LOCK_GROUP(g); |
681 | 0 | same_tick = evutil_timercmp( |
682 | 0 | &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); |
683 | 0 | memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); |
684 | |
|
685 | 0 | if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) |
686 | 0 | g->rate_limit.read_limit = cfg->read_maximum; |
687 | 0 | if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) |
688 | 0 | g->rate_limit.write_limit = cfg->write_maximum; |
689 | |
|
690 | 0 | if (!same_tick) { |
691 | | /* This can cause a hiccup in the schedule */ |
692 | 0 | event_add(&g->master_refill_event, &cfg->tick_timeout); |
693 | 0 | } |
694 | | |
695 | | /* The new limits might force us to adjust min_share differently. */ |
696 | 0 | bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); |
697 | |
|
698 | 0 | UNLOCK_GROUP(g); |
699 | 0 | return 0; |
700 | 0 | } |
701 | | |
702 | | int |
703 | | bufferevent_rate_limit_group_set_min_share( |
704 | | struct bufferevent_rate_limit_group *g, |
705 | | size_t share) |
706 | 0 | { |
707 | 0 | if (share > EV_SSIZE_MAX) |
708 | 0 | return -1; |
709 | | |
710 | 0 | g->configured_min_share = share; |
711 | | |
712 | | /* Can't set share to less than the one-tick maximum. IOW, at steady |
713 | | * state, at least one connection can go per tick. */ |
714 | 0 | if (share > g->rate_limit_cfg.read_rate) |
715 | 0 | share = g->rate_limit_cfg.read_rate; |
716 | 0 | if (share > g->rate_limit_cfg.write_rate) |
717 | 0 | share = g->rate_limit_cfg.write_rate; |
718 | |
|
719 | 0 | g->min_share = share; |
720 | 0 | return 0; |
721 | 0 | } |
722 | | |
723 | | void |
724 | | bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) |
725 | 0 | { |
726 | 0 | LOCK_GROUP(g); |
727 | 0 | EVUTIL_ASSERT(0 == g->n_members); |
728 | 0 | event_del(&g->master_refill_event); |
729 | 0 | UNLOCK_GROUP(g); |
730 | 0 | EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
731 | 0 | mm_free(g); |
732 | 0 | } |
733 | | |
734 | | int |
735 | | bufferevent_add_to_rate_limit_group(struct bufferevent *bev, |
736 | | struct bufferevent_rate_limit_group *g) |
737 | 0 | { |
738 | 0 | int wsuspend, rsuspend; |
739 | 0 | struct bufferevent_private *bevp = BEV_UPCAST(bev); |
740 | 0 | BEV_LOCK(bev); |
741 | |
|
742 | 0 | if (!bevp->rate_limiting) { |
743 | 0 | struct bufferevent_rate_limit *rlim; |
744 | 0 | rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); |
745 | 0 | if (!rlim) { |
746 | 0 | BEV_UNLOCK(bev); |
747 | 0 | return -1; |
748 | 0 | } |
749 | 0 | event_assign(&rlim->refill_bucket_event, bev->ev_base, |
750 | 0 | -1, EV_FINALIZE, bev_refill_callback_, bevp); |
751 | 0 | bevp->rate_limiting = rlim; |
752 | 0 | } |
753 | | |
754 | 0 | if (bevp->rate_limiting->group == g) { |
755 | 0 | BEV_UNLOCK(bev); |
756 | 0 | return 0; |
757 | 0 | } |
758 | 0 | if (bevp->rate_limiting->group) |
759 | 0 | bufferevent_remove_from_rate_limit_group(bev); |
760 | |
|
761 | 0 | LOCK_GROUP(g); |
762 | 0 | bevp->rate_limiting->group = g; |
763 | 0 | ++g->n_members; |
764 | 0 | LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); |
765 | |
|
766 | 0 | rsuspend = g->read_suspended; |
767 | 0 | wsuspend = g->write_suspended; |
768 | |
|
769 | 0 | UNLOCK_GROUP(g); |
770 | |
|
771 | 0 | if (rsuspend) |
772 | 0 | bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); |
773 | 0 | if (wsuspend) |
774 | 0 | bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); |
775 | |
|
776 | 0 | BEV_UNLOCK(bev); |
777 | 0 | return 0; |
778 | 0 | } |
779 | | |
780 | | int |
781 | | bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) |
782 | 0 | { |
783 | 0 | return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); |
784 | 0 | } |
785 | | |
786 | | int |
787 | | bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, |
788 | | int unsuspend) |
789 | 0 | { |
790 | 0 | struct bufferevent_private *bevp = BEV_UPCAST(bev); |
791 | 0 | BEV_LOCK(bev); |
792 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->group) { |
793 | 0 | struct bufferevent_rate_limit_group *g = |
794 | 0 | bevp->rate_limiting->group; |
795 | 0 | LOCK_GROUP(g); |
796 | 0 | bevp->rate_limiting->group = NULL; |
797 | 0 | --g->n_members; |
798 | 0 | LIST_REMOVE(bevp, rate_limiting->next_in_group); |
799 | 0 | UNLOCK_GROUP(g); |
800 | 0 | } |
801 | 0 | if (unsuspend) { |
802 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); |
803 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); |
804 | 0 | } |
805 | 0 | BEV_UNLOCK(bev); |
806 | 0 | return 0; |
807 | 0 | } |
808 | | |
809 | | /* === |
810 | | * API functions to expose rate limits. |
811 | | * |
812 | | * Don't use these from inside Libevent; they're meant to be for use by |
813 | | * the program. |
814 | | * === */ |
815 | | |
816 | | /* Mostly you don't want to use this function from inside libevent; |
817 | | * bufferevent_get_read_max_() is more likely what you want*/ |
818 | | ev_ssize_t |
819 | | bufferevent_get_read_limit(struct bufferevent *bev) |
820 | 0 | { |
821 | 0 | ev_ssize_t r; |
822 | 0 | struct bufferevent_private *bevp; |
823 | 0 | BEV_LOCK(bev); |
824 | 0 | bevp = BEV_UPCAST(bev); |
825 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->cfg) { |
826 | 0 | bufferevent_update_buckets(bevp); |
827 | 0 | r = bevp->rate_limiting->limit.read_limit; |
828 | 0 | } else { |
829 | 0 | r = EV_SSIZE_MAX; |
830 | 0 | } |
831 | 0 | BEV_UNLOCK(bev); |
832 | 0 | return r; |
833 | 0 | } |
834 | | |
835 | | /* Mostly you don't want to use this function from inside libevent; |
836 | | * bufferevent_get_write_max_() is more likely what you want*/ |
837 | | ev_ssize_t |
838 | | bufferevent_get_write_limit(struct bufferevent *bev) |
839 | 0 | { |
840 | 0 | ev_ssize_t r; |
841 | 0 | struct bufferevent_private *bevp; |
842 | 0 | BEV_LOCK(bev); |
843 | 0 | bevp = BEV_UPCAST(bev); |
844 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->cfg) { |
845 | 0 | bufferevent_update_buckets(bevp); |
846 | 0 | r = bevp->rate_limiting->limit.write_limit; |
847 | 0 | } else { |
848 | 0 | r = EV_SSIZE_MAX; |
849 | 0 | } |
850 | 0 | BEV_UNLOCK(bev); |
851 | 0 | return r; |
852 | 0 | } |
853 | | |
854 | | int |
855 | | bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) |
856 | 0 | { |
857 | 0 | struct bufferevent_private *bevp; |
858 | 0 | BEV_LOCK(bev); |
859 | 0 | bevp = BEV_UPCAST(bev); |
860 | 0 | if (size == 0 || size > EV_SSIZE_MAX) |
861 | 0 | bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; |
862 | 0 | else |
863 | 0 | bevp->max_single_read = size; |
864 | 0 | BEV_UNLOCK(bev); |
865 | 0 | return 0; |
866 | 0 | } |
867 | | |
868 | | int |
869 | | bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) |
870 | 0 | { |
871 | 0 | struct bufferevent_private *bevp; |
872 | 0 | BEV_LOCK(bev); |
873 | 0 | bevp = BEV_UPCAST(bev); |
874 | 0 | if (size == 0 || size > EV_SSIZE_MAX) |
875 | 0 | bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; |
876 | 0 | else |
877 | 0 | bevp->max_single_write = size; |
878 | 0 | BEV_UNLOCK(bev); |
879 | 0 | return 0; |
880 | 0 | } |
881 | | |
882 | | ev_ssize_t |
883 | | bufferevent_get_max_single_read(struct bufferevent *bev) |
884 | 0 | { |
885 | 0 | ev_ssize_t r; |
886 | |
|
887 | 0 | BEV_LOCK(bev); |
888 | 0 | r = BEV_UPCAST(bev)->max_single_read; |
889 | 0 | BEV_UNLOCK(bev); |
890 | 0 | return r; |
891 | 0 | } |
892 | | |
893 | | ev_ssize_t |
894 | | bufferevent_get_max_single_write(struct bufferevent *bev) |
895 | 0 | { |
896 | 0 | ev_ssize_t r; |
897 | |
|
898 | 0 | BEV_LOCK(bev); |
899 | 0 | r = BEV_UPCAST(bev)->max_single_write; |
900 | 0 | BEV_UNLOCK(bev); |
901 | 0 | return r; |
902 | 0 | } |
903 | | |
904 | | ev_ssize_t |
905 | | bufferevent_get_max_to_read(struct bufferevent *bev) |
906 | 0 | { |
907 | 0 | ev_ssize_t r; |
908 | 0 | BEV_LOCK(bev); |
909 | 0 | r = bufferevent_get_read_max_(BEV_UPCAST(bev)); |
910 | 0 | BEV_UNLOCK(bev); |
911 | 0 | return r; |
912 | 0 | } |
913 | | |
914 | | ev_ssize_t |
915 | | bufferevent_get_max_to_write(struct bufferevent *bev) |
916 | 0 | { |
917 | 0 | ev_ssize_t r; |
918 | 0 | BEV_LOCK(bev); |
919 | 0 | r = bufferevent_get_write_max_(BEV_UPCAST(bev)); |
920 | 0 | BEV_UNLOCK(bev); |
921 | 0 | return r; |
922 | 0 | } |
923 | | |
924 | | const struct ev_token_bucket_cfg * |
925 | 0 | bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { |
926 | 0 | struct bufferevent_private *bufev_private = BEV_UPCAST(bev); |
927 | 0 | struct ev_token_bucket_cfg *cfg; |
928 | |
|
929 | 0 | BEV_LOCK(bev); |
930 | |
|
931 | 0 | if (bufev_private->rate_limiting) { |
932 | 0 | cfg = bufev_private->rate_limiting->cfg; |
933 | 0 | } else { |
934 | 0 | cfg = NULL; |
935 | 0 | } |
936 | |
|
937 | 0 | BEV_UNLOCK(bev); |
938 | |
|
939 | 0 | return cfg; |
940 | 0 | } |
941 | | |
942 | | /* Mostly you don't want to use this function from inside libevent; |
943 | | * bufferevent_get_read_max_() is more likely what you want*/ |
944 | | ev_ssize_t |
945 | | bufferevent_rate_limit_group_get_read_limit( |
946 | | struct bufferevent_rate_limit_group *grp) |
947 | 0 | { |
948 | 0 | ev_ssize_t r; |
949 | 0 | LOCK_GROUP(grp); |
950 | 0 | r = grp->rate_limit.read_limit; |
951 | 0 | UNLOCK_GROUP(grp); |
952 | 0 | return r; |
953 | 0 | } |
954 | | |
955 | | /* Mostly you don't want to use this function from inside libevent; |
956 | | * bufferevent_get_write_max_() is more likely what you want. */ |
957 | | ev_ssize_t |
958 | | bufferevent_rate_limit_group_get_write_limit( |
959 | | struct bufferevent_rate_limit_group *grp) |
960 | 0 | { |
961 | 0 | ev_ssize_t r; |
962 | 0 | LOCK_GROUP(grp); |
963 | 0 | r = grp->rate_limit.write_limit; |
964 | 0 | UNLOCK_GROUP(grp); |
965 | 0 | return r; |
966 | 0 | } |
967 | | |
968 | | int |
969 | | bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) |
970 | 0 | { |
971 | 0 | int r = 0; |
972 | 0 | ev_ssize_t old_limit, new_limit; |
973 | 0 | struct bufferevent_private *bevp; |
974 | 0 | BEV_LOCK(bev); |
975 | 0 | bevp = BEV_UPCAST(bev); |
976 | 0 | EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); |
977 | 0 | old_limit = bevp->rate_limiting->limit.read_limit; |
978 | |
|
979 | 0 | new_limit = (bevp->rate_limiting->limit.read_limit -= decr); |
980 | 0 | if (old_limit > 0 && new_limit <= 0) { |
981 | 0 | bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); |
982 | 0 | if (event_add(&bevp->rate_limiting->refill_bucket_event, |
983 | 0 | &bevp->rate_limiting->cfg->tick_timeout) < 0) |
984 | 0 | r = -1; |
985 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
986 | 0 | if (!(bevp->write_suspended & BEV_SUSPEND_BW)) |
987 | 0 | event_del(&bevp->rate_limiting->refill_bucket_event); |
988 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); |
989 | 0 | } |
990 | |
|
991 | 0 | BEV_UNLOCK(bev); |
992 | 0 | return r; |
993 | 0 | } |
994 | | |
995 | | int |
996 | | bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) |
997 | 0 | { |
998 | | /* XXXX this is mostly copy-and-paste from |
999 | | * bufferevent_decrement_read_limit */ |
1000 | 0 | int r = 0; |
1001 | 0 | ev_ssize_t old_limit, new_limit; |
1002 | 0 | struct bufferevent_private *bevp; |
1003 | 0 | BEV_LOCK(bev); |
1004 | 0 | bevp = BEV_UPCAST(bev); |
1005 | 0 | EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); |
1006 | 0 | old_limit = bevp->rate_limiting->limit.write_limit; |
1007 | |
|
1008 | 0 | new_limit = (bevp->rate_limiting->limit.write_limit -= decr); |
1009 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1010 | 0 | bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); |
1011 | 0 | if (event_add(&bevp->rate_limiting->refill_bucket_event, |
1012 | 0 | &bevp->rate_limiting->cfg->tick_timeout) < 0) |
1013 | 0 | r = -1; |
1014 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1015 | 0 | if (!(bevp->read_suspended & BEV_SUSPEND_BW)) |
1016 | 0 | event_del(&bevp->rate_limiting->refill_bucket_event); |
1017 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); |
1018 | 0 | } |
1019 | |
|
1020 | 0 | BEV_UNLOCK(bev); |
1021 | 0 | return r; |
1022 | 0 | } |
1023 | | |
1024 | | int |
1025 | | bufferevent_rate_limit_group_decrement_read( |
1026 | | struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) |
1027 | 0 | { |
1028 | 0 | int r = 0; |
1029 | 0 | ev_ssize_t old_limit, new_limit; |
1030 | 0 | LOCK_GROUP(grp); |
1031 | 0 | old_limit = grp->rate_limit.read_limit; |
1032 | 0 | new_limit = (grp->rate_limit.read_limit -= decr); |
1033 | |
|
1034 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1035 | 0 | bev_group_suspend_reading_(grp); |
1036 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1037 | 0 | bev_group_unsuspend_reading_(grp); |
1038 | 0 | } |
1039 | |
|
1040 | 0 | UNLOCK_GROUP(grp); |
1041 | 0 | return r; |
1042 | 0 | } |
1043 | | |
1044 | | int |
1045 | | bufferevent_rate_limit_group_decrement_write( |
1046 | | struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) |
1047 | 0 | { |
1048 | 0 | int r = 0; |
1049 | 0 | ev_ssize_t old_limit, new_limit; |
1050 | 0 | LOCK_GROUP(grp); |
1051 | 0 | old_limit = grp->rate_limit.write_limit; |
1052 | 0 | new_limit = (grp->rate_limit.write_limit -= decr); |
1053 | |
|
1054 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1055 | 0 | bev_group_suspend_writing_(grp); |
1056 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1057 | 0 | bev_group_unsuspend_writing_(grp); |
1058 | 0 | } |
1059 | |
|
1060 | 0 | UNLOCK_GROUP(grp); |
1061 | 0 | return r; |
1062 | 0 | } |
1063 | | |
1064 | | void |
1065 | | bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, |
1066 | | ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) |
1067 | 0 | { |
1068 | 0 | EVUTIL_ASSERT(grp != NULL); |
1069 | 0 | if (total_read_out) |
1070 | 0 | *total_read_out = grp->total_read; |
1071 | 0 | if (total_written_out) |
1072 | 0 | *total_written_out = grp->total_written; |
1073 | 0 | } |
1074 | | |
1075 | | void |
1076 | | bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) |
1077 | 0 | { |
1078 | 0 | grp->total_read = grp->total_written = 0; |
1079 | 0 | } |
1080 | | |
1081 | | int |
1082 | | bufferevent_ratelim_init_(struct bufferevent_private *bev) |
1083 | 0 | { |
1084 | 0 | bev->rate_limiting = NULL; |
1085 | 0 | bev->max_single_read = MAX_SINGLE_READ_DEFAULT; |
1086 | 0 | bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; |
1087 | |
|
1088 | 0 | return 0; |
1089 | 0 | } |