Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/xpcom/io/nsInputStreamTee.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this
5
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include <stdlib.h>
8
#include "mozilla/Logging.h"
9
10
#include "mozilla/Mutex.h"
11
#include "mozilla/Attributes.h"
12
#include "nsIInputStreamTee.h"
13
#include "nsIInputStream.h"
14
#include "nsIOutputStream.h"
15
#include "nsCOMPtr.h"
16
#include "nsAutoPtr.h"
17
#include "nsIEventTarget.h"
18
#include "nsThreadUtils.h"
19
20
using namespace mozilla;
21
22
#ifdef LOG
23
#undef LOG
24
#endif
25
26
static LazyLogModule sTeeLog("nsInputStreamTee");
27
0
#define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args)
28
29
class nsInputStreamTee final : public nsIInputStreamTee
30
{
31
public:
32
  NS_DECL_THREADSAFE_ISUPPORTS
33
  NS_DECL_NSIINPUTSTREAM
34
  NS_DECL_NSIINPUTSTREAMTEE
35
36
  nsInputStreamTee();
37
  bool SinkIsValid();
38
  void InvalidateSink();
39
40
private:
41
  ~nsInputStreamTee()
42
0
  {
43
0
  }
44
45
  nsresult TeeSegment(const char* aBuf, uint32_t aCount);
46
47
  static nsresult WriteSegmentFun(nsIInputStream*, void*, const char*,
48
                                  uint32_t, uint32_t, uint32_t*);
49
50
private:
51
  nsCOMPtr<nsIInputStream>  mSource;
52
  nsCOMPtr<nsIOutputStream> mSink;
53
  nsCOMPtr<nsIEventTarget>  mEventTarget;
54
  nsWriteSegmentFun         mWriter;  // for implementing ReadSegments
55
  void*                     mClosure; // for implementing ReadSegments
56
  nsAutoPtr<Mutex>          mLock; // synchronize access to mSinkIsValid
57
  bool                      mSinkIsValid; // False if TeeWriteEvent fails
58
};
59
60
class nsInputStreamTeeWriteEvent : public Runnable
61
{
62
public:
63
  // aTee's lock is held across construction of this object
64
  nsInputStreamTeeWriteEvent(const char* aBuf,
65
                             uint32_t aCount,
66
                             nsIOutputStream* aSink,
67
                             nsInputStreamTee* aTee)
68
    : mozilla::Runnable("nsInputStreamTeeWriteEvent")
69
0
  {
70
0
    // copy the buffer - will be free'd by dtor
71
0
    mBuf = (char*)malloc(aCount);
72
0
    if (mBuf) {
73
0
      memcpy(mBuf, (char*)aBuf, aCount);
74
0
    }
75
0
    mCount = aCount;
76
0
    mSink = aSink;
77
0
    bool isNonBlocking;
78
0
    mSink->IsNonBlocking(&isNonBlocking);
79
0
    NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking");
80
0
    mTee = aTee;
81
0
  }
82
83
  NS_IMETHOD Run() override
84
0
  {
85
0
    if (!mBuf) {
86
0
      NS_WARNING("nsInputStreamTeeWriteEvent::Run() "
87
0
                 "memory not allocated\n");
88
0
      return NS_OK;
89
0
    }
90
0
    MOZ_ASSERT(mSink, "mSink is null!");
91
0
92
0
    //  The output stream could have been invalidated between when
93
0
    //  this event was dispatched and now, so check before writing.
94
0
    if (!mTee->SinkIsValid()) {
95
0
      return NS_OK;
96
0
    }
97
0
98
0
    LOG(("nsInputStreamTeeWriteEvent::Run() [%p]"
99
0
         "will write %u bytes to %p\n",
100
0
         this, mCount, mSink.get()));
101
0
102
0
    uint32_t totalBytesWritten = 0;
103
0
    while (mCount) {
104
0
      nsresult rv;
105
0
      uint32_t bytesWritten = 0;
106
0
      rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten);
107
0
      if (NS_FAILED(rv)) {
108
0
        LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32 " in writing",
109
0
             this, static_cast<uint32_t>(rv)));
110
0
        mTee->InvalidateSink();
111
0
        break;
112
0
      }
113
0
      totalBytesWritten += bytesWritten;
114
0
      NS_ASSERTION(bytesWritten <= mCount, "wrote too much");
115
0
      mCount -= bytesWritten;
116
0
    }
117
0
    return NS_OK;
118
0
  }
119
120
protected:
121
  virtual ~nsInputStreamTeeWriteEvent()
122
0
  {
123
0
    if (mBuf) {
124
0
      free(mBuf);
125
0
    }
126
0
    mBuf = nullptr;
127
0
  }
128
129
private:
130
  char* mBuf;
131
  uint32_t mCount;
132
  nsCOMPtr<nsIOutputStream> mSink;
133
  // back pointer to the tee that created this runnable
134
  RefPtr<nsInputStreamTee> mTee;
135
};
136
137
nsInputStreamTee::nsInputStreamTee()
138
  : mWriter(nullptr)
139
  , mClosure(nullptr)
140
  , mLock(nullptr)
141
  , mSinkIsValid(true)
142
0
{
143
0
}
144
145
bool
146
nsInputStreamTee::SinkIsValid()
147
0
{
148
0
  MutexAutoLock lock(*mLock);
149
0
  return mSinkIsValid;
150
0
}
151
152
void
153
nsInputStreamTee::InvalidateSink()
154
0
{
155
0
  MutexAutoLock lock(*mLock);
156
0
  mSinkIsValid = false;
157
0
}
158
159
nsresult
160
nsInputStreamTee::TeeSegment(const char* aBuf, uint32_t aCount)
161
0
{
162
0
  if (!mSink) {
163
0
    return NS_OK;  // nothing to do
164
0
  }
165
0
  if (mLock) { // asynchronous case
166
0
    NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null.");
167
0
    if (!SinkIsValid()) {
168
0
      return NS_OK; // nothing to do
169
0
    }
170
0
    nsCOMPtr<nsIRunnable> event =
171
0
      new nsInputStreamTeeWriteEvent(aBuf, aCount, mSink, this);
172
0
    LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n",
173
0
         this, aCount));
174
0
    return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL);
175
0
  } else { // synchronous case
176
0
    NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null.");
177
0
    nsresult rv;
178
0
    uint32_t totalBytesWritten = 0;
179
0
    while (aCount) {
180
0
      uint32_t bytesWritten = 0;
181
0
      rv = mSink->Write(aBuf + totalBytesWritten, aCount, &bytesWritten);
182
0
      if (NS_FAILED(rv)) {
183
0
        // ok, this is not a fatal error... just drop our reference to mSink
184
0
        // and continue on as if nothing happened.
185
0
        NS_WARNING("Write failed (non-fatal)");
186
0
        // catch possible misuse of the input stream tee
187
0
        NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream");
188
0
        mSink = nullptr;
189
0
        break;
190
0
      }
191
0
      totalBytesWritten += bytesWritten;
192
0
      NS_ASSERTION(bytesWritten <= aCount, "wrote too much");
193
0
      aCount -= bytesWritten;
194
0
    }
195
0
    return NS_OK;
196
0
  }
197
0
}
198
199
nsresult
200
nsInputStreamTee::WriteSegmentFun(nsIInputStream* aIn, void* aClosure,
201
                                  const char* aFromSegment, uint32_t aOffset,
202
                                  uint32_t aCount, uint32_t* aWriteCount)
203
0
{
204
0
  nsInputStreamTee* tee = reinterpret_cast<nsInputStreamTee*>(aClosure);
205
0
  nsresult rv = tee->mWriter(aIn, tee->mClosure, aFromSegment, aOffset,
206
0
                             aCount, aWriteCount);
207
0
  if (NS_FAILED(rv) || (*aWriteCount == 0)) {
208
0
    NS_ASSERTION((NS_FAILED(rv) ? (*aWriteCount == 0) : true),
209
0
                 "writer returned an error with non-zero writeCount");
210
0
    return rv;
211
0
  }
212
0
213
0
  return tee->TeeSegment(aFromSegment, *aWriteCount);
214
0
}
215
216
NS_IMPL_ISUPPORTS(nsInputStreamTee,
217
                  nsIInputStreamTee,
218
                  nsIInputStream)
219
NS_IMETHODIMP
220
nsInputStreamTee::Close()
221
0
{
222
0
  if (NS_WARN_IF(!mSource)) {
223
0
    return NS_ERROR_NOT_INITIALIZED;
224
0
  }
225
0
  nsresult rv = mSource->Close();
226
0
  mSource = nullptr;
227
0
  mSink = nullptr;
228
0
  return rv;
229
0
}
230
231
NS_IMETHODIMP
232
nsInputStreamTee::Available(uint64_t* aAvail)
233
0
{
234
0
  if (NS_WARN_IF(!mSource)) {
235
0
    return NS_ERROR_NOT_INITIALIZED;
236
0
  }
237
0
  return mSource->Available(aAvail);
238
0
}
239
240
NS_IMETHODIMP
241
nsInputStreamTee::Read(char* aBuf, uint32_t aCount, uint32_t* aBytesRead)
242
0
{
243
0
  if (NS_WARN_IF(!mSource)) {
244
0
    return NS_ERROR_NOT_INITIALIZED;
245
0
  }
246
0
247
0
  nsresult rv = mSource->Read(aBuf, aCount, aBytesRead);
248
0
  if (NS_FAILED(rv) || (*aBytesRead == 0)) {
249
0
    return rv;
250
0
  }
251
0
252
0
  return TeeSegment(aBuf, *aBytesRead);
253
0
}
254
255
NS_IMETHODIMP
256
nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter,
257
                               void* aClosure,
258
                               uint32_t aCount,
259
                               uint32_t* aBytesRead)
260
0
{
261
0
  if (NS_WARN_IF(!mSource)) {
262
0
    return NS_ERROR_NOT_INITIALIZED;
263
0
  }
264
0
265
0
  mWriter = aWriter;
266
0
  mClosure = aClosure;
267
0
268
0
  return mSource->ReadSegments(WriteSegmentFun, this, aCount, aBytesRead);
269
0
}
270
271
NS_IMETHODIMP
272
nsInputStreamTee::IsNonBlocking(bool* aResult)
273
0
{
274
0
  if (NS_WARN_IF(!mSource)) {
275
0
    return NS_ERROR_NOT_INITIALIZED;
276
0
  }
277
0
  return mSource->IsNonBlocking(aResult);
278
0
}
279
280
NS_IMETHODIMP
281
nsInputStreamTee::SetSource(nsIInputStream* aSource)
282
0
{
283
0
  mSource = aSource;
284
0
  return NS_OK;
285
0
}
286
287
NS_IMETHODIMP
288
nsInputStreamTee::GetSource(nsIInputStream** aSource)
289
0
{
290
0
  NS_IF_ADDREF(*aSource = mSource);
291
0
  return NS_OK;
292
0
}
293
294
NS_IMETHODIMP
295
nsInputStreamTee::SetSink(nsIOutputStream* aSink)
296
0
{
297
#ifdef DEBUG
298
  if (aSink) {
299
    bool nonBlocking;
300
    nsresult rv = aSink->IsNonBlocking(&nonBlocking);
301
    if (NS_FAILED(rv) || nonBlocking) {
302
      NS_ERROR("aSink should be a blocking stream");
303
    }
304
  }
305
#endif
306
  mSink = aSink;
307
0
  return NS_OK;
308
0
}
309
310
NS_IMETHODIMP
311
nsInputStreamTee::GetSink(nsIOutputStream** aSink)
312
0
{
313
0
  NS_IF_ADDREF(*aSink = mSink);
314
0
  return NS_OK;
315
0
}
316
317
NS_IMETHODIMP
318
nsInputStreamTee::SetEventTarget(nsIEventTarget* aEventTarget)
319
0
{
320
0
  mEventTarget = aEventTarget;
321
0
  if (mEventTarget) {
322
0
    // Only need synchronization if this is an async tee
323
0
    mLock = new Mutex("nsInputStreamTee.mLock");
324
0
  }
325
0
  return NS_OK;
326
0
}
327
328
NS_IMETHODIMP
329
nsInputStreamTee::GetEventTarget(nsIEventTarget** aEventTarget)
330
0
{
331
0
  NS_IF_ADDREF(*aEventTarget = mEventTarget);
332
0
  return NS_OK;
333
0
}
334
335
336
nsresult
337
NS_NewInputStreamTeeAsync(nsIInputStream** aResult,
338
                          nsIInputStream* aSource,
339
                          nsIOutputStream* aSink,
340
                          nsIEventTarget* aEventTarget)
341
0
{
342
0
  nsresult rv;
343
0
344
0
  nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee();
345
0
  rv = tee->SetSource(aSource);
346
0
  if (NS_FAILED(rv)) {
347
0
    return rv;
348
0
  }
349
0
350
0
  rv = tee->SetSink(aSink);
351
0
  if (NS_FAILED(rv)) {
352
0
    return rv;
353
0
  }
354
0
355
0
  rv = tee->SetEventTarget(aEventTarget);
356
0
  if (NS_FAILED(rv)) {
357
0
    return rv;
358
0
  }
359
0
360
0
  tee.forget(aResult);
361
0
  return rv;
362
0
}
363
364
nsresult
365
NS_NewInputStreamTee(nsIInputStream** aResult,
366
                     nsIInputStream* aSource,
367
                     nsIOutputStream* aSink)
368
0
{
369
0
  return NS_NewInputStreamTeeAsync(aResult, aSource, aSink, nullptr);
370
0
}
371
372
#undef LOG