Coverage Report

Created: 2025-08-03 10:06

/src/node/src/stream_wrap.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
#include "stream_wrap.h"
23
#include "stream_base-inl.h"
24
25
#include "env-inl.h"
26
#include "handle_wrap.h"
27
#include "node_buffer.h"
28
#include "node_errors.h"
29
#include "node_external_reference.h"
30
#include "pipe_wrap.h"
31
#include "req_wrap-inl.h"
32
#include "tcp_wrap.h"
33
#include "udp_wrap.h"
34
#include "util-inl.h"
35
36
#include <cstring>  // memcpy()
37
#include <climits>  // INT_MAX
38
39
40
namespace node {
41
42
using errors::TryCatchScope;
43
using v8::Context;
44
using v8::DontDelete;
45
using v8::EscapableHandleScope;
46
using v8::FunctionCallbackInfo;
47
using v8::FunctionTemplate;
48
using v8::HandleScope;
49
using v8::Isolate;
50
using v8::JustVoid;
51
using v8::Local;
52
using v8::Maybe;
53
using v8::MaybeLocal;
54
using v8::Nothing;
55
using v8::Object;
56
using v8::PropertyAttribute;
57
using v8::ReadOnly;
58
using v8::Signature;
59
using v8::Value;
60
61
1.13k
void IsConstructCallCallback(const FunctionCallbackInfo<Value>& args) {
62
1.13k
  CHECK(args.IsConstructCall());
63
1.13k
  StreamReq::ResetObject(args.This());
64
1.13k
}
65
66
void LibuvStreamWrap::Initialize(Local<Object> target,
67
                                 Local<Value> unused,
68
                                 Local<Context> context,
69
85.7k
                                 void* priv) {
70
85.7k
  Environment* env = Environment::GetCurrent(context);
71
85.7k
  Isolate* isolate = env->isolate();
72
73
85.7k
  Local<FunctionTemplate> sw =
74
85.7k
      NewFunctionTemplate(isolate, IsConstructCallCallback);
75
85.7k
  sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
76
77
  // we need to set handle and callback to null,
78
  // so that those fields are created and functions
79
  // do not become megamorphic
80
  // Fields:
81
  // - oncomplete
82
  // - callback
83
  // - handle
84
85.7k
  sw->InstanceTemplate()->Set(env->oncomplete_string(), v8::Null(isolate));
85
85.7k
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "callback"),
86
85.7k
                              v8::Null(isolate));
87
85.7k
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "handle"),
88
85.7k
                              v8::Null(isolate));
89
90
85.7k
  sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
91
92
85.7k
  SetConstructorFunction(context, target, "ShutdownWrap", sw);
93
85.7k
  env->set_shutdown_wrap_template(sw->InstanceTemplate());
94
95
85.7k
  Local<FunctionTemplate> ww =
96
85.7k
      FunctionTemplate::New(isolate, IsConstructCallCallback);
97
85.7k
  ww->InstanceTemplate()->SetInternalFieldCount(
98
85.7k
      StreamReq::kInternalFieldCount);
99
85.7k
  ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
100
85.7k
  SetConstructorFunction(context, target, "WriteWrap", ww);
101
85.7k
  env->set_write_wrap_template(ww->InstanceTemplate());
102
103
85.7k
  NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
104
85.7k
  NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
105
85.7k
  NODE_DEFINE_CONSTANT(target, kBytesWritten);
106
85.7k
  NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
107
85.7k
  target
108
85.7k
      ->Set(context,
109
85.7k
            FIXED_ONE_BYTE_STRING(isolate, "streamBaseState"),
110
85.7k
            env->stream_base_state().GetJSArray())
111
85.7k
      .Check();
112
85.7k
}
113
114
void LibuvStreamWrap::RegisterExternalReferences(
115
0
    ExternalReferenceRegistry* registry) {
116
0
  registry->Register(IsConstructCallCallback);
117
0
  registry->Register(GetWriteQueueSize);
118
0
  registry->Register(SetBlocking);
119
0
  StreamBase::RegisterExternalReferences(registry);
120
0
}
121
122
LibuvStreamWrap::LibuvStreamWrap(Environment* env,
123
                                 Local<Object> object,
124
                                 uv_stream_t* stream,
125
                                 AsyncWrap::ProviderType provider)
126
86.7k
    : HandleWrap(env,
127
86.7k
                 object,
128
86.7k
                 reinterpret_cast<uv_handle_t*>(stream),
129
86.7k
                 provider),
130
86.7k
      StreamBase(env),
131
86.7k
      stream_(stream) {
132
86.7k
  StreamBase::AttachToObject(object);
133
86.7k
}
134
135
136
Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
137
172k
    Environment* env) {
138
172k
  Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
139
172k
  if (tmpl.IsEmpty()) {
140
85.7k
    Isolate* isolate = env->isolate();
141
85.7k
    tmpl = NewFunctionTemplate(isolate, nullptr);
142
85.7k
    tmpl->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "LibuvStreamWrap"));
143
85.7k
    tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
144
85.7k
    tmpl->InstanceTemplate()->SetInternalFieldCount(
145
85.7k
        StreamBase::kInternalFieldCount);
146
85.7k
    Local<FunctionTemplate> get_write_queue_size =
147
85.7k
        FunctionTemplate::New(isolate,
148
85.7k
                              GetWriteQueueSize,
149
85.7k
                              Local<Value>(),
150
85.7k
                              Signature::New(isolate, tmpl));
151
85.7k
    tmpl->PrototypeTemplate()->SetAccessorProperty(
152
85.7k
        env->write_queue_size_string(),
153
85.7k
        get_write_queue_size,
154
85.7k
        Local<FunctionTemplate>(),
155
85.7k
        static_cast<PropertyAttribute>(ReadOnly | DontDelete));
156
85.7k
    SetProtoMethod(isolate, tmpl, "setBlocking", SetBlocking);
157
85.7k
    StreamBase::AddMethods(env, tmpl);
158
85.7k
    env->set_libuv_stream_wrap_ctor_template(tmpl);
159
85.7k
  }
160
172k
  return tmpl;
161
172k
}
162
163
164
0
LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
165
0
  Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
166
0
  CHECK(!sw.IsEmpty() && sw->HasInstance(object));
167
0
  return Unwrap<LibuvStreamWrap>(object);
168
0
}
169
170
171
0
int LibuvStreamWrap::GetFD() {
172
#ifdef _WIN32
173
  return fd_;
174
#else
175
0
  int fd = -1;
176
0
  if (stream() != nullptr)
177
0
    uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
178
0
  return fd;
179
0
#endif
180
0
}
181
182
183
1.13k
bool LibuvStreamWrap::IsAlive() {
184
1.13k
  return HandleWrap::IsAlive(this);
185
1.13k
}
186
187
188
0
bool LibuvStreamWrap::IsClosing() {
189
0
  return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
190
0
}
191
192
193
1.94k
AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
194
1.94k
  return static_cast<AsyncWrap*>(this);
195
1.94k
}
196
197
198
1.94k
bool LibuvStreamWrap::IsIPCPipe() {
199
1.94k
  return is_named_pipe_ipc();
200
1.94k
}
201
202
0
int LibuvStreamWrap::ReadStart() {
203
0
  return uv_read_start(
204
0
      stream(),
205
0
      [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
206
0
        static_cast<LibuvStreamWrap*>(handle->data)
207
0
            ->OnUvAlloc(suggested_size, buf);
208
0
      },
209
0
      [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
210
0
        LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(stream->data);
211
0
        TryCatchScope try_catch(wrap->env());
212
0
        try_catch.SetVerbose(true);
213
0
        wrap->OnUvRead(nread, buf);
214
0
      });
215
0
}
216
217
218
0
int LibuvStreamWrap::ReadStop() {
219
0
  return uv_read_stop(stream());
220
0
}
221
222
223
0
void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
224
0
  HandleScope scope(env()->isolate());
225
0
  Context::Scope context_scope(env()->context());
226
227
0
  *buf = EmitAlloc(suggested_size);
228
0
}
229
230
template <class WrapType>
231
static MaybeLocal<Object> AcceptHandle(Environment* env,
232
0
                                       LibuvStreamWrap* parent) {
233
0
  static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
234
0
                std::is_base_of<UDPWrap, WrapType>::value,
235
0
                "Can only accept stream handles");
236
237
0
  EscapableHandleScope scope(env->isolate());
238
0
  Local<Object> wrap_obj;
239
240
0
  if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
241
0
    return Local<Object>();
242
243
0
  HandleWrap* wrap = Unwrap<HandleWrap>(wrap_obj);
244
0
  CHECK_NOT_NULL(wrap);
245
0
  uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
246
0
  CHECK_NOT_NULL(stream);
247
248
0
  if (uv_accept(parent->stream(), stream))
249
0
    ABORT();
250
251
0
  return scope.Escape(wrap_obj);
252
0
}
Unexecuted instantiation: stream_wrap.cc:v8::MaybeLocal<v8::Object> node::AcceptHandle<node::TCPWrap>(node::Environment*, node::LibuvStreamWrap*)
Unexecuted instantiation: stream_wrap.cc:v8::MaybeLocal<v8::Object> node::AcceptHandle<node::PipeWrap>(node::Environment*, node::LibuvStreamWrap*)
Unexecuted instantiation: stream_wrap.cc:v8::MaybeLocal<v8::Object> node::AcceptHandle<node::UDPWrap>(node::Environment*, node::LibuvStreamWrap*)
253
254
0
Maybe<void> LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
255
0
  HandleScope scope(env()->isolate());
256
0
  Context::Scope context_scope(env()->context());
257
0
  uv_handle_type type = UV_UNKNOWN_HANDLE;
258
259
0
  if (is_named_pipe_ipc() &&
260
0
      uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
261
0
    type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
262
0
  }
263
264
  // We should not be getting this callback if someone has already called
265
  // uv_close() on the handle.
266
0
  CHECK_EQ(persistent().IsEmpty(), false);
267
268
0
  if (nread > 0) {
269
0
    MaybeLocal<Object> pending_obj;
270
271
0
    if (type == UV_TCP) {
272
0
      pending_obj = AcceptHandle<TCPWrap>(env(), this);
273
0
    } else if (type == UV_NAMED_PIPE) {
274
0
      pending_obj = AcceptHandle<PipeWrap>(env(), this);
275
0
    } else if (type == UV_UDP) {
276
0
      pending_obj = AcceptHandle<UDPWrap>(env(), this);
277
0
    } else {
278
0
      CHECK_EQ(type, UV_UNKNOWN_HANDLE);
279
0
    }
280
281
0
    Local<Object> local_pending_obj;
282
0
    if (type != UV_UNKNOWN_HANDLE &&
283
0
        (!pending_obj.ToLocal(&local_pending_obj) ||
284
0
         object()
285
0
             ->Set(env()->context(),
286
0
                   env()->pending_handle_string(),
287
0
                   local_pending_obj)
288
0
             .IsNothing())) {
289
0
      return Nothing<void>();
290
0
    }
291
0
  }
292
293
0
  EmitRead(nread, *buf);
294
0
  return JustVoid();
295
0
}
296
297
void LibuvStreamWrap::GetWriteQueueSize(
298
0
    const FunctionCallbackInfo<Value>& info) {
299
0
  LibuvStreamWrap* wrap;
300
0
  ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
301
302
0
  if (wrap->stream() == nullptr) {
303
0
    info.GetReturnValue().Set(0);
304
0
    return;
305
0
  }
306
307
0
  uint32_t write_queue_size = wrap->stream()->write_queue_size;
308
0
  info.GetReturnValue().Set(write_queue_size);
309
0
}
310
311
312
0
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
313
0
  LibuvStreamWrap* wrap;
314
0
  ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
315
316
0
  CHECK_GT(args.Length(), 0);
317
0
  if (!wrap->IsAlive())
318
0
    return args.GetReturnValue().Set(UV_EINVAL);
319
320
0
  bool enable = args[0]->IsTrue();
321
0
  args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
322
0
}
323
324
typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
325
typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
326
327
0
ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
328
0
  return new LibuvShutdownWrap(this, object);
329
0
}
330
331
809
WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
332
809
  return new LibuvWriteWrap(this, object);
333
809
}
334
335
336
0
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
337
0
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
338
0
  return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
339
0
}
340
341
342
0
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
343
0
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
344
0
      LibuvShutdownWrap::from_req(req));
345
0
  CHECK_NOT_NULL(req_wrap);
346
0
  HandleScope scope(req_wrap->env()->isolate());
347
0
  Context::Scope context_scope(req_wrap->env()->context());
348
0
  req_wrap->Done(status);
349
0
}
350
351
352
// NOTE: Call to this function could change both `buf`'s and `count`'s
353
// values, shifting their base and decrementing their length. This is
354
// required in order to skip the data that was successfully written via
355
// uv_try_write().
356
1.13k
int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
357
1.13k
  int err;
358
1.13k
  size_t written;
359
1.13k
  uv_buf_t* vbufs = *bufs;
360
1.13k
  size_t vcount = *count;
361
362
1.13k
  err = uv_try_write(stream(), vbufs, vcount);
363
1.13k
  if (err == UV_ENOSYS || err == UV_EAGAIN)
364
809
    return 0;
365
330
  if (err < 0)
366
96
    return err;
367
368
  // Slice off the buffers: skip all written buffers and slice the one that
369
  // was partially written.
370
234
  written = err;
371
468
  for (; vcount > 0; vbufs++, vcount--) {
372
    // Slice
373
234
    if (vbufs[0].len > written) {
374
0
      vbufs[0].base += written;
375
0
      vbufs[0].len -= written;
376
0
      written = 0;
377
0
      break;
378
379
    // Discard
380
234
    } else {
381
234
      written -= vbufs[0].len;
382
234
    }
383
234
  }
384
385
234
  *bufs = vbufs;
386
234
  *count = vcount;
387
388
234
  return 0;
389
330
}
390
391
392
int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
393
                             uv_buf_t* bufs,
394
                             size_t count,
395
809
                             uv_stream_t* send_handle) {
396
809
  LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
397
809
  return w->Dispatch(uv_write2,
398
809
                     stream(),
399
809
                     bufs,
400
809
                     count,
401
809
                     send_handle,
402
809
                     AfterUvWrite);
403
809
}
404
405
406
407
809
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
408
809
  LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
409
809
      LibuvWriteWrap::from_req(req));
410
809
  CHECK_NOT_NULL(req_wrap);
411
809
  HandleScope scope(req_wrap->env()->isolate());
412
809
  Context::Scope context_scope(req_wrap->env()->context());
413
809
  req_wrap->Done(status);
414
809
}
415
416
}  // namespace node
417
418
NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_wrap,
419
                                    node::LibuvStreamWrap::Initialize)
420
NODE_BINDING_EXTERNAL_REFERENCE(
421
    stream_wrap, node::LibuvStreamWrap::RegisterExternalReferences)