Coverage Report

Created: 2025-08-28 09:57

/src/node/src/stream_base.cc
Line
Count
Source (jump to first uncovered line)
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "env-inl.h"
6
#include "js_stream.h"
7
#include "node.h"
8
#include "node_buffer.h"
9
#include "node_errors.h"
10
#include "node_external_reference.h"
11
#include "string_bytes.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <climits>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::BackingStore;
22
using v8::ConstructorBehavior;
23
using v8::Context;
24
using v8::DontDelete;
25
using v8::DontEnum;
26
using v8::External;
27
using v8::Function;
28
using v8::FunctionCallbackInfo;
29
using v8::FunctionTemplate;
30
using v8::HandleScope;
31
using v8::Integer;
32
using v8::Isolate;
33
using v8::Local;
34
using v8::MaybeLocal;
35
using v8::Object;
36
using v8::PropertyAttribute;
37
using v8::ReadOnly;
38
using v8::SideEffectType;
39
using v8::Signature;
40
using v8::String;
41
using v8::Value;
42
43
0
int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
44
0
  Environment* env = stream_env();
45
46
0
  v8::HandleScope handle_scope(env->isolate());
47
48
0
  if (req_wrap_obj.IsEmpty()) {
49
0
    if (!env->shutdown_wrap_template()
50
0
             ->NewInstance(env->context())
51
0
             .ToLocal(&req_wrap_obj)) {
52
0
      return UV_EBUSY;
53
0
    }
54
0
    StreamReq::ResetObject(req_wrap_obj);
55
0
  }
56
57
0
  BaseObjectPtr<AsyncWrap> req_wrap_ptr;
58
0
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
59
0
  ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
60
0
  if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
61
0
  int err = DoShutdown(req_wrap);
62
63
0
  if (err != 0 && req_wrap != nullptr) {
64
0
    req_wrap->Dispose();
65
0
  }
66
67
0
  const char* msg = Error();
68
0
  if (msg != nullptr) {
69
0
    if (req_wrap_obj
70
0
            ->Set(env->context(),
71
0
                  env->error_string(),
72
0
                  OneByteString(env->isolate(), msg))
73
0
            .IsNothing()) {
74
0
      return UV_EBUSY;
75
0
    }
76
0
    ClearError();
77
0
  }
78
79
0
  return err;
80
0
}
81
82
StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
83
                                    size_t count,
84
                                    uv_stream_t* send_handle,
85
                                    v8::Local<v8::Object> req_wrap_obj,
86
729
                                    bool skip_try_write) {
87
729
  Environment* env = stream_env();
88
729
  int err;
89
90
729
  size_t total_bytes = 0;
91
1.45k
  for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
92
729
  bytes_written_ += total_bytes;
93
94
729
  if (send_handle == nullptr && HasDoTryWrite() && !skip_try_write) {
95
0
    err = DoTryWrite(&bufs, &count);
96
0
    if (err != 0 || count == 0) {
97
0
      return StreamWriteResult{false, err, nullptr, total_bytes, {}};
98
0
    }
99
0
  }
100
101
729
  v8::HandleScope handle_scope(env->isolate());
102
103
729
  if (req_wrap_obj.IsEmpty()) {
104
0
    if (!env->write_wrap_template()
105
0
             ->NewInstance(env->context())
106
0
             .ToLocal(&req_wrap_obj)) {
107
0
      return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
108
0
    }
109
0
    StreamReq::ResetObject(req_wrap_obj);
110
0
  }
111
112
729
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
113
729
  WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
114
729
  BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
115
116
729
  err = DoWrite(req_wrap, bufs, count, send_handle);
117
729
  bool async = err == 0;
118
119
729
  if (!async) {
120
0
    req_wrap->Dispose();
121
0
    req_wrap = nullptr;
122
0
  }
123
124
729
  const char* msg = Error();
125
729
  if (msg != nullptr) {
126
0
    if (req_wrap_obj
127
0
            ->Set(env->context(),
128
0
                  env->error_string(),
129
0
                  OneByteString(env->isolate(), msg))
130
0
            .IsNothing()) {
131
0
      return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
132
0
    }
133
0
    ClearError();
134
0
  }
135
136
729
  return StreamWriteResult{
137
729
      async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
138
729
}
139
140
template int StreamBase::WriteString<ASCII>(
141
    const FunctionCallbackInfo<Value>& args);
142
template int StreamBase::WriteString<UTF8>(
143
    const FunctionCallbackInfo<Value>& args);
144
template int StreamBase::WriteString<UCS2>(
145
    const FunctionCallbackInfo<Value>& args);
146
template int StreamBase::WriteString<LATIN1>(
147
    const FunctionCallbackInfo<Value>& args);
148
149
150
0
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
151
0
  return ReadStart();
152
0
}
153
154
155
0
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
156
0
  return ReadStop();
157
0
}
158
159
0
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
160
0
  CHECK(Buffer::HasInstance(args[0]));
161
162
0
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
163
0
  PushStreamListener(new CustomBufferJSListener(buf));
164
0
  return 0;
165
0
}
166
167
0
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
168
0
  CHECK(args[0]->IsObject());
169
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
170
171
0
  return Shutdown(req_wrap_obj);
172
0
}
173
174
1.05k
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
175
1.05k
  env_->stream_base_state()[kBytesWritten] = res.bytes;
176
1.05k
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
177
1.05k
}
178
179
0
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
180
0
  Environment* env = Environment::GetCurrent(args);
181
0
  Isolate* isolate = env->isolate();
182
0
  Local<Context> context = env->context();
183
184
0
  CHECK(args[0]->IsObject());
185
0
  CHECK(args[1]->IsArray());
186
187
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
188
0
  Local<Array> chunks = args[1].As<Array>();
189
0
  bool all_buffers = args[2]->IsTrue();
190
191
0
  size_t count;
192
0
  if (all_buffers)
193
0
    count = chunks->Length();
194
0
  else
195
0
    count = chunks->Length() >> 1;
196
197
0
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
198
199
0
  size_t storage_size = 0;
200
0
  size_t offset;
201
202
0
  if (!all_buffers) {
203
    // Determine storage size first
204
0
    for (size_t i = 0; i < count; i++) {
205
0
      Local<Value> chunk;
206
0
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
207
0
        return -1;
208
209
0
      if (Buffer::HasInstance(chunk))
210
0
        continue;
211
        // Buffer chunk, no additional storage required
212
213
      // String chunk
214
0
      Local<String> string;
215
0
      if (!chunk->ToString(context).ToLocal(&string))
216
0
        return -1;
217
0
      Local<Value> next_chunk;
218
0
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
219
0
        return -1;
220
0
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
221
0
      size_t chunk_size;
222
0
      if ((encoding == UTF8 &&
223
0
             string->Length() > 65535 &&
224
0
             !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
225
0
              !StringBytes::StorageSize(isolate, string, encoding)
226
0
                  .To(&chunk_size)) {
227
0
        return -1;
228
0
      }
229
0
      storage_size += chunk_size;
230
0
    }
231
232
0
    if (storage_size > INT_MAX)
233
0
      return UV_ENOBUFS;
234
0
  } else {
235
0
    for (size_t i = 0; i < count; i++) {
236
0
      Local<Value> chunk;
237
0
      if (!chunks->Get(context, i).ToLocal(&chunk))
238
0
        return -1;
239
0
      bufs[i].base = Buffer::Data(chunk);
240
0
      bufs[i].len = Buffer::Length(chunk);
241
0
    }
242
0
  }
243
244
0
  std::unique_ptr<BackingStore> bs;
245
0
  if (storage_size > 0) {
246
0
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
247
0
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
248
0
  }
249
250
0
  offset = 0;
251
0
  if (!all_buffers) {
252
0
    for (size_t i = 0; i < count; i++) {
253
0
      Local<Value> chunk;
254
0
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
255
0
        return -1;
256
257
      // Write buffer
258
0
      if (Buffer::HasInstance(chunk)) {
259
0
        bufs[i].base = Buffer::Data(chunk);
260
0
        bufs[i].len = Buffer::Length(chunk);
261
0
        continue;
262
0
      }
263
264
      // Write string
265
0
      CHECK_LE(offset, storage_size);
266
0
      char* str_storage =
267
0
          static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
268
0
      size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
269
270
0
      Local<String> string;
271
0
      if (!chunk->ToString(context).ToLocal(&string))
272
0
        return -1;
273
0
      Local<Value> next_chunk;
274
0
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
275
0
        return -1;
276
0
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
277
0
      str_size = StringBytes::Write(isolate,
278
0
                                    str_storage,
279
0
                                    str_size,
280
0
                                    string,
281
0
                                    encoding);
282
0
      bufs[i].base = str_storage;
283
0
      bufs[i].len = str_size;
284
0
      offset += str_size;
285
0
    }
286
0
  }
287
288
0
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
289
0
  SetWriteResult(res);
290
0
  if (res.wrap != nullptr && storage_size > 0)
291
0
    res.wrap->SetBackingStore(std::move(bs));
292
0
  return res.err;
293
0
}
294
295
296
0
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
297
0
  CHECK(args[0]->IsObject());
298
299
0
  Environment* env = Environment::GetCurrent(args);
300
301
0
  if (!args[1]->IsUint8Array()) {
302
0
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
303
0
    return 0;
304
0
  }
305
306
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
307
0
  uv_buf_t buf;
308
0
  buf.base = Buffer::Data(args[1]);
309
0
  buf.len = Buffer::Length(args[1]);
310
311
0
  uv_stream_t* send_handle = nullptr;
312
313
0
  if (args[2]->IsObject() && IsIPCPipe()) {
314
0
    Local<Object> send_handle_obj = args[2].As<Object>();
315
316
0
    HandleWrap* wrap;
317
0
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
318
0
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
319
    // Reference LibuvStreamWrap instance to prevent it from being garbage
320
    // collected before `AfterWrite` is called.
321
0
    if (req_wrap_obj->Set(env->context(),
322
0
                          env->handle_string(),
323
0
                          send_handle_obj).IsNothing()) {
324
0
      return -1;
325
0
    }
326
0
  }
327
328
0
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
329
0
  SetWriteResult(res);
330
331
0
  return res.err;
332
0
}
333
334
335
template <enum encoding enc>
336
1.05k
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
337
1.05k
  Environment* env = Environment::GetCurrent(args);
338
1.05k
  Isolate* isolate = env->isolate();
339
1.05k
  CHECK(args[0]->IsObject());
340
1.05k
  CHECK(args[1]->IsString());
341
342
1.05k
  Local<Object> req_wrap_obj = args[0].As<Object>();
343
1.05k
  Local<String> string = args[1].As<String>();
344
1.05k
  Local<Object> send_handle_obj;
345
1.05k
  if (args[2]->IsObject())
346
0
    send_handle_obj = args[2].As<Object>();
347
348
  // Compute the size of the storage that the string will be flattened into.
349
  // For UTF8 strings that are very long, go ahead and take the hit for
350
  // computing their actual size, rather than tripling the storage.
351
1.05k
  size_t storage_size;
352
1.05k
  if ((enc == UTF8 &&
353
1.05k
         string->Length() > 65535 &&
354
1.05k
         !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
355
1.05k
          !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
356
0
    return -1;
357
0
  }
358
359
1.05k
  if (storage_size > INT_MAX)
360
0
    return UV_ENOBUFS;
361
362
  // Try writing immediately if write size isn't too big
363
1.05k
  char stack_storage[16384];  // 16kb
364
1.05k
  size_t data_size;
365
1.05k
  size_t synchronously_written = 0;
366
1.05k
  uv_buf_t buf;
367
368
1.05k
  bool try_write = HasDoTryWrite() && storage_size <= sizeof(stack_storage) &&
369
1.05k
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
370
1.05k
  if (try_write) {
371
1.05k
    data_size = StringBytes::Write(isolate,
372
1.05k
                                   stack_storage,
373
1.05k
                                   storage_size,
374
1.05k
                                   string,
375
1.05k
                                   enc);
376
1.05k
    buf = uv_buf_init(stack_storage, data_size);
377
378
1.05k
    uv_buf_t* bufs = &buf;
379
1.05k
    size_t count = 1;
380
1.05k
    const int err = DoTryWrite(&bufs, &count);
381
    // Keep track of the bytes written here, because we're taking a shortcut
382
    // by using `DoTryWrite()` directly instead of using the utilities
383
    // provided by `Write()`.
384
1.05k
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
385
1.05k
    bytes_written_ += synchronously_written;
386
387
    // Immediate failure or success
388
1.05k
    if (err != 0 || count == 0) {
389
330
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
390
330
      return err;
391
330
    }
392
393
    // Partial write
394
729
    CHECK_EQ(count, 1);
395
729
  }
396
397
729
  std::unique_ptr<BackingStore> bs;
398
399
729
  if (try_write) {
400
    // Copy partial data
401
729
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
402
729
    bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
403
729
    memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
404
729
    data_size = buf.len;
405
729
  } else {
406
    // Write it
407
0
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
408
0
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
409
0
    data_size = StringBytes::Write(isolate,
410
0
                                   static_cast<char*>(bs->Data()),
411
0
                                   storage_size,
412
0
                                   string,
413
0
                                   enc);
414
0
  }
415
416
729
  CHECK_LE(data_size, storage_size);
417
418
729
  buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
419
420
729
  uv_stream_t* send_handle = nullptr;
421
422
729
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
423
0
    HandleWrap* wrap;
424
0
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
425
0
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
426
    // Reference LibuvStreamWrap instance to prevent it from being garbage
427
    // collected before `AfterWrite` is called.
428
0
    if (req_wrap_obj->Set(env->context(),
429
0
                          env->handle_string(),
430
0
                          send_handle_obj).IsNothing()) {
431
0
      return -1;
432
0
    }
433
0
  }
434
435
729
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
436
729
  res.bytes += synchronously_written;
437
438
729
  SetWriteResult(res);
439
729
  if (res.wrap != nullptr)
440
729
    res.wrap->SetBackingStore(std::move(bs));
441
442
729
  return res.err;
443
729
}
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&)
int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&)
Line
Count
Source
336
1.05k
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
337
1.05k
  Environment* env = Environment::GetCurrent(args);
338
1.05k
  Isolate* isolate = env->isolate();
339
1.05k
  CHECK(args[0]->IsObject());
340
1.05k
  CHECK(args[1]->IsString());
341
342
1.05k
  Local<Object> req_wrap_obj = args[0].As<Object>();
343
1.05k
  Local<String> string = args[1].As<String>();
344
1.05k
  Local<Object> send_handle_obj;
345
1.05k
  if (args[2]->IsObject())
346
0
    send_handle_obj = args[2].As<Object>();
347
348
  // Compute the size of the storage that the string will be flattened into.
349
  // For UTF8 strings that are very long, go ahead and take the hit for
350
  // computing their actual size, rather than tripling the storage.
351
1.05k
  size_t storage_size;
352
1.05k
  if ((enc == UTF8 &&
353
1.05k
         string->Length() > 65535 &&
354
1.05k
         !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
355
1.05k
          !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
356
0
    return -1;
357
0
  }
358
359
1.05k
  if (storage_size > INT_MAX)
360
0
    return UV_ENOBUFS;
361
362
  // Try writing immediately if write size isn't too big
363
1.05k
  char stack_storage[16384];  // 16kb
364
1.05k
  size_t data_size;
365
1.05k
  size_t synchronously_written = 0;
366
1.05k
  uv_buf_t buf;
367
368
1.05k
  bool try_write = HasDoTryWrite() && storage_size <= sizeof(stack_storage) &&
369
1.05k
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
370
1.05k
  if (try_write) {
371
1.05k
    data_size = StringBytes::Write(isolate,
372
1.05k
                                   stack_storage,
373
1.05k
                                   storage_size,
374
1.05k
                                   string,
375
1.05k
                                   enc);
376
1.05k
    buf = uv_buf_init(stack_storage, data_size);
377
378
1.05k
    uv_buf_t* bufs = &buf;
379
1.05k
    size_t count = 1;
380
1.05k
    const int err = DoTryWrite(&bufs, &count);
381
    // Keep track of the bytes written here, because we're taking a shortcut
382
    // by using `DoTryWrite()` directly instead of using the utilities
383
    // provided by `Write()`.
384
1.05k
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
385
1.05k
    bytes_written_ += synchronously_written;
386
387
    // Immediate failure or success
388
1.05k
    if (err != 0 || count == 0) {
389
330
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
390
330
      return err;
391
330
    }
392
393
    // Partial write
394
729
    CHECK_EQ(count, 1);
395
729
  }
396
397
729
  std::unique_ptr<BackingStore> bs;
398
399
729
  if (try_write) {
400
    // Copy partial data
401
729
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
402
729
    bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
403
729
    memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
404
729
    data_size = buf.len;
405
729
  } else {
406
    // Write it
407
0
    NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
408
0
    bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
409
0
    data_size = StringBytes::Write(isolate,
410
0
                                   static_cast<char*>(bs->Data()),
411
0
                                   storage_size,
412
0
                                   string,
413
0
                                   enc);
414
0
  }
415
416
729
  CHECK_LE(data_size, storage_size);
417
418
729
  buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
419
420
729
  uv_stream_t* send_handle = nullptr;
421
422
729
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
423
0
    HandleWrap* wrap;
424
0
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
425
0
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
426
    // Reference LibuvStreamWrap instance to prevent it from being garbage
427
    // collected before `AfterWrite` is called.
428
0
    if (req_wrap_obj->Set(env->context(),
429
0
                          env->handle_string(),
430
0
                          send_handle_obj).IsNothing()) {
431
0
      return -1;
432
0
    }
433
0
  }
434
435
729
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
436
729
  res.bytes += synchronously_written;
437
438
729
  SetWriteResult(res);
439
729
  if (res.wrap != nullptr)
440
729
    res.wrap->SetBackingStore(std::move(bs));
441
442
729
  return res.err;
443
729
}
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&)
444
445
446
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
447
                                                 Local<ArrayBuffer> ab,
448
                                                 size_t offset,
449
0
                                                 StreamBaseJSChecks checks) {
450
0
  Environment* env = env_;
451
452
0
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
453
0
  DCHECK_LE(offset, INT32_MAX);
454
455
0
  if (checks == DONT_SKIP_NREAD_CHECKS) {
456
0
    if (ab.IsEmpty()) {
457
0
      DCHECK_EQ(offset, 0);
458
0
      DCHECK_LE(nread, 0);
459
0
    } else {
460
0
      DCHECK_GE(nread, 0);
461
0
    }
462
0
  }
463
464
0
  env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
465
0
  env->stream_base_state()[kArrayBufferOffset] = offset;
466
467
0
  Local<Value> argv[] = {
468
0
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
469
0
  };
470
471
0
  AsyncWrap* wrap = GetAsyncWrap();
472
0
  CHECK_NOT_NULL(wrap);
473
0
  Local<Value> onread = wrap->object()
474
0
                            ->GetInternalField(StreamBase::kOnReadFunctionField)
475
0
                            .As<Value>();
476
0
  CHECK(onread->IsFunction());
477
0
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
478
0
}
479
480
481
0
bool StreamBase::IsIPCPipe() {
482
0
  return false;
483
0
}
484
485
486
0
int StreamBase::GetFD() {
487
0
  return -1;
488
0
}
489
490
491
757
Local<Object> StreamBase::GetObject() {
492
757
  return GetAsyncWrap()->object();
493
757
}
494
495
void StreamBase::AddMethod(Isolate* isolate,
496
                           Local<Signature> signature,
497
                           enum PropertyAttribute attributes,
498
                           Local<FunctionTemplate> t,
499
                           JSMethodFunction* stream_method,
500
826k
                           Local<String> string) {
501
826k
  Local<FunctionTemplate> templ =
502
826k
      NewFunctionTemplate(isolate,
503
826k
                          stream_method,
504
826k
                          signature,
505
826k
                          ConstructorBehavior::kThrow,
506
826k
                          SideEffectType::kHasNoSideEffect);
507
826k
  t->PrototypeTemplate()->SetAccessorProperty(
508
826k
      string, templ, Local<FunctionTemplate>(), attributes);
509
826k
}
510
511
84.4k
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
512
84.4k
  AddMethods(env->isolate_data(), t);
513
84.4k
}
514
515
void StreamBase::AddMethods(IsolateData* isolate_data,
516
206k
                            Local<FunctionTemplate> t) {
517
206k
  Isolate* isolate = isolate_data->isolate();
518
206k
  HandleScope scope(isolate);
519
520
206k
  enum PropertyAttribute attributes =
521
206k
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
522
206k
  Local<Signature> sig = Signature::New(isolate, t);
523
524
206k
  AddMethod(isolate, sig, attributes, t, GetFD, isolate_data->fd_string());
525
206k
  AddMethod(isolate,
526
206k
            sig,
527
206k
            attributes,
528
206k
            t,
529
206k
            GetExternal,
530
206k
            isolate_data->external_stream_string());
531
206k
  AddMethod(isolate,
532
206k
            sig,
533
206k
            attributes,
534
206k
            t,
535
206k
            GetBytesRead,
536
206k
            isolate_data->bytes_read_string());
537
206k
  AddMethod(isolate,
538
206k
            sig,
539
206k
            attributes,
540
206k
            t,
541
206k
            GetBytesWritten,
542
206k
            isolate_data->bytes_written_string());
543
206k
  SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
544
206k
  SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
545
206k
  SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
546
206k
  SetProtoMethod(
547
206k
      isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
548
206k
  SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
549
206k
  SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
550
206k
  SetProtoMethod(isolate,
551
206k
                 t,
552
206k
                 "writeAsciiString",
553
206k
                 JSMethod<&StreamBase::WriteString<ASCII>>);
554
206k
  SetProtoMethod(
555
206k
      isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
556
206k
  SetProtoMethod(
557
206k
      isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
558
206k
  SetProtoMethod(isolate,
559
206k
                 t,
560
206k
                 "writeLatin1String",
561
206k
                 JSMethod<&StreamBase::WriteString<LATIN1>>);
562
206k
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
563
206k
                              True(isolate));
564
206k
  t->PrototypeTemplate()->SetAccessor(
565
206k
      FIXED_ONE_BYTE_STRING(isolate, "onread"),
566
206k
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
567
206k
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
568
206k
                                   &Value::IsFunction>);
569
206k
}
570
571
void StreamBase::RegisterExternalReferences(
572
0
    ExternalReferenceRegistry* registry) {
573
  // This function is called by a single thread during start up, so it is safe
574
  // to use a local static variable here.
575
0
  static bool is_registered = false;
576
0
  if (is_registered) return;
577
0
  registry->Register(GetFD);
578
0
  registry->Register(GetExternal);
579
0
  registry->Register(GetBytesRead);
580
0
  registry->Register(GetBytesWritten);
581
0
  registry->Register(JSMethod<&StreamBase::ReadStartJS>);
582
0
  registry->Register(JSMethod<&StreamBase::ReadStopJS>);
583
0
  registry->Register(JSMethod<&StreamBase::Shutdown>);
584
0
  registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
585
0
  registry->Register(JSMethod<&StreamBase::Writev>);
586
0
  registry->Register(JSMethod<&StreamBase::WriteBuffer>);
587
0
  registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
588
0
  registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
589
0
  registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
590
0
  registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
591
0
  registry->Register(
592
0
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
593
0
  registry->Register(
594
0
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
595
0
                                   &Value::IsFunction>);
596
0
  is_registered = true;
597
0
}
598
599
0
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
600
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
601
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
602
0
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
603
604
0
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
605
606
0
  args.GetReturnValue().Set(wrap->GetFD());
607
0
}
608
609
0
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
610
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
611
0
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
612
613
  // uint64_t -> double. 53bits is enough for all real cases.
614
0
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
615
0
}
616
617
0
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
618
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
619
0
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
620
621
  // uint64_t -> double. 53bits is enough for all real cases.
622
0
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
623
0
}
624
625
0
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
626
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
627
0
  if (wrap == nullptr) return;
628
629
0
  Local<External> ext = External::New(args.GetIsolate(), wrap);
630
0
  args.GetReturnValue().Set(ext);
631
0
}
632
633
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
634
1.05k
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
635
1.05k
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
636
1.05k
  if (wrap == nullptr) return;
637
638
1.05k
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
639
640
1.05k
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
641
1.05k
  args.GetReturnValue().Set((wrap->*Method)(args));
642
1.05k
}
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStartJS>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStopJS>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Shutdown>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::UseUserBuffer>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Writev>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::WriteBuffer>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
Line
Count
Source
634
1.05k
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
635
1.05k
  StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>());
636
1.05k
  if (wrap == nullptr) return;
637
638
1.05k
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
639
640
1.05k
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
641
1.05k
  args.GetReturnValue().Set((wrap->*Method)(args));
642
1.05k
}
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
643
644
0
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
645
  // No TryWrite by default
646
0
  return 0;
647
0
}
648
649
650
729
const char* StreamResource::Error() const {
651
729
  return nullptr;
652
729
}
653
654
655
0
void StreamResource::ClearError() {
656
  // No-op
657
0
}
658
659
660
0
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
661
0
  CHECK_NOT_NULL(stream_);
662
0
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
663
0
  return env->allocate_managed_buffer(suggested_size);
664
0
}
665
666
0
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
667
0
  CHECK_NOT_NULL(stream_);
668
0
  StreamBase* stream = static_cast<StreamBase*>(stream_);
669
0
  Environment* env = stream->stream_env();
670
0
  Isolate* isolate = env->isolate();
671
0
  HandleScope handle_scope(isolate);
672
0
  Context::Scope context_scope(env->context());
673
0
  std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
674
675
0
  if (nread <= 0)  {
676
0
    if (nread < 0)
677
0
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
678
0
    return;
679
0
  }
680
681
0
  CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
682
0
  if (static_cast<size_t>(nread) != bs->ByteLength()) {
683
0
    std::unique_ptr<BackingStore> old_bs = std::move(bs);
684
0
    bs = ArrayBuffer::NewBackingStore(isolate, nread);
685
0
    memcpy(static_cast<char*>(bs->Data()),
686
0
           static_cast<char*>(old_bs->Data()),
687
0
           nread);
688
0
  }
689
690
0
  stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
691
0
}
692
693
694
0
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
695
0
  return buffer_;
696
0
}
697
698
699
0
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
700
0
  CHECK_NOT_NULL(stream_);
701
702
0
  StreamBase* stream = static_cast<StreamBase*>(stream_);
703
0
  Environment* env = stream->stream_env();
704
0
  HandleScope handle_scope(env->isolate());
705
0
  Context::Scope context_scope(env->context());
706
707
  // In the case that there's an error and buf is null, return immediately.
708
  // This can happen on unices when POLLHUP is received and UV_EOF is returned
709
  // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows.
710
0
  if (buf.base == nullptr && nread < 0) {
711
0
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
712
0
    return;
713
0
  }
714
715
0
  CHECK_EQ(buf.base, buffer_.base);
716
717
0
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
718
0
                             Local<ArrayBuffer>(),
719
0
                             0,
720
0
                             StreamBase::SKIP_NREAD_CHECKS);
721
0
  Local<Value> next_buf_v;
722
0
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
723
0
    buffer_.base = Buffer::Data(next_buf_v);
724
0
    buffer_.len = Buffer::Length(next_buf_v);
725
0
  }
726
0
}
727
728
729
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
730
729
    StreamReq* req_wrap, int status) {
731
729
  StreamBase* stream = static_cast<StreamBase*>(stream_);
732
729
  Environment* env = stream->stream_env();
733
729
  if (!env->can_call_into_js()) return;
734
0
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
735
0
  HandleScope handle_scope(env->isolate());
736
0
  Context::Scope context_scope(env->context());
737
0
  CHECK(!async_wrap->persistent().IsEmpty());
738
0
  Local<Object> req_wrap_obj = async_wrap->object();
739
740
0
  Local<Value> argv[] = {
741
0
    Integer::New(env->isolate(), status),
742
0
    stream->GetObject(),
743
0
    Undefined(env->isolate())
744
0
  };
745
746
0
  const char* msg = stream->Error();
747
0
  if (msg != nullptr) {
748
0
    argv[2] = OneByteString(env->isolate(), msg);
749
0
    stream->ClearError();
750
0
  }
751
752
0
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
753
0
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
754
0
}
755
756
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
757
729
    WriteWrap* req_wrap, int status) {
758
729
  OnStreamAfterReqFinished(req_wrap, status);
759
729
}
760
761
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
762
0
    ShutdownWrap* req_wrap, int status) {
763
0
  OnStreamAfterReqFinished(req_wrap, status);
764
0
}
765
766
0
void ShutdownWrap::OnDone(int status) {
767
0
  stream()->EmitAfterShutdown(this, status);
768
0
  Dispose();
769
0
}
770
771
729
void WriteWrap::OnDone(int status) {
772
729
  stream()->EmitAfterWrite(this, status);
773
729
  Dispose();
774
729
}
775
776
86.4k
StreamListener::~StreamListener() {
777
86.4k
  if (stream_ != nullptr)
778
84.5k
    stream_->RemoveStreamListener(this);
779
86.4k
}
780
781
0
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
782
0
  CHECK_NOT_NULL(previous_listener_);
783
0
  previous_listener_->OnStreamAfterShutdown(w, status);
784
0
}
785
786
0
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
787
0
  CHECK_NOT_NULL(previous_listener_);
788
0
  previous_listener_->OnStreamAfterWrite(w, status);
789
0
}
790
791
84.5k
StreamResource::~StreamResource() {
792
85.3k
  while (listener_ != nullptr) {
793
757
    StreamListener* listener = listener_;
794
757
    listener->OnStreamDestroy();
795
    // Remove the listener if it didn’t remove itself. This makes the logic
796
    // in `OnStreamDestroy()` implementations easier, because they
797
    // may call generic cleanup functions which can just remove the
798
    // listener unconditionally.
799
757
    if (listener == listener_)
800
757
      RemoveStreamListener(listener_);
801
757
  }
802
84.5k
}
803
804
85.3k
void StreamResource::RemoveStreamListener(StreamListener* listener) {
805
85.3k
  CHECK_NOT_NULL(listener);
806
807
85.3k
  StreamListener* previous;
808
85.3k
  StreamListener* current;
809
810
  // Remove from the linked list.
811
  // No loop condition because we want a crash if listener is not found.
812
85.3k
  for (current = listener_, previous = nullptr;;
813
86.0k
       previous = current, current = current->previous_listener_) {
814
86.0k
    CHECK_NOT_NULL(current);
815
86.0k
    if (current == listener) {
816
85.3k
      if (previous != nullptr)
817
757
        previous->previous_listener_ = current->previous_listener_;
818
84.5k
      else
819
84.5k
        listener_ = listener->previous_listener_;
820
85.3k
      break;
821
85.3k
    }
822
86.0k
  }
823
824
85.3k
  listener->stream_ = nullptr;
825
85.3k
  listener->previous_listener_ = nullptr;
826
85.3k
}
827
828
ShutdownWrap* StreamBase::CreateShutdownWrap(
829
0
    Local<Object> object) {
830
0
  auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
831
0
  wrap->MakeWeak();
832
0
  return wrap;
833
0
}
834
835
WriteWrap* StreamBase::CreateWriteWrap(
836
0
    Local<Object> object) {
837
0
  auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
838
0
  wrap->MakeWeak();
839
0
  return wrap;
840
0
}
841
842
729
void StreamReq::Done(int status, const char* error_str) {
843
729
  AsyncWrap* async_wrap = GetAsyncWrap();
844
729
  Environment* env = async_wrap->env();
845
729
  if (error_str != nullptr) {
846
0
    v8::HandleScope handle_scope(env->isolate());
847
0
    if (async_wrap->object()
848
0
            ->Set(env->context(),
849
0
                  env->error_string(),
850
0
                  OneByteString(env->isolate(), error_str))
851
0
            .IsNothing()) {
852
0
      return;
853
0
    }
854
0
  }
855
856
729
  OnDone(status);
857
729
}
858
859
}  // namespace node