/src/mozilla-central/netwerk/base/ThrottleQueue.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
2 | | /* vim: set ts=8 sts=2 et sw=2 tw=80: */ |
3 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
4 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
5 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
6 | | |
7 | | #include "ThrottleQueue.h" |
8 | | #include "nsISeekableStream.h" |
9 | | #include "nsIAsyncInputStream.h" |
10 | | #include "nsStreamUtils.h" |
11 | | #include "nsNetUtil.h" |
12 | | |
13 | | namespace mozilla { |
14 | | namespace net { |
15 | | |
16 | | //----------------------------------------------------------------------------- |
17 | | |
18 | | class ThrottleInputStream final |
19 | | : public nsIAsyncInputStream |
20 | | , public nsISeekableStream |
21 | | { |
22 | | public: |
23 | | |
24 | | ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue); |
25 | | |
26 | | NS_DECL_THREADSAFE_ISUPPORTS |
27 | | NS_DECL_NSIINPUTSTREAM |
28 | | NS_DECL_NSISEEKABLESTREAM |
29 | | NS_DECL_NSIASYNCINPUTSTREAM |
30 | | |
31 | | void AllowInput(); |
32 | | |
33 | | private: |
34 | | |
35 | | ~ThrottleInputStream(); |
36 | | |
37 | | nsCOMPtr<nsIInputStream> mStream; |
38 | | RefPtr<ThrottleQueue> mQueue; |
39 | | nsresult mClosedStatus; |
40 | | |
41 | | nsCOMPtr<nsIInputStreamCallback> mCallback; |
42 | | nsCOMPtr<nsIEventTarget> mEventTarget; |
43 | | }; |
44 | | |
45 | | NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream) |
46 | | |
47 | | ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue) |
48 | | : mStream(aStream) |
49 | | , mQueue(aQueue) |
50 | | , mClosedStatus(NS_OK) |
51 | 0 | { |
52 | 0 | MOZ_ASSERT(aQueue != nullptr); |
53 | 0 | } |
54 | | |
55 | | ThrottleInputStream::~ThrottleInputStream() |
56 | 0 | { |
57 | 0 | Close(); |
58 | 0 | } |
59 | | |
60 | | NS_IMETHODIMP |
61 | | ThrottleInputStream::Close() |
62 | 0 | { |
63 | 0 | if (NS_FAILED(mClosedStatus)) { |
64 | 0 | return mClosedStatus; |
65 | 0 | } |
66 | 0 | |
67 | 0 | if (mQueue) { |
68 | 0 | mQueue->DequeueStream(this); |
69 | 0 | mQueue = nullptr; |
70 | 0 | mClosedStatus = NS_BASE_STREAM_CLOSED; |
71 | 0 | } |
72 | 0 | return mStream->Close(); |
73 | 0 | } |
74 | | |
75 | | NS_IMETHODIMP |
76 | | ThrottleInputStream::Available(uint64_t* aResult) |
77 | 0 | { |
78 | 0 | if (NS_FAILED(mClosedStatus)) { |
79 | 0 | return mClosedStatus; |
80 | 0 | } |
81 | 0 | |
82 | 0 | return mStream->Available(aResult); |
83 | 0 | } |
84 | | |
85 | | NS_IMETHODIMP |
86 | | ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) |
87 | 0 | { |
88 | 0 | if (NS_FAILED(mClosedStatus)) { |
89 | 0 | return mClosedStatus; |
90 | 0 | } |
91 | 0 | |
92 | 0 | uint32_t realCount; |
93 | 0 | nsresult rv = mQueue->Available(aCount, &realCount); |
94 | 0 | if (NS_FAILED(rv)) { |
95 | 0 | return rv; |
96 | 0 | } |
97 | 0 | |
98 | 0 | if (realCount == 0) { |
99 | 0 | return NS_BASE_STREAM_WOULD_BLOCK; |
100 | 0 | } |
101 | 0 | |
102 | 0 | rv = mStream->Read(aBuf, realCount, aResult); |
103 | 0 | if (NS_SUCCEEDED(rv) && *aResult > 0) { |
104 | 0 | mQueue->RecordRead(*aResult); |
105 | 0 | } |
106 | 0 | return rv; |
107 | 0 | } |
108 | | |
109 | | NS_IMETHODIMP |
110 | | ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, |
111 | | uint32_t aCount, uint32_t* aResult) |
112 | 0 | { |
113 | 0 | if (NS_FAILED(mClosedStatus)) { |
114 | 0 | return mClosedStatus; |
115 | 0 | } |
116 | 0 | |
117 | 0 | uint32_t realCount; |
118 | 0 | nsresult rv = mQueue->Available(aCount, &realCount); |
119 | 0 | if (NS_FAILED(rv)) { |
120 | 0 | return rv; |
121 | 0 | } |
122 | 0 | |
123 | 0 | if (realCount == 0) { |
124 | 0 | return NS_BASE_STREAM_WOULD_BLOCK; |
125 | 0 | } |
126 | 0 | |
127 | 0 | rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult); |
128 | 0 | if (NS_SUCCEEDED(rv) && *aResult > 0) { |
129 | 0 | mQueue->RecordRead(*aResult); |
130 | 0 | } |
131 | 0 | return rv; |
132 | 0 | } |
133 | | |
134 | | NS_IMETHODIMP |
135 | | ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) |
136 | 0 | { |
137 | 0 | *aNonBlocking = true; |
138 | 0 | return NS_OK; |
139 | 0 | } |
140 | | |
141 | | NS_IMETHODIMP |
142 | | ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) |
143 | 0 | { |
144 | 0 | if (NS_FAILED(mClosedStatus)) { |
145 | 0 | return mClosedStatus; |
146 | 0 | } |
147 | 0 | |
148 | 0 | nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream); |
149 | 0 | if (!sstream) { |
150 | 0 | return NS_ERROR_FAILURE; |
151 | 0 | } |
152 | 0 | |
153 | 0 | return sstream->Seek(aWhence, aOffset); |
154 | 0 | } |
155 | | |
156 | | NS_IMETHODIMP |
157 | | ThrottleInputStream::Tell(int64_t* aResult) |
158 | 0 | { |
159 | 0 | if (NS_FAILED(mClosedStatus)) { |
160 | 0 | return mClosedStatus; |
161 | 0 | } |
162 | 0 | |
163 | 0 | nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream); |
164 | 0 | if (!sstream) { |
165 | 0 | return NS_ERROR_FAILURE; |
166 | 0 | } |
167 | 0 | |
168 | 0 | return sstream->Tell(aResult); |
169 | 0 | } |
170 | | |
171 | | NS_IMETHODIMP |
172 | | ThrottleInputStream::SetEOF() |
173 | 0 | { |
174 | 0 | if (NS_FAILED(mClosedStatus)) { |
175 | 0 | return mClosedStatus; |
176 | 0 | } |
177 | 0 | |
178 | 0 | nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream); |
179 | 0 | if (!sstream) { |
180 | 0 | return NS_ERROR_FAILURE; |
181 | 0 | } |
182 | 0 | |
183 | 0 | return sstream->SetEOF(); |
184 | 0 | } |
185 | | |
186 | | NS_IMETHODIMP |
187 | | ThrottleInputStream::CloseWithStatus(nsresult aStatus) |
188 | 0 | { |
189 | 0 | if (NS_FAILED(mClosedStatus)) { |
190 | 0 | // Already closed, ignore. |
191 | 0 | return NS_OK; |
192 | 0 | } |
193 | 0 | if (NS_SUCCEEDED(aStatus)) { |
194 | 0 | aStatus = NS_BASE_STREAM_CLOSED; |
195 | 0 | } |
196 | 0 |
|
197 | 0 | mClosedStatus = Close(); |
198 | 0 | if (NS_SUCCEEDED(mClosedStatus)) { |
199 | 0 | mClosedStatus = aStatus; |
200 | 0 | } |
201 | 0 | return NS_OK; |
202 | 0 | } |
203 | | |
204 | | NS_IMETHODIMP |
205 | | ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback, |
206 | | uint32_t aFlags, |
207 | | uint32_t aRequestedCount, |
208 | | nsIEventTarget *aEventTarget) |
209 | 0 | { |
210 | 0 | if (aFlags != 0) { |
211 | 0 | return NS_ERROR_ILLEGAL_VALUE; |
212 | 0 | } |
213 | 0 | |
214 | 0 | mCallback = aCallback; |
215 | 0 | mEventTarget = aEventTarget; |
216 | 0 | if (mCallback) { |
217 | 0 | mQueue->QueueStream(this); |
218 | 0 | } else { |
219 | 0 | mQueue->DequeueStream(this); |
220 | 0 | } |
221 | 0 | return NS_OK; |
222 | 0 | } |
223 | | |
224 | | void |
225 | | ThrottleInputStream::AllowInput() |
226 | 0 | { |
227 | 0 | MOZ_ASSERT(mCallback); |
228 | 0 | nsCOMPtr<nsIInputStreamCallback> callbackEvent = |
229 | 0 | NS_NewInputStreamReadyEvent("ThrottleInputStream::AllowInput", |
230 | 0 | mCallback, mEventTarget); |
231 | 0 | mCallback = nullptr; |
232 | 0 | mEventTarget = nullptr; |
233 | 0 | callbackEvent->OnInputStreamReady(this); |
234 | 0 | } |
235 | | |
236 | | //----------------------------------------------------------------------------- |
237 | | |
238 | | NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback, nsINamed) |
239 | | |
240 | | ThrottleQueue::ThrottleQueue() |
241 | | : mMeanBytesPerSecond(0) |
242 | | , mMaxBytesPerSecond(0) |
243 | | , mBytesProcessed(0) |
244 | | , mTimerArmed(false) |
245 | 0 | { |
246 | 0 | nsresult rv; |
247 | 0 | nsCOMPtr<nsIEventTarget> sts; |
248 | 0 | nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv); |
249 | 0 | if (NS_SUCCEEDED(rv)) |
250 | 0 | sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); |
251 | 0 | if (NS_SUCCEEDED(rv)) |
252 | 0 | mTimer = NS_NewTimer(sts); |
253 | 0 | } |
254 | | |
255 | | ThrottleQueue::~ThrottleQueue() |
256 | 0 | { |
257 | 0 | if (mTimer && mTimerArmed) { |
258 | 0 | mTimer->Cancel(); |
259 | 0 | } |
260 | 0 | mTimer = nullptr; |
261 | 0 | } |
262 | | |
263 | | NS_IMETHODIMP |
264 | | ThrottleQueue::RecordRead(uint32_t aBytesRead) |
265 | 0 | { |
266 | 0 | MOZ_ASSERT(OnSocketThread(), "not on socket thread"); |
267 | 0 | ThrottleEntry entry; |
268 | 0 | entry.mTime = TimeStamp::Now(); |
269 | 0 | entry.mBytesRead = aBytesRead; |
270 | 0 | mReadEvents.AppendElement(entry); |
271 | 0 | mBytesProcessed += aBytesRead; |
272 | 0 | return NS_OK; |
273 | 0 | } |
274 | | |
275 | | NS_IMETHODIMP |
276 | | ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) |
277 | 0 | { |
278 | 0 | MOZ_ASSERT(OnSocketThread(), "not on socket thread"); |
279 | 0 | TimeStamp now = TimeStamp::Now(); |
280 | 0 | TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1); |
281 | 0 | size_t i; |
282 | 0 |
|
283 | 0 | // Remove all stale events. |
284 | 0 | for (i = 0; i < mReadEvents.Length(); ++i) { |
285 | 0 | if (mReadEvents[i].mTime >= oneSecondAgo) { |
286 | 0 | break; |
287 | 0 | } |
288 | 0 | } |
289 | 0 | mReadEvents.RemoveElementsAt(0, i); |
290 | 0 |
|
291 | 0 | uint32_t totalBytes = 0; |
292 | 0 | for (i = 0; i < mReadEvents.Length(); ++i) { |
293 | 0 | totalBytes += mReadEvents[i].mBytesRead; |
294 | 0 | } |
295 | 0 |
|
296 | 0 | uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond; |
297 | 0 | double prob = static_cast<double>(rand()) / RAND_MAX; |
298 | 0 | uint32_t thisSliceBytes = mMeanBytesPerSecond - spread + |
299 | 0 | static_cast<uint32_t>(2 * spread * prob); |
300 | 0 |
|
301 | 0 | if (totalBytes >= thisSliceBytes) { |
302 | 0 | *aAvailable = 0; |
303 | 0 | } else { |
304 | 0 | *aAvailable = thisSliceBytes; |
305 | 0 | } |
306 | 0 | return NS_OK; |
307 | 0 | } |
308 | | |
309 | | NS_IMETHODIMP |
310 | | ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) |
311 | 0 | { |
312 | 0 | // Can be called on any thread. |
313 | 0 | if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) { |
314 | 0 | return NS_ERROR_ILLEGAL_VALUE; |
315 | 0 | } |
316 | 0 | |
317 | 0 | mMeanBytesPerSecond = aMeanBytesPerSecond; |
318 | 0 | mMaxBytesPerSecond = aMaxBytesPerSecond; |
319 | 0 | return NS_OK; |
320 | 0 | } |
321 | | |
322 | | NS_IMETHODIMP |
323 | | ThrottleQueue::BytesProcessed(uint64_t* aResult) |
324 | 0 | { |
325 | 0 | *aResult = mBytesProcessed; |
326 | 0 | return NS_OK; |
327 | 0 | } |
328 | | |
329 | | NS_IMETHODIMP |
330 | | ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult) |
331 | 0 | { |
332 | 0 | nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this); |
333 | 0 | result.forget(aResult); |
334 | 0 | return NS_OK; |
335 | 0 | } |
336 | | |
337 | | NS_IMETHODIMP |
338 | | ThrottleQueue::Notify(nsITimer* aTimer) |
339 | 0 | { |
340 | 0 | MOZ_ASSERT(OnSocketThread(), "not on socket thread"); |
341 | 0 | // A notified reader may need to push itself back on the queue. |
342 | 0 | // Swap out the list of readers so that this works properly. |
343 | 0 | nsTArray<RefPtr<ThrottleInputStream>> events; |
344 | 0 | events.SwapElements(mAsyncEvents); |
345 | 0 |
|
346 | 0 | // Optimistically notify all the waiting readers, and then let them |
347 | 0 | // requeue if there isn't enough bandwidth. |
348 | 0 | for (size_t i = 0; i < events.Length(); ++i) { |
349 | 0 | events[i]->AllowInput(); |
350 | 0 | } |
351 | 0 |
|
352 | 0 | mTimerArmed = false; |
353 | 0 | return NS_OK; |
354 | 0 | } |
355 | | |
356 | | NS_IMETHODIMP |
357 | | ThrottleQueue::GetName(nsACString& aName) |
358 | 0 | { |
359 | 0 | aName.AssignLiteral("net::ThrottleQueue"); |
360 | 0 | return NS_OK; |
361 | 0 | } |
362 | | |
363 | | void |
364 | | ThrottleQueue::QueueStream(ThrottleInputStream* aStream) |
365 | 0 | { |
366 | 0 | MOZ_ASSERT(OnSocketThread(), "not on socket thread"); |
367 | 0 | if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) { |
368 | 0 | mAsyncEvents.AppendElement(aStream); |
369 | 0 |
|
370 | 0 | if (!mTimerArmed) { |
371 | 0 | uint32_t ms = 1000; |
372 | 0 | if (mReadEvents.Length() > 0) { |
373 | 0 | TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1); |
374 | 0 | TimeStamp now = TimeStamp::Now(); |
375 | 0 |
|
376 | 0 | if (t > now) { |
377 | 0 | ms = static_cast<uint32_t>((t - now).ToMilliseconds()); |
378 | 0 | } else { |
379 | 0 | ms = 1; |
380 | 0 | } |
381 | 0 | } |
382 | 0 |
|
383 | 0 | if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) { |
384 | 0 | mTimerArmed = true; |
385 | 0 | } |
386 | 0 | } |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | | void |
391 | | ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) |
392 | 0 | { |
393 | 0 | MOZ_ASSERT(OnSocketThread(), "not on socket thread"); |
394 | 0 | mAsyncEvents.RemoveElement(aStream); |
395 | 0 | } |
396 | | |
397 | | } |
398 | | } |