/src/libevent/bufferevent_ratelim.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson |
3 | | * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> |
4 | | * All rights reserved. |
5 | | * |
6 | | * Redistribution and use in source and binary forms, with or without |
7 | | * modification, are permitted provided that the following conditions |
8 | | * are met: |
9 | | * 1. Redistributions of source code must retain the above copyright |
10 | | * notice, this list of conditions and the following disclaimer. |
11 | | * 2. Redistributions in binary form must reproduce the above copyright |
12 | | * notice, this list of conditions and the following disclaimer in the |
13 | | * documentation and/or other materials provided with the distribution. |
14 | | * 3. The name of the author may not be used to endorse or promote products |
15 | | * derived from this software without specific prior written permission. |
16 | | * |
17 | | * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
18 | | * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
19 | | * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
20 | | * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
21 | | * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
22 | | * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
23 | | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
24 | | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
25 | | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
26 | | * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
27 | | */ |
28 | | #include "evconfig-private.h" |
29 | | |
30 | | #include <sys/types.h> |
31 | | #include <limits.h> |
32 | | #include <string.h> |
33 | | #include <stdlib.h> |
34 | | |
35 | | #include "event2/event.h" |
36 | | #include "event2/event_struct.h" |
37 | | #include "event2/util.h" |
38 | | #include "event2/bufferevent.h" |
39 | | #include "event2/bufferevent_struct.h" |
40 | | #include "event2/buffer.h" |
41 | | |
42 | | #include "ratelim-internal.h" |
43 | | |
44 | | #include "bufferevent-internal.h" |
45 | | #include "mm-internal.h" |
46 | | #include "util-internal.h" |
47 | | #include "event-internal.h" |
48 | | |
49 | | int |
50 | | ev_token_bucket_init_(struct ev_token_bucket *bucket, |
51 | | const struct ev_token_bucket_cfg *cfg, |
52 | | ev_uint32_t current_tick, |
53 | | int reinitialize) |
54 | 0 | { |
55 | 0 | if (reinitialize) { |
56 | | /* on reinitialization, we only clip downwards, since we've |
57 | | already used who-knows-how-much bandwidth this tick. We |
58 | | leave "last_updated" as it is; the next update will add the |
59 | | appropriate amount of bandwidth to the bucket. |
60 | | */ |
61 | 0 | if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) |
62 | 0 | bucket->read_limit = cfg->read_maximum; |
63 | 0 | if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) |
64 | 0 | bucket->write_limit = cfg->write_maximum; |
65 | 0 | } else { |
66 | 0 | bucket->read_limit = cfg->read_rate; |
67 | 0 | bucket->write_limit = cfg->write_rate; |
68 | 0 | bucket->last_updated = current_tick; |
69 | 0 | } |
70 | 0 | return 0; |
71 | 0 | } |
72 | | |
73 | | int |
74 | | ev_token_bucket_update_(struct ev_token_bucket *bucket, |
75 | | const struct ev_token_bucket_cfg *cfg, |
76 | | ev_uint32_t current_tick) |
77 | 0 | { |
78 | | /* It's okay if the tick number overflows, since we'll just |
79 | | * wrap around when we do the unsigned subtraction. */ |
80 | 0 | unsigned n_ticks = current_tick - bucket->last_updated; |
81 | | |
82 | | /* Make sure some ticks actually happened, and that time didn't |
83 | | * roll back. */ |
84 | 0 | if (n_ticks == 0 || n_ticks > INT_MAX) |
85 | 0 | return 0; |
86 | | |
87 | | /* Naively, we would say |
88 | | bucket->limit += n_ticks * cfg->rate; |
89 | | |
90 | | if (bucket->limit > cfg->maximum) |
91 | | bucket->limit = cfg->maximum; |
92 | | |
93 | | But we're worried about overflow, so we do it like this: |
94 | | */ |
95 | | |
96 | 0 | if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) |
97 | 0 | bucket->read_limit = cfg->read_maximum; |
98 | 0 | else |
99 | 0 | bucket->read_limit += n_ticks * cfg->read_rate; |
100 | | |
101 | |
|
102 | 0 | if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) |
103 | 0 | bucket->write_limit = cfg->write_maximum; |
104 | 0 | else |
105 | 0 | bucket->write_limit += n_ticks * cfg->write_rate; |
106 | | |
107 | |
|
108 | 0 | bucket->last_updated = current_tick; |
109 | |
|
110 | 0 | return 1; |
111 | 0 | } |
112 | | |
113 | | static inline void |
114 | | bufferevent_update_buckets(struct bufferevent_private *bev) |
115 | 0 | { |
116 | | /* Must hold lock on bev. */ |
117 | 0 | struct timeval now; |
118 | 0 | unsigned tick; |
119 | 0 | event_base_gettimeofday_cached(bev->bev.ev_base, &now); |
120 | 0 | tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); |
121 | 0 | if (tick != bev->rate_limiting->limit.last_updated) |
122 | 0 | ev_token_bucket_update_(&bev->rate_limiting->limit, |
123 | 0 | bev->rate_limiting->cfg, tick); |
124 | 0 | } |
125 | | |
126 | | ev_uint32_t |
127 | | ev_token_bucket_get_tick_(const struct timeval *tv, |
128 | | const struct ev_token_bucket_cfg *cfg) |
129 | 0 | { |
130 | | /* This computation uses two multiplies and a divide. We could do |
131 | | * fewer if we knew that the tick length was an integer number of |
132 | | * seconds, or if we knew it divided evenly into a second. We should |
133 | | * investigate that more. |
134 | | */ |
135 | | |
136 | | /* We cast to an ev_uint64_t first, since we don't want to overflow |
137 | | * before we do the final divide. */ |
138 | 0 | ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; |
139 | 0 | return (unsigned)(msec / cfg->msec_per_tick); |
140 | 0 | } |
141 | | |
142 | | struct ev_token_bucket_cfg * |
143 | | ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, |
144 | | size_t write_rate, size_t write_burst, |
145 | | const struct timeval *tick_len) |
146 | 0 | { |
147 | 0 | struct ev_token_bucket_cfg *r; |
148 | 0 | struct timeval g; |
149 | 0 | unsigned msec_per_tick; |
150 | |
|
151 | 0 | if (! tick_len) { |
152 | 0 | g.tv_sec = 1; |
153 | 0 | g.tv_usec = 0; |
154 | 0 | tick_len = &g; |
155 | 0 | } |
156 | | |
157 | | /* Avoid possible overflow. |
158 | | * - there is no point in accepting values larger then INT_MAX/1000 anyway |
159 | | * - on windows tv_sec (tv_usec) is long, which is int, which has upper value limit INT_MAX |
160 | | * - and also negative values does not make any sense |
161 | | */ |
162 | 0 | if (tick_len->tv_sec < 0 || tick_len->tv_sec > INT_MAX/1000) |
163 | 0 | return NULL; |
164 | | |
165 | | /* Note, overflow with tv_usec is not possible since tv_sec is limited to |
166 | | * INT_MAX/1000 anyway */ |
167 | 0 | msec_per_tick = (unsigned)(tick_len->tv_sec * 1000) + |
168 | 0 | (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; |
169 | 0 | if (!msec_per_tick) |
170 | 0 | return NULL; |
171 | | |
172 | 0 | if (read_rate > read_burst || write_rate > write_burst || |
173 | 0 | read_rate < 1 || write_rate < 1) |
174 | 0 | return NULL; |
175 | 0 | if (read_rate > EV_RATE_LIMIT_MAX || |
176 | 0 | write_rate > EV_RATE_LIMIT_MAX || |
177 | 0 | read_burst > EV_RATE_LIMIT_MAX || |
178 | 0 | write_burst > EV_RATE_LIMIT_MAX) |
179 | 0 | return NULL; |
180 | 0 | r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); |
181 | 0 | if (!r) |
182 | 0 | return NULL; |
183 | 0 | r->read_rate = read_rate; |
184 | 0 | r->write_rate = write_rate; |
185 | 0 | r->read_maximum = read_burst; |
186 | 0 | r->write_maximum = write_burst; |
187 | 0 | memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); |
188 | 0 | r->msec_per_tick = msec_per_tick; |
189 | 0 | return r; |
190 | 0 | } |
191 | | |
192 | | void |
193 | | ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) |
194 | 0 | { |
195 | 0 | mm_free(cfg); |
196 | 0 | } |
197 | | |
198 | | /* Default values for max_single_read & max_single_write variables. */ |
199 | 0 | #define MAX_SINGLE_READ_DEFAULT 16384 |
200 | 0 | #define MAX_SINGLE_WRITE_DEFAULT 16384 |
201 | | |
202 | 0 | #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) |
203 | 0 | #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) |
204 | | |
205 | | static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); |
206 | | static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); |
207 | | static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); |
208 | | static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); |
209 | | |
210 | | /** Helper: figure out the maximum amount we should write if is_write, or |
211 | | the maximum amount we should read if is_read. Return that maximum, or |
212 | | 0 if our bucket is wholly exhausted. |
213 | | */ |
214 | | static inline ev_ssize_t |
215 | | bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) |
216 | 0 | { |
217 | | /* needs lock on bev. */ |
218 | 0 | ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; |
219 | |
|
220 | 0 | #define LIM(x) \ |
221 | 0 | (is_write ? (x).write_limit : (x).read_limit) |
222 | |
|
223 | 0 | #define GROUP_SUSPENDED(g) \ |
224 | 0 | (is_write ? (g)->write_suspended : (g)->read_suspended) |
225 | | |
226 | | /* Sets max_so_far to MIN(x, max_so_far) */ |
227 | 0 | #define CLAMPTO(x) \ |
228 | 0 | do { \ |
229 | 0 | if (max_so_far > (x)) \ |
230 | 0 | max_so_far = (x); \ |
231 | 0 | } while (0); |
232 | |
|
233 | 0 | if (!bev->rate_limiting) |
234 | 0 | return max_so_far; |
235 | | |
236 | | /* If rate-limiting is enabled at all, update the appropriate |
237 | | bucket, and take the smaller of our rate limit and the group |
238 | | rate limit. |
239 | | */ |
240 | | |
241 | 0 | if (bev->rate_limiting->cfg) { |
242 | 0 | bufferevent_update_buckets(bev); |
243 | 0 | max_so_far = LIM(bev->rate_limiting->limit); |
244 | 0 | } |
245 | 0 | if (bev->rate_limiting->group) { |
246 | 0 | struct bufferevent_rate_limit_group *g = |
247 | 0 | bev->rate_limiting->group; |
248 | 0 | ev_ssize_t share; |
249 | 0 | LOCK_GROUP(g); |
250 | 0 | if (GROUP_SUSPENDED(g)) { |
251 | | /* We can get here if we failed to lock this |
252 | | * particular bufferevent while suspending the whole |
253 | | * group. */ |
254 | 0 | if (is_write) |
255 | 0 | bufferevent_suspend_write_(&bev->bev, |
256 | 0 | BEV_SUSPEND_BW_GROUP); |
257 | 0 | else |
258 | 0 | bufferevent_suspend_read_(&bev->bev, |
259 | 0 | BEV_SUSPEND_BW_GROUP); |
260 | 0 | share = 0; |
261 | 0 | } else { |
262 | | /* XXXX probably we should divide among the active |
263 | | * members, not the total members. */ |
264 | 0 | share = LIM(g->rate_limit) / g->n_members; |
265 | 0 | if (share < g->min_share) |
266 | 0 | share = g->min_share; |
267 | 0 | } |
268 | 0 | UNLOCK_GROUP(g); |
269 | 0 | CLAMPTO(share); |
270 | 0 | } |
271 | |
|
272 | 0 | if (max_so_far < 0) |
273 | 0 | max_so_far = 0; |
274 | 0 | return max_so_far; |
275 | 0 | } |
276 | | |
277 | | ev_ssize_t |
278 | | bufferevent_get_read_max_(struct bufferevent_private *bev) |
279 | 0 | { |
280 | 0 | return bufferevent_get_rlim_max_(bev, 0); |
281 | 0 | } |
282 | | |
283 | | ev_ssize_t |
284 | | bufferevent_get_write_max_(struct bufferevent_private *bev) |
285 | 0 | { |
286 | 0 | return bufferevent_get_rlim_max_(bev, 1); |
287 | 0 | } |
288 | | |
289 | | int |
290 | | bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) |
291 | 0 | { |
292 | | /* XXXXX Make sure all users of this function check its return value */ |
293 | 0 | int r = 0; |
294 | | /* need to hold lock on bev */ |
295 | 0 | if (!bev->rate_limiting) |
296 | 0 | return 0; |
297 | | |
298 | 0 | if (bev->rate_limiting->cfg) { |
299 | 0 | bev->rate_limiting->limit.read_limit -= bytes; |
300 | 0 | if (bev->rate_limiting->limit.read_limit <= 0) { |
301 | 0 | bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); |
302 | 0 | if (event_add(&bev->rate_limiting->refill_bucket_event, |
303 | 0 | &bev->rate_limiting->cfg->tick_timeout) < 0) |
304 | 0 | r = -1; |
305 | 0 | } else if (bev->read_suspended & BEV_SUSPEND_BW) { |
306 | 0 | if (!(bev->write_suspended & BEV_SUSPEND_BW)) |
307 | 0 | event_del(&bev->rate_limiting->refill_bucket_event); |
308 | 0 | bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); |
309 | 0 | } |
310 | 0 | } |
311 | |
|
312 | 0 | if (bev->rate_limiting->group) { |
313 | 0 | LOCK_GROUP(bev->rate_limiting->group); |
314 | 0 | bev->rate_limiting->group->rate_limit.read_limit -= bytes; |
315 | 0 | bev->rate_limiting->group->total_read += bytes; |
316 | 0 | if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { |
317 | 0 | bev_group_suspend_reading_(bev->rate_limiting->group); |
318 | 0 | } else if (bev->rate_limiting->group->read_suspended) { |
319 | 0 | bev_group_unsuspend_reading_(bev->rate_limiting->group); |
320 | 0 | } |
321 | 0 | UNLOCK_GROUP(bev->rate_limiting->group); |
322 | 0 | } |
323 | |
|
324 | 0 | return r; |
325 | 0 | } |
326 | | |
327 | | int |
328 | | bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) |
329 | 0 | { |
330 | | /* XXXXX Make sure all users of this function check its return value */ |
331 | 0 | int r = 0; |
332 | | /* need to hold lock */ |
333 | 0 | if (!bev->rate_limiting) |
334 | 0 | return 0; |
335 | | |
336 | 0 | if (bev->rate_limiting->cfg) { |
337 | 0 | bev->rate_limiting->limit.write_limit -= bytes; |
338 | 0 | if (bev->rate_limiting->limit.write_limit <= 0) { |
339 | 0 | bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); |
340 | 0 | if (event_add(&bev->rate_limiting->refill_bucket_event, |
341 | 0 | &bev->rate_limiting->cfg->tick_timeout) < 0) |
342 | 0 | r = -1; |
343 | 0 | } else if (bev->write_suspended & BEV_SUSPEND_BW) { |
344 | 0 | if (!(bev->read_suspended & BEV_SUSPEND_BW)) |
345 | 0 | event_del(&bev->rate_limiting->refill_bucket_event); |
346 | 0 | bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); |
347 | 0 | } |
348 | 0 | } |
349 | |
|
350 | 0 | if (bev->rate_limiting->group) { |
351 | 0 | LOCK_GROUP(bev->rate_limiting->group); |
352 | 0 | bev->rate_limiting->group->rate_limit.write_limit -= bytes; |
353 | 0 | bev->rate_limiting->group->total_written += bytes; |
354 | 0 | if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { |
355 | 0 | bev_group_suspend_writing_(bev->rate_limiting->group); |
356 | 0 | } else if (bev->rate_limiting->group->write_suspended) { |
357 | 0 | bev_group_unsuspend_writing_(bev->rate_limiting->group); |
358 | 0 | } |
359 | 0 | UNLOCK_GROUP(bev->rate_limiting->group); |
360 | 0 | } |
361 | |
|
362 | 0 | return r; |
363 | 0 | } |
364 | | |
365 | | /** Stop reading on every bufferevent in <b>g</b> */ |
366 | | static int |
367 | | bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) |
368 | 0 | { |
369 | | /* Needs group lock */ |
370 | 0 | struct bufferevent_private *bev; |
371 | 0 | g->read_suspended = 1; |
372 | 0 | g->pending_unsuspend_read = 0; |
373 | | |
374 | | /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, |
375 | | to prevent a deadlock. (Ordinarily, the group lock nests inside |
376 | | the bufferevent locks. If we are unable to lock any individual |
377 | | bufferevent, it will find out later when it looks at its limit |
378 | | and sees that its group is suspended.) |
379 | | */ |
380 | 0 | LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { |
381 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
382 | 0 | bufferevent_suspend_read_(&bev->bev, |
383 | 0 | BEV_SUSPEND_BW_GROUP); |
384 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
385 | 0 | } |
386 | 0 | } |
387 | 0 | return 0; |
388 | 0 | } |
389 | | |
390 | | /** Stop writing on every bufferevent in <b>g</b> */ |
391 | | static int |
392 | | bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) |
393 | 0 | { |
394 | | /* Needs group lock */ |
395 | 0 | struct bufferevent_private *bev; |
396 | 0 | g->write_suspended = 1; |
397 | 0 | g->pending_unsuspend_write = 0; |
398 | 0 | LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { |
399 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
400 | 0 | bufferevent_suspend_write_(&bev->bev, |
401 | 0 | BEV_SUSPEND_BW_GROUP); |
402 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
403 | 0 | } |
404 | 0 | } |
405 | 0 | return 0; |
406 | 0 | } |
407 | | |
408 | | /** Timer callback invoked on a single bufferevent with one or more exhausted |
409 | | buckets when they are ready to refill. */ |
410 | | static void |
411 | | bev_refill_callback_(evutil_socket_t fd, short what, void *arg) |
412 | 0 | { |
413 | 0 | unsigned tick; |
414 | 0 | struct timeval now; |
415 | 0 | struct bufferevent_private *bev = arg; |
416 | 0 | int again = 0; |
417 | 0 | BEV_LOCK(&bev->bev); |
418 | 0 | if (!bev->rate_limiting || !bev->rate_limiting->cfg) { |
419 | 0 | BEV_UNLOCK(&bev->bev); |
420 | 0 | return; |
421 | 0 | } |
422 | | |
423 | | /* First, update the bucket */ |
424 | 0 | event_base_gettimeofday_cached(bev->bev.ev_base, &now); |
425 | 0 | tick = ev_token_bucket_get_tick_(&now, |
426 | 0 | bev->rate_limiting->cfg); |
427 | 0 | ev_token_bucket_update_(&bev->rate_limiting->limit, |
428 | 0 | bev->rate_limiting->cfg, |
429 | 0 | tick); |
430 | | |
431 | | /* Now unsuspend any read/write operations as appropriate. */ |
432 | 0 | if ((bev->read_suspended & BEV_SUSPEND_BW)) { |
433 | 0 | if (bev->rate_limiting->limit.read_limit > 0) |
434 | 0 | bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); |
435 | 0 | else |
436 | 0 | again = 1; |
437 | 0 | } |
438 | 0 | if ((bev->write_suspended & BEV_SUSPEND_BW)) { |
439 | 0 | if (bev->rate_limiting->limit.write_limit > 0) |
440 | 0 | bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); |
441 | 0 | else |
442 | 0 | again = 1; |
443 | 0 | } |
444 | 0 | if (again) { |
445 | | /* One or more of the buckets may need another refill if they |
446 | | started negative. |
447 | | |
448 | | XXXX if we need to be quiet for more ticks, we should |
449 | | maybe figure out what timeout we really want. |
450 | | */ |
451 | | /* XXXX Handle event_add failure somehow */ |
452 | 0 | event_add(&bev->rate_limiting->refill_bucket_event, |
453 | 0 | &bev->rate_limiting->cfg->tick_timeout); |
454 | 0 | } |
455 | 0 | BEV_UNLOCK(&bev->bev); |
456 | 0 | } |
457 | | |
458 | | /** Helper: grab a random element from a bufferevent group. |
459 | | * |
460 | | * Requires that we hold the lock on the group. |
461 | | */ |
462 | | static struct bufferevent_private * |
463 | | bev_group_random_element_(struct bufferevent_rate_limit_group *group) |
464 | 0 | { |
465 | 0 | int which; |
466 | 0 | struct bufferevent_private *bev; |
467 | | |
468 | | /* requires group lock */ |
469 | |
|
470 | 0 | if (!group->n_members) |
471 | 0 | return NULL; |
472 | | |
473 | 0 | EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); |
474 | |
|
475 | 0 | which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); |
476 | |
|
477 | 0 | bev = LIST_FIRST(&group->members); |
478 | 0 | while (which--) |
479 | 0 | bev = LIST_NEXT(bev, rate_limiting->next_in_group); |
480 | |
|
481 | 0 | return bev; |
482 | 0 | } |
483 | | |
484 | | /** Iterate over the elements of a rate-limiting group 'g' with a random |
485 | | starting point, assigning each to the variable 'bev', and executing the |
486 | | block 'block'. |
487 | | |
488 | | We do this in a half-baked effort to get fairness among group members. |
489 | | XXX Round-robin or some kind of priority queue would be even more fair. |
490 | | */ |
491 | | #define FOREACH_RANDOM_ORDER(block) \ |
492 | 0 | do { \ |
493 | 0 | first = bev_group_random_element_(g); \ |
494 | 0 | for (bev = first; bev != LIST_END(&g->members); \ |
495 | 0 | bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ |
496 | 0 | block ; \ |
497 | 0 | } \ |
498 | 0 | for (bev = LIST_FIRST(&g->members); bev && bev != first; \ |
499 | 0 | bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ |
500 | 0 | block ; \ |
501 | 0 | } \ |
502 | 0 | } while (0) |
503 | | |
504 | | static void |
505 | | bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) |
506 | 0 | { |
507 | 0 | int again = 0; |
508 | 0 | struct bufferevent_private *bev, *first; |
509 | |
|
510 | 0 | g->read_suspended = 0; |
511 | 0 | FOREACH_RANDOM_ORDER({ |
512 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
513 | 0 | bufferevent_unsuspend_read_(&bev->bev, |
514 | 0 | BEV_SUSPEND_BW_GROUP); |
515 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
516 | 0 | } else { |
517 | 0 | again = 1; |
518 | 0 | } |
519 | 0 | }); |
520 | 0 | g->pending_unsuspend_read = again; |
521 | 0 | } |
522 | | |
523 | | static void |
524 | | bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) |
525 | 0 | { |
526 | 0 | int again = 0; |
527 | 0 | struct bufferevent_private *bev, *first; |
528 | 0 | g->write_suspended = 0; |
529 | |
|
530 | 0 | FOREACH_RANDOM_ORDER({ |
531 | 0 | if (EVLOCK_TRY_LOCK_(bev->lock)) { |
532 | 0 | bufferevent_unsuspend_write_(&bev->bev, |
533 | 0 | BEV_SUSPEND_BW_GROUP); |
534 | 0 | EVLOCK_UNLOCK(bev->lock, 0); |
535 | 0 | } else { |
536 | 0 | again = 1; |
537 | 0 | } |
538 | 0 | }); |
539 | 0 | g->pending_unsuspend_write = again; |
540 | 0 | } |
541 | | |
542 | | /** Callback invoked every tick to add more elements to the group bucket |
543 | | and unsuspend group members as needed. |
544 | | */ |
545 | | static void |
546 | | bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) |
547 | 0 | { |
548 | 0 | struct bufferevent_rate_limit_group *g = arg; |
549 | 0 | unsigned tick; |
550 | 0 | struct timeval now; |
551 | |
|
552 | 0 | event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); |
553 | |
|
554 | 0 | LOCK_GROUP(g); |
555 | |
|
556 | 0 | tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); |
557 | 0 | ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); |
558 | |
|
559 | 0 | if (g->pending_unsuspend_read || |
560 | 0 | (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { |
561 | 0 | bev_group_unsuspend_reading_(g); |
562 | 0 | } |
563 | 0 | if (g->pending_unsuspend_write || |
564 | 0 | (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ |
565 | 0 | bev_group_unsuspend_writing_(g); |
566 | 0 | } |
567 | | |
568 | | /* XXXX Rather than waiting to the next tick to unsuspend stuff |
569 | | * with pending_unsuspend_write/read, we should do it on the |
570 | | * next iteration of the mainloop. |
571 | | */ |
572 | |
|
573 | 0 | UNLOCK_GROUP(g); |
574 | 0 | } |
575 | | |
576 | | int |
577 | | bufferevent_set_rate_limit(struct bufferevent *bev, |
578 | | struct ev_token_bucket_cfg *cfg) |
579 | 0 | { |
580 | 0 | struct bufferevent_private *bevp = BEV_UPCAST(bev); |
581 | 0 | int r = -1; |
582 | 0 | struct bufferevent_rate_limit *rlim; |
583 | 0 | struct timeval now; |
584 | 0 | ev_uint32_t tick; |
585 | 0 | int reinit = 0, suspended = 0; |
586 | | /* XXX reference-count cfg */ |
587 | |
|
588 | 0 | BEV_LOCK(bev); |
589 | |
|
590 | 0 | if (cfg == NULL) { |
591 | 0 | if (bevp->rate_limiting) { |
592 | 0 | rlim = bevp->rate_limiting; |
593 | 0 | rlim->cfg = NULL; |
594 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); |
595 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); |
596 | 0 | if (event_initialized(&rlim->refill_bucket_event)) |
597 | 0 | event_del(&rlim->refill_bucket_event); |
598 | 0 | } |
599 | 0 | r = 0; |
600 | 0 | goto done; |
601 | 0 | } |
602 | | |
603 | 0 | event_base_gettimeofday_cached(bev->ev_base, &now); |
604 | 0 | tick = ev_token_bucket_get_tick_(&now, cfg); |
605 | |
|
606 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { |
607 | | /* no-op */ |
608 | 0 | r = 0; |
609 | 0 | goto done; |
610 | 0 | } |
611 | 0 | if (bevp->rate_limiting == NULL) { |
612 | 0 | rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); |
613 | 0 | if (!rlim) |
614 | 0 | goto done; |
615 | 0 | bevp->rate_limiting = rlim; |
616 | 0 | } else { |
617 | 0 | rlim = bevp->rate_limiting; |
618 | 0 | } |
619 | 0 | reinit = rlim->cfg != NULL; |
620 | |
|
621 | 0 | rlim->cfg = cfg; |
622 | 0 | ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); |
623 | |
|
624 | 0 | if (reinit) { |
625 | 0 | EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); |
626 | 0 | event_del(&rlim->refill_bucket_event); |
627 | 0 | } |
628 | 0 | event_assign(&rlim->refill_bucket_event, bev->ev_base, |
629 | 0 | -1, EV_FINALIZE, bev_refill_callback_, bevp); |
630 | |
|
631 | 0 | if (rlim->limit.read_limit > 0) { |
632 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); |
633 | 0 | } else { |
634 | 0 | bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); |
635 | 0 | suspended=1; |
636 | 0 | } |
637 | 0 | if (rlim->limit.write_limit > 0) { |
638 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); |
639 | 0 | } else { |
640 | 0 | bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); |
641 | 0 | suspended = 1; |
642 | 0 | } |
643 | |
|
644 | 0 | if (suspended) |
645 | 0 | event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); |
646 | |
|
647 | 0 | r = 0; |
648 | |
|
649 | 0 | done: |
650 | 0 | BEV_UNLOCK(bev); |
651 | 0 | return r; |
652 | 0 | } |
653 | | |
654 | | struct bufferevent_rate_limit_group * |
655 | | bufferevent_rate_limit_group_new(struct event_base *base, |
656 | | const struct ev_token_bucket_cfg *cfg) |
657 | 0 | { |
658 | 0 | struct bufferevent_rate_limit_group *g; |
659 | 0 | struct timeval now; |
660 | 0 | ev_uint32_t tick; |
661 | |
|
662 | 0 | event_base_gettimeofday_cached(base, &now); |
663 | 0 | tick = ev_token_bucket_get_tick_(&now, cfg); |
664 | |
|
665 | 0 | g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); |
666 | 0 | if (!g) |
667 | 0 | return NULL; |
668 | 0 | memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); |
669 | 0 | LIST_INIT(&g->members); |
670 | |
|
671 | 0 | ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); |
672 | |
|
673 | 0 | event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, |
674 | 0 | bev_group_refill_callback_, g); |
675 | | /*XXXX handle event_add failure */ |
676 | 0 | event_add(&g->master_refill_event, &cfg->tick_timeout); |
677 | |
|
678 | 0 | EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
679 | |
|
680 | 0 | bufferevent_rate_limit_group_set_min_share(g, 64); |
681 | |
|
682 | 0 | evutil_weakrand_seed_(&g->weakrand_seed, |
683 | 0 | (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); |
684 | |
|
685 | 0 | return g; |
686 | 0 | } |
687 | | |
688 | | int |
689 | | bufferevent_rate_limit_group_set_cfg( |
690 | | struct bufferevent_rate_limit_group *g, |
691 | | const struct ev_token_bucket_cfg *cfg) |
692 | 0 | { |
693 | 0 | int same_tick; |
694 | 0 | if (!g || !cfg) |
695 | 0 | return -1; |
696 | | |
697 | 0 | LOCK_GROUP(g); |
698 | 0 | same_tick = evutil_timercmp( |
699 | 0 | &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); |
700 | 0 | memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); |
701 | |
|
702 | 0 | if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) |
703 | 0 | g->rate_limit.read_limit = cfg->read_maximum; |
704 | 0 | if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) |
705 | 0 | g->rate_limit.write_limit = cfg->write_maximum; |
706 | |
|
707 | 0 | if (!same_tick) { |
708 | | /* This can cause a hiccup in the schedule */ |
709 | 0 | event_add(&g->master_refill_event, &cfg->tick_timeout); |
710 | 0 | } |
711 | | |
712 | | /* The new limits might force us to adjust min_share differently. */ |
713 | 0 | bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); |
714 | |
|
715 | 0 | UNLOCK_GROUP(g); |
716 | 0 | return 0; |
717 | 0 | } |
718 | | |
719 | | int |
720 | | bufferevent_rate_limit_group_set_min_share( |
721 | | struct bufferevent_rate_limit_group *g, |
722 | | size_t share) |
723 | 0 | { |
724 | 0 | if (share > EV_SSIZE_MAX) |
725 | 0 | return -1; |
726 | | |
727 | 0 | g->configured_min_share = share; |
728 | | |
729 | | /* Can't set share to less than the one-tick maximum. IOW, at steady |
730 | | * state, at least one connection can go per tick. */ |
731 | 0 | if (share > g->rate_limit_cfg.read_rate) |
732 | 0 | share = g->rate_limit_cfg.read_rate; |
733 | 0 | if (share > g->rate_limit_cfg.write_rate) |
734 | 0 | share = g->rate_limit_cfg.write_rate; |
735 | |
|
736 | 0 | g->min_share = share; |
737 | 0 | return 0; |
738 | 0 | } |
739 | | |
740 | | void |
741 | | bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) |
742 | 0 | { |
743 | 0 | LOCK_GROUP(g); |
744 | 0 | EVUTIL_ASSERT(0 == g->n_members); |
745 | 0 | event_del(&g->master_refill_event); |
746 | 0 | UNLOCK_GROUP(g); |
747 | 0 | EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
748 | 0 | mm_free(g); |
749 | 0 | } |
750 | | |
751 | | int |
752 | | bufferevent_add_to_rate_limit_group(struct bufferevent *bev, |
753 | | struct bufferevent_rate_limit_group *g) |
754 | 0 | { |
755 | 0 | int wsuspend, rsuspend; |
756 | 0 | struct bufferevent_private *bevp = BEV_UPCAST(bev); |
757 | 0 | BEV_LOCK(bev); |
758 | |
|
759 | 0 | if (!bevp->rate_limiting) { |
760 | 0 | struct bufferevent_rate_limit *rlim; |
761 | 0 | rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); |
762 | 0 | if (!rlim) { |
763 | 0 | BEV_UNLOCK(bev); |
764 | 0 | return -1; |
765 | 0 | } |
766 | 0 | event_assign(&rlim->refill_bucket_event, bev->ev_base, |
767 | 0 | -1, EV_FINALIZE, bev_refill_callback_, bevp); |
768 | 0 | bevp->rate_limiting = rlim; |
769 | 0 | } |
770 | | |
771 | 0 | if (bevp->rate_limiting->group == g) { |
772 | 0 | BEV_UNLOCK(bev); |
773 | 0 | return 0; |
774 | 0 | } |
775 | 0 | if (bevp->rate_limiting->group) |
776 | 0 | bufferevent_remove_from_rate_limit_group(bev); |
777 | |
|
778 | 0 | LOCK_GROUP(g); |
779 | 0 | bevp->rate_limiting->group = g; |
780 | 0 | ++g->n_members; |
781 | 0 | LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); |
782 | |
|
783 | 0 | rsuspend = g->read_suspended; |
784 | 0 | wsuspend = g->write_suspended; |
785 | |
|
786 | 0 | UNLOCK_GROUP(g); |
787 | |
|
788 | 0 | if (rsuspend) |
789 | 0 | bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); |
790 | 0 | if (wsuspend) |
791 | 0 | bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); |
792 | |
|
793 | 0 | BEV_UNLOCK(bev); |
794 | 0 | return 0; |
795 | 0 | } |
796 | | |
797 | | int |
798 | | bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) |
799 | 0 | { |
800 | 0 | return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); |
801 | 0 | } |
802 | | |
803 | | int |
804 | | bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, |
805 | | int unsuspend) |
806 | 0 | { |
807 | 0 | struct bufferevent_private *bevp = BEV_UPCAST(bev); |
808 | 0 | BEV_LOCK(bev); |
809 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->group) { |
810 | 0 | struct bufferevent_rate_limit_group *g = |
811 | 0 | bevp->rate_limiting->group; |
812 | 0 | LOCK_GROUP(g); |
813 | 0 | bevp->rate_limiting->group = NULL; |
814 | 0 | --g->n_members; |
815 | 0 | LIST_REMOVE(bevp, rate_limiting->next_in_group); |
816 | 0 | UNLOCK_GROUP(g); |
817 | 0 | } |
818 | 0 | if (unsuspend) { |
819 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); |
820 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); |
821 | 0 | } |
822 | 0 | BEV_UNLOCK(bev); |
823 | 0 | return 0; |
824 | 0 | } |
825 | | |
826 | | /* === |
827 | | * API functions to expose rate limits. |
828 | | * |
829 | | * Don't use these from inside Libevent; they're meant to be for use by |
830 | | * the program. |
831 | | * === */ |
832 | | |
833 | | /* Mostly you don't want to use this function from inside libevent; |
834 | | * bufferevent_get_read_max_() is more likely what you want*/ |
835 | | ev_ssize_t |
836 | | bufferevent_get_read_limit(struct bufferevent *bev) |
837 | 0 | { |
838 | 0 | ev_ssize_t r; |
839 | 0 | struct bufferevent_private *bevp; |
840 | 0 | BEV_LOCK(bev); |
841 | 0 | bevp = BEV_UPCAST(bev); |
842 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->cfg) { |
843 | 0 | bufferevent_update_buckets(bevp); |
844 | 0 | r = bevp->rate_limiting->limit.read_limit; |
845 | 0 | } else { |
846 | 0 | r = EV_SSIZE_MAX; |
847 | 0 | } |
848 | 0 | BEV_UNLOCK(bev); |
849 | 0 | return r; |
850 | 0 | } |
851 | | |
852 | | /* Mostly you don't want to use this function from inside libevent; |
853 | | * bufferevent_get_write_max_() is more likely what you want*/ |
854 | | ev_ssize_t |
855 | | bufferevent_get_write_limit(struct bufferevent *bev) |
856 | 0 | { |
857 | 0 | ev_ssize_t r; |
858 | 0 | struct bufferevent_private *bevp; |
859 | 0 | BEV_LOCK(bev); |
860 | 0 | bevp = BEV_UPCAST(bev); |
861 | 0 | if (bevp->rate_limiting && bevp->rate_limiting->cfg) { |
862 | 0 | bufferevent_update_buckets(bevp); |
863 | 0 | r = bevp->rate_limiting->limit.write_limit; |
864 | 0 | } else { |
865 | 0 | r = EV_SSIZE_MAX; |
866 | 0 | } |
867 | 0 | BEV_UNLOCK(bev); |
868 | 0 | return r; |
869 | 0 | } |
870 | | |
871 | | int |
872 | | bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) |
873 | 0 | { |
874 | 0 | struct bufferevent_private *bevp; |
875 | 0 | int ret = 0; |
876 | 0 | BEV_LOCK(bev); |
877 | 0 | bevp = BEV_UPCAST(bev); |
878 | 0 | if (size == 0 || size > EV_SSIZE_MAX) |
879 | 0 | bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; |
880 | 0 | else |
881 | 0 | bevp->max_single_read = size; |
882 | 0 | ret = evbuffer_set_max_read(bev->input, bevp->max_single_read); |
883 | 0 | BEV_UNLOCK(bev); |
884 | 0 | return ret; |
885 | 0 | } |
886 | | |
887 | | int |
888 | | bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) |
889 | 0 | { |
890 | 0 | struct bufferevent_private *bevp; |
891 | 0 | BEV_LOCK(bev); |
892 | 0 | bevp = BEV_UPCAST(bev); |
893 | 0 | if (size == 0 || size > EV_SSIZE_MAX) |
894 | 0 | bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; |
895 | 0 | else |
896 | 0 | bevp->max_single_write = size; |
897 | 0 | BEV_UNLOCK(bev); |
898 | 0 | return 0; |
899 | 0 | } |
900 | | |
901 | | ev_ssize_t |
902 | | bufferevent_get_max_single_read(struct bufferevent *bev) |
903 | 0 | { |
904 | 0 | ev_ssize_t r; |
905 | |
|
906 | 0 | BEV_LOCK(bev); |
907 | 0 | r = BEV_UPCAST(bev)->max_single_read; |
908 | 0 | BEV_UNLOCK(bev); |
909 | 0 | return r; |
910 | 0 | } |
911 | | |
912 | | ev_ssize_t |
913 | | bufferevent_get_max_single_write(struct bufferevent *bev) |
914 | 0 | { |
915 | 0 | ev_ssize_t r; |
916 | |
|
917 | 0 | BEV_LOCK(bev); |
918 | 0 | r = BEV_UPCAST(bev)->max_single_write; |
919 | 0 | BEV_UNLOCK(bev); |
920 | 0 | return r; |
921 | 0 | } |
922 | | |
923 | | ev_ssize_t |
924 | | bufferevent_get_max_to_read(struct bufferevent *bev) |
925 | 0 | { |
926 | 0 | ev_ssize_t r; |
927 | 0 | BEV_LOCK(bev); |
928 | 0 | r = bufferevent_get_read_max_(BEV_UPCAST(bev)); |
929 | 0 | BEV_UNLOCK(bev); |
930 | 0 | return r; |
931 | 0 | } |
932 | | |
933 | | ev_ssize_t |
934 | | bufferevent_get_max_to_write(struct bufferevent *bev) |
935 | 0 | { |
936 | 0 | ev_ssize_t r; |
937 | 0 | BEV_LOCK(bev); |
938 | 0 | r = bufferevent_get_write_max_(BEV_UPCAST(bev)); |
939 | 0 | BEV_UNLOCK(bev); |
940 | 0 | return r; |
941 | 0 | } |
942 | | |
943 | | const struct ev_token_bucket_cfg * |
944 | 0 | bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { |
945 | 0 | struct bufferevent_private *bufev_private = BEV_UPCAST(bev); |
946 | 0 | struct ev_token_bucket_cfg *cfg; |
947 | |
|
948 | 0 | BEV_LOCK(bev); |
949 | |
|
950 | 0 | if (bufev_private->rate_limiting) { |
951 | 0 | cfg = bufev_private->rate_limiting->cfg; |
952 | 0 | } else { |
953 | 0 | cfg = NULL; |
954 | 0 | } |
955 | |
|
956 | 0 | BEV_UNLOCK(bev); |
957 | |
|
958 | 0 | return cfg; |
959 | 0 | } |
960 | | |
961 | | /* Mostly you don't want to use this function from inside libevent; |
962 | | * bufferevent_get_read_max_() is more likely what you want*/ |
963 | | ev_ssize_t |
964 | | bufferevent_rate_limit_group_get_read_limit( |
965 | | struct bufferevent_rate_limit_group *grp) |
966 | 0 | { |
967 | 0 | ev_ssize_t r; |
968 | 0 | LOCK_GROUP(grp); |
969 | 0 | r = grp->rate_limit.read_limit; |
970 | 0 | UNLOCK_GROUP(grp); |
971 | 0 | return r; |
972 | 0 | } |
973 | | |
974 | | /* Mostly you don't want to use this function from inside libevent; |
975 | | * bufferevent_get_write_max_() is more likely what you want. */ |
976 | | ev_ssize_t |
977 | | bufferevent_rate_limit_group_get_write_limit( |
978 | | struct bufferevent_rate_limit_group *grp) |
979 | 0 | { |
980 | 0 | ev_ssize_t r; |
981 | 0 | LOCK_GROUP(grp); |
982 | 0 | r = grp->rate_limit.write_limit; |
983 | 0 | UNLOCK_GROUP(grp); |
984 | 0 | return r; |
985 | 0 | } |
986 | | |
987 | | int |
988 | | bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) |
989 | 0 | { |
990 | 0 | int r = 0; |
991 | 0 | ev_ssize_t old_limit, new_limit; |
992 | 0 | struct bufferevent_private *bevp; |
993 | 0 | BEV_LOCK(bev); |
994 | 0 | bevp = BEV_UPCAST(bev); |
995 | 0 | EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); |
996 | 0 | old_limit = bevp->rate_limiting->limit.read_limit; |
997 | |
|
998 | 0 | new_limit = (bevp->rate_limiting->limit.read_limit -= decr); |
999 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1000 | 0 | bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); |
1001 | 0 | if (event_add(&bevp->rate_limiting->refill_bucket_event, |
1002 | 0 | &bevp->rate_limiting->cfg->tick_timeout) < 0) |
1003 | 0 | r = -1; |
1004 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1005 | 0 | if (!(bevp->write_suspended & BEV_SUSPEND_BW)) |
1006 | 0 | event_del(&bevp->rate_limiting->refill_bucket_event); |
1007 | 0 | bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); |
1008 | 0 | } |
1009 | |
|
1010 | 0 | BEV_UNLOCK(bev); |
1011 | 0 | return r; |
1012 | 0 | } |
1013 | | |
1014 | | int |
1015 | | bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) |
1016 | 0 | { |
1017 | | /* XXXX this is mostly copy-and-paste from |
1018 | | * bufferevent_decrement_read_limit */ |
1019 | 0 | int r = 0; |
1020 | 0 | ev_ssize_t old_limit, new_limit; |
1021 | 0 | struct bufferevent_private *bevp; |
1022 | 0 | BEV_LOCK(bev); |
1023 | 0 | bevp = BEV_UPCAST(bev); |
1024 | 0 | EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); |
1025 | 0 | old_limit = bevp->rate_limiting->limit.write_limit; |
1026 | |
|
1027 | 0 | new_limit = (bevp->rate_limiting->limit.write_limit -= decr); |
1028 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1029 | 0 | bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); |
1030 | 0 | if (event_add(&bevp->rate_limiting->refill_bucket_event, |
1031 | 0 | &bevp->rate_limiting->cfg->tick_timeout) < 0) |
1032 | 0 | r = -1; |
1033 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1034 | 0 | if (!(bevp->read_suspended & BEV_SUSPEND_BW)) |
1035 | 0 | event_del(&bevp->rate_limiting->refill_bucket_event); |
1036 | 0 | bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); |
1037 | 0 | } |
1038 | |
|
1039 | 0 | BEV_UNLOCK(bev); |
1040 | 0 | return r; |
1041 | 0 | } |
1042 | | |
1043 | | int |
1044 | | bufferevent_rate_limit_group_decrement_read( |
1045 | | struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) |
1046 | 0 | { |
1047 | 0 | int r = 0; |
1048 | 0 | ev_ssize_t old_limit, new_limit; |
1049 | 0 | LOCK_GROUP(grp); |
1050 | 0 | old_limit = grp->rate_limit.read_limit; |
1051 | 0 | new_limit = (grp->rate_limit.read_limit -= decr); |
1052 | |
|
1053 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1054 | 0 | bev_group_suspend_reading_(grp); |
1055 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1056 | 0 | bev_group_unsuspend_reading_(grp); |
1057 | 0 | } |
1058 | |
|
1059 | 0 | UNLOCK_GROUP(grp); |
1060 | 0 | return r; |
1061 | 0 | } |
1062 | | |
1063 | | int |
1064 | | bufferevent_rate_limit_group_decrement_write( |
1065 | | struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) |
1066 | 0 | { |
1067 | 0 | int r = 0; |
1068 | 0 | ev_ssize_t old_limit, new_limit; |
1069 | 0 | LOCK_GROUP(grp); |
1070 | 0 | old_limit = grp->rate_limit.write_limit; |
1071 | 0 | new_limit = (grp->rate_limit.write_limit -= decr); |
1072 | |
|
1073 | 0 | if (old_limit > 0 && new_limit <= 0) { |
1074 | 0 | bev_group_suspend_writing_(grp); |
1075 | 0 | } else if (old_limit <= 0 && new_limit > 0) { |
1076 | 0 | bev_group_unsuspend_writing_(grp); |
1077 | 0 | } |
1078 | |
|
1079 | 0 | UNLOCK_GROUP(grp); |
1080 | 0 | return r; |
1081 | 0 | } |
1082 | | |
1083 | | void |
1084 | | bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, |
1085 | | ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) |
1086 | 0 | { |
1087 | 0 | EVUTIL_ASSERT(grp != NULL); |
1088 | 0 | if (total_read_out) |
1089 | 0 | *total_read_out = grp->total_read; |
1090 | 0 | if (total_written_out) |
1091 | 0 | *total_written_out = grp->total_written; |
1092 | 0 | } |
1093 | | |
1094 | | void |
1095 | | bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) |
1096 | 0 | { |
1097 | 0 | grp->total_read = grp->total_written = 0; |
1098 | 0 | } |
1099 | | |
1100 | | int |
1101 | | bufferevent_ratelim_init_(struct bufferevent_private *bev) |
1102 | 0 | { |
1103 | 0 | bev->rate_limiting = NULL; |
1104 | 0 | bev->max_single_read = MAX_SINGLE_READ_DEFAULT; |
1105 | 0 | bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; |
1106 | |
|
1107 | 0 | if (evbuffer_set_max_read(bev->bev.input, bev->max_single_read)) |
1108 | 0 | return -1; |
1109 | | |
1110 | 0 | return 0; |
1111 | 0 | } |