Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/netwerk/base/ThrottleQueue.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "ThrottleQueue.h"
8
#include "nsISeekableStream.h"
9
#include "nsIAsyncInputStream.h"
10
#include "nsStreamUtils.h"
11
#include "nsNetUtil.h"
12
13
namespace mozilla {
14
namespace net {
15
16
//-----------------------------------------------------------------------------
17
18
class ThrottleInputStream final
19
  : public nsIAsyncInputStream
20
  , public nsISeekableStream
21
{
22
public:
23
24
  ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
25
26
  NS_DECL_THREADSAFE_ISUPPORTS
27
  NS_DECL_NSIINPUTSTREAM
28
  NS_DECL_NSISEEKABLESTREAM
29
  NS_DECL_NSIASYNCINPUTSTREAM
30
31
  void AllowInput();
32
33
private:
34
35
  ~ThrottleInputStream();
36
37
  nsCOMPtr<nsIInputStream> mStream;
38
  RefPtr<ThrottleQueue> mQueue;
39
  nsresult mClosedStatus;
40
41
  nsCOMPtr<nsIInputStreamCallback> mCallback;
42
  nsCOMPtr<nsIEventTarget> mEventTarget;
43
};
44
45
NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
46
47
ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
48
  : mStream(aStream)
49
  , mQueue(aQueue)
50
  , mClosedStatus(NS_OK)
51
0
{
52
0
  MOZ_ASSERT(aQueue != nullptr);
53
0
}
54
55
ThrottleInputStream::~ThrottleInputStream()
56
0
{
57
0
  Close();
58
0
}
59
60
NS_IMETHODIMP
61
ThrottleInputStream::Close()
62
0
{
63
0
  if (NS_FAILED(mClosedStatus)) {
64
0
    return mClosedStatus;
65
0
  }
66
0
67
0
  if (mQueue) {
68
0
    mQueue->DequeueStream(this);
69
0
    mQueue = nullptr;
70
0
    mClosedStatus = NS_BASE_STREAM_CLOSED;
71
0
  }
72
0
  return mStream->Close();
73
0
}
74
75
NS_IMETHODIMP
76
ThrottleInputStream::Available(uint64_t* aResult)
77
0
{
78
0
  if (NS_FAILED(mClosedStatus)) {
79
0
    return mClosedStatus;
80
0
  }
81
0
82
0
  return mStream->Available(aResult);
83
0
}
84
85
NS_IMETHODIMP
86
ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
87
0
{
88
0
  if (NS_FAILED(mClosedStatus)) {
89
0
    return mClosedStatus;
90
0
  }
91
0
92
0
  uint32_t realCount;
93
0
  nsresult rv = mQueue->Available(aCount, &realCount);
94
0
  if (NS_FAILED(rv)) {
95
0
    return rv;
96
0
  }
97
0
98
0
  if (realCount == 0) {
99
0
    return NS_BASE_STREAM_WOULD_BLOCK;
100
0
  }
101
0
102
0
  rv = mStream->Read(aBuf, realCount, aResult);
103
0
  if (NS_SUCCEEDED(rv) && *aResult > 0) {
104
0
    mQueue->RecordRead(*aResult);
105
0
  }
106
0
  return rv;
107
0
}
108
109
NS_IMETHODIMP
110
ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
111
                                  uint32_t aCount, uint32_t* aResult)
112
0
{
113
0
  if (NS_FAILED(mClosedStatus)) {
114
0
    return mClosedStatus;
115
0
  }
116
0
117
0
  uint32_t realCount;
118
0
  nsresult rv = mQueue->Available(aCount, &realCount);
119
0
  if (NS_FAILED(rv)) {
120
0
    return rv;
121
0
  }
122
0
123
0
  if (realCount == 0) {
124
0
    return NS_BASE_STREAM_WOULD_BLOCK;
125
0
  }
126
0
127
0
  rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
128
0
  if (NS_SUCCEEDED(rv) && *aResult > 0) {
129
0
    mQueue->RecordRead(*aResult);
130
0
  }
131
0
  return rv;
132
0
}
133
134
NS_IMETHODIMP
135
ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
136
0
{
137
0
  *aNonBlocking = true;
138
0
  return NS_OK;
139
0
}
140
141
NS_IMETHODIMP
142
ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
143
0
{
144
0
  if (NS_FAILED(mClosedStatus)) {
145
0
    return mClosedStatus;
146
0
  }
147
0
148
0
  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
149
0
  if (!sstream) {
150
0
    return NS_ERROR_FAILURE;
151
0
  }
152
0
153
0
  return sstream->Seek(aWhence, aOffset);
154
0
}
155
156
NS_IMETHODIMP
157
ThrottleInputStream::Tell(int64_t* aResult)
158
0
{
159
0
  if (NS_FAILED(mClosedStatus)) {
160
0
    return mClosedStatus;
161
0
  }
162
0
163
0
  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
164
0
  if (!sstream) {
165
0
    return NS_ERROR_FAILURE;
166
0
  }
167
0
168
0
  return sstream->Tell(aResult);
169
0
}
170
171
NS_IMETHODIMP
172
ThrottleInputStream::SetEOF()
173
0
{
174
0
  if (NS_FAILED(mClosedStatus)) {
175
0
    return mClosedStatus;
176
0
  }
177
0
178
0
  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
179
0
  if (!sstream) {
180
0
    return NS_ERROR_FAILURE;
181
0
  }
182
0
183
0
  return sstream->SetEOF();
184
0
}
185
186
NS_IMETHODIMP
187
ThrottleInputStream::CloseWithStatus(nsresult aStatus)
188
0
{
189
0
  if (NS_FAILED(mClosedStatus)) {
190
0
    // Already closed, ignore.
191
0
    return NS_OK;
192
0
  }
193
0
  if (NS_SUCCEEDED(aStatus)) {
194
0
    aStatus = NS_BASE_STREAM_CLOSED;
195
0
  }
196
0
197
0
  mClosedStatus = Close();
198
0
  if (NS_SUCCEEDED(mClosedStatus)) {
199
0
    mClosedStatus = aStatus;
200
0
  }
201
0
  return NS_OK;
202
0
}
203
204
NS_IMETHODIMP
205
ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
206
                               uint32_t aFlags,
207
                               uint32_t aRequestedCount,
208
                               nsIEventTarget *aEventTarget)
209
0
{
210
0
  if (aFlags != 0) {
211
0
    return NS_ERROR_ILLEGAL_VALUE;
212
0
  }
213
0
214
0
  mCallback = aCallback;
215
0
  mEventTarget = aEventTarget;
216
0
  if (mCallback) {
217
0
    mQueue->QueueStream(this);
218
0
  } else {
219
0
    mQueue->DequeueStream(this);
220
0
  }
221
0
  return NS_OK;
222
0
}
223
224
void
225
ThrottleInputStream::AllowInput()
226
0
{
227
0
  MOZ_ASSERT(mCallback);
228
0
  nsCOMPtr<nsIInputStreamCallback> callbackEvent =
229
0
    NS_NewInputStreamReadyEvent("ThrottleInputStream::AllowInput",
230
0
                                mCallback, mEventTarget);
231
0
  mCallback = nullptr;
232
0
  mEventTarget = nullptr;
233
0
  callbackEvent->OnInputStreamReady(this);
234
0
}
235
236
//-----------------------------------------------------------------------------
237
238
NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback, nsINamed)
239
240
ThrottleQueue::ThrottleQueue()
241
  : mMeanBytesPerSecond(0)
242
  , mMaxBytesPerSecond(0)
243
  , mBytesProcessed(0)
244
  , mTimerArmed(false)
245
0
{
246
0
  nsresult rv;
247
0
  nsCOMPtr<nsIEventTarget> sts;
248
0
  nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
249
0
  if (NS_SUCCEEDED(rv))
250
0
    sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
251
0
  if (NS_SUCCEEDED(rv))
252
0
    mTimer = NS_NewTimer(sts);
253
0
}
254
255
ThrottleQueue::~ThrottleQueue()
256
0
{
257
0
  if (mTimer && mTimerArmed) {
258
0
    mTimer->Cancel();
259
0
  }
260
0
  mTimer = nullptr;
261
0
}
262
263
NS_IMETHODIMP
264
ThrottleQueue::RecordRead(uint32_t aBytesRead)
265
0
{
266
0
  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
267
0
  ThrottleEntry entry;
268
0
  entry.mTime = TimeStamp::Now();
269
0
  entry.mBytesRead = aBytesRead;
270
0
  mReadEvents.AppendElement(entry);
271
0
  mBytesProcessed += aBytesRead;
272
0
  return NS_OK;
273
0
}
274
275
NS_IMETHODIMP
276
ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
277
0
{
278
0
  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
279
0
  TimeStamp now = TimeStamp::Now();
280
0
  TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
281
0
  size_t i;
282
0
283
0
  // Remove all stale events.
284
0
  for (i = 0; i < mReadEvents.Length(); ++i) {
285
0
    if (mReadEvents[i].mTime >= oneSecondAgo) {
286
0
      break;
287
0
    }
288
0
  }
289
0
  mReadEvents.RemoveElementsAt(0, i);
290
0
291
0
  uint32_t totalBytes = 0;
292
0
  for (i = 0; i < mReadEvents.Length(); ++i) {
293
0
    totalBytes += mReadEvents[i].mBytesRead;
294
0
  }
295
0
296
0
  uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
297
0
  double prob = static_cast<double>(rand()) / RAND_MAX;
298
0
  uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
299
0
    static_cast<uint32_t>(2 * spread * prob);
300
0
301
0
  if (totalBytes >= thisSliceBytes) {
302
0
    *aAvailable = 0;
303
0
  } else {
304
0
    *aAvailable = thisSliceBytes;
305
0
  }
306
0
  return NS_OK;
307
0
}
308
309
NS_IMETHODIMP
310
ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
311
0
{
312
0
  // Can be called on any thread.
313
0
  if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
314
0
    return NS_ERROR_ILLEGAL_VALUE;
315
0
  }
316
0
317
0
  mMeanBytesPerSecond = aMeanBytesPerSecond;
318
0
  mMaxBytesPerSecond = aMaxBytesPerSecond;
319
0
  return NS_OK;
320
0
}
321
322
NS_IMETHODIMP
323
ThrottleQueue::BytesProcessed(uint64_t* aResult)
324
0
{
325
0
  *aResult = mBytesProcessed;
326
0
  return NS_OK;
327
0
}
328
329
NS_IMETHODIMP
330
ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
331
0
{
332
0
  nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
333
0
  result.forget(aResult);
334
0
  return NS_OK;
335
0
}
336
337
NS_IMETHODIMP
338
ThrottleQueue::Notify(nsITimer* aTimer)
339
0
{
340
0
  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
341
0
  // A notified reader may need to push itself back on the queue.
342
0
  // Swap out the list of readers so that this works properly.
343
0
  nsTArray<RefPtr<ThrottleInputStream>> events;
344
0
  events.SwapElements(mAsyncEvents);
345
0
346
0
  // Optimistically notify all the waiting readers, and then let them
347
0
  // requeue if there isn't enough bandwidth.
348
0
  for (size_t i = 0; i < events.Length(); ++i) {
349
0
    events[i]->AllowInput();
350
0
  }
351
0
352
0
  mTimerArmed = false;
353
0
  return NS_OK;
354
0
}
355
356
NS_IMETHODIMP
357
ThrottleQueue::GetName(nsACString& aName)
358
0
{
359
0
  aName.AssignLiteral("net::ThrottleQueue");
360
0
  return NS_OK;
361
0
}
362
363
void
364
ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
365
0
{
366
0
  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
367
0
  if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
368
0
    mAsyncEvents.AppendElement(aStream);
369
0
370
0
    if (!mTimerArmed) {
371
0
      uint32_t ms = 1000;
372
0
      if (mReadEvents.Length() > 0) {
373
0
        TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
374
0
        TimeStamp now = TimeStamp::Now();
375
0
376
0
        if (t > now) {
377
0
          ms = static_cast<uint32_t>((t - now).ToMilliseconds());
378
0
        } else {
379
0
          ms = 1;
380
0
        }
381
0
      }
382
0
383
0
      if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
384
0
        mTimerArmed = true;
385
0
      }
386
0
    }
387
0
  }
388
0
}
389
390
void
391
ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
392
0
{
393
0
  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
394
0
  mAsyncEvents.RemoveElement(aStream);
395
0
}
396
397
}
398
}