/src/mozilla-central/xpcom/tests/gtest/TestPipes.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
2 | | /* vim: set ts=8 sts=2 et sw=2 tw=80: */ |
3 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
4 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
5 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
6 | | |
7 | | #include <algorithm> |
8 | | #include "gtest/gtest.h" |
9 | | #include "Helpers.h" |
10 | | #include "mozilla/ReentrantMonitor.h" |
11 | | #include "mozilla/Printf.h" |
12 | | #include "nsCOMPtr.h" |
13 | | #include "nsCRT.h" |
14 | | #include "nsIAsyncInputStream.h" |
15 | | #include "nsIAsyncOutputStream.h" |
16 | | #include "nsIBufferedStreams.h" |
17 | | #include "nsIClassInfo.h" |
18 | | #include "nsICloneableInputStream.h" |
19 | | #include "nsIInputStream.h" |
20 | | #include "nsIOutputStream.h" |
21 | | #include "nsIPipe.h" |
22 | | #include "nsISeekableStream.h" |
23 | | #include "nsIThread.h" |
24 | | #include "nsIRunnable.h" |
25 | | #include "nsStreamUtils.h" |
26 | | #include "nsString.h" |
27 | | #include "nsThreadUtils.h" |
28 | | #include "prinrval.h" |
29 | | |
30 | | using namespace mozilla; |
31 | | |
32 | 0 | #define ITERATIONS 33333 |
33 | | char kTestPattern[] = "My hovercraft is full of eels.\n"; |
34 | | |
35 | | bool gTrace = false; |
36 | | |
37 | | static nsresult |
38 | | WriteAll(nsIOutputStream *os, const char *buf, uint32_t bufLen, uint32_t *lenWritten) |
39 | 0 | { |
40 | 0 | const char *p = buf; |
41 | 0 | *lenWritten = 0; |
42 | 0 | while (bufLen) { |
43 | 0 | uint32_t n; |
44 | 0 | nsresult rv = os->Write(p, bufLen, &n); |
45 | 0 | if (NS_FAILED(rv)) return rv; |
46 | 0 | p += n; |
47 | 0 | bufLen -= n; |
48 | 0 | *lenWritten += n; |
49 | 0 | } |
50 | 0 | return NS_OK; |
51 | 0 | } |
52 | | |
53 | | class nsReceiver final : public nsIRunnable { |
54 | | public: |
55 | | NS_DECL_THREADSAFE_ISUPPORTS |
56 | | |
57 | 0 | NS_IMETHOD Run() override { |
58 | 0 | nsresult rv; |
59 | 0 | char buf[101]; |
60 | 0 | uint32_t count; |
61 | 0 | PRIntervalTime start = PR_IntervalNow(); |
62 | 0 | while (true) { |
63 | 0 | rv = mIn->Read(buf, 100, &count); |
64 | 0 | if (NS_FAILED(rv)) { |
65 | 0 | printf("read failed\n"); |
66 | 0 | break; |
67 | 0 | } |
68 | 0 | if (count == 0) { |
69 | 0 | // printf("EOF count = %d\n", mCount); |
70 | 0 | break; |
71 | 0 | } |
72 | 0 | |
73 | 0 | if (gTrace) { |
74 | 0 | buf[count] = '\0'; |
75 | 0 | printf("read: %s\n", buf); |
76 | 0 | } |
77 | 0 | mCount += count; |
78 | 0 | } |
79 | 0 | PRIntervalTime end = PR_IntervalNow(); |
80 | 0 | printf("read %d bytes, time = %dms\n", mCount, |
81 | 0 | PR_IntervalToMilliseconds(end - start)); |
82 | 0 | return rv; |
83 | 0 | } |
84 | | |
85 | 0 | explicit nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) { |
86 | 0 | } |
87 | | |
88 | 0 | uint32_t GetBytesRead() { return mCount; } |
89 | | |
90 | | private: |
91 | 0 | ~nsReceiver() {} |
92 | | |
93 | | protected: |
94 | | nsCOMPtr<nsIInputStream> mIn; |
95 | | uint32_t mCount; |
96 | | }; |
97 | | |
98 | | NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable) |
99 | | |
100 | | nsresult |
101 | | TestPipe(nsIInputStream* in, nsIOutputStream* out) |
102 | 0 | { |
103 | 0 | RefPtr<nsReceiver> receiver = new nsReceiver(in); |
104 | 0 | if (!receiver) |
105 | 0 | return NS_ERROR_OUT_OF_MEMORY; |
106 | 0 | |
107 | 0 | nsresult rv; |
108 | 0 |
|
109 | 0 | nsCOMPtr<nsIThread> thread; |
110 | 0 | rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver); |
111 | 0 | if (NS_FAILED(rv)) return rv; |
112 | 0 | |
113 | 0 | uint32_t total = 0; |
114 | 0 | PRIntervalTime start = PR_IntervalNow(); |
115 | 0 | for (uint32_t i = 0; i < ITERATIONS; i++) { |
116 | 0 | uint32_t writeCount; |
117 | 0 | SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern); |
118 | 0 | uint32_t len = strlen(buf.get()); |
119 | 0 | rv = WriteAll(out, buf.get(), len, &writeCount); |
120 | 0 | if (gTrace) { |
121 | 0 | printf("wrote: "); |
122 | 0 | for (uint32_t j = 0; j < writeCount; j++) { |
123 | 0 | putc(buf.get()[j], stdout); |
124 | 0 | } |
125 | 0 | printf("\n"); |
126 | 0 | } |
127 | 0 | if (NS_FAILED(rv)) return rv; |
128 | 0 | total += writeCount; |
129 | 0 | } |
130 | 0 | rv = out->Close(); |
131 | 0 | if (NS_FAILED(rv)) return rv; |
132 | 0 | |
133 | 0 | PRIntervalTime end = PR_IntervalNow(); |
134 | 0 |
|
135 | 0 | thread->Shutdown(); |
136 | 0 |
|
137 | 0 | printf("wrote %d bytes, time = %dms\n", total, |
138 | 0 | PR_IntervalToMilliseconds(end - start)); |
139 | 0 | EXPECT_EQ(receiver->GetBytesRead(), total); |
140 | 0 |
|
141 | 0 | return NS_OK; |
142 | 0 | } |
143 | | |
144 | | //////////////////////////////////////////////////////////////////////////////// |
145 | | |
146 | | class nsShortReader final : public nsIRunnable { |
147 | | public: |
148 | | NS_DECL_THREADSAFE_ISUPPORTS |
149 | | |
150 | 0 | NS_IMETHOD Run() override { |
151 | 0 | nsresult rv; |
152 | 0 | char buf[101]; |
153 | 0 | uint32_t count; |
154 | 0 | uint32_t total = 0; |
155 | 0 | while (true) { |
156 | 0 | //if (gTrace) |
157 | 0 | // printf("calling Read\n"); |
158 | 0 | rv = mIn->Read(buf, 100, &count); |
159 | 0 | if (NS_FAILED(rv)) { |
160 | 0 | printf("read failed\n"); |
161 | 0 | break; |
162 | 0 | } |
163 | 0 | if (count == 0) { |
164 | 0 | break; |
165 | 0 | } |
166 | 0 | |
167 | 0 | if (gTrace) { |
168 | 0 | // For next |printf()| call and possible others elsewhere. |
169 | 0 | buf[count] = '\0'; |
170 | 0 |
|
171 | 0 | printf("read %d bytes: %s\n", count, buf); |
172 | 0 | } |
173 | 0 |
|
174 | 0 | Received(count); |
175 | 0 | total += count; |
176 | 0 | } |
177 | 0 | printf("read %d bytes\n", total); |
178 | 0 | return rv; |
179 | 0 | } |
180 | | |
181 | 0 | explicit nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) { |
182 | 0 | mMon = new ReentrantMonitor("nsShortReader"); |
183 | 0 | } |
184 | | |
185 | 0 | void Received(uint32_t count) { |
186 | 0 | ReentrantMonitorAutoEnter mon(*mMon); |
187 | 0 | mReceived += count; |
188 | 0 | mon.Notify(); |
189 | 0 | } |
190 | | |
191 | 0 | uint32_t WaitForReceipt(const uint32_t aWriteCount) { |
192 | 0 | ReentrantMonitorAutoEnter mon(*mMon); |
193 | 0 | uint32_t result = mReceived; |
194 | 0 |
|
195 | 0 | while (result < aWriteCount) { |
196 | 0 | mon.Wait(); |
197 | 0 |
|
198 | 0 | EXPECT_TRUE(mReceived > result); |
199 | 0 | result = mReceived; |
200 | 0 | } |
201 | 0 |
|
202 | 0 | mReceived = 0; |
203 | 0 | return result; |
204 | 0 | } |
205 | | |
206 | | private: |
207 | 0 | ~nsShortReader() {} |
208 | | |
209 | | protected: |
210 | | nsCOMPtr<nsIInputStream> mIn; |
211 | | uint32_t mReceived; |
212 | | ReentrantMonitor* mMon; |
213 | | }; |
214 | | |
215 | | NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable) |
216 | | |
217 | | nsresult |
218 | | TestShortWrites(nsIInputStream* in, nsIOutputStream* out) |
219 | 0 | { |
220 | 0 | RefPtr<nsShortReader> receiver = new nsShortReader(in); |
221 | 0 | if (!receiver) |
222 | 0 | return NS_ERROR_OUT_OF_MEMORY; |
223 | 0 | |
224 | 0 | nsresult rv; |
225 | 0 |
|
226 | 0 | nsCOMPtr<nsIThread> thread; |
227 | 0 | rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), |
228 | 0 | receiver); |
229 | 0 | if (NS_FAILED(rv)) return rv; |
230 | 0 | |
231 | 0 | uint32_t total = 0; |
232 | 0 | for (uint32_t i = 0; i < ITERATIONS; i++) { |
233 | 0 | uint32_t writeCount; |
234 | 0 | SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern); |
235 | 0 | uint32_t len = strlen(buf.get()); |
236 | 0 | len = len * rand() / RAND_MAX; |
237 | 0 | len = std::min(1u, len); |
238 | 0 | rv = WriteAll(out, buf.get(), len, &writeCount); |
239 | 0 | if (NS_FAILED(rv)) return rv; |
240 | 0 | EXPECT_EQ(writeCount, len); |
241 | 0 | total += writeCount; |
242 | 0 |
|
243 | 0 | if (gTrace) |
244 | 0 | printf("wrote %d bytes: %s\n", writeCount, buf.get()); |
245 | 0 | //printf("calling Flush\n"); |
246 | 0 | out->Flush(); |
247 | 0 | //printf("calling WaitForReceipt\n"); |
248 | 0 |
|
249 | | #ifdef DEBUG |
250 | | const uint32_t received = |
251 | | receiver->WaitForReceipt(writeCount); |
252 | | EXPECT_EQ(received, writeCount); |
253 | | #endif |
254 | | } |
255 | 0 | rv = out->Close(); |
256 | 0 | if (NS_FAILED(rv)) return rv; |
257 | 0 | |
258 | 0 | thread->Shutdown(); |
259 | 0 |
|
260 | 0 | printf("wrote %d bytes\n", total); |
261 | 0 |
|
262 | 0 | return NS_OK; |
263 | 0 | } |
264 | | |
265 | | //////////////////////////////////////////////////////////////////////////////// |
266 | | |
267 | | class nsPump final : public nsIRunnable |
268 | | { |
269 | | public: |
270 | | NS_DECL_THREADSAFE_ISUPPORTS |
271 | | |
272 | 0 | NS_IMETHOD Run() override { |
273 | 0 | nsresult rv; |
274 | 0 | uint32_t count; |
275 | 0 | while (true) { |
276 | 0 | rv = mOut->WriteFrom(mIn, ~0U, &count); |
277 | 0 | if (NS_FAILED(rv)) { |
278 | 0 | printf("Write failed\n"); |
279 | 0 | break; |
280 | 0 | } |
281 | 0 | if (count == 0) { |
282 | 0 | printf("EOF count = %d\n", mCount); |
283 | 0 | break; |
284 | 0 | } |
285 | 0 | |
286 | 0 | if (gTrace) { |
287 | 0 | printf("Wrote: %d\n", count); |
288 | 0 | } |
289 | 0 | mCount += count; |
290 | 0 | } |
291 | 0 | mOut->Close(); |
292 | 0 | return rv; |
293 | 0 | } |
294 | | |
295 | | nsPump(nsIInputStream* in, |
296 | | nsIOutputStream* out) |
297 | 0 | : mIn(in), mOut(out), mCount(0) { |
298 | 0 | } |
299 | | |
300 | | private: |
301 | 0 | ~nsPump() {} |
302 | | |
303 | | protected: |
304 | | nsCOMPtr<nsIInputStream> mIn; |
305 | | nsCOMPtr<nsIOutputStream> mOut; |
306 | | uint32_t mCount; |
307 | | }; |
308 | | |
309 | | NS_IMPL_ISUPPORTS(nsPump, nsIRunnable) |
310 | | |
311 | | TEST(Pipes, ChainedPipes) |
312 | 0 | { |
313 | 0 | nsresult rv; |
314 | 0 | if (gTrace) { |
315 | 0 | printf("TestChainedPipes\n"); |
316 | 0 | } |
317 | 0 |
|
318 | 0 | nsCOMPtr<nsIInputStream> in1; |
319 | 0 | nsCOMPtr<nsIOutputStream> out1; |
320 | 0 | rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999); |
321 | 0 | if (NS_FAILED(rv)) return; |
322 | 0 | |
323 | 0 | nsCOMPtr<nsIInputStream> in2; |
324 | 0 | nsCOMPtr<nsIOutputStream> out2; |
325 | 0 | rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401); |
326 | 0 | if (NS_FAILED(rv)) return; |
327 | 0 | |
328 | 0 | RefPtr<nsPump> pump = new nsPump(in1, out2); |
329 | 0 | if (pump == nullptr) return; |
330 | 0 | |
331 | 0 | nsCOMPtr<nsIThread> thread; |
332 | 0 | rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump); |
333 | 0 | if (NS_FAILED(rv)) return; |
334 | 0 | |
335 | 0 | RefPtr<nsReceiver> receiver = new nsReceiver(in2); |
336 | 0 | if (receiver == nullptr) return; |
337 | 0 | |
338 | 0 | nsCOMPtr<nsIThread> receiverThread; |
339 | 0 | rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread), |
340 | 0 | receiver); |
341 | 0 | if (NS_FAILED(rv)) return; |
342 | 0 | |
343 | 0 | uint32_t total = 0; |
344 | 0 | for (uint32_t i = 0; i < ITERATIONS; i++) { |
345 | 0 | uint32_t writeCount; |
346 | 0 | SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern); |
347 | 0 | uint32_t len = strlen(buf.get()); |
348 | 0 | len = len * rand() / RAND_MAX; |
349 | 0 | len = std::max(1u, len); |
350 | 0 | rv = WriteAll(out1, buf.get(), len, &writeCount); |
351 | 0 | if (NS_FAILED(rv)) return; |
352 | 0 | EXPECT_EQ(writeCount, len); |
353 | 0 | total += writeCount; |
354 | 0 |
|
355 | 0 | if (gTrace) |
356 | 0 | printf("wrote %d bytes: %s\n", writeCount, buf.get()); |
357 | 0 | } |
358 | 0 | if (gTrace) { |
359 | 0 | printf("wrote total of %d bytes\n", total); |
360 | 0 | } |
361 | 0 | rv = out1->Close(); |
362 | 0 | if (NS_FAILED(rv)) return; |
363 | 0 | |
364 | 0 | thread->Shutdown(); |
365 | 0 | receiverThread->Shutdown(); |
366 | 0 | } |
367 | | |
368 | | //////////////////////////////////////////////////////////////////////////////// |
369 | | |
370 | | void |
371 | | RunTests(uint32_t segSize, uint32_t segCount) |
372 | 0 | { |
373 | 0 | nsresult rv; |
374 | 0 | nsCOMPtr<nsIInputStream> in; |
375 | 0 | nsCOMPtr<nsIOutputStream> out; |
376 | 0 | uint32_t bufSize = segSize * segCount; |
377 | 0 | if (gTrace) { |
378 | 0 | printf("Testing New Pipes: segment size %d buffer size %d\n", segSize, bufSize); |
379 | 0 | printf("Testing long writes...\n"); |
380 | 0 | } |
381 | 0 | rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); |
382 | 0 | EXPECT_TRUE(NS_SUCCEEDED(rv)); |
383 | 0 | rv = TestPipe(in, out); |
384 | 0 | EXPECT_TRUE(NS_SUCCEEDED(rv)); |
385 | 0 |
|
386 | 0 | if (gTrace) { |
387 | 0 | printf("Testing short writes...\n"); |
388 | 0 | } |
389 | 0 | rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); |
390 | 0 | EXPECT_TRUE(NS_SUCCEEDED(rv)); |
391 | 0 | rv = TestShortWrites(in, out); |
392 | 0 | EXPECT_TRUE(NS_SUCCEEDED(rv)); |
393 | 0 | } |
394 | | |
395 | | TEST(Pipes, Main) |
396 | 0 | { |
397 | 0 | RunTests(16, 1); |
398 | 0 | RunTests(4096, 16); |
399 | 0 | } |
400 | | |
401 | | //////////////////////////////////////////////////////////////////////////////// |
402 | | |
403 | | namespace { |
404 | | |
405 | | static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024; |
406 | | |
407 | | // An alternate pipe testing routing that uses NS_ConsumeStream() instead of |
408 | | // manual read loop. |
409 | | static void TestPipe2(uint32_t aNumBytes, |
410 | | uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) |
411 | 0 | { |
412 | 0 | nsCOMPtr<nsIInputStream> reader; |
413 | 0 | nsCOMPtr<nsIOutputStream> writer; |
414 | 0 |
|
415 | 0 | uint32_t maxSize = std::max(aNumBytes, aSegmentSize); |
416 | 0 |
|
417 | 0 | nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), |
418 | 0 | aSegmentSize, maxSize); |
419 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
420 | 0 |
|
421 | 0 | nsTArray<char> inputData; |
422 | 0 | testing::CreateData(aNumBytes, inputData); |
423 | 0 | testing::WriteAllAndClose(writer, inputData); |
424 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
425 | 0 | } |
426 | | |
427 | | } // namespace |
428 | | |
429 | | TEST(Pipes, Blocking_32k) |
430 | 0 | { |
431 | 0 | TestPipe2(32 * 1024); |
432 | 0 | } |
433 | | |
434 | | TEST(Pipes, Blocking_64k) |
435 | 0 | { |
436 | 0 | TestPipe2(64 * 1024); |
437 | 0 | } |
438 | | |
439 | | TEST(Pipes, Blocking_128k) |
440 | 0 | { |
441 | 0 | TestPipe2(128 * 1024); |
442 | 0 | } |
443 | | |
444 | | //////////////////////////////////////////////////////////////////////////////// |
445 | | |
446 | | namespace { |
447 | | |
448 | | // Utility routine to validate pipe clone before. There are many knobs. |
449 | | // |
450 | | // aTotalBytes Total number of bytes to write to the pipe. |
451 | | // aNumWrites How many separate write calls should be made. Bytes |
452 | | // are evenly distributed over these write calls. |
453 | | // aNumInitialClones How many clones of the pipe input stream should be |
454 | | // made before writing begins. |
455 | | // aNumToCloseAfterWrite How many streams should be closed after each write. |
456 | | // One stream is always kept open. This verifies that |
457 | | // closing one stream does not effect other open |
458 | | // streams. |
459 | | // aNumToCloneAfterWrite How many clones to create after each write. Occurs |
460 | | // after closing any streams. This tests cloning |
461 | | // active streams on a pipe that is being written to. |
462 | | // aNumStreamToReadPerWrite How many streams to read fully after each write. |
463 | | // This tests reading cloned streams at different rates |
464 | | // while the pipe is being written to. |
465 | | static void TestPipeClone(uint32_t aTotalBytes, |
466 | | uint32_t aNumWrites, |
467 | | uint32_t aNumInitialClones, |
468 | | uint32_t aNumToCloseAfterWrite, |
469 | | uint32_t aNumToCloneAfterWrite, |
470 | | uint32_t aNumStreamsToReadPerWrite, |
471 | | uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) |
472 | 0 | { |
473 | 0 | nsCOMPtr<nsIInputStream> reader; |
474 | 0 | nsCOMPtr<nsIOutputStream> writer; |
475 | 0 |
|
476 | 0 | uint32_t maxSize = std::max(aTotalBytes, aSegmentSize); |
477 | 0 |
|
478 | 0 | // Use async input streams so we can NS_ConsumeStream() the current data |
479 | 0 | // while the pipe is still being written to. |
480 | 0 | nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), |
481 | 0 | aSegmentSize, maxSize, |
482 | 0 | true, false); // non-blocking - reader, writer |
483 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
484 | 0 |
|
485 | 0 | nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader); |
486 | 0 | ASSERT_TRUE(cloneable); |
487 | 0 | ASSERT_TRUE(cloneable->GetCloneable()); |
488 | 0 |
|
489 | 0 | nsTArray<nsCString> outputDataList; |
490 | 0 |
|
491 | 0 | nsTArray<nsCOMPtr<nsIInputStream>> streamList; |
492 | 0 |
|
493 | 0 | // first stream is our original reader from the pipe |
494 | 0 | streamList.AppendElement(reader); |
495 | 0 | outputDataList.AppendElement(); |
496 | 0 |
|
497 | 0 | // Clone the initial input stream the specified number of times |
498 | 0 | // before performing any writes. |
499 | 0 | for (uint32_t i = 0; i < aNumInitialClones; ++i) { |
500 | 0 | nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement(); |
501 | 0 | rv = cloneable->Clone(getter_AddRefs(*clone)); |
502 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
503 | 0 | ASSERT_TRUE(*clone); |
504 | 0 |
|
505 | 0 | outputDataList.AppendElement(); |
506 | 0 | } |
507 | 0 |
|
508 | 0 | nsTArray<char> inputData; |
509 | 0 | testing::CreateData(aTotalBytes, inputData); |
510 | 0 |
|
511 | 0 | const uint32_t bytesPerWrite = ((aTotalBytes - 1)/ aNumWrites) + 1; |
512 | 0 | uint32_t offset = 0; |
513 | 0 | uint32_t remaining = aTotalBytes; |
514 | 0 | uint32_t nextStreamToRead = 0; |
515 | 0 |
|
516 | 0 | while (remaining) { |
517 | 0 | uint32_t numToWrite = std::min(bytesPerWrite, remaining); |
518 | 0 | testing::Write(writer, inputData, offset, numToWrite); |
519 | 0 | offset += numToWrite; |
520 | 0 | remaining -= numToWrite; |
521 | 0 |
|
522 | 0 | // Close the specified number of streams. This allows us to |
523 | 0 | // test that one closed clone does not break other open clones. |
524 | 0 | for (uint32_t i = 0; i < aNumToCloseAfterWrite && |
525 | 0 | streamList.Length() > 1; ++i) { |
526 | 0 |
|
527 | 0 | uint32_t lastIndex = streamList.Length() - 1; |
528 | 0 | streamList[lastIndex]->Close(); |
529 | 0 | streamList.RemoveElementAt(lastIndex); |
530 | 0 | outputDataList.RemoveElementAt(lastIndex); |
531 | 0 |
|
532 | 0 | if (nextStreamToRead >= streamList.Length()) { |
533 | 0 | nextStreamToRead = 0; |
534 | 0 | } |
535 | 0 | } |
536 | 0 |
|
537 | 0 | // Create the specified number of clones. This lets us verify |
538 | 0 | // that we can create clones in the middle of pipe reading and |
539 | 0 | // writing. |
540 | 0 | for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) { |
541 | 0 | nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement(); |
542 | 0 | rv = cloneable->Clone(getter_AddRefs(*clone)); |
543 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
544 | 0 | ASSERT_TRUE(*clone); |
545 | 0 |
|
546 | 0 | // Initialize the new output data to make whats been read to data for |
547 | 0 | // the original stream. First stream is always the original stream. |
548 | 0 | nsCString* outputData = outputDataList.AppendElement(); |
549 | 0 | *outputData = outputDataList[0]; |
550 | 0 | } |
551 | 0 |
|
552 | 0 | // Read the specified number of streams. This lets us verify that we |
553 | 0 | // can read from the clones at different rates while the pipe is being |
554 | 0 | // written to. |
555 | 0 | for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) { |
556 | 0 | nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead]; |
557 | 0 | nsCString& outputData = outputDataList[nextStreamToRead]; |
558 | 0 |
|
559 | 0 | // Can't use ConsumeAndValidateStream() here because we're not |
560 | 0 | // guaranteed the exact amount read. It should just be at least |
561 | 0 | // as many as numToWrite. |
562 | 0 | nsAutoCString tmpOutputData; |
563 | 0 | rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData); |
564 | 0 | ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv)); |
565 | 0 | ASSERT_GE(tmpOutputData.Length(), numToWrite); |
566 | 0 |
|
567 | 0 | outputData += tmpOutputData; |
568 | 0 |
|
569 | 0 | nextStreamToRead += 1; |
570 | 0 | if (nextStreamToRead >= streamList.Length()) { |
571 | 0 | // Note: When we wrap around on the streams being read, its possible |
572 | 0 | // we will trigger a segment to be deleted from the pipe. It |
573 | 0 | // would be nice to validate this here, but we don't have any |
574 | 0 | // QI'able interface that would let us check easily. |
575 | 0 |
|
576 | 0 | nextStreamToRead = 0; |
577 | 0 | } |
578 | 0 | } |
579 | 0 | } |
580 | 0 |
|
581 | 0 | rv = writer->Close(); |
582 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
583 | 0 |
|
584 | 0 | nsDependentCSubstring inputString(inputData.Elements(), inputData.Length()); |
585 | 0 |
|
586 | 0 | // Finally, read the remaining bytes from each stream. This may be |
587 | 0 | // different amounts of data depending on how much reading we did while |
588 | 0 | // writing. Verify that the end result matches the input data. |
589 | 0 | for (uint32_t i = 0; i < streamList.Length(); ++i) { |
590 | 0 | nsCOMPtr<nsIInputStream>& stream = streamList[i]; |
591 | 0 | nsCString& outputData = outputDataList[i]; |
592 | 0 |
|
593 | 0 | nsAutoCString tmpOutputData; |
594 | 0 | rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData); |
595 | 0 | ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv)); |
596 | 0 | stream->Close(); |
597 | 0 |
|
598 | 0 | // Append to total amount read from the stream |
599 | 0 | outputData += tmpOutputData; |
600 | 0 |
|
601 | 0 | ASSERT_EQ(inputString.Length(), outputData.Length()); |
602 | 0 | ASSERT_TRUE(inputString.Equals(outputData)); |
603 | 0 | } |
604 | 0 | } |
605 | | |
606 | | } // namespace |
607 | | |
608 | | TEST(Pipes, Clone_BeforeWrite_ReadAtEnd) |
609 | 0 | { |
610 | 0 | TestPipeClone(32 * 1024, // total bytes |
611 | 0 | 16, // num writes |
612 | 0 | 3, // num initial clones |
613 | 0 | 0, // num streams to close after each write |
614 | 0 | 0, // num clones to add after each write |
615 | 0 | 0); // num streams to read after each write |
616 | 0 | } |
617 | | |
618 | | TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite) |
619 | 0 | { |
620 | 0 | // Since this reads all streams on every write, it should trigger the |
621 | 0 | // pipe cursor roll back optimization. Currently we can only verify |
622 | 0 | // this with logging. |
623 | 0 |
|
624 | 0 | TestPipeClone(32 * 1024, // total bytes |
625 | 0 | 16, // num writes |
626 | 0 | 3, // num initial clones |
627 | 0 | 0, // num streams to close after each write |
628 | 0 | 0, // num clones to add after each write |
629 | 0 | 4); // num streams to read after each write |
630 | 0 | } |
631 | | |
632 | | TEST(Pipes, Clone_DuringWrite_ReadAtEnd) |
633 | 0 | { |
634 | 0 | TestPipeClone(32 * 1024, // total bytes |
635 | 0 | 16, // num writes |
636 | 0 | 0, // num initial clones |
637 | 0 | 0, // num streams to close after each write |
638 | 0 | 1, // num clones to add after each write |
639 | 0 | 0); // num streams to read after each write |
640 | 0 | } |
641 | | |
642 | | TEST(Pipes, Clone_DuringWrite_ReadDuringWrite) |
643 | 0 | { |
644 | 0 | TestPipeClone(32 * 1024, // total bytes |
645 | 0 | 16, // num writes |
646 | 0 | 0, // num initial clones |
647 | 0 | 0, // num streams to close after each write |
648 | 0 | 1, // num clones to add after each write |
649 | 0 | 1); // num streams to read after each write |
650 | 0 | } |
651 | | |
652 | | TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite) |
653 | 0 | { |
654 | 0 | // Since this reads streams faster than we clone new ones, it should |
655 | 0 | // trigger pipe segment deletion periodically. Currently we can |
656 | 0 | // only verify this with logging. |
657 | 0 |
|
658 | 0 | TestPipeClone(32 * 1024, // total bytes |
659 | 0 | 16, // num writes |
660 | 0 | 1, // num initial clones |
661 | 0 | 1, // num streams to close after each write |
662 | 0 | 2, // num clones to add after each write |
663 | 0 | 3); // num streams to read after each write |
664 | 0 | } |
665 | | |
666 | | TEST(Pipes, Write_AsyncWait) |
667 | 0 | { |
668 | 0 | nsCOMPtr<nsIAsyncInputStream> reader; |
669 | 0 | nsCOMPtr<nsIAsyncOutputStream> writer; |
670 | 0 |
|
671 | 0 | const uint32_t segmentSize = 1024; |
672 | 0 | const uint32_t numSegments = 1; |
673 | 0 |
|
674 | 0 | nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), |
675 | 0 | true, true, // non-blocking - reader, writer |
676 | 0 | segmentSize, numSegments); |
677 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
678 | 0 |
|
679 | 0 | nsTArray<char> inputData; |
680 | 0 | testing::CreateData(segmentSize, inputData); |
681 | 0 |
|
682 | 0 | uint32_t numWritten = 0; |
683 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
684 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
685 | 0 |
|
686 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
687 | 0 | ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); |
688 | 0 |
|
689 | 0 | RefPtr<testing::OutputStreamCallback> cb = |
690 | 0 | new testing::OutputStreamCallback(); |
691 | 0 |
|
692 | 0 | rv = writer->AsyncWait(cb, 0, 0, nullptr); |
693 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
694 | 0 |
|
695 | 0 | ASSERT_FALSE(cb->Called()); |
696 | 0 |
|
697 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
698 | 0 |
|
699 | 0 | ASSERT_TRUE(cb->Called()); |
700 | 0 | } |
701 | | |
702 | | TEST(Pipes, Write_AsyncWait_Clone) |
703 | 0 | { |
704 | 0 | nsCOMPtr<nsIAsyncInputStream> reader; |
705 | 0 | nsCOMPtr<nsIAsyncOutputStream> writer; |
706 | 0 |
|
707 | 0 | const uint32_t segmentSize = 1024; |
708 | 0 | const uint32_t numSegments = 1; |
709 | 0 |
|
710 | 0 | nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), |
711 | 0 | true, true, // non-blocking - reader, writer |
712 | 0 | segmentSize, numSegments); |
713 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
714 | 0 |
|
715 | 0 | nsCOMPtr<nsIInputStream> clone; |
716 | 0 | rv = NS_CloneInputStream(reader, getter_AddRefs(clone)); |
717 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
718 | 0 |
|
719 | 0 | nsTArray<char> inputData; |
720 | 0 | testing::CreateData(segmentSize, inputData); |
721 | 0 |
|
722 | 0 | uint32_t numWritten = 0; |
723 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
724 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
725 | 0 |
|
726 | 0 | // This attempts to write data beyond the original pipe size limit. It |
727 | 0 | // should fail since neither side of the clone has been read yet. |
728 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
729 | 0 | ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); |
730 | 0 |
|
731 | 0 | RefPtr<testing::OutputStreamCallback> cb = |
732 | 0 | new testing::OutputStreamCallback(); |
733 | 0 |
|
734 | 0 | rv = writer->AsyncWait(cb, 0, 0, nullptr); |
735 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
736 | 0 |
|
737 | 0 | ASSERT_FALSE(cb->Called()); |
738 | 0 |
|
739 | 0 | // Consume data on the original stream, but the clone still has not been read. |
740 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
741 | 0 |
|
742 | 0 | // A clone that is not being read should not stall the other input stream |
743 | 0 | // reader. Therefore the writer callback should trigger when the fastest |
744 | 0 | // reader drains the other input stream. |
745 | 0 | ASSERT_TRUE(cb->Called()); |
746 | 0 |
|
747 | 0 | // Attempt to write data. This will buffer data beyond the pipe size limit in |
748 | 0 | // order for the clone stream to still work. This is allowed because the |
749 | 0 | // other input stream has drained its buffered segments and is ready for more |
750 | 0 | // data. |
751 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
752 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
753 | 0 |
|
754 | 0 | // Again, this should fail since the origin stream has not been read again. |
755 | 0 | // The pipe size should still restrict how far ahead we can buffer even |
756 | 0 | // when there is a cloned stream not being read. |
757 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
758 | 0 | ASSERT_TRUE(NS_FAILED(rv)); |
759 | 0 |
|
760 | 0 | cb = new testing::OutputStreamCallback(); |
761 | 0 | rv = writer->AsyncWait(cb, 0, 0, nullptr); |
762 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
763 | 0 |
|
764 | 0 | // The write should again be blocked since we have written data and the |
765 | 0 | // main reader is at its maximum advance buffer. |
766 | 0 | ASSERT_FALSE(cb->Called()); |
767 | 0 |
|
768 | 0 | nsTArray<char> expectedCloneData; |
769 | 0 | expectedCloneData.AppendElements(inputData); |
770 | 0 | expectedCloneData.AppendElements(inputData); |
771 | 0 |
|
772 | 0 | // We should now be able to consume the entire backlog of buffered data on |
773 | 0 | // the cloned stream. |
774 | 0 | testing::ConsumeAndValidateStream(clone, expectedCloneData); |
775 | 0 |
|
776 | 0 | // Draining the clone side should also trigger the AsyncWait() writer |
777 | 0 | // callback |
778 | 0 | ASSERT_TRUE(cb->Called()); |
779 | 0 |
|
780 | 0 | // Finally, we should be able to consume the remaining data on the original |
781 | 0 | // reader. |
782 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
783 | 0 | } |
784 | | |
785 | | TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal) |
786 | 0 | { |
787 | 0 | nsCOMPtr<nsIAsyncInputStream> reader; |
788 | 0 | nsCOMPtr<nsIAsyncOutputStream> writer; |
789 | 0 |
|
790 | 0 | const uint32_t segmentSize = 1024; |
791 | 0 | const uint32_t numSegments = 1; |
792 | 0 |
|
793 | 0 | nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), |
794 | 0 | true, true, // non-blocking - reader, writer |
795 | 0 | segmentSize, numSegments); |
796 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
797 | 0 |
|
798 | 0 | nsCOMPtr<nsIInputStream> clone; |
799 | 0 | rv = NS_CloneInputStream(reader, getter_AddRefs(clone)); |
800 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
801 | 0 |
|
802 | 0 | nsTArray<char> inputData; |
803 | 0 | testing::CreateData(segmentSize, inputData); |
804 | 0 |
|
805 | 0 | uint32_t numWritten = 0; |
806 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
807 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
808 | 0 |
|
809 | 0 | // This attempts to write data beyond the original pipe size limit. It |
810 | 0 | // should fail since neither side of the clone has been read yet. |
811 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
812 | 0 | ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); |
813 | 0 |
|
814 | 0 | RefPtr<testing::OutputStreamCallback> cb = |
815 | 0 | new testing::OutputStreamCallback(); |
816 | 0 |
|
817 | 0 | rv = writer->AsyncWait(cb, 0, 0, nullptr); |
818 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
819 | 0 |
|
820 | 0 | ASSERT_FALSE(cb->Called()); |
821 | 0 |
|
822 | 0 | // Consume data on the original stream, but the clone still has not been read. |
823 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
824 | 0 |
|
825 | 0 | // A clone that is not being read should not stall the other input stream |
826 | 0 | // reader. Therefore the writer callback should trigger when the fastest |
827 | 0 | // reader drains the other input stream. |
828 | 0 | ASSERT_TRUE(cb->Called()); |
829 | 0 |
|
830 | 0 | // Attempt to write data. This will buffer data beyond the pipe size limit in |
831 | 0 | // order for the clone stream to still work. This is allowed because the |
832 | 0 | // other input stream has drained its buffered segments and is ready for more |
833 | 0 | // data. |
834 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
835 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
836 | 0 |
|
837 | 0 | // Again, this should fail since the origin stream has not been read again. |
838 | 0 | // The pipe size should still restrict how far ahead we can buffer even |
839 | 0 | // when there is a cloned stream not being read. |
840 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
841 | 0 | ASSERT_TRUE(NS_FAILED(rv)); |
842 | 0 |
|
843 | 0 | cb = new testing::OutputStreamCallback(); |
844 | 0 | rv = writer->AsyncWait(cb, 0, 0, nullptr); |
845 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
846 | 0 |
|
847 | 0 | // The write should again be blocked since we have written data and the |
848 | 0 | // main reader is at its maximum advance buffer. |
849 | 0 | ASSERT_FALSE(cb->Called()); |
850 | 0 |
|
851 | 0 | // Close the original reader input stream. This was the fastest reader, |
852 | 0 | // so we should have a single stream that is buffered beyond our nominal |
853 | 0 | // limit. |
854 | 0 | reader->Close(); |
855 | 0 |
|
856 | 0 | // Because the clone stream is still buffered the writable callback should |
857 | 0 | // not be fired. |
858 | 0 | ASSERT_FALSE(cb->Called()); |
859 | 0 |
|
860 | 0 | // And we should not be able to perform a write. |
861 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
862 | 0 | ASSERT_TRUE(NS_FAILED(rv)); |
863 | 0 |
|
864 | 0 | // Create another clone stream. Now we have two streams that exceed our |
865 | 0 | // maximum size limit |
866 | 0 | nsCOMPtr<nsIInputStream> clone2; |
867 | 0 | rv = NS_CloneInputStream(clone, getter_AddRefs(clone2)); |
868 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
869 | 0 |
|
870 | 0 | nsTArray<char> expectedCloneData; |
871 | 0 | expectedCloneData.AppendElements(inputData); |
872 | 0 | expectedCloneData.AppendElements(inputData); |
873 | 0 |
|
874 | 0 | // We should now be able to consume the entire backlog of buffered data on |
875 | 0 | // the cloned stream. |
876 | 0 | testing::ConsumeAndValidateStream(clone, expectedCloneData); |
877 | 0 |
|
878 | 0 | // The pipe should now be writable because we have two open streams, one of which |
879 | 0 | // is completely drained. |
880 | 0 | ASSERT_TRUE(cb->Called()); |
881 | 0 |
|
882 | 0 | // Write again to reach our limit again. |
883 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
884 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
885 | 0 |
|
886 | 0 | // The stream is again non-writeable. |
887 | 0 | cb = new testing::OutputStreamCallback(); |
888 | 0 | rv = writer->AsyncWait(cb, 0, 0, nullptr); |
889 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
890 | 0 | ASSERT_FALSE(cb->Called()); |
891 | 0 |
|
892 | 0 | // Close the empty stream. This is different from our previous close since |
893 | 0 | // before we were closing a stream with some data still buffered. |
894 | 0 | clone->Close(); |
895 | 0 |
|
896 | 0 | // The pipe should not be writable. The second clone is still fully buffered |
897 | 0 | // over our limit. |
898 | 0 | ASSERT_FALSE(cb->Called()); |
899 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
900 | 0 | ASSERT_TRUE(NS_FAILED(rv)); |
901 | 0 |
|
902 | 0 | // Finally consume all of the buffered data on the second clone. |
903 | 0 | expectedCloneData.AppendElements(inputData); |
904 | 0 | testing::ConsumeAndValidateStream(clone2, expectedCloneData); |
905 | 0 |
|
906 | 0 | // Draining the final clone should make the pipe writable again. |
907 | 0 | ASSERT_TRUE(cb->Called()); |
908 | 0 | } |
909 | | |
910 | | TEST(Pipes, Read_AsyncWait) |
911 | 0 | { |
912 | 0 | nsCOMPtr<nsIAsyncInputStream> reader; |
913 | 0 | nsCOMPtr<nsIAsyncOutputStream> writer; |
914 | 0 |
|
915 | 0 | const uint32_t segmentSize = 1024; |
916 | 0 | const uint32_t numSegments = 1; |
917 | 0 |
|
918 | 0 | nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), |
919 | 0 | true, true, // non-blocking - reader, writer |
920 | 0 | segmentSize, numSegments); |
921 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
922 | 0 |
|
923 | 0 | nsTArray<char> inputData; |
924 | 0 | testing::CreateData(segmentSize, inputData); |
925 | 0 |
|
926 | 0 | RefPtr<testing::InputStreamCallback> cb = |
927 | 0 | new testing::InputStreamCallback(); |
928 | 0 |
|
929 | 0 | rv = reader->AsyncWait(cb, 0, 0, nullptr); |
930 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
931 | 0 |
|
932 | 0 | ASSERT_FALSE(cb->Called()); |
933 | 0 |
|
934 | 0 | uint32_t numWritten = 0; |
935 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
936 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
937 | 0 |
|
938 | 0 | ASSERT_TRUE(cb->Called()); |
939 | 0 |
|
940 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
941 | 0 | } |
942 | | |
943 | | TEST(Pipes, Read_AsyncWait_Clone) |
944 | 0 | { |
945 | 0 | nsCOMPtr<nsIAsyncInputStream> reader; |
946 | 0 | nsCOMPtr<nsIAsyncOutputStream> writer; |
947 | 0 |
|
948 | 0 | const uint32_t segmentSize = 1024; |
949 | 0 | const uint32_t numSegments = 1; |
950 | 0 |
|
951 | 0 | nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), |
952 | 0 | true, true, // non-blocking - reader, writer |
953 | 0 | segmentSize, numSegments); |
954 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
955 | 0 |
|
956 | 0 | nsCOMPtr<nsIInputStream> clone; |
957 | 0 | rv = NS_CloneInputStream(reader, getter_AddRefs(clone)); |
958 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
959 | 0 |
|
960 | 0 | nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone); |
961 | 0 | ASSERT_TRUE(asyncClone); |
962 | 0 |
|
963 | 0 | nsTArray<char> inputData; |
964 | 0 | testing::CreateData(segmentSize, inputData); |
965 | 0 |
|
966 | 0 | RefPtr<testing::InputStreamCallback> cb = |
967 | 0 | new testing::InputStreamCallback(); |
968 | 0 |
|
969 | 0 | RefPtr<testing::InputStreamCallback> cb2 = |
970 | 0 | new testing::InputStreamCallback(); |
971 | 0 |
|
972 | 0 | rv = reader->AsyncWait(cb, 0, 0, nullptr); |
973 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
974 | 0 |
|
975 | 0 | ASSERT_FALSE(cb->Called()); |
976 | 0 |
|
977 | 0 | rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr); |
978 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
979 | 0 |
|
980 | 0 | ASSERT_FALSE(cb2->Called()); |
981 | 0 |
|
982 | 0 | uint32_t numWritten = 0; |
983 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
984 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
985 | 0 |
|
986 | 0 | ASSERT_TRUE(cb->Called()); |
987 | 0 | ASSERT_TRUE(cb2->Called()); |
988 | 0 |
|
989 | 0 | testing::ConsumeAndValidateStream(reader, inputData); |
990 | 0 | } |
991 | | |
992 | | namespace { |
993 | | |
994 | | nsresult |
995 | | CloseDuringReadFunc(nsIInputStream *aReader, |
996 | | void* aClosure, |
997 | | const char* aFromSegment, |
998 | | uint32_t aToOffset, |
999 | | uint32_t aCount, |
1000 | | uint32_t* aWriteCountOut) |
1001 | 0 | { |
1002 | 0 | MOZ_RELEASE_ASSERT(aReader); |
1003 | 0 | MOZ_RELEASE_ASSERT(aClosure); |
1004 | 0 | MOZ_RELEASE_ASSERT(aFromSegment); |
1005 | 0 | MOZ_RELEASE_ASSERT(aWriteCountOut); |
1006 | 0 | MOZ_RELEASE_ASSERT(aToOffset == 0); |
1007 | 0 |
|
1008 | 0 | // This is insanity and you probably should not do this under normal |
1009 | 0 | // conditions. We want to simulate the case where the pipe is closed |
1010 | 0 | // (possibly from other end on another thread) simultaneously with the |
1011 | 0 | // read. This is the easiest way to do trigger this case in a synchronous |
1012 | 0 | // gtest. |
1013 | 0 | MOZ_ALWAYS_SUCCEEDS(aReader->Close()); |
1014 | 0 |
|
1015 | 0 | nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure); |
1016 | 0 | buffer->AppendElements(aFromSegment, aCount); |
1017 | 0 |
|
1018 | 0 | *aWriteCountOut = aCount; |
1019 | 0 |
|
1020 | 0 | return NS_OK; |
1021 | 0 | } |
1022 | | |
1023 | | void |
1024 | | TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) |
1025 | 0 | { |
1026 | 0 | nsCOMPtr<nsIInputStream> reader; |
1027 | 0 | nsCOMPtr<nsIOutputStream> writer; |
1028 | 0 |
|
1029 | 0 | const uint32_t maxSize = aSegmentSize; |
1030 | 0 |
|
1031 | 0 | nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), |
1032 | 0 | aSegmentSize, maxSize); |
1033 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
1034 | 0 |
|
1035 | 0 | nsTArray<char> inputData; |
1036 | 0 |
|
1037 | 0 | testing::CreateData(aDataSize, inputData); |
1038 | 0 |
|
1039 | 0 | uint32_t numWritten = 0; |
1040 | 0 | rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); |
1041 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
1042 | 0 |
|
1043 | 0 | nsTArray<char> outputData; |
1044 | 0 |
|
1045 | 0 | uint32_t numRead = 0; |
1046 | 0 | rv = reader->ReadSegments(CloseDuringReadFunc, &outputData, |
1047 | 0 | inputData.Length(), &numRead); |
1048 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
1049 | 0 | ASSERT_EQ(inputData.Length(), numRead); |
1050 | 0 |
|
1051 | 0 | ASSERT_EQ(inputData, outputData); |
1052 | 0 |
|
1053 | 0 | uint64_t available; |
1054 | 0 | rv = reader->Available(&available); |
1055 | 0 | ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv); |
1056 | 0 | } |
1057 | | |
1058 | | } // namespace |
1059 | | |
1060 | | TEST(Pipes, Close_During_Read_Partial_Segment) |
1061 | 0 | { |
1062 | 0 | TestCloseDuringRead(1024, 512); |
1063 | 0 | } |
1064 | | |
1065 | | TEST(Pipes, Close_During_Read_Full_Segment) |
1066 | 0 | { |
1067 | 0 | TestCloseDuringRead(1024, 1024); |
1068 | 0 | } |
1069 | | |
1070 | | TEST(Pipes, Interfaces) |
1071 | 0 | { |
1072 | 0 | nsCOMPtr<nsIInputStream> reader; |
1073 | 0 | nsCOMPtr<nsIOutputStream> writer; |
1074 | 0 |
|
1075 | 0 | nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer)); |
1076 | 0 | ASSERT_TRUE(NS_SUCCEEDED(rv)); |
1077 | 0 |
|
1078 | 0 | nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader); |
1079 | 0 | ASSERT_TRUE(readerType1); |
1080 | 0 |
|
1081 | 0 | nsCOMPtr<nsISeekableStream> readerType2 = do_QueryInterface(reader); |
1082 | 0 | ASSERT_TRUE(readerType2); |
1083 | 0 |
|
1084 | 0 | nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader); |
1085 | 0 | ASSERT_TRUE(readerType3); |
1086 | 0 |
|
1087 | 0 | nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader); |
1088 | 0 | ASSERT_TRUE(readerType4); |
1089 | 0 |
|
1090 | 0 | nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader); |
1091 | 0 | ASSERT_TRUE(readerType5); |
1092 | 0 |
|
1093 | 0 | nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader); |
1094 | 0 | ASSERT_TRUE(readerType6); |
1095 | 0 | } |