/src/mozilla-central/xpcom/io/nsPipe3.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
2 | | /* vim: set ts=8 sts=2 et sw=2 tw=80: */ |
3 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
4 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
5 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
6 | | |
7 | | #include <algorithm> |
8 | | #include "mozilla/Attributes.h" |
9 | | #include "mozilla/IntegerPrintfMacros.h" |
10 | | #include "mozilla/ReentrantMonitor.h" |
11 | | #include "nsIBufferedStreams.h" |
12 | | #include "nsICloneableInputStream.h" |
13 | | #include "nsIPipe.h" |
14 | | #include "nsIEventTarget.h" |
15 | | #include "nsISeekableStream.h" |
16 | | #include "mozilla/RefPtr.h" |
17 | | #include "nsSegmentedBuffer.h" |
18 | | #include "nsStreamUtils.h" |
19 | | #include "nsCOMPtr.h" |
20 | | #include "nsCRT.h" |
21 | | #include "mozilla/Logging.h" |
22 | | #include "nsIClassInfoImpl.h" |
23 | | #include "nsAlgorithm.h" |
24 | | #include "nsMemory.h" |
25 | | #include "nsIAsyncInputStream.h" |
26 | | #include "nsIAsyncOutputStream.h" |
27 | | |
28 | | using namespace mozilla; |
29 | | |
30 | | #ifdef LOG |
31 | | #undef LOG |
32 | | #endif |
33 | | // |
34 | | // set MOZ_LOG=nsPipe:5 |
35 | | // |
36 | | static LazyLogModule sPipeLog("nsPipe"); |
37 | 0 | #define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args) |
38 | | |
39 | 0 | #define DEFAULT_SEGMENT_SIZE 4096 |
40 | 0 | #define DEFAULT_SEGMENT_COUNT 16 |
41 | | |
42 | | class nsPipe; |
43 | | class nsPipeEvents; |
44 | | class nsPipeInputStream; |
45 | | class nsPipeOutputStream; |
46 | | class AutoReadSegment; |
47 | | |
48 | | namespace { |
49 | | |
50 | | enum MonitorAction |
51 | | { |
52 | | DoNotNotifyMonitor, |
53 | | NotifyMonitor |
54 | | }; |
55 | | |
56 | | enum SegmentChangeResult |
57 | | { |
58 | | SegmentNotChanged, |
59 | | SegmentAdvanceBufferRead |
60 | | }; |
61 | | |
62 | | } // namespace |
63 | | |
64 | | //----------------------------------------------------------------------------- |
65 | | |
66 | | // this class is used to delay notifications until the end of a particular |
67 | | // scope. it helps avoid the complexity of issuing callbacks while inside |
68 | | // a critical section. |
69 | | class nsPipeEvents |
70 | | { |
71 | | public: |
72 | 0 | nsPipeEvents() { } |
73 | | ~nsPipeEvents(); |
74 | | |
75 | | inline void NotifyInputReady(nsIAsyncInputStream* aStream, |
76 | | nsIInputStreamCallback* aCallback) |
77 | 0 | { |
78 | 0 | mInputList.AppendElement(InputEntry(aStream, aCallback)); |
79 | 0 | } |
80 | | |
81 | | inline void NotifyOutputReady(nsIAsyncOutputStream* aStream, |
82 | | nsIOutputStreamCallback* aCallback) |
83 | 0 | { |
84 | 0 | MOZ_DIAGNOSTIC_ASSERT(!mOutputCallback); |
85 | 0 | mOutputStream = aStream; |
86 | 0 | mOutputCallback = aCallback; |
87 | 0 | } |
88 | | |
89 | | private: |
90 | | struct InputEntry |
91 | | { |
92 | | InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback) |
93 | | : mStream(aStream) |
94 | | , mCallback(aCallback) |
95 | 0 | { |
96 | 0 | MOZ_DIAGNOSTIC_ASSERT(mStream); |
97 | 0 | MOZ_DIAGNOSTIC_ASSERT(mCallback); |
98 | 0 | } |
99 | | |
100 | | nsCOMPtr<nsIAsyncInputStream> mStream; |
101 | | nsCOMPtr<nsIInputStreamCallback> mCallback; |
102 | | }; |
103 | | |
104 | | nsTArray<InputEntry> mInputList; |
105 | | |
106 | | nsCOMPtr<nsIAsyncOutputStream> mOutputStream; |
107 | | nsCOMPtr<nsIOutputStreamCallback> mOutputCallback; |
108 | | }; |
109 | | |
110 | | //----------------------------------------------------------------------------- |
111 | | |
112 | | // This class is used to maintain input stream state. Its broken out from the |
113 | | // nsPipeInputStream class because generally the nsPipe should be modifying |
114 | | // this state and not the input stream itself. |
115 | | struct nsPipeReadState |
116 | | { |
117 | | nsPipeReadState() |
118 | | : mReadCursor(nullptr) |
119 | | , mReadLimit(nullptr) |
120 | | , mSegment(0) |
121 | | , mAvailable(0) |
122 | | , mActiveRead(false) |
123 | | , mNeedDrain(false) |
124 | 0 | { } |
125 | | |
126 | | char* mReadCursor; |
127 | | char* mReadLimit; |
128 | | int32_t mSegment; |
129 | | uint32_t mAvailable; |
130 | | |
131 | | // This flag is managed using the AutoReadSegment RAII stack class. |
132 | | bool mActiveRead; |
133 | | |
134 | | // Set to indicate that the input stream has closed and should be drained, |
135 | | // but that drain has been delayed due to an active read. When the read |
136 | | // completes, this flag indicate the drain should then be performed. |
137 | | bool mNeedDrain; |
138 | | }; |
139 | | |
140 | | //----------------------------------------------------------------------------- |
141 | | |
142 | | // an input end of a pipe (maintained as a list of refs within the pipe) |
143 | | class nsPipeInputStream final |
144 | | : public nsIAsyncInputStream |
145 | | , public nsISeekableStream |
146 | | , public nsISearchableInputStream |
147 | | , public nsICloneableInputStream |
148 | | , public nsIClassInfo |
149 | | , public nsIBufferedInputStream |
150 | | { |
151 | | public: |
152 | | // Pipe input streams preserve their refcount changes when record/replaying, |
153 | | // as otherwise the thread which destroys the stream may vary between |
154 | | // recording and replaying. |
155 | | NS_DECL_THREADSAFE_ISUPPORTS_WITH_RECORDING(recordreplay::Behavior::Preserve) |
156 | | NS_DECL_NSIINPUTSTREAM |
157 | | NS_DECL_NSIASYNCINPUTSTREAM |
158 | | NS_DECL_NSISEEKABLESTREAM |
159 | | NS_DECL_NSISEARCHABLEINPUTSTREAM |
160 | | NS_DECL_NSICLONEABLEINPUTSTREAM |
161 | | NS_DECL_NSICLASSINFO |
162 | | NS_DECL_NSIBUFFEREDINPUTSTREAM |
163 | | |
164 | | explicit nsPipeInputStream(nsPipe* aPipe) |
165 | | : mPipe(aPipe) |
166 | | , mLogicalOffset(0) |
167 | | , mInputStatus(NS_OK) |
168 | | , mBlocking(true) |
169 | | , mBlocked(false) |
170 | | , mCallbackFlags(0) |
171 | 0 | { } |
172 | | |
173 | | explicit nsPipeInputStream(const nsPipeInputStream& aOther) |
174 | | : mPipe(aOther.mPipe) |
175 | | , mLogicalOffset(aOther.mLogicalOffset) |
176 | | , mInputStatus(aOther.mInputStatus) |
177 | | , mBlocking(aOther.mBlocking) |
178 | | , mBlocked(false) |
179 | | , mCallbackFlags(0) |
180 | | , mReadState(aOther.mReadState) |
181 | 0 | { } |
182 | | |
183 | | nsresult Fill(); |
184 | | void SetNonBlocking(bool aNonBlocking) |
185 | 0 | { |
186 | 0 | mBlocking = !aNonBlocking; |
187 | 0 | } |
188 | | |
189 | | uint32_t Available(); |
190 | | |
191 | | // synchronously wait for the pipe to become readable. |
192 | | nsresult Wait(); |
193 | | |
194 | | // These two don't acquire the monitor themselves. Instead they |
195 | | // expect their caller to have done so and to pass the monitor as |
196 | | // evidence. |
197 | | MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&, |
198 | | const ReentrantMonitorAutoEnter& ev); |
199 | | MonitorAction OnInputException(nsresult, nsPipeEvents&, |
200 | | const ReentrantMonitorAutoEnter& ev); |
201 | | |
202 | | nsPipeReadState& ReadState() |
203 | 0 | { |
204 | 0 | return mReadState; |
205 | 0 | } |
206 | | |
207 | | const nsPipeReadState& ReadState() const |
208 | 0 | { |
209 | 0 | return mReadState; |
210 | 0 | } |
211 | | |
212 | | nsresult Status() const; |
213 | | |
214 | | // A version of Status() that doesn't acquire the monitor. |
215 | | nsresult Status(const ReentrantMonitorAutoEnter& ev) const; |
216 | | |
217 | | private: |
218 | | virtual ~nsPipeInputStream(); |
219 | | |
220 | | RefPtr<nsPipe> mPipe; |
221 | | |
222 | | int64_t mLogicalOffset; |
223 | | // Individual input streams can be closed without effecting the rest of the |
224 | | // pipe. So track individual input stream status separately. |mInputStatus| |
225 | | // is protected by |mPipe->mReentrantMonitor|. |
226 | | nsresult mInputStatus; |
227 | | bool mBlocking; |
228 | | |
229 | | // these variables can only be accessed while inside the pipe's monitor |
230 | | bool mBlocked; |
231 | | nsCOMPtr<nsIInputStreamCallback> mCallback; |
232 | | uint32_t mCallbackFlags; |
233 | | |
234 | | // requires pipe's monitor; usually treat as an opaque token to pass to nsPipe |
235 | | nsPipeReadState mReadState; |
236 | | }; |
237 | | |
238 | | //----------------------------------------------------------------------------- |
239 | | |
240 | | // the output end of a pipe (allocated as a member of the pipe). |
241 | | class nsPipeOutputStream |
242 | | : public nsIAsyncOutputStream |
243 | | , public nsIClassInfo |
244 | | { |
245 | | public: |
246 | | // since this class will be allocated as a member of the pipe, we do not |
247 | | // need our own ref count. instead, we share the lifetime (the ref count) |
248 | | // of the entire pipe. this macro is just convenience since it does not |
249 | | // declare a mRefCount variable; however, don't let the name fool you... |
250 | | // we are not inheriting from nsPipe ;-) |
251 | | NS_DECL_ISUPPORTS_INHERITED |
252 | | |
253 | | NS_DECL_NSIOUTPUTSTREAM |
254 | | NS_DECL_NSIASYNCOUTPUTSTREAM |
255 | | NS_DECL_NSICLASSINFO |
256 | | |
257 | | explicit nsPipeOutputStream(nsPipe* aPipe) |
258 | | : mPipe(aPipe) |
259 | | , mWriterRefCnt(0) |
260 | | , mLogicalOffset(0) |
261 | | , mBlocking(true) |
262 | | , mBlocked(false) |
263 | | , mWritable(true) |
264 | | , mCallbackFlags(0) |
265 | 0 | { } |
266 | | |
267 | | void SetNonBlocking(bool aNonBlocking) |
268 | 0 | { |
269 | 0 | mBlocking = !aNonBlocking; |
270 | 0 | } |
271 | | void SetWritable(bool aWritable) |
272 | 0 | { |
273 | 0 | mWritable = aWritable; |
274 | 0 | } |
275 | | |
276 | | // synchronously wait for the pipe to become writable. |
277 | | nsresult Wait(); |
278 | | |
279 | | MonitorAction OnOutputWritable(nsPipeEvents&); |
280 | | MonitorAction OnOutputException(nsresult, nsPipeEvents&); |
281 | | |
282 | | private: |
283 | | nsPipe* mPipe; |
284 | | |
285 | | // separate refcnt so that we know when to close the producer |
286 | | ThreadSafeAutoRefCntWithRecording<recordreplay::Behavior::Preserve> mWriterRefCnt; |
287 | | int64_t mLogicalOffset; |
288 | | bool mBlocking; |
289 | | |
290 | | // these variables can only be accessed while inside the pipe's monitor |
291 | | bool mBlocked; |
292 | | bool mWritable; |
293 | | nsCOMPtr<nsIOutputStreamCallback> mCallback; |
294 | | uint32_t mCallbackFlags; |
295 | | }; |
296 | | |
297 | | //----------------------------------------------------------------------------- |
298 | | |
299 | | class nsPipe final : public nsIPipe |
300 | | { |
301 | | public: |
302 | | friend class nsPipeInputStream; |
303 | | friend class nsPipeOutputStream; |
304 | | friend class AutoReadSegment; |
305 | | |
306 | | // As for nsPipeInputStream, preserve refcount changes when recording or |
307 | | // replaying. |
308 | | NS_DECL_THREADSAFE_ISUPPORTS_WITH_RECORDING(recordreplay::Behavior::Preserve) |
309 | | NS_DECL_NSIPIPE |
310 | | |
311 | | // nsPipe methods: |
312 | | nsPipe(); |
313 | | |
314 | | private: |
315 | | ~nsPipe(); |
316 | | |
317 | | // |
318 | | // Methods below may only be called while inside the pipe's monitor. Some |
319 | | // of these methods require passing a ReentrantMonitorAutoEnter to prove the |
320 | | // monitor is held. |
321 | | // |
322 | | |
323 | | void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex, |
324 | | char*& aCursor, char*& aLimit); |
325 | | SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState, |
326 | | const ReentrantMonitorAutoEnter &ev); |
327 | | bool ReadSegmentBeingWritten(nsPipeReadState& aReadState); |
328 | | uint32_t CountSegmentReferences(int32_t aSegment); |
329 | | void SetAllNullReadCursors(); |
330 | | bool AllReadCursorsMatchWriteCursor(); |
331 | | void RollBackAllReadCursors(char* aWriteCursor); |
332 | | void UpdateAllReadCursors(char* aWriteCursor); |
333 | | void ValidateAllReadCursors(); |
334 | | uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState, |
335 | | const ReentrantMonitorAutoEnter& ev) const; |
336 | | bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const; |
337 | | |
338 | | // |
339 | | // methods below may be called while outside the pipe's monitor |
340 | | // |
341 | | |
342 | | void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents); |
343 | | nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen); |
344 | | void AdvanceWriteCursor(uint32_t aCount); |
345 | | |
346 | | void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason); |
347 | | void OnPipeException(nsresult aReason, bool aOutputOnly = false); |
348 | | |
349 | | nsresult CloneInputStream(nsPipeInputStream* aOriginal, |
350 | | nsIInputStream** aCloneOut); |
351 | | |
352 | | // methods below should only be called by AutoReadSegment |
353 | | nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment, |
354 | | uint32_t& aLength); |
355 | | void ReleaseReadSegment(nsPipeReadState& aReadState, |
356 | | nsPipeEvents& aEvents); |
357 | | void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount); |
358 | | |
359 | | // We can't inherit from both nsIInputStream and nsIOutputStream |
360 | | // because they collide on their Close method. Consequently we nest their |
361 | | // implementations to avoid the extra object allocation. |
362 | | nsPipeOutputStream mOutput; |
363 | | |
364 | | // Since the input stream can be cloned, we may have more than one. Use |
365 | | // a weak reference as the streams will clear their entry here in their |
366 | | // destructor. Using a strong reference would create a reference cycle. |
367 | | // Only usable while mReentrantMonitor is locked. |
368 | | nsTArray<nsPipeInputStream*> mInputList; |
369 | | |
370 | | // But hold a strong ref to our original input stream. For backward |
371 | | // compatibility we need to be able to consistently return this same |
372 | | // object from GetInputStream(). Note, mOriginalInput is also stored |
373 | | // in mInputList as a weak ref. |
374 | | RefPtr<nsPipeInputStream> mOriginalInput; |
375 | | |
376 | | ReentrantMonitor mReentrantMonitor; |
377 | | nsSegmentedBuffer mBuffer; |
378 | | |
379 | | // The maximum number of segments to allow to be buffered in advance |
380 | | // of the fastest reader. This is collection of segments is called |
381 | | // the "advance buffer". |
382 | | uint32_t mMaxAdvanceBufferSegmentCount; |
383 | | |
384 | | int32_t mWriteSegment; |
385 | | char* mWriteCursor; |
386 | | char* mWriteLimit; |
387 | | |
388 | | // |mStatus| is protected by |mReentrantMonitor|. |
389 | | nsresult mStatus; |
390 | | bool mInited; |
391 | | }; |
392 | | |
393 | | //----------------------------------------------------------------------------- |
394 | | |
395 | | // RAII class representing an active read segment. When it goes out of scope |
396 | | // it automatically updates the read cursor and releases the read segment. |
397 | | class MOZ_STACK_CLASS AutoReadSegment final |
398 | | { |
399 | | public: |
400 | | AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState, |
401 | | uint32_t aMaxLength) |
402 | | : mPipe(aPipe) |
403 | | , mReadState(aReadState) |
404 | | , mStatus(NS_ERROR_FAILURE) |
405 | | , mSegment(nullptr) |
406 | | , mLength(0) |
407 | | , mOffset(0) |
408 | 0 | { |
409 | 0 | MOZ_DIAGNOSTIC_ASSERT(mPipe); |
410 | 0 | MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead); |
411 | 0 | mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength); |
412 | 0 | if (NS_SUCCEEDED(mStatus)) { |
413 | 0 | MOZ_DIAGNOSTIC_ASSERT(mReadState.mActiveRead); |
414 | 0 | MOZ_DIAGNOSTIC_ASSERT(mSegment); |
415 | 0 | mLength = std::min(mLength, aMaxLength); |
416 | 0 | MOZ_DIAGNOSTIC_ASSERT(mLength); |
417 | 0 | } |
418 | 0 | } |
419 | | |
420 | | ~AutoReadSegment() |
421 | 0 | { |
422 | 0 | if (NS_SUCCEEDED(mStatus)) { |
423 | 0 | if (mOffset) { |
424 | 0 | mPipe->AdvanceReadCursor(mReadState, mOffset); |
425 | 0 | } else { |
426 | 0 | nsPipeEvents events; |
427 | 0 | mPipe->ReleaseReadSegment(mReadState, events); |
428 | 0 | } |
429 | 0 | } |
430 | 0 | MOZ_DIAGNOSTIC_ASSERT(!mReadState.mActiveRead); |
431 | 0 | } |
432 | | |
433 | | nsresult Status() const |
434 | 0 | { |
435 | 0 | return mStatus; |
436 | 0 | } |
437 | | |
438 | | const char* Data() const |
439 | 0 | { |
440 | 0 | MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus)); |
441 | 0 | MOZ_DIAGNOSTIC_ASSERT(mSegment); |
442 | 0 | return mSegment + mOffset; |
443 | 0 | } |
444 | | |
445 | | uint32_t Length() const |
446 | 0 | { |
447 | 0 | MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus)); |
448 | 0 | MOZ_DIAGNOSTIC_ASSERT(mLength >= mOffset); |
449 | 0 | return mLength - mOffset; |
450 | 0 | } |
451 | | |
452 | | void |
453 | | Advance(uint32_t aCount) |
454 | 0 | { |
455 | 0 | MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mStatus)); |
456 | 0 | MOZ_DIAGNOSTIC_ASSERT(aCount <= (mLength - mOffset)); |
457 | 0 | mOffset += aCount; |
458 | 0 | } |
459 | | |
460 | | nsPipeReadState& |
461 | | ReadState() const |
462 | 0 | { |
463 | 0 | return mReadState; |
464 | 0 | } |
465 | | |
466 | | private: |
467 | | // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment |
468 | | nsPipe* mPipe; |
469 | | nsPipeReadState& mReadState; |
470 | | nsresult mStatus; |
471 | | const char* mSegment; |
472 | | uint32_t mLength; |
473 | | uint32_t mOffset; |
474 | | }; |
475 | | |
476 | | // |
477 | | // NOTES on buffer architecture: |
478 | | // |
479 | | // +-----------------+ - - mBuffer.GetSegment(0) |
480 | | // | | |
481 | | // + - - - - - - - - + - - nsPipeReadState.mReadCursor |
482 | | // |/////////////////| |
483 | | // |/////////////////| |
484 | | // |/////////////////| |
485 | | // |/////////////////| |
486 | | // +-----------------+ - - nsPipeReadState.mReadLimit |
487 | | // | |
488 | | // +-----------------+ |
489 | | // |/////////////////| |
490 | | // |/////////////////| |
491 | | // |/////////////////| |
492 | | // |/////////////////| |
493 | | // |/////////////////| |
494 | | // |/////////////////| |
495 | | // +-----------------+ |
496 | | // | |
497 | | // +-----------------+ - - mBuffer.GetSegment(mWriteSegment) |
498 | | // |/////////////////| |
499 | | // |/////////////////| |
500 | | // |/////////////////| |
501 | | // + - - - - - - - - + - - mWriteCursor |
502 | | // | | |
503 | | // | | |
504 | | // +-----------------+ - - mWriteLimit |
505 | | // |
506 | | // (shaded region contains data) |
507 | | // |
508 | | // NOTE: Each input stream produced by the nsPipe contains its own, separate |
509 | | // nsPipeReadState. This means there are multiple mReadCursor and |
510 | | // mReadLimit values in play. The pipe cannot discard old data until |
511 | | // all mReadCursors have moved beyond that point in the stream. |
512 | | // |
513 | | // Likewise, each input stream reader will have it's own amount of |
514 | | // buffered data. The pipe size threshold, however, is only applied |
515 | | // to the input stream that is being read fastest. We call this |
516 | | // the "advance buffer" in that its in advance of all readers. We |
517 | | // allow slower input streams to buffer more data so that we don't |
518 | | // stall processing of the faster input stream. |
519 | | // |
520 | | // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for |
521 | | // small allocations (e.g., 64 byte allocations). this means that buffers may |
522 | | // be allocated back-to-back. in the diagram above, for example, mReadLimit |
523 | | // would actually be pointing at the beginning of the next segment. when |
524 | | // making changes to this file, please keep this fact in mind. |
525 | | // |
526 | | |
527 | | //----------------------------------------------------------------------------- |
528 | | // nsPipe methods: |
529 | | //----------------------------------------------------------------------------- |
530 | | |
531 | | nsPipe::nsPipe() |
532 | | : mOutput(this) |
533 | | , mOriginalInput(new nsPipeInputStream(this)) |
534 | | , mReentrantMonitor("nsPipe.mReentrantMonitor") |
535 | | , mMaxAdvanceBufferSegmentCount(0) |
536 | | , mWriteSegment(-1) |
537 | | , mWriteCursor(nullptr) |
538 | | , mWriteLimit(nullptr) |
539 | | , mStatus(NS_OK) |
540 | | , mInited(false) |
541 | 0 | { |
542 | 0 | mInputList.AppendElement(mOriginalInput); |
543 | 0 | } |
544 | | |
545 | | nsPipe::~nsPipe() |
546 | 0 | { |
547 | 0 | } |
548 | | |
549 | | NS_IMPL_ADDREF(nsPipe) |
550 | | NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe) |
551 | | |
552 | | NS_IMETHODIMP_(MozExternalRefCountType) |
553 | | nsPipe::Release() |
554 | 0 | { |
555 | 0 | MOZ_DIAGNOSTIC_ASSERT(int32_t(mRefCnt) > 0, "dup release"); |
556 | 0 | nsrefcnt count = --mRefCnt; |
557 | 0 | NS_LOG_RELEASE(this, count, "nsPipe"); |
558 | 0 | if (count == 0) { |
559 | 0 | delete (this); |
560 | 0 | return 0; |
561 | 0 | } |
562 | 0 | // Avoid racing on |mOriginalInput| by only looking at it when |
563 | 0 | // the refcount is 1, that is, we are the only pointer (hence only |
564 | 0 | // thread) to access it. |
565 | 0 | if (count == 1 && mOriginalInput) { |
566 | 0 | mOriginalInput = nullptr; |
567 | 0 | return 1; |
568 | 0 | } |
569 | 0 | return count; |
570 | 0 | } |
571 | | |
572 | | NS_IMETHODIMP |
573 | | nsPipe::Init(bool aNonBlockingIn, |
574 | | bool aNonBlockingOut, |
575 | | uint32_t aSegmentSize, |
576 | | uint32_t aSegmentCount) |
577 | 0 | { |
578 | 0 | mInited = true; |
579 | 0 |
|
580 | 0 | if (aSegmentSize == 0) { |
581 | 0 | aSegmentSize = DEFAULT_SEGMENT_SIZE; |
582 | 0 | } |
583 | 0 | if (aSegmentCount == 0) { |
584 | 0 | aSegmentCount = DEFAULT_SEGMENT_COUNT; |
585 | 0 | } |
586 | 0 |
|
587 | 0 | // protect against overflow |
588 | 0 | uint32_t maxCount = uint32_t(-1) / aSegmentSize; |
589 | 0 | if (aSegmentCount > maxCount) { |
590 | 0 | aSegmentCount = maxCount; |
591 | 0 | } |
592 | 0 |
|
593 | 0 | // The internal buffer is always "infinite" so that we can allow |
594 | 0 | // the size to expand when cloned streams are read at different |
595 | 0 | // rates. We enforce a limit on how much data can be buffered |
596 | 0 | // ahead of the fastest reader in GetWriteSegment(). |
597 | 0 | nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX); |
598 | 0 | if (NS_FAILED(rv)) { |
599 | 0 | return rv; |
600 | 0 | } |
601 | 0 | |
602 | 0 | mMaxAdvanceBufferSegmentCount = aSegmentCount; |
603 | 0 |
|
604 | 0 | mOutput.SetNonBlocking(aNonBlockingOut); |
605 | 0 | mOriginalInput->SetNonBlocking(aNonBlockingIn); |
606 | 0 |
|
607 | 0 | return NS_OK; |
608 | 0 | } |
609 | | |
610 | | NS_IMETHODIMP |
611 | | nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream) |
612 | 0 | { |
613 | 0 | if (NS_WARN_IF(!mInited)) { |
614 | 0 | return NS_ERROR_NOT_INITIALIZED; |
615 | 0 | } |
616 | 0 | RefPtr<nsPipeInputStream> ref = mOriginalInput; |
617 | 0 | ref.forget(aInputStream); |
618 | 0 | return NS_OK; |
619 | 0 | } |
620 | | |
621 | | NS_IMETHODIMP |
622 | | nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream) |
623 | 0 | { |
624 | 0 | if (NS_WARN_IF(!mInited)) { |
625 | 0 | return NS_ERROR_NOT_INITIALIZED; |
626 | 0 | } |
627 | 0 | NS_ADDREF(*aOutputStream = &mOutput); |
628 | 0 | return NS_OK; |
629 | 0 | } |
630 | | |
631 | | void |
632 | | nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex, |
633 | | char*& aCursor, char*& aLimit) |
634 | 0 | { |
635 | 0 | if (aIndex == 0) { |
636 | 0 | MOZ_DIAGNOSTIC_ASSERT(!aReadState.mReadCursor || mBuffer.GetSegmentCount()); |
637 | 0 | aCursor = aReadState.mReadCursor; |
638 | 0 | aLimit = aReadState.mReadLimit; |
639 | 0 | } else { |
640 | 0 | uint32_t absoluteIndex = aReadState.mSegment + aIndex; |
641 | 0 | uint32_t numSegments = mBuffer.GetSegmentCount(); |
642 | 0 | if (absoluteIndex >= numSegments) { |
643 | 0 | aCursor = aLimit = nullptr; |
644 | 0 | } else { |
645 | 0 | aCursor = mBuffer.GetSegment(absoluteIndex); |
646 | 0 | if (mWriteSegment == (int32_t)absoluteIndex) { |
647 | 0 | aLimit = mWriteCursor; |
648 | 0 | } else { |
649 | 0 | aLimit = aCursor + mBuffer.GetSegmentSize(); |
650 | 0 | } |
651 | 0 | } |
652 | 0 | } |
653 | 0 | } |
654 | | |
655 | | nsresult |
656 | | nsPipe::GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment, |
657 | | uint32_t& aLength) |
658 | 0 | { |
659 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
660 | 0 |
|
661 | 0 | if (aReadState.mReadCursor == aReadState.mReadLimit) { |
662 | 0 | return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK; |
663 | 0 | } |
664 | 0 |
|
665 | 0 | // The input stream locks the pipe while getting the buffer to read from, |
666 | 0 | // but then unlocks while actual data copying is taking place. In |
667 | 0 | // order to avoid deleting the buffer out from under this lockless read |
668 | 0 | // set a flag to indicate a read is active. This flag is only modified |
669 | 0 | // while the lock is held. |
670 | 0 | MOZ_DIAGNOSTIC_ASSERT(!aReadState.mActiveRead); |
671 | 0 | aReadState.mActiveRead = true; |
672 | 0 |
|
673 | 0 | aSegment = aReadState.mReadCursor; |
674 | 0 | aLength = aReadState.mReadLimit - aReadState.mReadCursor; |
675 | 0 | MOZ_DIAGNOSTIC_ASSERT(aLength <= aReadState.mAvailable); |
676 | 0 |
|
677 | 0 | return NS_OK; |
678 | 0 | } |
679 | | |
680 | | void |
681 | | nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents) |
682 | 0 | { |
683 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
684 | 0 |
|
685 | 0 | MOZ_DIAGNOSTIC_ASSERT(aReadState.mActiveRead); |
686 | 0 | aReadState.mActiveRead = false; |
687 | 0 |
|
688 | 0 | // When a read completes and releases the mActiveRead flag, we may have blocked |
689 | 0 | // a drain from completing. This occurs when the input stream is closed during |
690 | 0 | // the read. In these cases, we need to complete the drain as soon as the |
691 | 0 | // active read completes. |
692 | 0 | if (aReadState.mNeedDrain) { |
693 | 0 | aReadState.mNeedDrain = false; |
694 | 0 | DrainInputStream(aReadState, aEvents); |
695 | 0 | } |
696 | 0 | } |
697 | | |
698 | | void |
699 | | nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead) |
700 | 0 | { |
701 | 0 | MOZ_DIAGNOSTIC_ASSERT(aBytesRead > 0); |
702 | 0 |
|
703 | 0 | nsPipeEvents events; |
704 | 0 | { |
705 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
706 | 0 |
|
707 | 0 | LOG(("III advancing read cursor by %u\n", aBytesRead)); |
708 | 0 | MOZ_DIAGNOSTIC_ASSERT(aBytesRead <= mBuffer.GetSegmentSize()); |
709 | 0 |
|
710 | 0 | aReadState.mReadCursor += aBytesRead; |
711 | 0 | MOZ_DIAGNOSTIC_ASSERT(aReadState.mReadCursor <= aReadState.mReadLimit); |
712 | 0 |
|
713 | 0 | MOZ_DIAGNOSTIC_ASSERT(aReadState.mAvailable >= aBytesRead); |
714 | 0 | aReadState.mAvailable -= aBytesRead; |
715 | 0 |
|
716 | 0 | // Check to see if we're at the end of the available read data. If we |
717 | 0 | // are, and this segment is not still being written, then we can possibly |
718 | 0 | // free up the segment. |
719 | 0 | if (aReadState.mReadCursor == aReadState.mReadLimit && |
720 | 0 | !ReadSegmentBeingWritten(aReadState)) { |
721 | 0 |
|
722 | 0 | // Advance the segment position. If we have read any segments from the |
723 | 0 | // advance buffer then we can potentially notify blocked writers. |
724 | 0 | if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead && |
725 | 0 | mOutput.OnOutputWritable(events) == NotifyMonitor) { |
726 | 0 | mon.NotifyAll(); |
727 | 0 | } |
728 | 0 | } |
729 | 0 |
|
730 | 0 | ReleaseReadSegment(aReadState, events); |
731 | 0 | } |
732 | 0 | } |
733 | | |
734 | | SegmentChangeResult |
735 | | nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState, |
736 | | const ReentrantMonitorAutoEnter &ev) |
737 | 0 | { |
738 | 0 | // Calculate how many segments are buffered for this stream to start. |
739 | 0 | uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev); |
740 | 0 |
|
741 | 0 | int32_t currentSegment = aReadState.mSegment; |
742 | 0 |
|
743 | 0 | // Move to the next segment to read |
744 | 0 | aReadState.mSegment += 1; |
745 | 0 |
|
746 | 0 | // If this was the last reference to the first segment, then remove it. |
747 | 0 | if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) { |
748 | 0 |
|
749 | 0 | // shift write and read segment index (-1 indicates an empty buffer). |
750 | 0 | mWriteSegment -= 1; |
751 | 0 |
|
752 | 0 | // Directly modify the current read state. If the associated input |
753 | 0 | // stream is closed simultaneous with reading, then it may not be |
754 | 0 | // in the mInputList any more. |
755 | 0 | aReadState.mSegment -= 1; |
756 | 0 |
|
757 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
758 | 0 | // Skip the current read state structure since we modify it manually |
759 | 0 | // before entering this loop. |
760 | 0 | if (&mInputList[i]->ReadState() == &aReadState) { |
761 | 0 | continue; |
762 | 0 | } |
763 | 0 | mInputList[i]->ReadState().mSegment -= 1; |
764 | 0 | } |
765 | 0 |
|
766 | 0 | // done with this segment |
767 | 0 | mBuffer.DeleteFirstSegment(); |
768 | 0 | LOG(("III deleting first segment\n")); |
769 | 0 | } |
770 | 0 |
|
771 | 0 | if (mWriteSegment < aReadState.mSegment) { |
772 | 0 | // read cursor has hit the end of written data, so reset it |
773 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == (aReadState.mSegment - 1)); |
774 | 0 | aReadState.mReadCursor = nullptr; |
775 | 0 | aReadState.mReadLimit = nullptr; |
776 | 0 | // also, the buffer is completely empty, so reset the write cursor |
777 | 0 | if (mWriteSegment == -1) { |
778 | 0 | mWriteCursor = nullptr; |
779 | 0 | mWriteLimit = nullptr; |
780 | 0 | } |
781 | 0 | } else { |
782 | 0 | // advance read cursor and limit to next buffer segment |
783 | 0 | aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment); |
784 | 0 | if (mWriteSegment == aReadState.mSegment) { |
785 | 0 | aReadState.mReadLimit = mWriteCursor; |
786 | 0 | } else { |
787 | 0 | aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize(); |
788 | 0 | } |
789 | 0 | } |
790 | 0 |
|
791 | 0 | // Calculate how many segments are buffered for the stream after |
792 | 0 | // reading. |
793 | 0 | uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev); |
794 | 0 |
|
795 | 0 | // If the stream has read a segment out of the set of advanced buffer |
796 | 0 | // segments, then the writer may advance. |
797 | 0 | if (startBufferSegments >= mMaxAdvanceBufferSegmentCount && |
798 | 0 | endBufferSegments < mMaxAdvanceBufferSegmentCount) { |
799 | 0 | return SegmentAdvanceBufferRead; |
800 | 0 | } |
801 | 0 | |
802 | 0 | // Otherwise there are no significant changes to the segment structure. |
803 | 0 | return SegmentNotChanged; |
804 | 0 | } |
805 | | |
806 | | void |
807 | | nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents) |
808 | 0 | { |
809 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
810 | 0 |
|
811 | 0 | // If a segment is actively being read in ReadSegments() for this input |
812 | 0 | // stream, then we cannot drain the stream. This can happen because |
813 | 0 | // ReadSegments() does not hold the lock while copying from the buffer. |
814 | 0 | // If we detect this condition, simply note that we need a drain once |
815 | 0 | // the read completes and return immediately. |
816 | 0 | if (aReadState.mActiveRead) { |
817 | 0 | MOZ_DIAGNOSTIC_ASSERT(!aReadState.mNeedDrain); |
818 | 0 | aReadState.mNeedDrain = true; |
819 | 0 | return; |
820 | 0 | } |
821 | 0 | |
822 | 0 | while(mWriteSegment >= aReadState.mSegment) { |
823 | 0 |
|
824 | 0 | // If the last segment to free is still being written to, we're done |
825 | 0 | // draining. We can't free any more. |
826 | 0 | if (ReadSegmentBeingWritten(aReadState)) { |
827 | 0 | break; |
828 | 0 | } |
829 | 0 | |
830 | 0 | // Don't bother checking if this results in an advance buffer segment |
831 | 0 | // read. Since we are draining the entire stream we will read an |
832 | 0 | // advance buffer segment no matter what. |
833 | 0 | AdvanceReadSegment(aReadState, mon); |
834 | 0 | } |
835 | 0 |
|
836 | 0 | // Force the stream into an empty state. Make sure mAvailable, mCursor, and |
837 | 0 | // mReadLimit are consistent with one another. |
838 | 0 | aReadState.mAvailable = 0; |
839 | 0 | aReadState.mReadCursor = nullptr; |
840 | 0 | aReadState.mReadLimit = nullptr; |
841 | 0 |
|
842 | 0 | // Remove the input stream from the pipe's list of streams. This will |
843 | 0 | // prevent the pipe from holding the stream alive or trying to update |
844 | 0 | // its read state any further. |
845 | 0 | DebugOnly<uint32_t> numRemoved = 0; |
846 | 0 | mInputList.RemoveElementsBy([&](nsPipeInputStream* aEntry) { |
847 | 0 | bool result = &aReadState == &aEntry->ReadState(); |
848 | 0 | numRemoved += result ? 1 : 0; |
849 | 0 | return result; |
850 | 0 | }); |
851 | 0 | MOZ_ASSERT(numRemoved == 1); |
852 | 0 |
|
853 | 0 | // If we have read any segments from the advance buffer then we can |
854 | 0 | // potentially notify blocked writers. |
855 | 0 | if (!IsAdvanceBufferFull(mon) && |
856 | 0 | mOutput.OnOutputWritable(aEvents) == NotifyMonitor) { |
857 | 0 | mon.NotifyAll(); |
858 | 0 | } |
859 | 0 | } |
860 | | |
861 | | bool |
862 | | nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState) |
863 | 0 | { |
864 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
865 | 0 | bool beingWritten = mWriteSegment == aReadState.mSegment && |
866 | 0 | mWriteLimit > mWriteCursor; |
867 | 0 | MOZ_DIAGNOSTIC_ASSERT(!beingWritten || aReadState.mReadLimit == mWriteCursor); |
868 | 0 | return beingWritten; |
869 | 0 | } |
870 | | |
871 | | nsresult |
872 | | nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) |
873 | 0 | { |
874 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
875 | 0 |
|
876 | 0 | if (NS_FAILED(mStatus)) { |
877 | 0 | return mStatus; |
878 | 0 | } |
879 | 0 | |
880 | 0 | // write cursor and limit may both be null indicating an empty buffer. |
881 | 0 | if (mWriteCursor == mWriteLimit) { |
882 | 0 | // The pipe is full if we have hit our limit on advance data buffering. |
883 | 0 | // This means the fastest reader is still reading slower than data is |
884 | 0 | // being written into the pipe. |
885 | 0 | if (IsAdvanceBufferFull(mon)) { |
886 | 0 | return NS_BASE_STREAM_WOULD_BLOCK; |
887 | 0 | } |
888 | 0 | |
889 | 0 | // The nsSegmentedBuffer is configured to be "infinite", so this |
890 | 0 | // should never return nullptr here. |
891 | 0 | char* seg = mBuffer.AppendNewSegment(); |
892 | 0 | if (!seg) { |
893 | 0 | return NS_ERROR_OUT_OF_MEMORY; |
894 | 0 | } |
895 | 0 | |
896 | 0 | LOG(("OOO appended new segment\n")); |
897 | 0 | mWriteCursor = seg; |
898 | 0 | mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize(); |
899 | 0 | ++mWriteSegment; |
900 | 0 | } |
901 | 0 |
|
902 | 0 | // make sure read cursor is initialized |
903 | 0 | SetAllNullReadCursors(); |
904 | 0 |
|
905 | 0 | // check to see if we can roll-back our read and write cursors to the |
906 | 0 | // beginning of the current/first segment. this is purely an optimization. |
907 | 0 | if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) { |
908 | 0 | char* head = mBuffer.GetSegment(0); |
909 | 0 | LOG(("OOO rolling back write cursor %" PRId64 " bytes\n", |
910 | 0 | static_cast<int64_t>(mWriteCursor - head))); |
911 | 0 | RollBackAllReadCursors(head); |
912 | 0 | mWriteCursor = head; |
913 | 0 | } |
914 | 0 |
|
915 | 0 | aSegment = mWriteCursor; |
916 | 0 | aSegmentLen = mWriteLimit - mWriteCursor; |
917 | 0 | return NS_OK; |
918 | 0 | } |
919 | | |
920 | | void |
921 | | nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) |
922 | 0 | { |
923 | 0 | MOZ_DIAGNOSTIC_ASSERT(aBytesWritten > 0); |
924 | 0 |
|
925 | 0 | nsPipeEvents events; |
926 | 0 | { |
927 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
928 | 0 |
|
929 | 0 | LOG(("OOO advancing write cursor by %u\n", aBytesWritten)); |
930 | 0 |
|
931 | 0 | char* newWriteCursor = mWriteCursor + aBytesWritten; |
932 | 0 | MOZ_DIAGNOSTIC_ASSERT(newWriteCursor <= mWriteLimit); |
933 | 0 |
|
934 | 0 | // update read limit if reading in the same segment |
935 | 0 | UpdateAllReadCursors(newWriteCursor); |
936 | 0 |
|
937 | 0 | mWriteCursor = newWriteCursor; |
938 | 0 |
|
939 | 0 | ValidateAllReadCursors(); |
940 | 0 |
|
941 | 0 | // update the writable flag on the output stream |
942 | 0 | if (mWriteCursor == mWriteLimit) { |
943 | 0 | mOutput.SetWritable(!IsAdvanceBufferFull(mon)); |
944 | 0 | } |
945 | 0 |
|
946 | 0 | // notify input stream that pipe now contains additional data |
947 | 0 | bool needNotify = false; |
948 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
949 | 0 | if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon) |
950 | 0 | == NotifyMonitor) { |
951 | 0 | needNotify = true; |
952 | 0 | } |
953 | 0 | } |
954 | 0 |
|
955 | 0 | if (needNotify) { |
956 | 0 | mon.NotifyAll(); |
957 | 0 | } |
958 | 0 | } |
959 | 0 | } |
960 | | |
961 | | void |
962 | | nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason) |
963 | 0 | { |
964 | 0 | MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason)); |
965 | 0 |
|
966 | 0 | nsPipeEvents events; |
967 | 0 | { |
968 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
969 | 0 |
|
970 | 0 | // Its possible to re-enter this method when we call OnPipeException() or |
971 | 0 | // OnInputExection() below. If there is a caller stuck in our synchronous |
972 | 0 | // Wait() method, then they will get woken up with a failure code which |
973 | 0 | // re-enters this method. Therefore, gracefully handle unknown streams |
974 | 0 | // here. |
975 | 0 |
|
976 | 0 | // If we only have one stream open and it is the given stream, then shut |
977 | 0 | // down the entire pipe. |
978 | 0 | if (mInputList.Length() == 1) { |
979 | 0 | if (mInputList[0] == aStream) { |
980 | 0 | OnPipeException(aReason); |
981 | 0 | } |
982 | 0 | return; |
983 | 0 | } |
984 | 0 |
|
985 | 0 | // Otherwise just close the particular stream that hit an exception. |
986 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
987 | 0 | if (mInputList[i] != aStream) { |
988 | 0 | continue; |
989 | 0 | } |
990 | 0 | |
991 | 0 | MonitorAction action = mInputList[i]->OnInputException(aReason, events, |
992 | 0 | mon); |
993 | 0 |
|
994 | 0 | // Notify after element is removed in case we re-enter as a result. |
995 | 0 | if (action == NotifyMonitor) { |
996 | 0 | mon.NotifyAll(); |
997 | 0 | } |
998 | 0 |
|
999 | 0 | return; |
1000 | 0 | } |
1001 | 0 | } |
1002 | 0 | } |
1003 | | |
1004 | | void |
1005 | | nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) |
1006 | 0 | { |
1007 | 0 | LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n", |
1008 | 0 | static_cast<uint32_t>(aReason), aOutputOnly)); |
1009 | 0 |
|
1010 | 0 | nsPipeEvents events; |
1011 | 0 | { |
1012 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
1013 | 0 |
|
1014 | 0 | // if we've already hit an exception, then ignore this one. |
1015 | 0 | if (NS_FAILED(mStatus)) { |
1016 | 0 | return; |
1017 | 0 | } |
1018 | 0 | |
1019 | 0 | mStatus = aReason; |
1020 | 0 |
|
1021 | 0 | bool needNotify = false; |
1022 | 0 |
|
1023 | 0 | // OnInputException() can drain the stream and remove it from |
1024 | 0 | // mInputList. So iterate over a temp list instead. |
1025 | 0 | nsTArray<nsPipeInputStream*> list(mInputList); |
1026 | 0 | for (uint32_t i = 0; i < list.Length(); ++i) { |
1027 | 0 | // an output-only exception applies to the input end if the pipe has |
1028 | 0 | // zero bytes available. |
1029 | 0 | if (aOutputOnly && list[i]->Available()) { |
1030 | 0 | continue; |
1031 | 0 | } |
1032 | 0 | |
1033 | 0 | if (list[i]->OnInputException(aReason, events, mon) == NotifyMonitor) { |
1034 | 0 | needNotify = true; |
1035 | 0 | } |
1036 | 0 | } |
1037 | 0 |
|
1038 | 0 | if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) { |
1039 | 0 | needNotify = true; |
1040 | 0 | } |
1041 | 0 |
|
1042 | 0 | // Notify after we have removed any input streams from mInputList |
1043 | 0 | if (needNotify) { |
1044 | 0 | mon.NotifyAll(); |
1045 | 0 | } |
1046 | 0 | } |
1047 | 0 | } |
1048 | | |
1049 | | nsresult |
1050 | | nsPipe::CloneInputStream(nsPipeInputStream* aOriginal, |
1051 | | nsIInputStream** aCloneOut) |
1052 | 0 | { |
1053 | 0 | ReentrantMonitorAutoEnter mon(mReentrantMonitor); |
1054 | 0 | RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal); |
1055 | 0 | mInputList.AppendElement(ref); |
1056 | 0 | nsCOMPtr<nsIAsyncInputStream> downcast = ref.forget(); |
1057 | 0 | downcast.forget(aCloneOut); |
1058 | 0 | return NS_OK; |
1059 | 0 | } |
1060 | | |
1061 | | uint32_t |
1062 | | nsPipe::CountSegmentReferences(int32_t aSegment) |
1063 | 0 | { |
1064 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
1065 | 0 | uint32_t count = 0; |
1066 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1067 | 0 | if (aSegment >= mInputList[i]->ReadState().mSegment) { |
1068 | 0 | count += 1; |
1069 | 0 | } |
1070 | 0 | } |
1071 | 0 | return count; |
1072 | 0 | } |
1073 | | |
1074 | | void |
1075 | | nsPipe::SetAllNullReadCursors() |
1076 | 0 | { |
1077 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
1078 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1079 | 0 | nsPipeReadState& readState = mInputList[i]->ReadState(); |
1080 | 0 | if (!readState.mReadCursor) { |
1081 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment); |
1082 | 0 | readState.mReadCursor = readState.mReadLimit = mWriteCursor; |
1083 | 0 | } |
1084 | 0 | } |
1085 | 0 | } |
1086 | | |
1087 | | bool |
1088 | | nsPipe::AllReadCursorsMatchWriteCursor() |
1089 | 0 | { |
1090 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
1091 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1092 | 0 | const nsPipeReadState& readState = mInputList[i]->ReadState(); |
1093 | 0 | if (readState.mSegment != mWriteSegment || |
1094 | 0 | readState.mReadCursor != mWriteCursor) { |
1095 | 0 | return false; |
1096 | 0 | } |
1097 | 0 | } |
1098 | 0 | return true; |
1099 | 0 | } |
1100 | | |
1101 | | void |
1102 | | nsPipe::RollBackAllReadCursors(char* aWriteCursor) |
1103 | 0 | { |
1104 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
1105 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1106 | 0 | nsPipeReadState& readState = mInputList[i]->ReadState(); |
1107 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteSegment == readState.mSegment); |
1108 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadCursor); |
1109 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteCursor == readState.mReadLimit); |
1110 | 0 | readState.mReadCursor = aWriteCursor; |
1111 | 0 | readState.mReadLimit = aWriteCursor; |
1112 | 0 | } |
1113 | 0 | } |
1114 | | |
1115 | | void |
1116 | | nsPipe::UpdateAllReadCursors(char* aWriteCursor) |
1117 | 0 | { |
1118 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
1119 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1120 | 0 | nsPipeReadState& readState = mInputList[i]->ReadState(); |
1121 | 0 | if (mWriteSegment == readState.mSegment && |
1122 | 0 | readState.mReadLimit == mWriteCursor) { |
1123 | 0 | readState.mReadLimit = aWriteCursor; |
1124 | 0 | } |
1125 | 0 | } |
1126 | 0 | } |
1127 | | |
1128 | | void |
1129 | | nsPipe::ValidateAllReadCursors() |
1130 | 0 | { |
1131 | 0 | mReentrantMonitor.AssertCurrentThreadIn(); |
1132 | 0 | // The only way mReadCursor == mWriteCursor is if: |
1133 | 0 | // |
1134 | 0 | // - mReadCursor is at the start of a segment (which, based on how |
1135 | 0 | // nsSegmentedBuffer works, means that this segment is the "first" |
1136 | 0 | // segment) |
1137 | 0 | // - mWriteCursor points at the location past the end of the current |
1138 | 0 | // write segment (so the current write filled the current write |
1139 | 0 | // segment, so we've incremented mWriteCursor to point past the end |
1140 | 0 | // of it) |
1141 | 0 | // - the segment to which data has just been written is located |
1142 | 0 | // exactly one segment's worth of bytes before the first segment |
1143 | 0 | // where mReadCursor is located |
1144 | 0 | // |
1145 | 0 | // Consequently, the byte immediately after the end of the current |
1146 | 0 | // write segment is the first byte of the first segment, so |
1147 | 0 | // mReadCursor == mWriteCursor. (Another way to think about this is |
1148 | 0 | // to consider the buffer architecture diagram above, but consider it |
1149 | 0 | // with an arena allocator which allocates from the *end* of the |
1150 | 0 | // arena to the *beginning* of the arena.) |
1151 | | #ifdef DEBUG |
1152 | | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1153 | | const nsPipeReadState& state = mInputList[i]->ReadState(); |
1154 | | MOZ_ASSERT(state.mReadCursor != mWriteCursor || |
1155 | | (mBuffer.GetSegment(state.mSegment) == state.mReadCursor && |
1156 | | mWriteCursor == mWriteLimit)); |
1157 | | } |
1158 | | #endif |
1159 | | } |
1160 | | |
1161 | | uint32_t |
1162 | | nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState, |
1163 | | const ReentrantMonitorAutoEnter& ev) const |
1164 | 0 | { |
1165 | 0 | // The write segment can be smaller than the current reader position |
1166 | 0 | // in some cases. For example, when the first write segment has not |
1167 | 0 | // been allocated yet mWriteSegment is negative. In these cases |
1168 | 0 | // the stream is effectively using zero segments. |
1169 | 0 | if (mWriteSegment < aReadState.mSegment) { |
1170 | 0 | return 0; |
1171 | 0 | } |
1172 | 0 | |
1173 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= 0); |
1174 | 0 | MOZ_DIAGNOSTIC_ASSERT(aReadState.mSegment >= 0); |
1175 | 0 |
|
1176 | 0 | // Otherwise at least one segment is being used. We add one here |
1177 | 0 | // since a single segment is being used when the write and read |
1178 | 0 | // segment indices are the same. |
1179 | 0 | return 1 + mWriteSegment - aReadState.mSegment; |
1180 | 0 | } |
1181 | | |
1182 | | bool |
1183 | | nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const |
1184 | 0 | { |
1185 | 0 | // If we have fewer total segments than the limit we can immediately |
1186 | 0 | // determine we are not full. Note, we must add one to mWriteSegment |
1187 | 0 | // to convert from a index to a count. |
1188 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1); |
1189 | 0 | MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX); |
1190 | 0 | uint32_t totalWriteSegments = mWriteSegment + 1; |
1191 | 0 | if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) { |
1192 | 0 | return false; |
1193 | 0 | } |
1194 | 0 | |
1195 | 0 | // Otherwise we must inspect all of our reader streams. We need |
1196 | 0 | // to determine the buffer depth of the fastest reader. |
1197 | 0 | uint32_t minBufferSegments = UINT32_MAX; |
1198 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1199 | 0 | // Only count buffer segments from input streams that are open. |
1200 | 0 | if (NS_FAILED(mInputList[i]->Status(ev))) { |
1201 | 0 | continue; |
1202 | 0 | } |
1203 | 0 | const nsPipeReadState& state = mInputList[i]->ReadState(); |
1204 | 0 | uint32_t bufferSegments = GetBufferSegmentCount(state, ev); |
1205 | 0 | minBufferSegments = std::min(minBufferSegments, bufferSegments); |
1206 | 0 | // We only care if any reader has fewer segments buffered than |
1207 | 0 | // our threshold. We can stop once we hit that threshold. |
1208 | 0 | if (minBufferSegments < mMaxAdvanceBufferSegmentCount) { |
1209 | 0 | return false; |
1210 | 0 | } |
1211 | 0 | } |
1212 | 0 |
|
1213 | 0 | // Note, its possible for minBufferSegments to exceed our |
1214 | 0 | // mMaxAdvanceBufferSegmentCount here. This happens when a cloned |
1215 | 0 | // reader gets far behind, but then the fastest reader stream is |
1216 | 0 | // closed. This leaves us with a single stream that is buffered |
1217 | 0 | // beyond our max. Naturally we continue to indicate the pipe |
1218 | 0 | // is full at this point. |
1219 | 0 |
|
1220 | 0 | return true; |
1221 | 0 | } |
1222 | | |
1223 | | //----------------------------------------------------------------------------- |
1224 | | // nsPipeEvents methods: |
1225 | | //----------------------------------------------------------------------------- |
1226 | | |
1227 | | nsPipeEvents::~nsPipeEvents() |
1228 | 0 | { |
1229 | 0 | // dispatch any pending events |
1230 | 0 |
|
1231 | 0 | for (uint32_t i = 0; i < mInputList.Length(); ++i) { |
1232 | 0 | mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream); |
1233 | 0 | } |
1234 | 0 | mInputList.Clear(); |
1235 | 0 |
|
1236 | 0 | if (mOutputCallback) { |
1237 | 0 | mOutputCallback->OnOutputStreamReady(mOutputStream); |
1238 | 0 | mOutputCallback = nullptr; |
1239 | 0 | mOutputStream = nullptr; |
1240 | 0 | } |
1241 | 0 | } |
1242 | | |
1243 | | //----------------------------------------------------------------------------- |
1244 | | // nsPipeInputStream methods: |
1245 | | //----------------------------------------------------------------------------- |
1246 | | |
1247 | | NS_IMPL_ADDREF(nsPipeInputStream); |
1248 | | NS_IMPL_RELEASE(nsPipeInputStream); |
1249 | | |
1250 | 0 | NS_INTERFACE_TABLE_HEAD(nsPipeInputStream) |
1251 | 0 | NS_INTERFACE_TABLE_BEGIN |
1252 | 0 | NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream) |
1253 | 0 | NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISeekableStream) |
1254 | 0 | NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream) |
1255 | 0 | NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream) |
1256 | 0 | NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream) |
1257 | 0 | NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo) |
1258 | 0 | NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream, |
1259 | 0 | nsIAsyncInputStream) |
1260 | 0 | NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports, |
1261 | 0 | nsIAsyncInputStream) |
1262 | 0 | NS_INTERFACE_TABLE_END |
1263 | 0 | NS_INTERFACE_TABLE_TAIL |
1264 | | |
1265 | | NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream, |
1266 | | nsIInputStream, |
1267 | | nsIAsyncInputStream, |
1268 | | nsISeekableStream, |
1269 | | nsISearchableInputStream, |
1270 | | nsICloneableInputStream, |
1271 | | nsIBufferedInputStream) |
1272 | | |
1273 | | NS_IMPL_THREADSAFE_CI(nsPipeInputStream) |
1274 | | |
1275 | | NS_IMETHODIMP |
1276 | | nsPipeInputStream::Init(nsIInputStream*, uint32_t) |
1277 | 0 | { |
1278 | 0 | MOZ_CRASH("nsPipeInputStream should never be initialized with " |
1279 | 0 | "nsIBufferedInputStream::Init!\n"); |
1280 | 0 | } |
1281 | | |
1282 | | NS_IMETHODIMP |
1283 | | nsPipeInputStream::GetData(nsIInputStream **aResult) |
1284 | 0 | { |
1285 | 0 | // as this was not created with init() we are not |
1286 | 0 | // wrapping anything |
1287 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
1288 | 0 | } |
1289 | | |
1290 | | uint32_t |
1291 | | nsPipeInputStream::Available() |
1292 | 0 | { |
1293 | 0 | mPipe->mReentrantMonitor.AssertCurrentThreadIn(); |
1294 | 0 | return mReadState.mAvailable; |
1295 | 0 | } |
1296 | | |
1297 | | nsresult |
1298 | | nsPipeInputStream::Wait() |
1299 | 0 | { |
1300 | 0 | MOZ_DIAGNOSTIC_ASSERT(mBlocking); |
1301 | 0 |
|
1302 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1303 | 0 |
|
1304 | 0 | while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) { |
1305 | 0 | LOG(("III pipe input: waiting for data\n")); |
1306 | 0 |
|
1307 | 0 | mBlocked = true; |
1308 | 0 | mon.Wait(); |
1309 | 0 | mBlocked = false; |
1310 | 0 |
|
1311 | 0 | LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n", |
1312 | 0 | static_cast<uint32_t>(Status(mon)), mReadState.mAvailable)); |
1313 | 0 | } |
1314 | 0 |
|
1315 | 0 | return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon); |
1316 | 0 | } |
1317 | | |
1318 | | MonitorAction |
1319 | | nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, |
1320 | | nsPipeEvents& aEvents, |
1321 | | const ReentrantMonitorAutoEnter& ev) |
1322 | 0 | { |
1323 | 0 | MonitorAction result = DoNotNotifyMonitor; |
1324 | 0 |
|
1325 | 0 | mPipe->mReentrantMonitor.AssertCurrentThreadIn(); |
1326 | 0 | mReadState.mAvailable += aBytesWritten; |
1327 | 0 |
|
1328 | 0 | if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { |
1329 | 0 | aEvents.NotifyInputReady(this, mCallback); |
1330 | 0 | mCallback = nullptr; |
1331 | 0 | mCallbackFlags = 0; |
1332 | 0 | } else if (mBlocked) { |
1333 | 0 | result = NotifyMonitor; |
1334 | 0 | } |
1335 | 0 |
|
1336 | 0 | return result; |
1337 | 0 | } |
1338 | | |
1339 | | MonitorAction |
1340 | | nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents, |
1341 | | const ReentrantMonitorAutoEnter& ev) |
1342 | 0 | { |
1343 | 0 | LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n", |
1344 | 0 | this, static_cast<uint32_t>(aReason))); |
1345 | 0 |
|
1346 | 0 | MonitorAction result = DoNotNotifyMonitor; |
1347 | 0 |
|
1348 | 0 | MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason)); |
1349 | 0 |
|
1350 | 0 | if (NS_SUCCEEDED(mInputStatus)) { |
1351 | 0 | mInputStatus = aReason; |
1352 | 0 | } |
1353 | 0 |
|
1354 | 0 | // force count of available bytes to zero. |
1355 | 0 | mPipe->DrainInputStream(mReadState, aEvents); |
1356 | 0 |
|
1357 | 0 | if (mCallback) { |
1358 | 0 | aEvents.NotifyInputReady(this, mCallback); |
1359 | 0 | mCallback = nullptr; |
1360 | 0 | mCallbackFlags = 0; |
1361 | 0 | } else if (mBlocked) { |
1362 | 0 | result = NotifyMonitor; |
1363 | 0 | } |
1364 | 0 |
|
1365 | 0 | return result; |
1366 | 0 | } |
1367 | | |
1368 | | NS_IMETHODIMP |
1369 | | nsPipeInputStream::CloseWithStatus(nsresult aReason) |
1370 | 0 | { |
1371 | 0 | LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n", |
1372 | 0 | this, static_cast<uint32_t>(aReason))); |
1373 | 0 |
|
1374 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1375 | 0 |
|
1376 | 0 | if (NS_FAILED(mInputStatus)) { |
1377 | 0 | return NS_OK; |
1378 | 0 | } |
1379 | 0 | |
1380 | 0 | if (NS_SUCCEEDED(aReason)) { |
1381 | 0 | aReason = NS_BASE_STREAM_CLOSED; |
1382 | 0 | } |
1383 | 0 |
|
1384 | 0 | mPipe->OnInputStreamException(this, aReason); |
1385 | 0 | return NS_OK; |
1386 | 0 | } |
1387 | | |
1388 | | NS_IMETHODIMP |
1389 | | nsPipeInputStream::Close() |
1390 | 0 | { |
1391 | 0 | return CloseWithStatus(NS_BASE_STREAM_CLOSED); |
1392 | 0 | } |
1393 | | |
1394 | | NS_IMETHODIMP |
1395 | | nsPipeInputStream::Available(uint64_t* aResult) |
1396 | 0 | { |
1397 | 0 | // nsPipeInputStream supports under 4GB stream only |
1398 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1399 | 0 |
|
1400 | 0 | // return error if closed |
1401 | 0 | if (!mReadState.mAvailable && NS_FAILED(Status(mon))) { |
1402 | 0 | return Status(mon); |
1403 | 0 | } |
1404 | 0 | |
1405 | 0 | *aResult = (uint64_t)mReadState.mAvailable; |
1406 | 0 | return NS_OK; |
1407 | 0 | } |
1408 | | |
1409 | | NS_IMETHODIMP |
1410 | | nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter, |
1411 | | void* aClosure, |
1412 | | uint32_t aCount, |
1413 | | uint32_t* aReadCount) |
1414 | 0 | { |
1415 | 0 | LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount)); |
1416 | 0 |
|
1417 | 0 | nsresult rv = NS_OK; |
1418 | 0 |
|
1419 | 0 | *aReadCount = 0; |
1420 | 0 | while (aCount) { |
1421 | 0 | AutoReadSegment segment(mPipe, mReadState, aCount); |
1422 | 0 | rv = segment.Status(); |
1423 | 0 | if (NS_FAILED(rv)) { |
1424 | 0 | // ignore this error if we've already read something. |
1425 | 0 | if (*aReadCount > 0) { |
1426 | 0 | rv = NS_OK; |
1427 | 0 | break; |
1428 | 0 | } |
1429 | 0 | if (rv == NS_BASE_STREAM_WOULD_BLOCK) { |
1430 | 0 | // pipe is empty |
1431 | 0 | if (!mBlocking) { |
1432 | 0 | break; |
1433 | 0 | } |
1434 | 0 | // wait for some data to be written to the pipe |
1435 | 0 | rv = Wait(); |
1436 | 0 | if (NS_SUCCEEDED(rv)) { |
1437 | 0 | continue; |
1438 | 0 | } |
1439 | 0 | } |
1440 | 0 | // ignore this error, just return. |
1441 | 0 | if (rv == NS_BASE_STREAM_CLOSED) { |
1442 | 0 | rv = NS_OK; |
1443 | 0 | break; |
1444 | 0 | } |
1445 | 0 | mPipe->OnInputStreamException(this, rv); |
1446 | 0 | break; |
1447 | 0 | } |
1448 | 0 | |
1449 | 0 | uint32_t writeCount; |
1450 | 0 | while (segment.Length()) { |
1451 | 0 | writeCount = 0; |
1452 | 0 |
|
1453 | 0 | rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure, |
1454 | 0 | segment.Data(), *aReadCount, segment.Length(), &writeCount); |
1455 | 0 |
|
1456 | 0 | if (NS_FAILED(rv) || writeCount == 0) { |
1457 | 0 | aCount = 0; |
1458 | 0 | // any errors returned from the writer end here: do not |
1459 | 0 | // propagate to the caller of ReadSegments. |
1460 | 0 | rv = NS_OK; |
1461 | 0 | break; |
1462 | 0 | } |
1463 | 0 | |
1464 | 0 | MOZ_DIAGNOSTIC_ASSERT(writeCount <= segment.Length()); |
1465 | 0 | segment.Advance(writeCount); |
1466 | 0 | aCount -= writeCount; |
1467 | 0 | *aReadCount += writeCount; |
1468 | 0 | mLogicalOffset += writeCount; |
1469 | 0 | } |
1470 | 0 | } |
1471 | 0 |
|
1472 | 0 | return rv; |
1473 | 0 | } |
1474 | | |
1475 | | NS_IMETHODIMP |
1476 | | nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount) |
1477 | 0 | { |
1478 | 0 | return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount); |
1479 | 0 | } |
1480 | | |
1481 | | NS_IMETHODIMP |
1482 | | nsPipeInputStream::IsNonBlocking(bool* aNonBlocking) |
1483 | 0 | { |
1484 | 0 | *aNonBlocking = !mBlocking; |
1485 | 0 | return NS_OK; |
1486 | 0 | } |
1487 | | |
1488 | | NS_IMETHODIMP |
1489 | | nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback, |
1490 | | uint32_t aFlags, |
1491 | | uint32_t aRequestedCount, |
1492 | | nsIEventTarget* aTarget) |
1493 | 0 | { |
1494 | 0 | LOG(("III AsyncWait [this=%p]\n", this)); |
1495 | 0 |
|
1496 | 0 | nsPipeEvents pipeEvents; |
1497 | 0 | { |
1498 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1499 | 0 |
|
1500 | 0 | // replace a pending callback |
1501 | 0 | mCallback = nullptr; |
1502 | 0 | mCallbackFlags = 0; |
1503 | 0 |
|
1504 | 0 | if (!aCallback) { |
1505 | 0 | return NS_OK; |
1506 | 0 | } |
1507 | 0 | |
1508 | 0 | nsCOMPtr<nsIInputStreamCallback> proxy; |
1509 | 0 | if (aTarget) { |
1510 | 0 | proxy = NS_NewInputStreamReadyEvent("nsPipeInputStream::AsyncWait", |
1511 | 0 | aCallback, aTarget); |
1512 | 0 | aCallback = proxy; |
1513 | 0 | } |
1514 | 0 |
|
1515 | 0 | if (NS_FAILED(Status(mon)) || |
1516 | 0 | (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) { |
1517 | 0 | // stream is already closed or readable; post event. |
1518 | 0 | pipeEvents.NotifyInputReady(this, aCallback); |
1519 | 0 | } else { |
1520 | 0 | // queue up callback object to be notified when data becomes available |
1521 | 0 | mCallback = aCallback; |
1522 | 0 | mCallbackFlags = aFlags; |
1523 | 0 | } |
1524 | 0 | } |
1525 | 0 | return NS_OK; |
1526 | 0 | } |
1527 | | |
1528 | | NS_IMETHODIMP |
1529 | | nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset) |
1530 | 0 | { |
1531 | 0 | MOZ_ASSERT_UNREACHABLE("nsPipeInputStream::Seek"); |
1532 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
1533 | 0 | } |
1534 | | |
1535 | | NS_IMETHODIMP |
1536 | | nsPipeInputStream::Tell(int64_t* aOffset) |
1537 | 0 | { |
1538 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1539 | 0 |
|
1540 | 0 | // return error if closed |
1541 | 0 | if (!mReadState.mAvailable && NS_FAILED(Status(mon))) { |
1542 | 0 | return Status(mon); |
1543 | 0 | } |
1544 | 0 | |
1545 | 0 | *aOffset = mLogicalOffset; |
1546 | 0 | return NS_OK; |
1547 | 0 | } |
1548 | | |
1549 | | NS_IMETHODIMP |
1550 | | nsPipeInputStream::SetEOF() |
1551 | 0 | { |
1552 | 0 | MOZ_ASSERT_UNREACHABLE("nsPipeInputStream::SetEOF"); |
1553 | 0 | return NS_ERROR_NOT_IMPLEMENTED; |
1554 | 0 | } |
1555 | | |
1556 | | static bool strings_equal(bool aIgnoreCase, |
1557 | | const char* aS1, const char* aS2, uint32_t aLen) |
1558 | 0 | { |
1559 | 0 | return aIgnoreCase |
1560 | 0 | ? !nsCRT::strncasecmp(aS1, aS2, aLen) : !strncmp(aS1, aS2, aLen); |
1561 | 0 | } |
1562 | | |
1563 | | NS_IMETHODIMP |
1564 | | nsPipeInputStream::Search(const char* aForString, |
1565 | | bool aIgnoreCase, |
1566 | | bool* aFound, |
1567 | | uint32_t* aOffsetSearchedTo) |
1568 | 0 | { |
1569 | 0 | LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase)); |
1570 | 0 |
|
1571 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1572 | 0 |
|
1573 | 0 | char* cursor1; |
1574 | 0 | char* limit1; |
1575 | 0 | uint32_t index = 0, offset = 0; |
1576 | 0 | uint32_t strLen = strlen(aForString); |
1577 | 0 |
|
1578 | 0 | mPipe->PeekSegment(mReadState, 0, cursor1, limit1); |
1579 | 0 | if (cursor1 == limit1) { |
1580 | 0 | *aFound = false; |
1581 | 0 | *aOffsetSearchedTo = 0; |
1582 | 0 | LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); |
1583 | 0 | return NS_OK; |
1584 | 0 | } |
1585 | 0 |
|
1586 | 0 | while (true) { |
1587 | 0 | uint32_t i, len1 = limit1 - cursor1; |
1588 | 0 |
|
1589 | 0 | // check if the string is in the buffer segment |
1590 | 0 | for (i = 0; i < len1 - strLen + 1; i++) { |
1591 | 0 | if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) { |
1592 | 0 | *aFound = true; |
1593 | 0 | *aOffsetSearchedTo = offset + i; |
1594 | 0 | LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); |
1595 | 0 | return NS_OK; |
1596 | 0 | } |
1597 | 0 | } |
1598 | 0 |
|
1599 | 0 | // get the next segment |
1600 | 0 | char* cursor2; |
1601 | 0 | char* limit2; |
1602 | 0 | uint32_t len2; |
1603 | 0 |
|
1604 | 0 | index++; |
1605 | 0 | offset += len1; |
1606 | 0 |
|
1607 | 0 | mPipe->PeekSegment(mReadState, index, cursor2, limit2); |
1608 | 0 | if (cursor2 == limit2) { |
1609 | 0 | *aFound = false; |
1610 | 0 | *aOffsetSearchedTo = offset - strLen + 1; |
1611 | 0 | LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); |
1612 | 0 | return NS_OK; |
1613 | 0 | } |
1614 | 0 | len2 = limit2 - cursor2; |
1615 | 0 |
|
1616 | 0 | // check if the string is straddling the next buffer segment |
1617 | 0 | uint32_t lim = XPCOM_MIN(strLen, len2 + 1); |
1618 | 0 | for (i = 0; i < lim; ++i) { |
1619 | 0 | uint32_t strPart1Len = strLen - i - 1; |
1620 | 0 | uint32_t strPart2Len = strLen - strPart1Len; |
1621 | 0 | const char* strPart2 = &aForString[strLen - strPart2Len]; |
1622 | 0 | uint32_t bufSeg1Offset = len1 - strPart1Len; |
1623 | 0 | if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString, strPart1Len) && |
1624 | 0 | strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) { |
1625 | 0 | *aFound = true; |
1626 | 0 | *aOffsetSearchedTo = offset - strPart1Len; |
1627 | 0 | LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo)); |
1628 | 0 | return NS_OK; |
1629 | 0 | } |
1630 | 0 | } |
1631 | 0 |
|
1632 | 0 | // finally continue with the next buffer |
1633 | 0 | cursor1 = cursor2; |
1634 | 0 | limit1 = limit2; |
1635 | 0 | } |
1636 | 0 |
|
1637 | 0 | MOZ_ASSERT_UNREACHABLE("can't get here"); |
1638 | 0 | return NS_ERROR_UNEXPECTED; // keep compiler happy |
1639 | 0 | } |
1640 | | |
1641 | | NS_IMETHODIMP |
1642 | | nsPipeInputStream::GetCloneable(bool* aCloneableOut) |
1643 | 0 | { |
1644 | 0 | *aCloneableOut = true; |
1645 | 0 | return NS_OK; |
1646 | 0 | } |
1647 | | |
1648 | | NS_IMETHODIMP |
1649 | | nsPipeInputStream::Clone(nsIInputStream** aCloneOut) |
1650 | 0 | { |
1651 | 0 | return mPipe->CloneInputStream(this, aCloneOut); |
1652 | 0 | } |
1653 | | |
1654 | | nsresult |
1655 | | nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const |
1656 | 0 | { |
1657 | 0 | if (NS_FAILED(mInputStatus)) { |
1658 | 0 | return mInputStatus; |
1659 | 0 | } |
1660 | 0 | |
1661 | 0 | if (mReadState.mAvailable) { |
1662 | 0 | // Still something to read and this input stream state is OK. |
1663 | 0 | return NS_OK; |
1664 | 0 | } |
1665 | 0 | |
1666 | 0 | // Nothing to read, just fall through to the pipe's state that |
1667 | 0 | // may reflect state of its output stream side (already closed). |
1668 | 0 | return mPipe->mStatus; |
1669 | 0 | } |
1670 | | |
1671 | | nsresult |
1672 | | nsPipeInputStream::Status() const |
1673 | 0 | { |
1674 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1675 | 0 | return Status(mon); |
1676 | 0 | } |
1677 | | |
1678 | | nsPipeInputStream::~nsPipeInputStream() |
1679 | 0 | { |
1680 | 0 | Close(); |
1681 | 0 | } |
1682 | | |
1683 | | //----------------------------------------------------------------------------- |
1684 | | // nsPipeOutputStream methods: |
1685 | | //----------------------------------------------------------------------------- |
1686 | | |
1687 | | NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream, |
1688 | | nsIOutputStream, |
1689 | | nsIAsyncOutputStream, |
1690 | | nsIClassInfo) |
1691 | | |
1692 | | NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream, |
1693 | | nsIOutputStream, |
1694 | | nsIAsyncOutputStream) |
1695 | | |
1696 | | NS_IMPL_THREADSAFE_CI(nsPipeOutputStream) |
1697 | | |
1698 | | nsresult |
1699 | | nsPipeOutputStream::Wait() |
1700 | 0 | { |
1701 | 0 | MOZ_DIAGNOSTIC_ASSERT(mBlocking); |
1702 | 0 |
|
1703 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1704 | 0 |
|
1705 | 0 | if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) { |
1706 | 0 | LOG(("OOO pipe output: waiting for space\n")); |
1707 | 0 | mBlocked = true; |
1708 | 0 | mon.Wait(); |
1709 | 0 | mBlocked = false; |
1710 | 0 | LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n", |
1711 | 0 | static_cast<uint32_t>(mPipe->mStatus), mWritable)); |
1712 | 0 | } |
1713 | 0 |
|
1714 | 0 | return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; |
1715 | 0 | } |
1716 | | |
1717 | | MonitorAction |
1718 | | nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) |
1719 | 0 | { |
1720 | 0 | MonitorAction result = DoNotNotifyMonitor; |
1721 | 0 |
|
1722 | 0 | mWritable = true; |
1723 | 0 |
|
1724 | 0 | if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) { |
1725 | 0 | aEvents.NotifyOutputReady(this, mCallback); |
1726 | 0 | mCallback = nullptr; |
1727 | 0 | mCallbackFlags = 0; |
1728 | 0 | } else if (mBlocked) { |
1729 | 0 | result = NotifyMonitor; |
1730 | 0 | } |
1731 | 0 |
|
1732 | 0 | return result; |
1733 | 0 | } |
1734 | | |
1735 | | MonitorAction |
1736 | | nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents) |
1737 | 0 | { |
1738 | 0 | LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n", |
1739 | 0 | this, static_cast<uint32_t>(aReason))); |
1740 | 0 |
|
1741 | 0 | MonitorAction result = DoNotNotifyMonitor; |
1742 | 0 |
|
1743 | 0 | MOZ_DIAGNOSTIC_ASSERT(NS_FAILED(aReason)); |
1744 | 0 | mWritable = false; |
1745 | 0 |
|
1746 | 0 | if (mCallback) { |
1747 | 0 | aEvents.NotifyOutputReady(this, mCallback); |
1748 | 0 | mCallback = nullptr; |
1749 | 0 | mCallbackFlags = 0; |
1750 | 0 | } else if (mBlocked) { |
1751 | 0 | result = NotifyMonitor; |
1752 | 0 | } |
1753 | 0 |
|
1754 | 0 | return result; |
1755 | 0 | } |
1756 | | |
1757 | | |
1758 | | NS_IMETHODIMP_(MozExternalRefCountType) |
1759 | | nsPipeOutputStream::AddRef() |
1760 | 0 | { |
1761 | 0 | ++mWriterRefCnt; |
1762 | 0 | return mPipe->AddRef(); |
1763 | 0 | } |
1764 | | |
1765 | | NS_IMETHODIMP_(MozExternalRefCountType) |
1766 | | nsPipeOutputStream::Release() |
1767 | 0 | { |
1768 | 0 | if (--mWriterRefCnt == 0) { |
1769 | 0 | Close(); |
1770 | 0 | } |
1771 | 0 | return mPipe->Release(); |
1772 | 0 | } |
1773 | | |
1774 | | NS_IMETHODIMP |
1775 | | nsPipeOutputStream::CloseWithStatus(nsresult aReason) |
1776 | 0 | { |
1777 | 0 | LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n", |
1778 | 0 | this, static_cast<uint32_t>(aReason))); |
1779 | 0 |
|
1780 | 0 | if (NS_SUCCEEDED(aReason)) { |
1781 | 0 | aReason = NS_BASE_STREAM_CLOSED; |
1782 | 0 | } |
1783 | 0 |
|
1784 | 0 | // input stream may remain open |
1785 | 0 | mPipe->OnPipeException(aReason, true); |
1786 | 0 | return NS_OK; |
1787 | 0 | } |
1788 | | |
1789 | | NS_IMETHODIMP |
1790 | | nsPipeOutputStream::Close() |
1791 | 0 | { |
1792 | 0 | return CloseWithStatus(NS_BASE_STREAM_CLOSED); |
1793 | 0 | } |
1794 | | |
1795 | | NS_IMETHODIMP |
1796 | | nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader, |
1797 | | void* aClosure, |
1798 | | uint32_t aCount, |
1799 | | uint32_t* aWriteCount) |
1800 | 0 | { |
1801 | 0 | LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount)); |
1802 | 0 |
|
1803 | 0 | nsresult rv = NS_OK; |
1804 | 0 |
|
1805 | 0 | char* segment; |
1806 | 0 | uint32_t segmentLen; |
1807 | 0 |
|
1808 | 0 | *aWriteCount = 0; |
1809 | 0 | while (aCount) { |
1810 | 0 | rv = mPipe->GetWriteSegment(segment, segmentLen); |
1811 | 0 | if (NS_FAILED(rv)) { |
1812 | 0 | if (rv == NS_BASE_STREAM_WOULD_BLOCK) { |
1813 | 0 | // pipe is full |
1814 | 0 | if (!mBlocking) { |
1815 | 0 | // ignore this error if we've already written something |
1816 | 0 | if (*aWriteCount > 0) { |
1817 | 0 | rv = NS_OK; |
1818 | 0 | } |
1819 | 0 | break; |
1820 | 0 | } |
1821 | 0 | // wait for the pipe to have an empty segment. |
1822 | 0 | rv = Wait(); |
1823 | 0 | if (NS_SUCCEEDED(rv)) { |
1824 | 0 | continue; |
1825 | 0 | } |
1826 | 0 | } |
1827 | 0 | mPipe->OnPipeException(rv); |
1828 | 0 | break; |
1829 | 0 | } |
1830 | 0 | |
1831 | 0 | // write no more than aCount |
1832 | 0 | if (segmentLen > aCount) { |
1833 | 0 | segmentLen = aCount; |
1834 | 0 | } |
1835 | 0 |
|
1836 | 0 | uint32_t readCount, originalLen = segmentLen; |
1837 | 0 | while (segmentLen) { |
1838 | 0 | readCount = 0; |
1839 | 0 |
|
1840 | 0 | rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen, &readCount); |
1841 | 0 |
|
1842 | 0 | if (NS_FAILED(rv) || readCount == 0) { |
1843 | 0 | aCount = 0; |
1844 | 0 | // any errors returned from the aReader end here: do not |
1845 | 0 | // propagate to the caller of WriteSegments. |
1846 | 0 | rv = NS_OK; |
1847 | 0 | break; |
1848 | 0 | } |
1849 | 0 | |
1850 | 0 | MOZ_DIAGNOSTIC_ASSERT(readCount <= segmentLen); |
1851 | 0 | segment += readCount; |
1852 | 0 | segmentLen -= readCount; |
1853 | 0 | aCount -= readCount; |
1854 | 0 | *aWriteCount += readCount; |
1855 | 0 | mLogicalOffset += readCount; |
1856 | 0 | } |
1857 | 0 |
|
1858 | 0 | if (segmentLen < originalLen) { |
1859 | 0 | mPipe->AdvanceWriteCursor(originalLen - segmentLen); |
1860 | 0 | } |
1861 | 0 | } |
1862 | 0 |
|
1863 | 0 | return rv; |
1864 | 0 | } |
1865 | | |
1866 | | static nsresult |
1867 | | nsReadFromRawBuffer(nsIOutputStream* aOutStr, |
1868 | | void* aClosure, |
1869 | | char* aToRawSegment, |
1870 | | uint32_t aOffset, |
1871 | | uint32_t aCount, |
1872 | | uint32_t* aReadCount) |
1873 | 0 | { |
1874 | 0 | const char* fromBuf = (const char*)aClosure; |
1875 | 0 | memcpy(aToRawSegment, &fromBuf[aOffset], aCount); |
1876 | 0 | *aReadCount = aCount; |
1877 | 0 | return NS_OK; |
1878 | 0 | } |
1879 | | |
1880 | | NS_IMETHODIMP |
1881 | | nsPipeOutputStream::Write(const char* aFromBuf, |
1882 | | uint32_t aBufLen, |
1883 | | uint32_t* aWriteCount) |
1884 | 0 | { |
1885 | 0 | return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen, aWriteCount); |
1886 | 0 | } |
1887 | | |
1888 | | NS_IMETHODIMP |
1889 | | nsPipeOutputStream::Flush(void) |
1890 | 0 | { |
1891 | 0 | // nothing to do |
1892 | 0 | return NS_OK; |
1893 | 0 | } |
1894 | | |
1895 | | static nsresult |
1896 | | nsReadFromInputStream(nsIOutputStream* aOutStr, |
1897 | | void* aClosure, |
1898 | | char* aToRawSegment, |
1899 | | uint32_t aOffset, |
1900 | | uint32_t aCount, |
1901 | | uint32_t* aReadCount) |
1902 | 0 | { |
1903 | 0 | nsIInputStream* fromStream = (nsIInputStream*)aClosure; |
1904 | 0 | return fromStream->Read(aToRawSegment, aCount, aReadCount); |
1905 | 0 | } |
1906 | | |
1907 | | NS_IMETHODIMP |
1908 | | nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream, |
1909 | | uint32_t aCount, |
1910 | | uint32_t* aWriteCount) |
1911 | 0 | { |
1912 | 0 | return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount); |
1913 | 0 | } |
1914 | | |
1915 | | NS_IMETHODIMP |
1916 | | nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking) |
1917 | 0 | { |
1918 | 0 | *aNonBlocking = !mBlocking; |
1919 | 0 | return NS_OK; |
1920 | 0 | } |
1921 | | |
1922 | | NS_IMETHODIMP |
1923 | | nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback, |
1924 | | uint32_t aFlags, |
1925 | | uint32_t aRequestedCount, |
1926 | | nsIEventTarget* aTarget) |
1927 | 0 | { |
1928 | 0 | LOG(("OOO AsyncWait [this=%p]\n", this)); |
1929 | 0 |
|
1930 | 0 | nsPipeEvents pipeEvents; |
1931 | 0 | { |
1932 | 0 | ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor); |
1933 | 0 |
|
1934 | 0 | // replace a pending callback |
1935 | 0 | mCallback = nullptr; |
1936 | 0 | mCallbackFlags = 0; |
1937 | 0 |
|
1938 | 0 | if (!aCallback) { |
1939 | 0 | return NS_OK; |
1940 | 0 | } |
1941 | 0 | |
1942 | 0 | nsCOMPtr<nsIOutputStreamCallback> proxy; |
1943 | 0 | if (aTarget) { |
1944 | 0 | proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget); |
1945 | 0 | aCallback = proxy; |
1946 | 0 | } |
1947 | 0 |
|
1948 | 0 | if (NS_FAILED(mPipe->mStatus) || |
1949 | 0 | (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) { |
1950 | 0 | // stream is already closed or writable; post event. |
1951 | 0 | pipeEvents.NotifyOutputReady(this, aCallback); |
1952 | 0 | } else { |
1953 | 0 | // queue up callback object to be notified when data becomes available |
1954 | 0 | mCallback = aCallback; |
1955 | 0 | mCallbackFlags = aFlags; |
1956 | 0 | } |
1957 | 0 | } |
1958 | 0 | return NS_OK; |
1959 | 0 | } |
1960 | | |
1961 | | //////////////////////////////////////////////////////////////////////////////// |
1962 | | |
1963 | | nsresult |
1964 | | NS_NewPipe(nsIInputStream** aPipeIn, |
1965 | | nsIOutputStream** aPipeOut, |
1966 | | uint32_t aSegmentSize, |
1967 | | uint32_t aMaxSize, |
1968 | | bool aNonBlockingInput, |
1969 | | bool aNonBlockingOutput) |
1970 | 0 | { |
1971 | 0 | if (aSegmentSize == 0) { |
1972 | 0 | aSegmentSize = DEFAULT_SEGMENT_SIZE; |
1973 | 0 | } |
1974 | 0 |
|
1975 | 0 | // Handle aMaxSize of UINT32_MAX as a special case |
1976 | 0 | uint32_t segmentCount; |
1977 | 0 | if (aMaxSize == UINT32_MAX) { |
1978 | 0 | segmentCount = UINT32_MAX; |
1979 | 0 | } else { |
1980 | 0 | segmentCount = aMaxSize / aSegmentSize; |
1981 | 0 | } |
1982 | 0 |
|
1983 | 0 | nsIAsyncInputStream* in; |
1984 | 0 | nsIAsyncOutputStream* out; |
1985 | 0 | nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput, |
1986 | 0 | aSegmentSize, segmentCount); |
1987 | 0 | if (NS_FAILED(rv)) { |
1988 | 0 | return rv; |
1989 | 0 | } |
1990 | 0 | |
1991 | 0 | *aPipeIn = in; |
1992 | 0 | *aPipeOut = out; |
1993 | 0 | return NS_OK; |
1994 | 0 | } |
1995 | | |
1996 | | nsresult |
1997 | | NS_NewPipe2(nsIAsyncInputStream** aPipeIn, |
1998 | | nsIAsyncOutputStream** aPipeOut, |
1999 | | bool aNonBlockingInput, |
2000 | | bool aNonBlockingOutput, |
2001 | | uint32_t aSegmentSize, |
2002 | | uint32_t aSegmentCount) |
2003 | 0 | { |
2004 | 0 | nsPipe* pipe = new nsPipe(); |
2005 | 0 | nsresult rv = pipe->Init(aNonBlockingInput, |
2006 | 0 | aNonBlockingOutput, |
2007 | 0 | aSegmentSize, |
2008 | 0 | aSegmentCount); |
2009 | 0 | if (NS_FAILED(rv)) { |
2010 | 0 | NS_ADDREF(pipe); |
2011 | 0 | NS_RELEASE(pipe); |
2012 | 0 | return rv; |
2013 | 0 | } |
2014 | 0 |
|
2015 | 0 | // These always succeed because the pipe is initialized above. |
2016 | 0 | MOZ_ALWAYS_SUCCEEDS(pipe->GetInputStream(aPipeIn)); |
2017 | 0 | MOZ_ALWAYS_SUCCEEDS(pipe->GetOutputStream(aPipeOut)); |
2018 | 0 | return NS_OK; |
2019 | 0 | } |
2020 | | |
2021 | | nsresult |
2022 | | nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult) |
2023 | 0 | { |
2024 | 0 | if (aOuter) { |
2025 | 0 | return NS_ERROR_NO_AGGREGATION; |
2026 | 0 | } |
2027 | 0 | nsPipe* pipe = new nsPipe(); |
2028 | 0 | NS_ADDREF(pipe); |
2029 | 0 | nsresult rv = pipe->QueryInterface(aIID, aResult); |
2030 | 0 | NS_RELEASE(pipe); |
2031 | 0 | return rv; |
2032 | 0 | } |
2033 | | |
2034 | | //////////////////////////////////////////////////////////////////////////////// |