/src/h2o/lib/handler/throttle_resp.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2016 Justin Zhu |
3 | | * |
4 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | | * of this software and associated documentation files (the "Software"), to |
6 | | * deal in the Software without restriction, including without limitation the |
7 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
8 | | * sell copies of the Software, and to permit persons to whom the Software is |
9 | | * furnished to do so, subject to the following conditions: |
10 | | * |
11 | | * The above copyright notice and this permission notice shall be included in |
12 | | * all copies or substantial portions of the Software. |
13 | | * |
14 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
19 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
20 | | * IN THE SOFTWARE. |
21 | | */ |
22 | | #include <stdlib.h> |
23 | | #include "h2o.h" |
24 | | |
25 | | typedef struct st_throttle_resp_t { |
26 | | h2o_ostream_t super; |
27 | | h2o_timer_t timeout_entry; |
28 | | struct { |
29 | | uint64_t at; |
30 | | ssize_t bytes_left; |
31 | | } window; |
32 | | h2o_req_t *req; |
33 | | size_t bytes_per_sec; |
34 | | struct { |
35 | | H2O_VECTOR(h2o_sendvec_t) bufs; |
36 | | h2o_send_state_t stream_state; |
37 | | } state; |
38 | | } throttle_resp_t; |
39 | | |
40 | | /** |
41 | | * Given current deficit (`bytes_left` which would be negative) and bytes_per_sec, returns when the deficit would become |
42 | | * non-negative. |
43 | | */ |
44 | | static uint64_t calc_delay(ssize_t bytes_left, size_t bytes_per_sec) |
45 | 0 | { |
46 | 0 | return (-bytes_left * (uint64_t)1000 + bytes_per_sec - 1) / bytes_per_sec; |
47 | 0 | } |
48 | | |
49 | | static void real_send(throttle_resp_t *self) |
50 | 0 | { |
51 | 0 | uint64_t now = h2o_now(self->req->conn->ctx->loop); |
52 | | |
53 | | /* if time has changed since previous invocation, update window */ |
54 | 0 | if (self->window.at < now) { |
55 | | /* burst rate (after upstream remains silent) is limited to 1-second worth of data */ |
56 | 0 | uint64_t addition = (self->bytes_per_sec * (now - self->window.at)) / 1000; |
57 | 0 | if (addition > self->bytes_per_sec) |
58 | 0 | addition = self->bytes_per_sec; |
59 | 0 | self->window.bytes_left += addition; |
60 | 0 | self->window.at = now; |
61 | 0 | } |
62 | | |
63 | | /* schedule the timer for delayed invocation, if window is negative at this moment */ |
64 | 0 | if (self->window.bytes_left < 0) { |
65 | 0 | uint64_t delay = calc_delay(self->window.bytes_left, self->bytes_per_sec); |
66 | 0 | assert(delay > 0); |
67 | 0 | h2o_timer_link(self->req->conn->ctx->loop, delay, &self->timeout_entry); |
68 | 0 | return; |
69 | 0 | } |
70 | | |
71 | | /* adjust window and send */ |
72 | 0 | for (size_t i = 0; i < self->state.bufs.size; i++) |
73 | 0 | self->window.bytes_left -= self->state.bufs.entries[i].len; |
74 | 0 | h2o_ostream_send_next(&self->super, self->req, self->state.bufs.entries, self->state.bufs.size, self->state.stream_state); |
75 | 0 | } |
76 | | |
77 | | static void on_timer(h2o_timer_t *entry) |
78 | 0 | { |
79 | 0 | throttle_resp_t *self = H2O_STRUCT_FROM_MEMBER(throttle_resp_t, timeout_entry, entry); |
80 | 0 | real_send(self); |
81 | 0 | } |
82 | | |
83 | | static void on_send(h2o_ostream_t *_self, h2o_req_t *req, h2o_sendvec_t *inbufs, size_t inbufcnt, h2o_send_state_t state) |
84 | 0 | { |
85 | 0 | throttle_resp_t *self = (void *)_self; |
86 | |
|
87 | 0 | assert(!h2o_timer_is_linked(&self->timeout_entry)); |
88 | | |
89 | | /* save state */ |
90 | 0 | h2o_vector_reserve(&req->pool, &self->state.bufs, inbufcnt); |
91 | 0 | for (size_t i = 0; i < inbufcnt; ++i) { |
92 | 0 | self->state.bufs.entries[i] = inbufs[i]; |
93 | 0 | } |
94 | 0 | self->state.bufs.size = inbufcnt; |
95 | 0 | self->state.stream_state = state; |
96 | |
|
97 | 0 | real_send(self); |
98 | 0 | } |
99 | | |
100 | | static void on_stop(h2o_ostream_t *_self, h2o_req_t *req) |
101 | 0 | { |
102 | 0 | throttle_resp_t *self = (void *)_self; |
103 | 0 | if (h2o_timer_is_linked(&self->timeout_entry)) |
104 | 0 | h2o_timer_unlink(&self->timeout_entry); |
105 | 0 | } |
106 | | |
107 | | static void on_setup_ostream(h2o_filter_t *self, h2o_req_t *req, h2o_ostream_t **slot) |
108 | 0 | { |
109 | 0 | throttle_resp_t *throttle; |
110 | 0 | size_t bytes_per_sec; |
111 | | |
112 | | /* only handle 200 OK with content */ |
113 | 0 | if (req->res.status != 200) |
114 | 0 | goto Next; |
115 | 0 | if (h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("HEAD"))) |
116 | 0 | goto Next; |
117 | | |
118 | 0 | { /* obtain the rate from X-Traffic header field and delete the header field, or skip */ |
119 | 0 | ssize_t xt_index; |
120 | 0 | if ((xt_index = h2o_find_header(&req->res.headers, H2O_TOKEN_X_TRAFFIC, -1)) == -1) |
121 | 0 | goto Next; |
122 | 0 | char *buf = req->res.headers.entries[xt_index].value.base; |
123 | 0 | if (H2O_UNLIKELY((bytes_per_sec = h2o_strtosizefwd(&buf, req->res.headers.entries[xt_index].value.len)) == SIZE_MAX)) |
124 | 0 | goto Next; |
125 | 0 | h2o_delete_header(&req->res.headers, xt_index); |
126 | 0 | } |
127 | | |
128 | | /* instantiate the ostream filter */ |
129 | 0 | throttle = (void *)h2o_add_ostream(req, H2O_ALIGNOF(*throttle), sizeof(*throttle), slot); |
130 | 0 | throttle->super.do_send = on_send; |
131 | 0 | throttle->super.stop = on_stop; |
132 | 0 | h2o_timer_init(&throttle->timeout_entry, on_timer); |
133 | 0 | throttle->window.at = h2o_now(req->conn->ctx->loop); |
134 | 0 | throttle->window.bytes_left = 0; |
135 | 0 | throttle->req = req; |
136 | 0 | throttle->bytes_per_sec = bytes_per_sec; |
137 | 0 | memset(&throttle->state.bufs, 0, sizeof(throttle->state.bufs)); |
138 | 0 | throttle->state.stream_state = H2O_SEND_STATE_IN_PROGRESS; |
139 | |
|
140 | 0 | { /* reduce `preferred_chunk_size` so that we'd be sending one chunk every 100ms */ |
141 | 0 | size_t chunk_size = bytes_per_sec / 10; |
142 | 0 | if (chunk_size < 4096) |
143 | 0 | chunk_size = 4096; |
144 | 0 | if (req->preferred_chunk_size > chunk_size) |
145 | 0 | req->preferred_chunk_size = chunk_size; |
146 | 0 | } |
147 | |
|
148 | 0 | slot = &throttle->super.next; |
149 | |
|
150 | 0 | Next: |
151 | 0 | h2o_setup_next_ostream(req, slot); |
152 | 0 | } |
153 | | |
154 | | void h2o_throttle_resp_register(h2o_pathconf_t *pathconf) |
155 | 0 | { |
156 | 0 | h2o_filter_t *self = h2o_create_filter(pathconf, sizeof(*self)); |
157 | 0 | self->on_setup_ostream = on_setup_ostream; |
158 | 0 | } |