Coverage Report

Created: 2025-10-31 09:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/node/src/stream_wrap.cc
Line
Count
Source
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
#include "stream_wrap.h"
23
#include "stream_base-inl.h"
24
25
#include "env-inl.h"
26
#include "handle_wrap.h"
27
#include "node_buffer.h"
28
#include "node_errors.h"
29
#include "node_external_reference.h"
30
#include "pipe_wrap.h"
31
#include "req_wrap-inl.h"
32
#include "tcp_wrap.h"
33
#include "udp_wrap.h"
34
#include "util-inl.h"
35
36
#include <cstring>  // memcpy()
37
#include <climits>  // INT_MAX
38
39
40
namespace node {
41
42
using errors::TryCatchScope;
43
using v8::Context;
44
using v8::DontDelete;
45
using v8::EscapableHandleScope;
46
using v8::FunctionCallbackInfo;
47
using v8::FunctionTemplate;
48
using v8::HandleScope;
49
using v8::Isolate;
50
using v8::JustVoid;
51
using v8::Local;
52
using v8::Maybe;
53
using v8::MaybeLocal;
54
using v8::Nothing;
55
using v8::Object;
56
using v8::PropertyAttribute;
57
using v8::ReadOnly;
58
using v8::Signature;
59
using v8::Value;
60
61
0
void IsConstructCallCallback(const FunctionCallbackInfo<Value>& args) {
62
0
  CHECK(args.IsConstructCall());
63
0
  StreamReq::ResetObject(args.This());
64
0
}
65
66
void LibuvStreamWrap::Initialize(Local<Object> target,
67
                                 Local<Value> unused,
68
                                 Local<Context> context,
69
0
                                 void* priv) {
70
0
  Environment* env = Environment::GetCurrent(context);
71
0
  Isolate* isolate = env->isolate();
72
73
0
  Local<FunctionTemplate> sw =
74
0
      NewFunctionTemplate(isolate, IsConstructCallCallback);
75
0
  sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kInternalFieldCount);
76
77
  // we need to set handle and callback to null,
78
  // so that those fields are created and functions
79
  // do not become megamorphic
80
  // Fields:
81
  // - oncomplete
82
  // - callback
83
  // - handle
84
0
  sw->InstanceTemplate()->Set(env->oncomplete_string(), v8::Null(isolate));
85
0
  sw->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "callback"),
86
0
                              v8::Null(isolate));
87
0
  sw->InstanceTemplate()->Set(env->handle_string(), v8::Null(isolate));
88
89
0
  sw->Inherit(AsyncWrap::GetConstructorTemplate(env));
90
91
0
  SetConstructorFunction(context, target, "ShutdownWrap", sw);
92
0
  env->set_shutdown_wrap_template(sw->InstanceTemplate());
93
94
0
  Local<FunctionTemplate> ww =
95
0
      FunctionTemplate::New(isolate, IsConstructCallCallback);
96
0
  ww->InstanceTemplate()->SetInternalFieldCount(
97
0
      StreamReq::kInternalFieldCount);
98
0
  ww->Inherit(AsyncWrap::GetConstructorTemplate(env));
99
0
  SetConstructorFunction(context, target, "WriteWrap", ww);
100
0
  env->set_write_wrap_template(ww->InstanceTemplate());
101
102
0
  NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
103
0
  NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
104
0
  NODE_DEFINE_CONSTANT(target, kBytesWritten);
105
0
  NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
106
0
  target
107
0
      ->Set(context,
108
0
            FIXED_ONE_BYTE_STRING(isolate, "streamBaseState"),
109
0
            env->stream_base_state().GetJSArray())
110
0
      .Check();
111
0
}
112
113
void LibuvStreamWrap::RegisterExternalReferences(
114
0
    ExternalReferenceRegistry* registry) {
115
0
  registry->Register(IsConstructCallCallback);
116
0
  registry->Register(GetWriteQueueSize);
117
0
  registry->Register(SetBlocking);
118
0
  StreamBase::RegisterExternalReferences(registry);
119
0
}
120
121
LibuvStreamWrap::LibuvStreamWrap(Environment* env,
122
                                 Local<Object> object,
123
                                 uv_stream_t* stream,
124
                                 AsyncWrap::ProviderType provider)
125
0
    : HandleWrap(env,
126
0
                 object,
127
0
                 reinterpret_cast<uv_handle_t*>(stream),
128
0
                 provider),
129
0
      StreamBase(env),
130
0
      stream_(stream) {
131
0
  StreamBase::AttachToObject(object);
132
0
}
133
134
135
Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
136
0
    Environment* env) {
137
0
  Local<FunctionTemplate> tmpl = env->libuv_stream_wrap_ctor_template();
138
0
  if (tmpl.IsEmpty()) {
139
0
    Isolate* isolate = env->isolate();
140
0
    tmpl = NewFunctionTemplate(isolate, nullptr);
141
0
    tmpl->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "LibuvStreamWrap"));
142
0
    tmpl->Inherit(HandleWrap::GetConstructorTemplate(env));
143
0
    tmpl->InstanceTemplate()->SetInternalFieldCount(
144
0
        StreamBase::kInternalFieldCount);
145
0
    Local<FunctionTemplate> get_write_queue_size =
146
0
        FunctionTemplate::New(isolate,
147
0
                              GetWriteQueueSize,
148
0
                              Local<Value>(),
149
0
                              Signature::New(isolate, tmpl));
150
0
    tmpl->PrototypeTemplate()->SetAccessorProperty(
151
0
        env->write_queue_size_string(),
152
0
        get_write_queue_size,
153
0
        Local<FunctionTemplate>(),
154
0
        static_cast<PropertyAttribute>(ReadOnly | DontDelete));
155
0
    SetProtoMethod(isolate, tmpl, "setBlocking", SetBlocking);
156
0
    StreamBase::AddMethods(env, tmpl);
157
0
    env->set_libuv_stream_wrap_ctor_template(tmpl);
158
0
  }
159
0
  return tmpl;
160
0
}
161
162
163
0
LibuvStreamWrap* LibuvStreamWrap::From(Environment* env, Local<Object> object) {
164
0
  Local<FunctionTemplate> sw = env->libuv_stream_wrap_ctor_template();
165
0
  CHECK(!sw.IsEmpty() && sw->HasInstance(object));
166
0
  return Unwrap<LibuvStreamWrap>(object);
167
0
}
168
169
170
0
int LibuvStreamWrap::GetFD() {
171
#ifdef _WIN32
172
  return fd_;
173
#else
174
0
  int fd = -1;
175
0
  if (stream() != nullptr)
176
0
    uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
177
0
  return fd;
178
0
#endif
179
0
}
180
181
182
0
bool LibuvStreamWrap::IsAlive() {
183
0
  return HandleWrap::IsAlive(this);
184
0
}
185
186
187
0
bool LibuvStreamWrap::IsClosing() {
188
0
  return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
189
0
}
190
191
192
0
AsyncWrap* LibuvStreamWrap::GetAsyncWrap() {
193
0
  return static_cast<AsyncWrap*>(this);
194
0
}
195
196
197
0
bool LibuvStreamWrap::IsIPCPipe() {
198
0
  return is_named_pipe_ipc();
199
0
}
200
201
0
int LibuvStreamWrap::ReadStart() {
202
0
  return uv_read_start(
203
0
      stream(),
204
0
      [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
205
0
        static_cast<LibuvStreamWrap*>(handle->data)
206
0
            ->OnUvAlloc(suggested_size, buf);
207
0
      },
208
0
      [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
209
0
        LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(stream->data);
210
0
        TryCatchScope try_catch(wrap->env());
211
0
        try_catch.SetVerbose(true);
212
0
        wrap->OnUvRead(nread, buf);
213
0
      });
214
0
}
215
216
217
0
int LibuvStreamWrap::ReadStop() {
218
0
  return uv_read_stop(stream());
219
0
}
220
221
222
0
void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
223
0
  HandleScope scope(env()->isolate());
224
0
  Context::Scope context_scope(env()->context());
225
226
0
  *buf = EmitAlloc(suggested_size);
227
0
}
228
229
template <class WrapType>
230
static MaybeLocal<Object> AcceptHandle(Environment* env,
231
0
                                       LibuvStreamWrap* parent) {
232
0
  static_assert(std::is_base_of<LibuvStreamWrap, WrapType>::value ||
233
0
                std::is_base_of<UDPWrap, WrapType>::value,
234
0
                "Can only accept stream handles");
235
236
0
  EscapableHandleScope scope(env->isolate());
237
0
  Local<Object> wrap_obj;
238
239
0
  if (!WrapType::Instantiate(env, parent, WrapType::SOCKET).ToLocal(&wrap_obj))
240
0
    return Local<Object>();
241
242
0
  HandleWrap* wrap = BaseObject::Unwrap<HandleWrap>(wrap_obj);
243
0
  CHECK_NOT_NULL(wrap);
244
0
  uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
245
0
  CHECK_NOT_NULL(stream);
246
247
0
  if (uv_accept(parent->stream(), stream))
248
0
    ABORT();
249
250
0
  return scope.Escape(wrap_obj);
251
0
}
Unexecuted instantiation: stream_wrap.cc:v8::MaybeLocal<v8::Object> node::AcceptHandle<node::TCPWrap>(node::Environment*, node::LibuvStreamWrap*)
Unexecuted instantiation: stream_wrap.cc:v8::MaybeLocal<v8::Object> node::AcceptHandle<node::PipeWrap>(node::Environment*, node::LibuvStreamWrap*)
Unexecuted instantiation: stream_wrap.cc:v8::MaybeLocal<v8::Object> node::AcceptHandle<node::UDPWrap>(node::Environment*, node::LibuvStreamWrap*)
252
253
0
Maybe<void> LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
254
0
  HandleScope scope(env()->isolate());
255
0
  Context::Scope context_scope(env()->context());
256
0
  uv_handle_type type = UV_UNKNOWN_HANDLE;
257
258
0
  if (is_named_pipe_ipc() &&
259
0
      uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
260
0
    type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
261
0
  }
262
263
  // We should not be getting this callback if someone has already called
264
  // uv_close() on the handle.
265
0
  CHECK_EQ(persistent().IsEmpty(), false);
266
267
0
  if (nread > 0) {
268
0
    MaybeLocal<Object> pending_obj;
269
270
0
    if (type == UV_TCP) {
271
0
      pending_obj = AcceptHandle<TCPWrap>(env(), this);
272
0
    } else if (type == UV_NAMED_PIPE) {
273
0
      pending_obj = AcceptHandle<PipeWrap>(env(), this);
274
0
    } else if (type == UV_UDP) {
275
0
      pending_obj = AcceptHandle<UDPWrap>(env(), this);
276
0
    } else {
277
0
      CHECK_EQ(type, UV_UNKNOWN_HANDLE);
278
0
    }
279
280
0
    Local<Object> local_pending_obj;
281
0
    if (type != UV_UNKNOWN_HANDLE &&
282
0
        (!pending_obj.ToLocal(&local_pending_obj) ||
283
0
         object()
284
0
             ->Set(env()->context(),
285
0
                   env()->pending_handle_string(),
286
0
                   local_pending_obj)
287
0
             .IsNothing())) {
288
0
      return Nothing<void>();
289
0
    }
290
0
  }
291
292
0
  EmitRead(nread, *buf);
293
0
  return JustVoid();
294
0
}
295
296
void LibuvStreamWrap::GetWriteQueueSize(
297
0
    const FunctionCallbackInfo<Value>& info) {
298
0
  LibuvStreamWrap* wrap;
299
0
  ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
300
301
0
  if (wrap->stream() == nullptr) {
302
0
    info.GetReturnValue().Set(0);
303
0
    return;
304
0
  }
305
306
0
  uint32_t write_queue_size = wrap->stream()->write_queue_size;
307
0
  info.GetReturnValue().Set(write_queue_size);
308
0
}
309
310
311
0
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
312
0
  LibuvStreamWrap* wrap;
313
0
  ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
314
315
0
  CHECK_GT(args.Length(), 0);
316
0
  if (!wrap->IsAlive())
317
0
    return args.GetReturnValue().Set(UV_EINVAL);
318
319
0
  bool enable = args[0]->IsTrue();
320
0
  args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
321
0
}
322
323
typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>> LibuvShutdownWrap;
324
typedef SimpleWriteWrap<ReqWrap<uv_write_t>> LibuvWriteWrap;
325
326
0
ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
327
0
  return new LibuvShutdownWrap(this, object);
328
0
}
329
330
0
WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
331
0
  return new LibuvWriteWrap(this, object);
332
0
}
333
334
335
0
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
336
0
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
337
0
  return req_wrap->Dispatch(uv_shutdown, stream(), AfterUvShutdown);
338
0
}
339
340
341
0
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
342
0
  LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
343
0
      LibuvShutdownWrap::from_req(req));
344
0
  CHECK_NOT_NULL(req_wrap);
345
0
  HandleScope scope(req_wrap->env()->isolate());
346
0
  Context::Scope context_scope(req_wrap->env()->context());
347
0
  req_wrap->Done(status);
348
0
}
349
350
351
// NOTE: Call to this function could change both `buf`'s and `count`'s
352
// values, shifting their base and decrementing their length. This is
353
// required in order to skip the data that was successfully written via
354
// uv_try_write().
355
0
int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
356
0
  int err;
357
0
  size_t written;
358
0
  uv_buf_t* vbufs = *bufs;
359
0
  size_t vcount = *count;
360
361
0
  err = uv_try_write(stream(), vbufs, vcount);
362
0
  if (err == UV_ENOSYS || err == UV_EAGAIN)
363
0
    return 0;
364
0
  if (err < 0)
365
0
    return err;
366
367
  // Slice off the buffers: skip all written buffers and slice the one that
368
  // was partially written.
369
0
  written = err;
370
0
  for (; vcount > 0; vbufs++, vcount--) {
371
    // Slice
372
0
    if (vbufs[0].len > written) {
373
0
      vbufs[0].base += written;
374
0
      vbufs[0].len -= written;
375
0
      written = 0;
376
0
      break;
377
378
    // Discard
379
0
    } else {
380
0
      written -= vbufs[0].len;
381
0
    }
382
0
  }
383
384
0
  *bufs = vbufs;
385
0
  *count = vcount;
386
387
0
  return 0;
388
0
}
389
390
391
int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
392
                             uv_buf_t* bufs,
393
                             size_t count,
394
0
                             uv_stream_t* send_handle) {
395
0
  LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
396
0
  return w->Dispatch(uv_write2,
397
0
                     stream(),
398
0
                     bufs,
399
0
                     count,
400
0
                     send_handle,
401
0
                     AfterUvWrite);
402
0
}
403
404
405
406
0
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
407
0
  LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
408
0
      LibuvWriteWrap::from_req(req));
409
0
  CHECK_NOT_NULL(req_wrap);
410
0
  HandleScope scope(req_wrap->env()->isolate());
411
0
  Context::Scope context_scope(req_wrap->env()->context());
412
0
  req_wrap->Done(status);
413
0
}
414
415
}  // namespace node
416
417
NODE_BINDING_CONTEXT_AWARE_INTERNAL(stream_wrap,
418
                                    node::LibuvStreamWrap::Initialize)
419
NODE_BINDING_EXTERNAL_REFERENCE(
420
    stream_wrap, node::LibuvStreamWrap::RegisterExternalReferences)