Coverage Report

Created: 2025-12-10 07:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/node/src/stream_base.cc
Line
Count
Source
1
#include "stream_base.h"  // NOLINT(build/include_inline)
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "env-inl.h"
6
#include "js_stream.h"
7
#include "node.h"
8
#include "node_buffer.h"
9
#include "node_errors.h"
10
#include "node_external_reference.h"
11
#include "string_bytes.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <climits>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::ArrayBuffer;
21
using v8::BackingStore;
22
using v8::BackingStoreInitializationMode;
23
using v8::ConstructorBehavior;
24
using v8::Context;
25
using v8::DontDelete;
26
using v8::DontEnum;
27
using v8::External;
28
using v8::Function;
29
using v8::FunctionCallbackInfo;
30
using v8::FunctionTemplate;
31
using v8::HandleScope;
32
using v8::Integer;
33
using v8::Isolate;
34
using v8::Local;
35
using v8::MaybeLocal;
36
using v8::Object;
37
using v8::PropertyAttribute;
38
using v8::ReadOnly;
39
using v8::SideEffectType;
40
using v8::Signature;
41
using v8::String;
42
using v8::Value;
43
44
0
int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
45
0
  Environment* env = stream_env();
46
47
0
  v8::HandleScope handle_scope(env->isolate());
48
49
0
  if (req_wrap_obj.IsEmpty()) {
50
0
    if (!env->shutdown_wrap_template()
51
0
             ->NewInstance(env->context())
52
0
             .ToLocal(&req_wrap_obj)) {
53
0
      return UV_EBUSY;
54
0
    }
55
0
    StreamReq::ResetObject(req_wrap_obj);
56
0
  }
57
58
0
  BaseObjectPtr<AsyncWrap> req_wrap_ptr;
59
0
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
60
0
  ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
61
0
  if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap());
62
0
  int err = DoShutdown(req_wrap);
63
64
0
  if (err != 0 && req_wrap != nullptr) {
65
0
    req_wrap->Dispose();
66
0
  }
67
68
0
  const char* msg = Error();
69
0
  if (msg != nullptr) {
70
0
    if (req_wrap_obj
71
0
            ->Set(env->context(),
72
0
                  env->error_string(),
73
0
                  OneByteString(env->isolate(), msg))
74
0
            .IsNothing()) {
75
0
      return UV_EBUSY;
76
0
    }
77
0
    ClearError();
78
0
  }
79
80
0
  return err;
81
0
}
82
83
StreamWriteResult StreamBase::Write(uv_buf_t* bufs,
84
                                    size_t count,
85
                                    uv_stream_t* send_handle,
86
                                    v8::Local<v8::Object> req_wrap_obj,
87
0
                                    bool skip_try_write) {
88
0
  Environment* env = stream_env();
89
0
  int err;
90
91
0
  size_t total_bytes = 0;
92
0
  for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len;
93
0
  bytes_written_ += total_bytes;
94
95
0
  if (send_handle == nullptr && HasDoTryWrite() && !skip_try_write) {
96
0
    err = DoTryWrite(&bufs, &count);
97
0
    if (err != 0 || count == 0) {
98
0
      return StreamWriteResult{false, err, nullptr, total_bytes, {}};
99
0
    }
100
0
  }
101
102
0
  v8::HandleScope handle_scope(env->isolate());
103
104
0
  if (req_wrap_obj.IsEmpty()) {
105
0
    if (!env->write_wrap_template()
106
0
             ->NewInstance(env->context())
107
0
             .ToLocal(&req_wrap_obj)) {
108
0
      return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
109
0
    }
110
0
    StreamReq::ResetObject(req_wrap_obj);
111
0
  }
112
113
0
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap());
114
0
  WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
115
0
  BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap());
116
117
0
  err = DoWrite(req_wrap, bufs, count, send_handle);
118
0
  bool async = err == 0;
119
120
0
  if (!async) {
121
0
    req_wrap->Dispose();
122
0
    req_wrap = nullptr;
123
0
  }
124
125
0
  const char* msg = Error();
126
0
  if (msg != nullptr) {
127
0
    if (req_wrap_obj
128
0
            ->Set(env->context(),
129
0
                  env->error_string(),
130
0
                  OneByteString(env->isolate(), msg))
131
0
            .IsNothing()) {
132
0
      return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}};
133
0
    }
134
0
    ClearError();
135
0
  }
136
137
0
  return StreamWriteResult{
138
0
      async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)};
139
0
}
140
141
template int StreamBase::WriteString<ASCII>(
142
    const FunctionCallbackInfo<Value>& args);
143
template int StreamBase::WriteString<UTF8>(
144
    const FunctionCallbackInfo<Value>& args);
145
template int StreamBase::WriteString<UCS2>(
146
    const FunctionCallbackInfo<Value>& args);
147
template int StreamBase::WriteString<LATIN1>(
148
    const FunctionCallbackInfo<Value>& args);
149
150
151
0
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
152
0
  return ReadStart();
153
0
}
154
155
156
0
int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
157
0
  return ReadStop();
158
0
}
159
160
0
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
161
0
  CHECK(Buffer::HasInstance(args[0]));
162
163
0
  uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
164
0
  PushStreamListener(new CustomBufferJSListener(buf));
165
0
  return 0;
166
0
}
167
168
0
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
169
0
  CHECK(args[0]->IsObject());
170
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
171
172
0
  return Shutdown(req_wrap_obj);
173
0
}
174
175
0
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
176
0
  env_->stream_base_state()[kBytesWritten] = res.bytes;
177
0
  env_->stream_base_state()[kLastWriteWasAsync] = res.async;
178
0
}
179
180
0
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
181
0
  Environment* env = Environment::GetCurrent(args);
182
0
  Isolate* isolate = env->isolate();
183
0
  Local<Context> context = env->context();
184
185
0
  CHECK(args[0]->IsObject());
186
0
  CHECK(args[1]->IsArray());
187
188
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
189
0
  Local<Array> chunks = args[1].As<Array>();
190
0
  bool all_buffers = args[2]->IsTrue();
191
192
0
  size_t count;
193
0
  if (all_buffers)
194
0
    count = chunks->Length();
195
0
  else
196
0
    count = chunks->Length() >> 1;
197
198
0
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
199
200
0
  size_t storage_size = 0;
201
0
  size_t offset;
202
203
0
  if (!all_buffers) {
204
    // Determine storage size first
205
0
    for (size_t i = 0; i < count; i++) {
206
0
      Local<Value> chunk;
207
0
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
208
0
        return -1;
209
210
0
      if (Buffer::HasInstance(chunk))
211
0
        continue;
212
        // Buffer chunk, no additional storage required
213
214
      // String chunk
215
0
      Local<String> string;
216
0
      if (!chunk->ToString(context).ToLocal(&string))
217
0
        return -1;
218
0
      Local<Value> next_chunk;
219
0
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
220
0
        return -1;
221
0
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
222
0
      size_t chunk_size;
223
0
      if ((encoding == UTF8 &&
224
0
             string->Length() > 65535 &&
225
0
             !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
226
0
              !StringBytes::StorageSize(isolate, string, encoding)
227
0
                  .To(&chunk_size)) {
228
0
        return -1;
229
0
      }
230
0
      storage_size += chunk_size;
231
0
    }
232
233
0
    if (storage_size > INT_MAX)
234
0
      return UV_ENOBUFS;
235
0
  } else {
236
0
    for (size_t i = 0; i < count; i++) {
237
0
      Local<Value> chunk;
238
0
      if (!chunks->Get(context, i).ToLocal(&chunk))
239
0
        return -1;
240
0
      bufs[i].base = Buffer::Data(chunk);
241
0
      bufs[i].len = Buffer::Length(chunk);
242
0
    }
243
0
  }
244
245
0
  std::unique_ptr<BackingStore> bs;
246
0
  if (storage_size > 0) {
247
0
    bs = ArrayBuffer::NewBackingStore(
248
0
        isolate, storage_size, BackingStoreInitializationMode::kUninitialized);
249
0
  }
250
251
0
  offset = 0;
252
0
  if (!all_buffers) {
253
0
    for (size_t i = 0; i < count; i++) {
254
0
      Local<Value> chunk;
255
0
      if (!chunks->Get(context, i * 2).ToLocal(&chunk))
256
0
        return -1;
257
258
      // Write buffer
259
0
      if (Buffer::HasInstance(chunk)) {
260
0
        bufs[i].base = Buffer::Data(chunk);
261
0
        bufs[i].len = Buffer::Length(chunk);
262
0
        continue;
263
0
      }
264
265
      // Write string
266
0
      CHECK_LE(offset, storage_size);
267
0
      char* str_storage =
268
0
          static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
269
0
      size_t str_size = (bs ? bs->ByteLength() : 0) - offset;
270
271
0
      Local<String> string;
272
0
      if (!chunk->ToString(context).ToLocal(&string))
273
0
        return -1;
274
0
      Local<Value> next_chunk;
275
0
      if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
276
0
        return -1;
277
0
      enum encoding encoding = ParseEncoding(isolate, next_chunk);
278
0
      str_size = StringBytes::Write(isolate,
279
0
                                    str_storage,
280
0
                                    str_size,
281
0
                                    string,
282
0
                                    encoding);
283
0
      bufs[i].base = str_storage;
284
0
      bufs[i].len = str_size;
285
0
      offset += str_size;
286
0
    }
287
0
  }
288
289
0
  StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
290
0
  SetWriteResult(res);
291
0
  if (res.wrap != nullptr && storage_size > 0)
292
0
    res.wrap->SetBackingStore(std::move(bs));
293
0
  return res.err;
294
0
}
295
296
297
0
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
298
0
  CHECK(args[0]->IsObject());
299
300
0
  Environment* env = Environment::GetCurrent(args);
301
302
0
  if (!args[1]->IsUint8Array()) {
303
0
    node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer");
304
0
    return 0;
305
0
  }
306
307
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
308
0
  uv_buf_t buf;
309
0
  buf.base = Buffer::Data(args[1]);
310
0
  buf.len = Buffer::Length(args[1]);
311
312
0
  uv_stream_t* send_handle = nullptr;
313
314
0
  if (args[2]->IsObject() && IsIPCPipe()) {
315
0
    Local<Object> send_handle_obj = args[2].As<Object>();
316
317
0
    HandleWrap* wrap;
318
0
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
319
0
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
320
    // Reference LibuvStreamWrap instance to prevent it from being garbage
321
    // collected before `AfterWrite` is called.
322
0
    if (req_wrap_obj->Set(env->context(),
323
0
                          env->handle_string(),
324
0
                          send_handle_obj).IsNothing()) {
325
0
      return -1;
326
0
    }
327
0
  }
328
329
0
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
330
0
  SetWriteResult(res);
331
332
0
  return res.err;
333
0
}
334
335
336
template <enum encoding enc>
337
0
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
338
0
  Environment* env = Environment::GetCurrent(args);
339
0
  Isolate* isolate = env->isolate();
340
0
  CHECK(args[0]->IsObject());
341
0
  CHECK(args[1]->IsString());
342
343
0
  Local<Object> req_wrap_obj = args[0].As<Object>();
344
0
  Local<String> string = args[1].As<String>();
345
0
  Local<Object> send_handle_obj;
346
0
  if (args[2]->IsObject())
347
0
    send_handle_obj = args[2].As<Object>();
348
349
  // Compute the size of the storage that the string will be flattened into.
350
  // For UTF8 strings that are very long, go ahead and take the hit for
351
  // computing their actual size, rather than tripling the storage.
352
0
  size_t storage_size;
353
0
  if ((enc == UTF8 &&
354
0
         string->Length() > 65535 &&
355
0
         !StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
356
0
          !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
357
0
    return -1;
358
0
  }
359
360
0
  if (storage_size > INT_MAX)
361
0
    return UV_ENOBUFS;
362
363
  // Try writing immediately if write size isn't too big
364
0
  char stack_storage[16384];  // 16kb
365
0
  size_t data_size;
366
0
  size_t synchronously_written = 0;
367
0
  uv_buf_t buf;
368
369
0
  bool try_write = HasDoTryWrite() && storage_size <= sizeof(stack_storage) &&
370
0
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
371
0
  if (try_write) {
372
0
    data_size = StringBytes::Write(isolate,
373
0
                                   stack_storage,
374
0
                                   storage_size,
375
0
                                   string,
376
0
                                   enc);
377
0
    buf = uv_buf_init(stack_storage, data_size);
378
379
0
    uv_buf_t* bufs = &buf;
380
0
    size_t count = 1;
381
0
    const int err = DoTryWrite(&bufs, &count);
382
    // Keep track of the bytes written here, because we're taking a shortcut
383
    // by using `DoTryWrite()` directly instead of using the utilities
384
    // provided by `Write()`.
385
0
    synchronously_written = count == 0 ? data_size : data_size - buf.len;
386
0
    bytes_written_ += synchronously_written;
387
388
    // Immediate failure or success
389
0
    if (err != 0 || count == 0) {
390
0
      SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} });
391
0
      return err;
392
0
    }
393
394
    // Partial write
395
0
    CHECK_EQ(count, 1);
396
0
  }
397
398
0
  std::unique_ptr<BackingStore> bs;
399
400
0
  if (try_write) {
401
    // Copy partial data
402
0
    bs = ArrayBuffer::NewBackingStore(
403
0
        isolate, buf.len, BackingStoreInitializationMode::kUninitialized);
404
0
    memcpy(bs->Data(), buf.base, buf.len);
405
0
    data_size = buf.len;
406
0
  } else {
407
    // Write it
408
0
    bs = ArrayBuffer::NewBackingStore(
409
0
        isolate, storage_size, BackingStoreInitializationMode::kUninitialized);
410
0
    data_size = StringBytes::Write(isolate,
411
0
                                   static_cast<char*>(bs->Data()),
412
0
                                   storage_size,
413
0
                                   string,
414
0
                                   enc);
415
0
  }
416
417
0
  CHECK_LE(data_size, storage_size);
418
419
0
  buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);
420
421
0
  uv_stream_t* send_handle = nullptr;
422
423
0
  if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
424
0
    HandleWrap* wrap;
425
0
    ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
426
0
    send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
427
    // Reference LibuvStreamWrap instance to prevent it from being garbage
428
    // collected before `AfterWrite` is called.
429
0
    if (req_wrap_obj->Set(env->context(),
430
0
                          env->handle_string(),
431
0
                          send_handle_obj).IsNothing()) {
432
0
      return -1;
433
0
    }
434
0
  }
435
436
0
  StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write);
437
0
  res.bytes += synchronously_written;
438
439
0
  SetWriteResult(res);
440
0
  if (res.wrap != nullptr)
441
0
    res.wrap->SetBackingStore(std::move(bs));
442
443
0
  return res.err;
444
0
}
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&)
445
446
447
MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
448
                                                 Local<ArrayBuffer> ab,
449
                                                 size_t offset,
450
0
                                                 StreamBaseJSChecks checks) {
451
0
  Environment* env = env_;
452
453
0
  DCHECK_EQ(static_cast<int32_t>(nread), nread);
454
0
  DCHECK_LE(offset, INT32_MAX);
455
456
0
  if (checks == DONT_SKIP_NREAD_CHECKS) {
457
0
    if (ab.IsEmpty()) {
458
0
      DCHECK_EQ(offset, 0);
459
0
      DCHECK_LE(nread, 0);
460
0
    } else {
461
0
      DCHECK_GE(nread, 0);
462
0
    }
463
0
  }
464
465
0
  env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread);
466
0
  env->stream_base_state()[kArrayBufferOffset] = offset;
467
468
0
  Local<Value> argv[] = {
469
0
    ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
470
0
  };
471
472
0
  AsyncWrap* wrap = GetAsyncWrap();
473
0
  CHECK_NOT_NULL(wrap);
474
0
  Local<Value> onread = wrap->object()
475
0
                            ->GetInternalField(StreamBase::kOnReadFunctionField)
476
0
                            .As<Value>();
477
0
  CHECK(onread->IsFunction());
478
0
  return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
479
0
}
480
481
482
0
bool StreamBase::IsIPCPipe() {
483
0
  return false;
484
0
}
485
486
487
0
int StreamBase::GetFD() {
488
0
  return -1;
489
0
}
490
491
492
0
Local<Object> StreamBase::GetObject() {
493
0
  return GetAsyncWrap()->object();
494
0
}
495
496
void StreamBase::AddAccessor(v8::Isolate* isolate,
497
                             v8::Local<v8::Signature> signature,
498
                             enum v8::PropertyAttribute attributes,
499
                             v8::Local<v8::FunctionTemplate> t,
500
                             JSMethodFunction* getter,
501
                             JSMethodFunction* setter,
502
35
                             v8::Local<v8::String> string) {
503
35
  Local<FunctionTemplate> getter_templ =
504
35
      NewFunctionTemplate(isolate,
505
35
                          getter,
506
35
                          signature,
507
35
                          ConstructorBehavior::kThrow,
508
35
                          SideEffectType::kHasNoSideEffect);
509
35
  Local<FunctionTemplate> setter_templ =
510
35
      NewFunctionTemplate(isolate,
511
35
                          setter,
512
35
                          signature,
513
35
                          ConstructorBehavior::kThrow,
514
35
                          SideEffectType::kHasSideEffect);
515
35
  t->PrototypeTemplate()->SetAccessorProperty(
516
35
      string, getter_templ, setter_templ, attributes);
517
35
}
518
519
void StreamBase::AddMethod(Isolate* isolate,
520
                           Local<Signature> signature,
521
                           enum PropertyAttribute attributes,
522
                           Local<FunctionTemplate> t,
523
                           JSMethodFunction* stream_method,
524
140
                           Local<String> string) {
525
140
  Local<FunctionTemplate> templ =
526
140
      NewFunctionTemplate(isolate,
527
140
                          stream_method,
528
140
                          signature,
529
140
                          ConstructorBehavior::kThrow,
530
140
                          SideEffectType::kHasNoSideEffect);
531
140
  t->PrototypeTemplate()->SetAccessorProperty(
532
140
      string, templ, Local<FunctionTemplate>(), attributes);
533
140
}
534
535
0
void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
536
0
  AddMethods(env->isolate_data(), t);
537
0
}
538
539
void StreamBase::AddMethods(IsolateData* isolate_data,
540
35
                            Local<FunctionTemplate> t) {
541
35
  Isolate* isolate = isolate_data->isolate();
542
35
  HandleScope scope(isolate);
543
544
35
  enum PropertyAttribute attributes =
545
35
      static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum);
546
35
  Local<Signature> sig = Signature::New(isolate, t);
547
548
35
  AddMethod(isolate, sig, attributes, t, GetFD, isolate_data->fd_string());
549
35
  AddMethod(isolate,
550
35
            sig,
551
35
            attributes,
552
35
            t,
553
35
            GetExternal,
554
35
            isolate_data->external_stream_string());
555
35
  AddMethod(isolate,
556
35
            sig,
557
35
            attributes,
558
35
            t,
559
35
            GetBytesRead,
560
35
            isolate_data->bytes_read_string());
561
35
  AddMethod(isolate,
562
35
            sig,
563
35
            attributes,
564
35
            t,
565
35
            GetBytesWritten,
566
35
            isolate_data->bytes_written_string());
567
35
  SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
568
35
  SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
569
35
  SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>);
570
35
  SetProtoMethod(
571
35
      isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>);
572
35
  SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>);
573
35
  SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
574
35
  SetProtoMethod(isolate,
575
35
                 t,
576
35
                 "writeAsciiString",
577
35
                 JSMethod<&StreamBase::WriteString<ASCII>>);
578
35
  SetProtoMethod(
579
35
      isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>);
580
35
  SetProtoMethod(
581
35
      isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>);
582
35
  SetProtoMethod(isolate,
583
35
                 t,
584
35
                 "writeLatin1String",
585
35
                 JSMethod<&StreamBase::WriteString<LATIN1>>);
586
35
  t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"),
587
35
                              True(isolate));
588
35
  AddAccessor(isolate,
589
35
              sig,
590
35
              static_cast<PropertyAttribute>(DontDelete | DontEnum),
591
35
              t,
592
35
              BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>,
593
35
              BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
594
35
                                           &Value::IsFunction>,
595
35
              FIXED_ONE_BYTE_STRING(isolate, "onread"));
596
35
}
597
598
void StreamBase::RegisterExternalReferences(
599
0
    ExternalReferenceRegistry* registry) {
600
  // This function is called by a single thread during start up, so it is safe
601
  // to use a local static variable here.
602
0
  static bool is_registered = false;
603
0
  if (is_registered) return;
604
0
  registry->Register(GetFD);
605
0
  registry->Register(GetExternal);
606
0
  registry->Register(GetBytesRead);
607
0
  registry->Register(GetBytesWritten);
608
0
  registry->Register(JSMethod<&StreamBase::ReadStartJS>);
609
0
  registry->Register(JSMethod<&StreamBase::ReadStopJS>);
610
0
  registry->Register(JSMethod<&StreamBase::Shutdown>);
611
0
  registry->Register(JSMethod<&StreamBase::UseUserBuffer>);
612
0
  registry->Register(JSMethod<&StreamBase::Writev>);
613
0
  registry->Register(JSMethod<&StreamBase::WriteBuffer>);
614
0
  registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>);
615
0
  registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>);
616
0
  registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>);
617
0
  registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>);
618
0
  registry->Register(
619
0
      BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>);
620
0
  registry->Register(
621
0
      BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField,
622
0
                                   &Value::IsFunction>);
623
0
  is_registered = true;
624
0
}
625
626
0
void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
627
  // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD().
628
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
629
0
  if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL);
630
631
0
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
632
633
0
  args.GetReturnValue().Set(wrap->GetFD());
634
0
}
635
636
0
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
637
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
638
0
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
639
640
  // uint64_t -> double. 53bits is enough for all real cases.
641
0
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
642
0
}
643
644
0
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
645
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
646
0
  if (wrap == nullptr) return args.GetReturnValue().Set(0);
647
648
  // uint64_t -> double. 53bits is enough for all real cases.
649
0
  args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
650
0
}
651
652
0
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
653
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
654
0
  if (wrap == nullptr) return;
655
656
0
  Local<External> ext = External::New(args.GetIsolate(), wrap);
657
0
  args.GetReturnValue().Set(ext);
658
0
}
659
660
template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
661
0
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
662
0
  StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>());
663
0
  if (wrap == nullptr) return;
664
665
0
  if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL);
666
667
0
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap());
668
0
  args.GetReturnValue().Set((wrap->*Method)(args));
669
0
}
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStartJS>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStopJS>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Shutdown>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::UseUserBuffer>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Writev>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::WriteBuffer>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&)
670
671
0
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
672
  // No TryWrite by default
673
0
  return 0;
674
0
}
675
676
677
0
const char* StreamResource::Error() const {
678
0
  return nullptr;
679
0
}
680
681
682
0
void StreamResource::ClearError() {
683
  // No-op
684
0
}
685
686
687
0
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
688
0
  CHECK_NOT_NULL(stream_);
689
0
  Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
690
0
  return env->allocate_managed_buffer(suggested_size);
691
0
}
692
693
0
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
694
0
  CHECK_NOT_NULL(stream_);
695
0
  StreamBase* stream = static_cast<StreamBase*>(stream_);
696
0
  Environment* env = stream->stream_env();
697
0
  Isolate* isolate = env->isolate();
698
0
  HandleScope handle_scope(isolate);
699
0
  Context::Scope context_scope(env->context());
700
0
  std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);
701
702
0
  if (nread <= 0)  {
703
0
    if (nread < 0)
704
0
      stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
705
0
    return;
706
0
  }
707
708
0
  CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
709
0
  if (static_cast<size_t>(nread) != bs->ByteLength()) {
710
0
    std::unique_ptr<BackingStore> old_bs = std::move(bs);
711
0
    bs = ArrayBuffer::NewBackingStore(
712
0
        isolate, nread, BackingStoreInitializationMode::kUninitialized);
713
0
    memcpy(bs->Data(), old_bs->Data(), nread);
714
0
  }
715
716
0
  stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
717
0
}
718
719
720
0
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
721
0
  return buffer_;
722
0
}
723
724
725
0
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
726
0
  CHECK_NOT_NULL(stream_);
727
728
0
  StreamBase* stream = static_cast<StreamBase*>(stream_);
729
0
  Environment* env = stream->stream_env();
730
0
  HandleScope handle_scope(env->isolate());
731
0
  Context::Scope context_scope(env->context());
732
733
  // In the case that there's an error and buf is null, return immediately.
734
  // This can happen on unices when POLLHUP is received and UV_EOF is returned
735
  // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows.
736
0
  if (buf.base == nullptr && nread < 0) {
737
0
    stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
738
0
    return;
739
0
  }
740
741
0
  CHECK_EQ(buf.base, buffer_.base);
742
743
0
  MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
744
0
                             Local<ArrayBuffer>(),
745
0
                             0,
746
0
                             StreamBase::SKIP_NREAD_CHECKS);
747
0
  Local<Value> next_buf_v;
748
0
  if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
749
0
    buffer_.base = Buffer::Data(next_buf_v);
750
0
    buffer_.len = Buffer::Length(next_buf_v);
751
0
  }
752
0
}
753
754
755
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
756
0
    StreamReq* req_wrap, int status) {
757
0
  StreamBase* stream = static_cast<StreamBase*>(stream_);
758
0
  Environment* env = stream->stream_env();
759
0
  if (!env->can_call_into_js()) return;
760
0
  AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
761
0
  HandleScope handle_scope(env->isolate());
762
0
  Context::Scope context_scope(env->context());
763
0
  CHECK(!async_wrap->persistent().IsEmpty());
764
0
  Local<Object> req_wrap_obj = async_wrap->object();
765
766
0
  Local<Value> argv[] = {
767
0
    Integer::New(env->isolate(), status),
768
0
    stream->GetObject(),
769
0
    Undefined(env->isolate())
770
0
  };
771
772
0
  const char* msg = stream->Error();
773
0
  if (msg != nullptr) {
774
0
    argv[2] = OneByteString(env->isolate(), msg);
775
0
    stream->ClearError();
776
0
  }
777
778
0
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
779
0
    async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
780
0
}
781
782
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
783
0
    WriteWrap* req_wrap, int status) {
784
0
  OnStreamAfterReqFinished(req_wrap, status);
785
0
}
786
787
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
788
0
    ShutdownWrap* req_wrap, int status) {
789
0
  OnStreamAfterReqFinished(req_wrap, status);
790
0
}
791
792
0
void ShutdownWrap::OnDone(int status) {
793
0
  stream()->EmitAfterShutdown(this, status);
794
0
  Dispose();
795
0
}
796
797
0
void WriteWrap::OnDone(int status) {
798
0
  stream()->EmitAfterWrite(this, status);
799
0
  Dispose();
800
0
}
801
802
0
StreamListener::~StreamListener() {
803
0
  if (stream_ != nullptr)
804
0
    stream_->RemoveStreamListener(this);
805
0
}
806
807
0
void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
808
0
  CHECK_NOT_NULL(previous_listener_);
809
0
  previous_listener_->OnStreamAfterShutdown(w, status);
810
0
}
811
812
0
void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
813
0
  CHECK_NOT_NULL(previous_listener_);
814
0
  previous_listener_->OnStreamAfterWrite(w, status);
815
0
}
816
817
0
StreamResource::~StreamResource() {
818
0
  while (listener_ != nullptr) {
819
0
    StreamListener* listener = listener_;
820
0
    listener->OnStreamDestroy();
821
    // Remove the listener if it didn’t remove itself. This makes the logic
822
    // in `OnStreamDestroy()` implementations easier, because they
823
    // may call generic cleanup functions which can just remove the
824
    // listener unconditionally.
825
0
    if (listener == listener_)
826
0
      RemoveStreamListener(listener_);
827
0
  }
828
0
}
829
830
0
void StreamResource::RemoveStreamListener(StreamListener* listener) {
831
0
  CHECK_NOT_NULL(listener);
832
833
0
  StreamListener* previous;
834
0
  StreamListener* current;
835
836
  // Remove from the linked list.
837
  // No loop condition because we want a crash if listener is not found.
838
0
  for (current = listener_, previous = nullptr;;
839
0
       previous = current, current = current->previous_listener_) {
840
0
    CHECK_NOT_NULL(current);
841
0
    if (current == listener) {
842
0
      if (previous != nullptr)
843
0
        previous->previous_listener_ = current->previous_listener_;
844
0
      else
845
0
        listener_ = listener->previous_listener_;
846
0
      break;
847
0
    }
848
0
  }
849
850
0
  listener->stream_ = nullptr;
851
0
  listener->previous_listener_ = nullptr;
852
0
}
853
854
ShutdownWrap* StreamBase::CreateShutdownWrap(
855
0
    Local<Object> object) {
856
0
  auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
857
0
  wrap->MakeWeak();
858
0
  return wrap;
859
0
}
860
861
WriteWrap* StreamBase::CreateWriteWrap(
862
0
    Local<Object> object) {
863
0
  auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
864
0
  wrap->MakeWeak();
865
0
  return wrap;
866
0
}
867
868
0
void StreamReq::Done(int status, const char* error_str) {
869
0
  AsyncWrap* async_wrap = GetAsyncWrap();
870
0
  Environment* env = async_wrap->env();
871
0
  if (error_str != nullptr) {
872
0
    v8::HandleScope handle_scope(env->isolate());
873
0
    if (async_wrap->object()
874
0
            ->Set(env->context(),
875
0
                  env->error_string(),
876
0
                  OneByteString(env->isolate(), error_str))
877
0
            .IsNothing()) {
878
0
      return;
879
0
    }
880
0
  }
881
882
0
  OnDone(status);
883
0
}
884
885
}  // namespace node