Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/xpcom/tests/gtest/TestPipes.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include <algorithm>
8
#include "gtest/gtest.h"
9
#include "Helpers.h"
10
#include "mozilla/ReentrantMonitor.h"
11
#include "mozilla/Printf.h"
12
#include "nsCOMPtr.h"
13
#include "nsCRT.h"
14
#include "nsIAsyncInputStream.h"
15
#include "nsIAsyncOutputStream.h"
16
#include "nsIBufferedStreams.h"
17
#include "nsIClassInfo.h"
18
#include "nsICloneableInputStream.h"
19
#include "nsIInputStream.h"
20
#include "nsIOutputStream.h"
21
#include "nsIPipe.h"
22
#include "nsISeekableStream.h"
23
#include "nsIThread.h"
24
#include "nsIRunnable.h"
25
#include "nsStreamUtils.h"
26
#include "nsString.h"
27
#include "nsThreadUtils.h"
28
#include "prinrval.h"
29
30
using namespace mozilla;
31
32
0
#define ITERATIONS      33333
33
char kTestPattern[] = "My hovercraft is full of eels.\n";
34
35
bool gTrace = false;
36
37
static nsresult
38
WriteAll(nsIOutputStream *os, const char *buf, uint32_t bufLen, uint32_t *lenWritten)
39
0
{
40
0
    const char *p = buf;
41
0
    *lenWritten = 0;
42
0
    while (bufLen) {
43
0
        uint32_t n;
44
0
        nsresult rv = os->Write(p, bufLen, &n);
45
0
        if (NS_FAILED(rv)) return rv;
46
0
        p += n;
47
0
        bufLen -= n;
48
0
        *lenWritten += n;
49
0
    }
50
0
    return NS_OK;
51
0
}
52
53
class nsReceiver final : public nsIRunnable {
54
public:
55
    NS_DECL_THREADSAFE_ISUPPORTS
56
57
0
    NS_IMETHOD Run() override {
58
0
        nsresult rv;
59
0
        char buf[101];
60
0
        uint32_t count;
61
0
        PRIntervalTime start = PR_IntervalNow();
62
0
        while (true) {
63
0
            rv = mIn->Read(buf, 100, &count);
64
0
            if (NS_FAILED(rv)) {
65
0
                printf("read failed\n");
66
0
                break;
67
0
            }
68
0
            if (count == 0) {
69
0
//                printf("EOF count = %d\n", mCount);
70
0
                break;
71
0
            }
72
0
73
0
            if (gTrace) {
74
0
                buf[count] = '\0';
75
0
                printf("read: %s\n", buf);
76
0
            }
77
0
            mCount += count;
78
0
        }
79
0
        PRIntervalTime end = PR_IntervalNow();
80
0
        printf("read  %d bytes, time = %dms\n", mCount,
81
0
               PR_IntervalToMilliseconds(end - start));
82
0
        return rv;
83
0
    }
84
85
0
    explicit nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) {
86
0
    }
87
88
0
    uint32_t GetBytesRead() { return mCount; }
89
90
private:
91
0
    ~nsReceiver() {}
92
93
protected:
94
    nsCOMPtr<nsIInputStream> mIn;
95
    uint32_t            mCount;
96
};
97
98
NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable)
99
100
nsresult
101
TestPipe(nsIInputStream* in, nsIOutputStream* out)
102
0
{
103
0
    RefPtr<nsReceiver> receiver = new nsReceiver(in);
104
0
    if (!receiver)
105
0
        return NS_ERROR_OUT_OF_MEMORY;
106
0
107
0
    nsresult rv;
108
0
109
0
    nsCOMPtr<nsIThread> thread;
110
0
    rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
111
0
    if (NS_FAILED(rv)) return rv;
112
0
113
0
    uint32_t total = 0;
114
0
    PRIntervalTime start = PR_IntervalNow();
115
0
    for (uint32_t i = 0; i < ITERATIONS; i++) {
116
0
        uint32_t writeCount;
117
0
        SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
118
0
        uint32_t len = strlen(buf.get());
119
0
        rv = WriteAll(out, buf.get(), len, &writeCount);
120
0
        if (gTrace) {
121
0
            printf("wrote: ");
122
0
            for (uint32_t j = 0; j < writeCount; j++) {
123
0
              putc(buf.get()[j], stdout);
124
0
            }
125
0
            printf("\n");
126
0
        }
127
0
        if (NS_FAILED(rv)) return rv;
128
0
        total += writeCount;
129
0
    }
130
0
    rv = out->Close();
131
0
    if (NS_FAILED(rv)) return rv;
132
0
133
0
    PRIntervalTime end = PR_IntervalNow();
134
0
135
0
    thread->Shutdown();
136
0
137
0
    printf("wrote %d bytes, time = %dms\n", total,
138
0
           PR_IntervalToMilliseconds(end - start));
139
0
    EXPECT_EQ(receiver->GetBytesRead(), total);
140
0
141
0
    return NS_OK;
142
0
}
143
144
////////////////////////////////////////////////////////////////////////////////
145
146
class nsShortReader final : public nsIRunnable {
147
public:
148
    NS_DECL_THREADSAFE_ISUPPORTS
149
150
0
    NS_IMETHOD Run() override {
151
0
        nsresult rv;
152
0
        char buf[101];
153
0
        uint32_t count;
154
0
        uint32_t total = 0;
155
0
        while (true) {
156
0
            //if (gTrace)
157
0
            //    printf("calling Read\n");
158
0
            rv = mIn->Read(buf, 100, &count);
159
0
            if (NS_FAILED(rv)) {
160
0
                printf("read failed\n");
161
0
                break;
162
0
            }
163
0
            if (count == 0) {
164
0
                break;
165
0
            }
166
0
167
0
            if (gTrace) {
168
0
                // For next |printf()| call and possible others elsewhere.
169
0
                buf[count] = '\0';
170
0
171
0
                printf("read %d bytes: %s\n", count, buf);
172
0
            }
173
0
174
0
            Received(count);
175
0
            total += count;
176
0
        }
177
0
        printf("read  %d bytes\n", total);
178
0
        return rv;
179
0
    }
180
181
0
    explicit nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) {
182
0
        mMon = new ReentrantMonitor("nsShortReader");
183
0
    }
184
185
0
    void Received(uint32_t count) {
186
0
        ReentrantMonitorAutoEnter mon(*mMon);
187
0
        mReceived += count;
188
0
        mon.Notify();
189
0
    }
190
191
0
    uint32_t WaitForReceipt(const uint32_t aWriteCount) {
192
0
        ReentrantMonitorAutoEnter mon(*mMon);
193
0
        uint32_t result = mReceived;
194
0
195
0
        while (result < aWriteCount) {
196
0
            mon.Wait();
197
0
198
0
            EXPECT_TRUE(mReceived > result);
199
0
            result = mReceived;
200
0
        }
201
0
202
0
        mReceived = 0;
203
0
        return result;
204
0
    }
205
206
private:
207
0
    ~nsShortReader() {}
208
209
protected:
210
    nsCOMPtr<nsIInputStream> mIn;
211
    uint32_t                 mReceived;
212
    ReentrantMonitor*        mMon;
213
};
214
215
NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable)
216
217
nsresult
218
TestShortWrites(nsIInputStream* in, nsIOutputStream* out)
219
0
{
220
0
    RefPtr<nsShortReader> receiver = new nsShortReader(in);
221
0
    if (!receiver)
222
0
        return NS_ERROR_OUT_OF_MEMORY;
223
0
224
0
    nsresult rv;
225
0
226
0
    nsCOMPtr<nsIThread> thread;
227
0
    rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread),
228
0
                           receiver);
229
0
    if (NS_FAILED(rv)) return rv;
230
0
231
0
    uint32_t total = 0;
232
0
    for (uint32_t i = 0; i < ITERATIONS; i++) {
233
0
        uint32_t writeCount;
234
0
        SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
235
0
        uint32_t len = strlen(buf.get());
236
0
        len = len * rand() / RAND_MAX;
237
0
        len = std::min(1u, len);
238
0
        rv = WriteAll(out, buf.get(), len, &writeCount);
239
0
        if (NS_FAILED(rv)) return rv;
240
0
        EXPECT_EQ(writeCount, len);
241
0
        total += writeCount;
242
0
243
0
        if (gTrace)
244
0
          printf("wrote %d bytes: %s\n", writeCount, buf.get());
245
0
        //printf("calling Flush\n");
246
0
        out->Flush();
247
0
        //printf("calling WaitForReceipt\n");
248
0
249
#ifdef DEBUG
250
        const uint32_t received =
251
          receiver->WaitForReceipt(writeCount);
252
        EXPECT_EQ(received, writeCount);
253
#endif
254
    }
255
0
    rv = out->Close();
256
0
    if (NS_FAILED(rv)) return rv;
257
0
258
0
    thread->Shutdown();
259
0
260
0
    printf("wrote %d bytes\n", total);
261
0
262
0
    return NS_OK;
263
0
}
264
265
////////////////////////////////////////////////////////////////////////////////
266
267
class nsPump final : public nsIRunnable
268
{
269
public:
270
    NS_DECL_THREADSAFE_ISUPPORTS
271
272
0
    NS_IMETHOD Run() override {
273
0
        nsresult rv;
274
0
        uint32_t count;
275
0
        while (true) {
276
0
            rv = mOut->WriteFrom(mIn, ~0U, &count);
277
0
            if (NS_FAILED(rv)) {
278
0
                printf("Write failed\n");
279
0
                break;
280
0
            }
281
0
            if (count == 0) {
282
0
                printf("EOF count = %d\n", mCount);
283
0
                break;
284
0
            }
285
0
286
0
            if (gTrace) {
287
0
                printf("Wrote: %d\n", count);
288
0
            }
289
0
            mCount += count;
290
0
        }
291
0
        mOut->Close();
292
0
        return rv;
293
0
    }
294
295
    nsPump(nsIInputStream* in,
296
           nsIOutputStream* out)
297
0
        : mIn(in), mOut(out), mCount(0) {
298
0
    }
299
300
private:
301
0
    ~nsPump() {}
302
303
protected:
304
    nsCOMPtr<nsIInputStream>      mIn;
305
    nsCOMPtr<nsIOutputStream>     mOut;
306
    uint32_t                            mCount;
307
};
308
309
NS_IMPL_ISUPPORTS(nsPump, nsIRunnable)
310
311
TEST(Pipes, ChainedPipes)
312
0
{
313
0
    nsresult rv;
314
0
    if (gTrace) {
315
0
        printf("TestChainedPipes\n");
316
0
    }
317
0
318
0
    nsCOMPtr<nsIInputStream> in1;
319
0
    nsCOMPtr<nsIOutputStream> out1;
320
0
    rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
321
0
    if (NS_FAILED(rv)) return;
322
0
323
0
    nsCOMPtr<nsIInputStream> in2;
324
0
    nsCOMPtr<nsIOutputStream> out2;
325
0
    rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
326
0
    if (NS_FAILED(rv)) return;
327
0
328
0
    RefPtr<nsPump> pump = new nsPump(in1, out2);
329
0
    if (pump == nullptr) return;
330
0
331
0
    nsCOMPtr<nsIThread> thread;
332
0
    rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
333
0
    if (NS_FAILED(rv)) return;
334
0
335
0
    RefPtr<nsReceiver> receiver = new nsReceiver(in2);
336
0
    if (receiver == nullptr) return;
337
0
338
0
    nsCOMPtr<nsIThread> receiverThread;
339
0
    rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
340
0
                           receiver);
341
0
    if (NS_FAILED(rv)) return;
342
0
343
0
    uint32_t total = 0;
344
0
    for (uint32_t i = 0; i < ITERATIONS; i++) {
345
0
        uint32_t writeCount;
346
0
        SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
347
0
        uint32_t len = strlen(buf.get());
348
0
        len = len * rand() / RAND_MAX;
349
0
        len = std::max(1u, len);
350
0
        rv = WriteAll(out1, buf.get(), len, &writeCount);
351
0
        if (NS_FAILED(rv)) return;
352
0
        EXPECT_EQ(writeCount, len);
353
0
        total += writeCount;
354
0
355
0
        if (gTrace)
356
0
            printf("wrote %d bytes: %s\n", writeCount, buf.get());
357
0
    }
358
0
    if (gTrace) {
359
0
        printf("wrote total of %d bytes\n", total);
360
0
    }
361
0
    rv = out1->Close();
362
0
    if (NS_FAILED(rv)) return;
363
0
364
0
    thread->Shutdown();
365
0
    receiverThread->Shutdown();
366
0
}
367
368
////////////////////////////////////////////////////////////////////////////////
369
370
void
371
RunTests(uint32_t segSize, uint32_t segCount)
372
0
{
373
0
    nsresult rv;
374
0
    nsCOMPtr<nsIInputStream> in;
375
0
    nsCOMPtr<nsIOutputStream> out;
376
0
    uint32_t bufSize = segSize * segCount;
377
0
    if (gTrace) {
378
0
        printf("Testing New Pipes: segment size %d buffer size %d\n", segSize, bufSize);
379
0
        printf("Testing long writes...\n");
380
0
    }
381
0
    rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
382
0
    EXPECT_TRUE(NS_SUCCEEDED(rv));
383
0
    rv = TestPipe(in, out);
384
0
    EXPECT_TRUE(NS_SUCCEEDED(rv));
385
0
386
0
    if (gTrace) {
387
0
        printf("Testing short writes...\n");
388
0
    }
389
0
    rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
390
0
    EXPECT_TRUE(NS_SUCCEEDED(rv));
391
0
    rv = TestShortWrites(in, out);
392
0
    EXPECT_TRUE(NS_SUCCEEDED(rv));
393
0
}
394
395
TEST(Pipes, Main)
396
0
{
397
0
    RunTests(16, 1);
398
0
    RunTests(4096, 16);
399
0
}
400
401
////////////////////////////////////////////////////////////////////////////////
402
403
namespace {
404
405
static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
406
407
// An alternate pipe testing routing that uses NS_ConsumeStream() instead of
408
// manual read loop.
409
static void TestPipe2(uint32_t aNumBytes,
410
                      uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE)
411
0
{
412
0
  nsCOMPtr<nsIInputStream> reader;
413
0
  nsCOMPtr<nsIOutputStream> writer;
414
0
415
0
  uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
416
0
417
0
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
418
0
                           aSegmentSize, maxSize);
419
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
420
0
421
0
  nsTArray<char> inputData;
422
0
  testing::CreateData(aNumBytes, inputData);
423
0
  testing::WriteAllAndClose(writer, inputData);
424
0
  testing::ConsumeAndValidateStream(reader, inputData);
425
0
}
426
427
} // namespace
428
429
TEST(Pipes, Blocking_32k)
430
0
{
431
0
  TestPipe2(32 * 1024);
432
0
}
433
434
TEST(Pipes, Blocking_64k)
435
0
{
436
0
  TestPipe2(64 * 1024);
437
0
}
438
439
TEST(Pipes, Blocking_128k)
440
0
{
441
0
  TestPipe2(128 * 1024);
442
0
}
443
444
////////////////////////////////////////////////////////////////////////////////
445
446
namespace {
447
448
// Utility routine to validate pipe clone before.  There are many knobs.
449
//
450
// aTotalBytes              Total number of bytes to write to the pipe.
451
// aNumWrites               How many separate write calls should be made.  Bytes
452
//                          are evenly distributed over these write calls.
453
// aNumInitialClones        How many clones of the pipe input stream should be
454
//                          made before writing begins.
455
// aNumToCloseAfterWrite    How many streams should be closed after each write.
456
//                          One stream is always kept open.  This verifies that
457
//                          closing one stream does not effect other open
458
//                          streams.
459
// aNumToCloneAfterWrite    How many clones to create after each write.  Occurs
460
//                          after closing any streams.  This tests cloning
461
//                          active streams on a pipe that is being written to.
462
// aNumStreamToReadPerWrite How many streams to read fully after each write.
463
//                          This tests reading cloned streams at different rates
464
//                          while the pipe is being written to.
465
static void TestPipeClone(uint32_t aTotalBytes,
466
                          uint32_t aNumWrites,
467
                          uint32_t aNumInitialClones,
468
                          uint32_t aNumToCloseAfterWrite,
469
                          uint32_t aNumToCloneAfterWrite,
470
                          uint32_t aNumStreamsToReadPerWrite,
471
                          uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE)
472
0
{
473
0
  nsCOMPtr<nsIInputStream> reader;
474
0
  nsCOMPtr<nsIOutputStream> writer;
475
0
476
0
  uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
477
0
478
0
  // Use async input streams so we can NS_ConsumeStream() the current data
479
0
  // while the pipe is still being written to.
480
0
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
481
0
                           aSegmentSize, maxSize,
482
0
                           true, false); // non-blocking - reader, writer
483
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
484
0
485
0
  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
486
0
  ASSERT_TRUE(cloneable);
487
0
  ASSERT_TRUE(cloneable->GetCloneable());
488
0
489
0
  nsTArray<nsCString> outputDataList;
490
0
491
0
  nsTArray<nsCOMPtr<nsIInputStream>> streamList;
492
0
493
0
  // first stream is our original reader from the pipe
494
0
  streamList.AppendElement(reader);
495
0
  outputDataList.AppendElement();
496
0
497
0
  // Clone the initial input stream the specified number of times
498
0
  // before performing any writes.
499
0
  for (uint32_t i = 0; i < aNumInitialClones; ++i) {
500
0
    nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
501
0
    rv = cloneable->Clone(getter_AddRefs(*clone));
502
0
    ASSERT_TRUE(NS_SUCCEEDED(rv));
503
0
    ASSERT_TRUE(*clone);
504
0
505
0
    outputDataList.AppendElement();
506
0
  }
507
0
508
0
  nsTArray<char> inputData;
509
0
  testing::CreateData(aTotalBytes, inputData);
510
0
511
0
  const uint32_t bytesPerWrite = ((aTotalBytes - 1)/ aNumWrites) + 1;
512
0
  uint32_t offset = 0;
513
0
  uint32_t remaining = aTotalBytes;
514
0
  uint32_t nextStreamToRead = 0;
515
0
516
0
  while (remaining) {
517
0
    uint32_t numToWrite = std::min(bytesPerWrite, remaining);
518
0
    testing::Write(writer, inputData, offset, numToWrite);
519
0
    offset += numToWrite;
520
0
    remaining -= numToWrite;
521
0
522
0
    // Close the specified number of streams.  This allows us to
523
0
    // test that one closed clone does not break other open clones.
524
0
    for (uint32_t i = 0; i < aNumToCloseAfterWrite &&
525
0
                         streamList.Length() > 1; ++i) {
526
0
527
0
      uint32_t lastIndex = streamList.Length() - 1;
528
0
      streamList[lastIndex]->Close();
529
0
      streamList.RemoveElementAt(lastIndex);
530
0
      outputDataList.RemoveElementAt(lastIndex);
531
0
532
0
      if (nextStreamToRead >= streamList.Length()) {
533
0
        nextStreamToRead = 0;
534
0
      }
535
0
    }
536
0
537
0
    // Create the specified number of clones.  This lets us verify
538
0
    // that we can create clones in the middle of pipe reading and
539
0
    // writing.
540
0
    for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
541
0
      nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
542
0
      rv = cloneable->Clone(getter_AddRefs(*clone));
543
0
      ASSERT_TRUE(NS_SUCCEEDED(rv));
544
0
      ASSERT_TRUE(*clone);
545
0
546
0
      // Initialize the new output data to make whats been read to data for
547
0
      // the original stream.  First stream is always the original stream.
548
0
      nsCString* outputData = outputDataList.AppendElement();
549
0
      *outputData = outputDataList[0];
550
0
    }
551
0
552
0
    // Read the specified number of streams.  This lets us verify that we
553
0
    // can read from the clones at different rates while the pipe is being
554
0
    // written to.
555
0
    for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
556
0
      nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
557
0
      nsCString& outputData = outputDataList[nextStreamToRead];
558
0
559
0
      // Can't use ConsumeAndValidateStream() here because we're not
560
0
      // guaranteed the exact amount read.  It should just be at least
561
0
      // as many as numToWrite.
562
0
      nsAutoCString tmpOutputData;
563
0
      rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
564
0
      ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
565
0
      ASSERT_GE(tmpOutputData.Length(), numToWrite);
566
0
567
0
      outputData += tmpOutputData;
568
0
569
0
      nextStreamToRead += 1;
570
0
      if (nextStreamToRead >= streamList.Length()) {
571
0
        // Note: When we wrap around on the streams being read, its possible
572
0
        //       we will trigger a segment to be deleted from the pipe.  It
573
0
        //       would be nice to validate this here, but we don't have any
574
0
        //       QI'able interface that would let us check easily.
575
0
576
0
        nextStreamToRead = 0;
577
0
      }
578
0
    }
579
0
  }
580
0
581
0
  rv = writer->Close();
582
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
583
0
584
0
  nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
585
0
586
0
  // Finally, read the remaining bytes from each stream.  This may be
587
0
  // different amounts of data depending on how much reading we did while
588
0
  // writing.  Verify that the end result matches the input data.
589
0
  for (uint32_t i = 0; i < streamList.Length(); ++i) {
590
0
    nsCOMPtr<nsIInputStream>& stream = streamList[i];
591
0
    nsCString& outputData = outputDataList[i];
592
0
593
0
    nsAutoCString tmpOutputData;
594
0
    rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
595
0
    ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
596
0
    stream->Close();
597
0
598
0
    // Append to total amount read from the stream
599
0
    outputData += tmpOutputData;
600
0
601
0
    ASSERT_EQ(inputString.Length(), outputData.Length());
602
0
    ASSERT_TRUE(inputString.Equals(outputData));
603
0
  }
604
0
}
605
606
} // namespace
607
608
TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
609
0
{
610
0
  TestPipeClone(32 * 1024, // total bytes
611
0
                16,        // num writes
612
0
                3,         // num initial clones
613
0
                0,         // num streams to close after each write
614
0
                0,         // num clones to add after each write
615
0
                0);        // num streams to read after each write
616
0
}
617
618
TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
619
0
{
620
0
  // Since this reads all streams on every write, it should trigger the
621
0
  // pipe cursor roll back optimization.  Currently we can only verify
622
0
  // this with logging.
623
0
624
0
  TestPipeClone(32 * 1024, // total bytes
625
0
                16,        // num writes
626
0
                3,         // num initial clones
627
0
                0,         // num streams to close after each write
628
0
                0,         // num clones to add after each write
629
0
                4);        // num streams to read after each write
630
0
}
631
632
TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
633
0
{
634
0
  TestPipeClone(32 * 1024, // total bytes
635
0
                16,        // num writes
636
0
                0,         // num initial clones
637
0
                0,         // num streams to close after each write
638
0
                1,         // num clones to add after each write
639
0
                0);        // num streams to read after each write
640
0
}
641
642
TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
643
0
{
644
0
  TestPipeClone(32 * 1024, // total bytes
645
0
                16,        // num writes
646
0
                0,         // num initial clones
647
0
                0,         // num streams to close after each write
648
0
                1,         // num clones to add after each write
649
0
                1);        // num streams to read after each write
650
0
}
651
652
TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
653
0
{
654
0
  // Since this reads streams faster than we clone new ones, it should
655
0
  // trigger pipe segment deletion periodically.  Currently we can
656
0
  // only verify this with logging.
657
0
658
0
  TestPipeClone(32 * 1024, // total bytes
659
0
                16,        // num writes
660
0
                1,         // num initial clones
661
0
                1,         // num streams to close after each write
662
0
                2,         // num clones to add after each write
663
0
                3);        // num streams to read after each write
664
0
}
665
666
TEST(Pipes, Write_AsyncWait)
667
0
{
668
0
  nsCOMPtr<nsIAsyncInputStream> reader;
669
0
  nsCOMPtr<nsIAsyncOutputStream> writer;
670
0
671
0
  const uint32_t segmentSize = 1024;
672
0
  const uint32_t numSegments = 1;
673
0
674
0
  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
675
0
                            true, true,  // non-blocking - reader, writer
676
0
                            segmentSize, numSegments);
677
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
678
0
679
0
  nsTArray<char> inputData;
680
0
  testing::CreateData(segmentSize, inputData);
681
0
682
0
  uint32_t numWritten = 0;
683
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
684
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
685
0
686
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
687
0
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
688
0
689
0
  RefPtr<testing::OutputStreamCallback> cb =
690
0
    new testing::OutputStreamCallback();
691
0
692
0
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
693
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
694
0
695
0
  ASSERT_FALSE(cb->Called());
696
0
697
0
  testing::ConsumeAndValidateStream(reader, inputData);
698
0
699
0
  ASSERT_TRUE(cb->Called());
700
0
}
701
702
TEST(Pipes, Write_AsyncWait_Clone)
703
0
{
704
0
  nsCOMPtr<nsIAsyncInputStream> reader;
705
0
  nsCOMPtr<nsIAsyncOutputStream> writer;
706
0
707
0
  const uint32_t segmentSize = 1024;
708
0
  const uint32_t numSegments = 1;
709
0
710
0
  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
711
0
                            true, true,  // non-blocking - reader, writer
712
0
                            segmentSize, numSegments);
713
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
714
0
715
0
  nsCOMPtr<nsIInputStream> clone;
716
0
  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
717
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
718
0
719
0
  nsTArray<char> inputData;
720
0
  testing::CreateData(segmentSize, inputData);
721
0
722
0
  uint32_t numWritten = 0;
723
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
724
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
725
0
726
0
  // This attempts to write data beyond the original pipe size limit.  It
727
0
  // should fail since neither side of the clone has been read yet.
728
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
729
0
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
730
0
731
0
  RefPtr<testing::OutputStreamCallback> cb =
732
0
    new testing::OutputStreamCallback();
733
0
734
0
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
735
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
736
0
737
0
  ASSERT_FALSE(cb->Called());
738
0
739
0
  // Consume data on the original stream, but the clone still has not been read.
740
0
  testing::ConsumeAndValidateStream(reader, inputData);
741
0
742
0
  // A clone that is not being read should not stall the other input stream
743
0
  // reader.  Therefore the writer callback should trigger when the fastest
744
0
  // reader drains the other input stream.
745
0
  ASSERT_TRUE(cb->Called());
746
0
747
0
  // Attempt to write data.  This will buffer data beyond the pipe size limit in
748
0
  // order for the clone stream to still work.  This is allowed because the
749
0
  // other input stream has drained its buffered segments and is ready for more
750
0
  // data.
751
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
752
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
753
0
754
0
  // Again, this should fail since the origin stream has not been read again.
755
0
  // The pipe size should still restrict how far ahead we can buffer even
756
0
  // when there is a cloned stream not being read.
757
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
758
0
  ASSERT_TRUE(NS_FAILED(rv));
759
0
760
0
  cb = new testing::OutputStreamCallback();
761
0
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
762
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
763
0
764
0
  // The write should again be blocked since we have written data and the
765
0
  // main reader is at its maximum advance buffer.
766
0
  ASSERT_FALSE(cb->Called());
767
0
768
0
  nsTArray<char> expectedCloneData;
769
0
  expectedCloneData.AppendElements(inputData);
770
0
  expectedCloneData.AppendElements(inputData);
771
0
772
0
  // We should now be able to consume the entire backlog of buffered data on
773
0
  // the cloned stream.
774
0
  testing::ConsumeAndValidateStream(clone, expectedCloneData);
775
0
776
0
  // Draining the clone side should also trigger the AsyncWait() writer
777
0
  // callback
778
0
  ASSERT_TRUE(cb->Called());
779
0
780
0
  // Finally, we should be able to consume the remaining data on the original
781
0
  // reader.
782
0
  testing::ConsumeAndValidateStream(reader, inputData);
783
0
}
784
785
TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
786
0
{
787
0
  nsCOMPtr<nsIAsyncInputStream> reader;
788
0
  nsCOMPtr<nsIAsyncOutputStream> writer;
789
0
790
0
  const uint32_t segmentSize = 1024;
791
0
  const uint32_t numSegments = 1;
792
0
793
0
  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
794
0
                            true, true,  // non-blocking - reader, writer
795
0
                            segmentSize, numSegments);
796
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
797
0
798
0
  nsCOMPtr<nsIInputStream> clone;
799
0
  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
800
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
801
0
802
0
  nsTArray<char> inputData;
803
0
  testing::CreateData(segmentSize, inputData);
804
0
805
0
  uint32_t numWritten = 0;
806
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
807
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
808
0
809
0
  // This attempts to write data beyond the original pipe size limit.  It
810
0
  // should fail since neither side of the clone has been read yet.
811
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
812
0
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
813
0
814
0
  RefPtr<testing::OutputStreamCallback> cb =
815
0
    new testing::OutputStreamCallback();
816
0
817
0
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
818
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
819
0
820
0
  ASSERT_FALSE(cb->Called());
821
0
822
0
  // Consume data on the original stream, but the clone still has not been read.
823
0
  testing::ConsumeAndValidateStream(reader, inputData);
824
0
825
0
  // A clone that is not being read should not stall the other input stream
826
0
  // reader.  Therefore the writer callback should trigger when the fastest
827
0
  // reader drains the other input stream.
828
0
  ASSERT_TRUE(cb->Called());
829
0
830
0
  // Attempt to write data.  This will buffer data beyond the pipe size limit in
831
0
  // order for the clone stream to still work.  This is allowed because the
832
0
  // other input stream has drained its buffered segments and is ready for more
833
0
  // data.
834
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
835
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
836
0
837
0
  // Again, this should fail since the origin stream has not been read again.
838
0
  // The pipe size should still restrict how far ahead we can buffer even
839
0
  // when there is a cloned stream not being read.
840
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
841
0
  ASSERT_TRUE(NS_FAILED(rv));
842
0
843
0
  cb = new testing::OutputStreamCallback();
844
0
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
845
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
846
0
847
0
  // The write should again be blocked since we have written data and the
848
0
  // main reader is at its maximum advance buffer.
849
0
  ASSERT_FALSE(cb->Called());
850
0
851
0
  // Close the original reader input stream.  This was the fastest reader,
852
0
  // so we should have a single stream that is buffered beyond our nominal
853
0
  // limit.
854
0
  reader->Close();
855
0
856
0
  // Because the clone stream is still buffered the writable callback should
857
0
  // not be fired.
858
0
  ASSERT_FALSE(cb->Called());
859
0
860
0
  // And we should not be able to perform a write.
861
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
862
0
  ASSERT_TRUE(NS_FAILED(rv));
863
0
864
0
  // Create another clone stream.  Now we have two streams that exceed our
865
0
  // maximum size limit
866
0
  nsCOMPtr<nsIInputStream> clone2;
867
0
  rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
868
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
869
0
870
0
  nsTArray<char> expectedCloneData;
871
0
  expectedCloneData.AppendElements(inputData);
872
0
  expectedCloneData.AppendElements(inputData);
873
0
874
0
  // We should now be able to consume the entire backlog of buffered data on
875
0
  // the cloned stream.
876
0
  testing::ConsumeAndValidateStream(clone, expectedCloneData);
877
0
878
0
  // The pipe should now be writable because we have two open streams, one of which
879
0
  // is completely drained.
880
0
  ASSERT_TRUE(cb->Called());
881
0
882
0
  // Write again to reach our limit again.
883
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
884
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
885
0
886
0
  // The stream is again non-writeable.
887
0
  cb = new testing::OutputStreamCallback();
888
0
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
889
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
890
0
  ASSERT_FALSE(cb->Called());
891
0
892
0
  // Close the empty stream.  This is different from our previous close since
893
0
  // before we were closing a stream with some data still buffered.
894
0
  clone->Close();
895
0
896
0
  // The pipe should not be writable.  The second clone is still fully buffered
897
0
  // over our limit.
898
0
  ASSERT_FALSE(cb->Called());
899
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
900
0
  ASSERT_TRUE(NS_FAILED(rv));
901
0
902
0
  // Finally consume all of the buffered data on the second clone.
903
0
  expectedCloneData.AppendElements(inputData);
904
0
  testing::ConsumeAndValidateStream(clone2, expectedCloneData);
905
0
906
0
  // Draining the final clone should make the pipe writable again.
907
0
  ASSERT_TRUE(cb->Called());
908
0
}
909
910
TEST(Pipes, Read_AsyncWait)
911
0
{
912
0
  nsCOMPtr<nsIAsyncInputStream> reader;
913
0
  nsCOMPtr<nsIAsyncOutputStream> writer;
914
0
915
0
  const uint32_t segmentSize = 1024;
916
0
  const uint32_t numSegments = 1;
917
0
918
0
  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
919
0
                            true, true,  // non-blocking - reader, writer
920
0
                            segmentSize, numSegments);
921
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
922
0
923
0
  nsTArray<char> inputData;
924
0
  testing::CreateData(segmentSize, inputData);
925
0
926
0
  RefPtr<testing::InputStreamCallback> cb =
927
0
    new testing::InputStreamCallback();
928
0
929
0
  rv = reader->AsyncWait(cb, 0, 0, nullptr);
930
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
931
0
932
0
  ASSERT_FALSE(cb->Called());
933
0
934
0
  uint32_t numWritten = 0;
935
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
936
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
937
0
938
0
  ASSERT_TRUE(cb->Called());
939
0
940
0
  testing::ConsumeAndValidateStream(reader, inputData);
941
0
}
942
943
TEST(Pipes, Read_AsyncWait_Clone)
944
0
{
945
0
  nsCOMPtr<nsIAsyncInputStream> reader;
946
0
  nsCOMPtr<nsIAsyncOutputStream> writer;
947
0
948
0
  const uint32_t segmentSize = 1024;
949
0
  const uint32_t numSegments = 1;
950
0
951
0
  nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
952
0
                            true, true,  // non-blocking - reader, writer
953
0
                            segmentSize, numSegments);
954
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
955
0
956
0
  nsCOMPtr<nsIInputStream> clone;
957
0
  rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
958
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
959
0
960
0
  nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
961
0
  ASSERT_TRUE(asyncClone);
962
0
963
0
  nsTArray<char> inputData;
964
0
  testing::CreateData(segmentSize, inputData);
965
0
966
0
  RefPtr<testing::InputStreamCallback> cb =
967
0
    new testing::InputStreamCallback();
968
0
969
0
  RefPtr<testing::InputStreamCallback> cb2 =
970
0
    new testing::InputStreamCallback();
971
0
972
0
  rv = reader->AsyncWait(cb, 0, 0, nullptr);
973
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
974
0
975
0
  ASSERT_FALSE(cb->Called());
976
0
977
0
  rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
978
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
979
0
980
0
  ASSERT_FALSE(cb2->Called());
981
0
982
0
  uint32_t numWritten = 0;
983
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
984
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
985
0
986
0
  ASSERT_TRUE(cb->Called());
987
0
  ASSERT_TRUE(cb2->Called());
988
0
989
0
  testing::ConsumeAndValidateStream(reader, inputData);
990
0
}
991
992
namespace {
993
994
nsresult
995
CloseDuringReadFunc(nsIInputStream *aReader,
996
                    void* aClosure,
997
                    const char* aFromSegment,
998
                    uint32_t aToOffset,
999
                    uint32_t aCount,
1000
                    uint32_t* aWriteCountOut)
1001
0
{
1002
0
  MOZ_RELEASE_ASSERT(aReader);
1003
0
  MOZ_RELEASE_ASSERT(aClosure);
1004
0
  MOZ_RELEASE_ASSERT(aFromSegment);
1005
0
  MOZ_RELEASE_ASSERT(aWriteCountOut);
1006
0
  MOZ_RELEASE_ASSERT(aToOffset == 0);
1007
0
1008
0
  // This is insanity and you probably should not do this under normal
1009
0
  // conditions.  We want to simulate the case where the pipe is closed
1010
0
  // (possibly from other end on another thread) simultaneously with the
1011
0
  // read.  This is the easiest way to do trigger this case in a synchronous
1012
0
  // gtest.
1013
0
  MOZ_ALWAYS_SUCCEEDS(aReader->Close());
1014
0
1015
0
  nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
1016
0
  buffer->AppendElements(aFromSegment, aCount);
1017
0
1018
0
  *aWriteCountOut = aCount;
1019
0
1020
0
  return NS_OK;
1021
0
}
1022
1023
void
1024
TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize)
1025
0
{
1026
0
  nsCOMPtr<nsIInputStream> reader;
1027
0
  nsCOMPtr<nsIOutputStream> writer;
1028
0
1029
0
  const uint32_t maxSize = aSegmentSize;
1030
0
1031
0
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
1032
0
                           aSegmentSize, maxSize);
1033
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
1034
0
1035
0
  nsTArray<char> inputData;
1036
0
1037
0
  testing::CreateData(aDataSize, inputData);
1038
0
1039
0
  uint32_t numWritten = 0;
1040
0
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
1041
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
1042
0
1043
0
  nsTArray<char> outputData;
1044
0
1045
0
  uint32_t numRead = 0;
1046
0
  rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
1047
0
                            inputData.Length(), &numRead);
1048
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
1049
0
  ASSERT_EQ(inputData.Length(), numRead);
1050
0
1051
0
  ASSERT_EQ(inputData, outputData);
1052
0
1053
0
  uint64_t available;
1054
0
  rv = reader->Available(&available);
1055
0
  ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
1056
0
}
1057
1058
} // namespace
1059
1060
TEST(Pipes, Close_During_Read_Partial_Segment)
1061
0
{
1062
0
  TestCloseDuringRead(1024, 512);
1063
0
}
1064
1065
TEST(Pipes, Close_During_Read_Full_Segment)
1066
0
{
1067
0
  TestCloseDuringRead(1024, 1024);
1068
0
}
1069
1070
TEST(Pipes, Interfaces)
1071
0
{
1072
0
  nsCOMPtr<nsIInputStream> reader;
1073
0
  nsCOMPtr<nsIOutputStream> writer;
1074
0
1075
0
  nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
1076
0
  ASSERT_TRUE(NS_SUCCEEDED(rv));
1077
0
1078
0
  nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
1079
0
  ASSERT_TRUE(readerType1);
1080
0
1081
0
  nsCOMPtr<nsISeekableStream> readerType2 = do_QueryInterface(reader);
1082
0
  ASSERT_TRUE(readerType2);
1083
0
1084
0
  nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
1085
0
  ASSERT_TRUE(readerType3);
1086
0
1087
0
  nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
1088
0
  ASSERT_TRUE(readerType4);
1089
0
1090
0
  nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
1091
0
  ASSERT_TRUE(readerType5);
1092
0
1093
0
  nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
1094
0
  ASSERT_TRUE(readerType6);
1095
0
}