/src/node/src/stream_base.cc
Line | Count | Source |
1 | | #include "stream_base.h" // NOLINT(build/include_inline) |
2 | | #include "stream_base-inl.h" |
3 | | #include "stream_wrap.h" |
4 | | |
5 | | #include "env-inl.h" |
6 | | #include "js_stream.h" |
7 | | #include "node.h" |
8 | | #include "node_buffer.h" |
9 | | #include "node_errors.h" |
10 | | #include "node_external_reference.h" |
11 | | #include "string_bytes.h" |
12 | | #include "util-inl.h" |
13 | | #include "v8.h" |
14 | | |
15 | | #include <climits> // INT_MAX |
16 | | |
17 | | namespace node { |
18 | | |
19 | | using v8::Array; |
20 | | using v8::ArrayBuffer; |
21 | | using v8::BackingStore; |
22 | | using v8::BackingStoreInitializationMode; |
23 | | using v8::ConstructorBehavior; |
24 | | using v8::Context; |
25 | | using v8::DontDelete; |
26 | | using v8::DontEnum; |
27 | | using v8::External; |
28 | | using v8::Function; |
29 | | using v8::FunctionCallbackInfo; |
30 | | using v8::FunctionTemplate; |
31 | | using v8::HandleScope; |
32 | | using v8::Integer; |
33 | | using v8::Isolate; |
34 | | using v8::Local; |
35 | | using v8::MaybeLocal; |
36 | | using v8::Object; |
37 | | using v8::PropertyAttribute; |
38 | | using v8::ReadOnly; |
39 | | using v8::SideEffectType; |
40 | | using v8::Signature; |
41 | | using v8::String; |
42 | | using v8::Value; |
43 | | |
44 | 0 | int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) { |
45 | 0 | Environment* env = stream_env(); |
46 | |
|
47 | 0 | v8::HandleScope handle_scope(env->isolate()); |
48 | |
|
49 | 0 | if (req_wrap_obj.IsEmpty()) { |
50 | 0 | if (!env->shutdown_wrap_template() |
51 | 0 | ->NewInstance(env->context()) |
52 | 0 | .ToLocal(&req_wrap_obj)) { |
53 | 0 | return UV_EBUSY; |
54 | 0 | } |
55 | 0 | StreamReq::ResetObject(req_wrap_obj); |
56 | 0 | } |
57 | | |
58 | 0 | BaseObjectPtr<AsyncWrap> req_wrap_ptr; |
59 | 0 | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); |
60 | 0 | ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); |
61 | 0 | if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap()); |
62 | 0 | int err = DoShutdown(req_wrap); |
63 | |
|
64 | 0 | if (err != 0 && req_wrap != nullptr) { |
65 | 0 | req_wrap->Dispose(); |
66 | 0 | } |
67 | |
|
68 | 0 | const char* msg = Error(); |
69 | 0 | if (msg != nullptr) { |
70 | 0 | if (req_wrap_obj |
71 | 0 | ->Set(env->context(), |
72 | 0 | env->error_string(), |
73 | 0 | OneByteString(env->isolate(), msg)) |
74 | 0 | .IsNothing()) { |
75 | 0 | return UV_EBUSY; |
76 | 0 | } |
77 | 0 | ClearError(); |
78 | 0 | } |
79 | | |
80 | 0 | return err; |
81 | 0 | } |
82 | | |
83 | | StreamWriteResult StreamBase::Write(uv_buf_t* bufs, |
84 | | size_t count, |
85 | | uv_stream_t* send_handle, |
86 | | v8::Local<v8::Object> req_wrap_obj, |
87 | 0 | bool skip_try_write) { |
88 | 0 | Environment* env = stream_env(); |
89 | 0 | int err; |
90 | |
|
91 | 0 | size_t total_bytes = 0; |
92 | 0 | for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len; |
93 | 0 | bytes_written_ += total_bytes; |
94 | |
|
95 | 0 | if (send_handle == nullptr && HasDoTryWrite() && !skip_try_write) { |
96 | 0 | err = DoTryWrite(&bufs, &count); |
97 | 0 | if (err != 0 || count == 0) { |
98 | 0 | return StreamWriteResult{false, err, nullptr, total_bytes, {}}; |
99 | 0 | } |
100 | 0 | } |
101 | | |
102 | 0 | v8::HandleScope handle_scope(env->isolate()); |
103 | |
|
104 | 0 | if (req_wrap_obj.IsEmpty()) { |
105 | 0 | if (!env->write_wrap_template() |
106 | 0 | ->NewInstance(env->context()) |
107 | 0 | .ToLocal(&req_wrap_obj)) { |
108 | 0 | return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; |
109 | 0 | } |
110 | 0 | StreamReq::ResetObject(req_wrap_obj); |
111 | 0 | } |
112 | | |
113 | 0 | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); |
114 | 0 | WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); |
115 | 0 | BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap()); |
116 | |
|
117 | 0 | err = DoWrite(req_wrap, bufs, count, send_handle); |
118 | 0 | bool async = err == 0; |
119 | |
|
120 | 0 | if (!async) { |
121 | 0 | req_wrap->Dispose(); |
122 | 0 | req_wrap = nullptr; |
123 | 0 | } |
124 | |
|
125 | 0 | const char* msg = Error(); |
126 | 0 | if (msg != nullptr) { |
127 | 0 | if (req_wrap_obj |
128 | 0 | ->Set(env->context(), |
129 | 0 | env->error_string(), |
130 | 0 | OneByteString(env->isolate(), msg)) |
131 | 0 | .IsNothing()) { |
132 | 0 | return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; |
133 | 0 | } |
134 | 0 | ClearError(); |
135 | 0 | } |
136 | | |
137 | 0 | return StreamWriteResult{ |
138 | 0 | async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)}; |
139 | 0 | } |
140 | | |
141 | | template int StreamBase::WriteString<ASCII>( |
142 | | const FunctionCallbackInfo<Value>& args); |
143 | | template int StreamBase::WriteString<UTF8>( |
144 | | const FunctionCallbackInfo<Value>& args); |
145 | | template int StreamBase::WriteString<UCS2>( |
146 | | const FunctionCallbackInfo<Value>& args); |
147 | | template int StreamBase::WriteString<LATIN1>( |
148 | | const FunctionCallbackInfo<Value>& args); |
149 | | |
150 | | |
151 | 0 | int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) { |
152 | 0 | return ReadStart(); |
153 | 0 | } |
154 | | |
155 | | |
156 | 0 | int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) { |
157 | 0 | return ReadStop(); |
158 | 0 | } |
159 | | |
160 | 0 | int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) { |
161 | 0 | CHECK(Buffer::HasInstance(args[0])); |
162 | | |
163 | 0 | uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0])); |
164 | 0 | PushStreamListener(new CustomBufferJSListener(buf)); |
165 | 0 | return 0; |
166 | 0 | } |
167 | | |
168 | 0 | int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) { |
169 | 0 | CHECK(args[0]->IsObject()); |
170 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
171 | |
|
172 | 0 | return Shutdown(req_wrap_obj); |
173 | 0 | } |
174 | | |
175 | 0 | void StreamBase::SetWriteResult(const StreamWriteResult& res) { |
176 | 0 | env_->stream_base_state()[kBytesWritten] = res.bytes; |
177 | 0 | env_->stream_base_state()[kLastWriteWasAsync] = res.async; |
178 | 0 | } |
179 | | |
180 | 0 | int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { |
181 | 0 | Environment* env = Environment::GetCurrent(args); |
182 | 0 | Isolate* isolate = env->isolate(); |
183 | 0 | Local<Context> context = env->context(); |
184 | |
|
185 | 0 | CHECK(args[0]->IsObject()); |
186 | 0 | CHECK(args[1]->IsArray()); |
187 | | |
188 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
189 | 0 | Local<Array> chunks = args[1].As<Array>(); |
190 | 0 | bool all_buffers = args[2]->IsTrue(); |
191 | |
|
192 | 0 | size_t count; |
193 | 0 | if (all_buffers) |
194 | 0 | count = chunks->Length(); |
195 | 0 | else |
196 | 0 | count = chunks->Length() >> 1; |
197 | |
|
198 | 0 | MaybeStackBuffer<uv_buf_t, 16> bufs(count); |
199 | |
|
200 | 0 | size_t storage_size = 0; |
201 | 0 | size_t offset; |
202 | |
|
203 | 0 | if (!all_buffers) { |
204 | | // Determine storage size first |
205 | 0 | for (size_t i = 0; i < count; i++) { |
206 | 0 | Local<Value> chunk; |
207 | 0 | if (!chunks->Get(context, i * 2).ToLocal(&chunk)) |
208 | 0 | return -1; |
209 | | |
210 | 0 | if (Buffer::HasInstance(chunk)) |
211 | 0 | continue; |
212 | | // Buffer chunk, no additional storage required |
213 | | |
214 | | // String chunk |
215 | 0 | Local<String> string; |
216 | 0 | if (!chunk->ToString(context).ToLocal(&string)) |
217 | 0 | return -1; |
218 | 0 | Local<Value> next_chunk; |
219 | 0 | if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk)) |
220 | 0 | return -1; |
221 | 0 | enum encoding encoding = ParseEncoding(isolate, next_chunk); |
222 | 0 | size_t chunk_size; |
223 | 0 | if ((encoding == UTF8 && |
224 | 0 | string->Length() > 65535 && |
225 | 0 | !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) || |
226 | 0 | !StringBytes::StorageSize(isolate, string, encoding) |
227 | 0 | .To(&chunk_size)) { |
228 | 0 | return -1; |
229 | 0 | } |
230 | 0 | storage_size += chunk_size; |
231 | 0 | } |
232 | | |
233 | 0 | if (storage_size > INT_MAX) |
234 | 0 | return UV_ENOBUFS; |
235 | 0 | } else { |
236 | 0 | for (size_t i = 0; i < count; i++) { |
237 | 0 | Local<Value> chunk; |
238 | 0 | if (!chunks->Get(context, i).ToLocal(&chunk)) |
239 | 0 | return -1; |
240 | 0 | bufs[i].base = Buffer::Data(chunk); |
241 | 0 | bufs[i].len = Buffer::Length(chunk); |
242 | 0 | } |
243 | 0 | } |
244 | | |
245 | 0 | std::unique_ptr<BackingStore> bs; |
246 | 0 | if (storage_size > 0) { |
247 | 0 | bs = ArrayBuffer::NewBackingStore( |
248 | 0 | isolate, storage_size, BackingStoreInitializationMode::kUninitialized); |
249 | 0 | } |
250 | |
|
251 | 0 | offset = 0; |
252 | 0 | if (!all_buffers) { |
253 | 0 | for (size_t i = 0; i < count; i++) { |
254 | 0 | Local<Value> chunk; |
255 | 0 | if (!chunks->Get(context, i * 2).ToLocal(&chunk)) |
256 | 0 | return -1; |
257 | | |
258 | | // Write buffer |
259 | 0 | if (Buffer::HasInstance(chunk)) { |
260 | 0 | bufs[i].base = Buffer::Data(chunk); |
261 | 0 | bufs[i].len = Buffer::Length(chunk); |
262 | 0 | continue; |
263 | 0 | } |
264 | | |
265 | | // Write string |
266 | 0 | CHECK_LE(offset, storage_size); |
267 | 0 | char* str_storage = |
268 | 0 | static_cast<char*>(bs ? bs->Data() : nullptr) + offset; |
269 | 0 | size_t str_size = (bs ? bs->ByteLength() : 0) - offset; |
270 | |
|
271 | 0 | Local<String> string; |
272 | 0 | if (!chunk->ToString(context).ToLocal(&string)) |
273 | 0 | return -1; |
274 | 0 | Local<Value> next_chunk; |
275 | 0 | if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk)) |
276 | 0 | return -1; |
277 | 0 | enum encoding encoding = ParseEncoding(isolate, next_chunk); |
278 | 0 | str_size = StringBytes::Write(isolate, |
279 | 0 | str_storage, |
280 | 0 | str_size, |
281 | 0 | string, |
282 | 0 | encoding); |
283 | 0 | bufs[i].base = str_storage; |
284 | 0 | bufs[i].len = str_size; |
285 | 0 | offset += str_size; |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | 0 | StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); |
290 | 0 | SetWriteResult(res); |
291 | 0 | if (res.wrap != nullptr && storage_size > 0) |
292 | 0 | res.wrap->SetBackingStore(std::move(bs)); |
293 | 0 | return res.err; |
294 | 0 | } |
295 | | |
296 | | |
297 | 0 | int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { |
298 | 0 | CHECK(args[0]->IsObject()); |
299 | | |
300 | 0 | Environment* env = Environment::GetCurrent(args); |
301 | |
|
302 | 0 | if (!args[1]->IsUint8Array()) { |
303 | 0 | node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer"); |
304 | 0 | return 0; |
305 | 0 | } |
306 | | |
307 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
308 | 0 | uv_buf_t buf; |
309 | 0 | buf.base = Buffer::Data(args[1]); |
310 | 0 | buf.len = Buffer::Length(args[1]); |
311 | |
|
312 | 0 | uv_stream_t* send_handle = nullptr; |
313 | |
|
314 | 0 | if (args[2]->IsObject() && IsIPCPipe()) { |
315 | 0 | Local<Object> send_handle_obj = args[2].As<Object>(); |
316 | |
|
317 | 0 | HandleWrap* wrap; |
318 | 0 | ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); |
319 | 0 | send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); |
320 | | // Reference LibuvStreamWrap instance to prevent it from being garbage |
321 | | // collected before `AfterWrite` is called. |
322 | 0 | if (req_wrap_obj->Set(env->context(), |
323 | 0 | env->handle_string(), |
324 | 0 | send_handle_obj).IsNothing()) { |
325 | 0 | return -1; |
326 | 0 | } |
327 | 0 | } |
328 | | |
329 | 0 | StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); |
330 | 0 | SetWriteResult(res); |
331 | |
|
332 | 0 | return res.err; |
333 | 0 | } |
334 | | |
335 | | |
336 | | template <enum encoding enc> |
337 | 0 | int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { |
338 | 0 | Environment* env = Environment::GetCurrent(args); |
339 | 0 | Isolate* isolate = env->isolate(); |
340 | 0 | CHECK(args[0]->IsObject()); |
341 | 0 | CHECK(args[1]->IsString()); |
342 | | |
343 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
344 | 0 | Local<String> string = args[1].As<String>(); |
345 | 0 | Local<Object> send_handle_obj; |
346 | 0 | if (args[2]->IsObject()) |
347 | 0 | send_handle_obj = args[2].As<Object>(); |
348 | | |
349 | | // Compute the size of the storage that the string will be flattened into. |
350 | | // For UTF8 strings that are very long, go ahead and take the hit for |
351 | | // computing their actual size, rather than tripling the storage. |
352 | 0 | size_t storage_size; |
353 | 0 | if ((enc == UTF8 && |
354 | 0 | string->Length() > 65535 && |
355 | 0 | !StringBytes::Size(isolate, string, enc).To(&storage_size)) || |
356 | 0 | !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) { |
357 | 0 | return -1; |
358 | 0 | } |
359 | | |
360 | 0 | if (storage_size > INT_MAX) |
361 | 0 | return UV_ENOBUFS; |
362 | | |
363 | | // Try writing immediately if write size isn't too big |
364 | 0 | char stack_storage[16384]; // 16kb |
365 | 0 | size_t data_size; |
366 | 0 | size_t synchronously_written = 0; |
367 | 0 | uv_buf_t buf; |
368 | |
|
369 | 0 | bool try_write = HasDoTryWrite() && storage_size <= sizeof(stack_storage) && |
370 | 0 | (!IsIPCPipe() || send_handle_obj.IsEmpty()); |
371 | 0 | if (try_write) { |
372 | 0 | data_size = StringBytes::Write(isolate, |
373 | 0 | stack_storage, |
374 | 0 | storage_size, |
375 | 0 | string, |
376 | 0 | enc); |
377 | 0 | buf = uv_buf_init(stack_storage, data_size); |
378 | |
|
379 | 0 | uv_buf_t* bufs = &buf; |
380 | 0 | size_t count = 1; |
381 | 0 | const int err = DoTryWrite(&bufs, &count); |
382 | | // Keep track of the bytes written here, because we're taking a shortcut |
383 | | // by using `DoTryWrite()` directly instead of using the utilities |
384 | | // provided by `Write()`. |
385 | 0 | synchronously_written = count == 0 ? data_size : data_size - buf.len; |
386 | 0 | bytes_written_ += synchronously_written; |
387 | | |
388 | | // Immediate failure or success |
389 | 0 | if (err != 0 || count == 0) { |
390 | 0 | SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} }); |
391 | 0 | return err; |
392 | 0 | } |
393 | | |
394 | | // Partial write |
395 | 0 | CHECK_EQ(count, 1); |
396 | 0 | } |
397 | | |
398 | 0 | std::unique_ptr<BackingStore> bs; |
399 | |
|
400 | 0 | if (try_write) { |
401 | | // Copy partial data |
402 | 0 | bs = ArrayBuffer::NewBackingStore( |
403 | 0 | isolate, buf.len, BackingStoreInitializationMode::kUninitialized); |
404 | 0 | memcpy(bs->Data(), buf.base, buf.len); |
405 | 0 | data_size = buf.len; |
406 | 0 | } else { |
407 | | // Write it |
408 | 0 | bs = ArrayBuffer::NewBackingStore( |
409 | 0 | isolate, storage_size, BackingStoreInitializationMode::kUninitialized); |
410 | 0 | data_size = StringBytes::Write(isolate, |
411 | 0 | static_cast<char*>(bs->Data()), |
412 | 0 | storage_size, |
413 | 0 | string, |
414 | 0 | enc); |
415 | 0 | } |
416 | |
|
417 | 0 | CHECK_LE(data_size, storage_size); |
418 | | |
419 | 0 | buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size); |
420 | |
|
421 | 0 | uv_stream_t* send_handle = nullptr; |
422 | |
|
423 | 0 | if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { |
424 | 0 | HandleWrap* wrap; |
425 | 0 | ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); |
426 | 0 | send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); |
427 | | // Reference LibuvStreamWrap instance to prevent it from being garbage |
428 | | // collected before `AfterWrite` is called. |
429 | 0 | if (req_wrap_obj->Set(env->context(), |
430 | 0 | env->handle_string(), |
431 | 0 | send_handle_obj).IsNothing()) { |
432 | 0 | return -1; |
433 | 0 | } |
434 | 0 | } |
435 | | |
436 | 0 | StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write); |
437 | 0 | res.bytes += synchronously_written; |
438 | |
|
439 | 0 | SetWriteResult(res); |
440 | 0 | if (res.wrap != nullptr) |
441 | 0 | res.wrap->SetBackingStore(std::move(bs)); |
442 | |
|
443 | 0 | return res.err; |
444 | 0 | } Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&) |
445 | | |
446 | | |
447 | | MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread, |
448 | | Local<ArrayBuffer> ab, |
449 | | size_t offset, |
450 | 0 | StreamBaseJSChecks checks) { |
451 | 0 | Environment* env = env_; |
452 | |
|
453 | 0 | DCHECK_EQ(static_cast<int32_t>(nread), nread); |
454 | 0 | DCHECK_LE(offset, INT32_MAX); |
455 | |
|
456 | 0 | if (checks == DONT_SKIP_NREAD_CHECKS) { |
457 | 0 | if (ab.IsEmpty()) { |
458 | 0 | DCHECK_EQ(offset, 0); |
459 | 0 | DCHECK_LE(nread, 0); |
460 | 0 | } else { |
461 | 0 | DCHECK_GE(nread, 0); |
462 | 0 | } |
463 | 0 | } |
464 | |
|
465 | 0 | env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread); |
466 | 0 | env->stream_base_state()[kArrayBufferOffset] = offset; |
467 | |
|
468 | 0 | Local<Value> argv[] = { |
469 | 0 | ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>() |
470 | 0 | }; |
471 | |
|
472 | 0 | AsyncWrap* wrap = GetAsyncWrap(); |
473 | 0 | CHECK_NOT_NULL(wrap); |
474 | 0 | Local<Value> onread = wrap->object() |
475 | 0 | ->GetInternalField(StreamBase::kOnReadFunctionField) |
476 | 0 | .As<Value>(); |
477 | 0 | CHECK(onread->IsFunction()); |
478 | 0 | return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); |
479 | 0 | } |
480 | | |
481 | | |
482 | 0 | bool StreamBase::IsIPCPipe() { |
483 | 0 | return false; |
484 | 0 | } |
485 | | |
486 | | |
487 | 0 | int StreamBase::GetFD() { |
488 | 0 | return -1; |
489 | 0 | } |
490 | | |
491 | | |
492 | 0 | Local<Object> StreamBase::GetObject() { |
493 | 0 | return GetAsyncWrap()->object(); |
494 | 0 | } |
495 | | |
496 | | void StreamBase::AddAccessor(v8::Isolate* isolate, |
497 | | v8::Local<v8::Signature> signature, |
498 | | enum v8::PropertyAttribute attributes, |
499 | | v8::Local<v8::FunctionTemplate> t, |
500 | | JSMethodFunction* getter, |
501 | | JSMethodFunction* setter, |
502 | 35 | v8::Local<v8::String> string) { |
503 | 35 | Local<FunctionTemplate> getter_templ = |
504 | 35 | NewFunctionTemplate(isolate, |
505 | 35 | getter, |
506 | 35 | signature, |
507 | 35 | ConstructorBehavior::kThrow, |
508 | 35 | SideEffectType::kHasNoSideEffect); |
509 | 35 | Local<FunctionTemplate> setter_templ = |
510 | 35 | NewFunctionTemplate(isolate, |
511 | 35 | setter, |
512 | 35 | signature, |
513 | 35 | ConstructorBehavior::kThrow, |
514 | 35 | SideEffectType::kHasSideEffect); |
515 | 35 | t->PrototypeTemplate()->SetAccessorProperty( |
516 | 35 | string, getter_templ, setter_templ, attributes); |
517 | 35 | } |
518 | | |
519 | | void StreamBase::AddMethod(Isolate* isolate, |
520 | | Local<Signature> signature, |
521 | | enum PropertyAttribute attributes, |
522 | | Local<FunctionTemplate> t, |
523 | | JSMethodFunction* stream_method, |
524 | 140 | Local<String> string) { |
525 | 140 | Local<FunctionTemplate> templ = |
526 | 140 | NewFunctionTemplate(isolate, |
527 | 140 | stream_method, |
528 | 140 | signature, |
529 | 140 | ConstructorBehavior::kThrow, |
530 | 140 | SideEffectType::kHasNoSideEffect); |
531 | 140 | t->PrototypeTemplate()->SetAccessorProperty( |
532 | 140 | string, templ, Local<FunctionTemplate>(), attributes); |
533 | 140 | } |
534 | | |
535 | 0 | void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) { |
536 | 0 | AddMethods(env->isolate_data(), t); |
537 | 0 | } |
538 | | |
539 | | void StreamBase::AddMethods(IsolateData* isolate_data, |
540 | 35 | Local<FunctionTemplate> t) { |
541 | 35 | Isolate* isolate = isolate_data->isolate(); |
542 | 35 | HandleScope scope(isolate); |
543 | | |
544 | 35 | enum PropertyAttribute attributes = |
545 | 35 | static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum); |
546 | 35 | Local<Signature> sig = Signature::New(isolate, t); |
547 | | |
548 | 35 | AddMethod(isolate, sig, attributes, t, GetFD, isolate_data->fd_string()); |
549 | 35 | AddMethod(isolate, |
550 | 35 | sig, |
551 | 35 | attributes, |
552 | 35 | t, |
553 | 35 | GetExternal, |
554 | 35 | isolate_data->external_stream_string()); |
555 | 35 | AddMethod(isolate, |
556 | 35 | sig, |
557 | 35 | attributes, |
558 | 35 | t, |
559 | 35 | GetBytesRead, |
560 | 35 | isolate_data->bytes_read_string()); |
561 | 35 | AddMethod(isolate, |
562 | 35 | sig, |
563 | 35 | attributes, |
564 | 35 | t, |
565 | 35 | GetBytesWritten, |
566 | 35 | isolate_data->bytes_written_string()); |
567 | 35 | SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>); |
568 | 35 | SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>); |
569 | 35 | SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>); |
570 | 35 | SetProtoMethod( |
571 | 35 | isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>); |
572 | 35 | SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>); |
573 | 35 | SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); |
574 | 35 | SetProtoMethod(isolate, |
575 | 35 | t, |
576 | 35 | "writeAsciiString", |
577 | 35 | JSMethod<&StreamBase::WriteString<ASCII>>); |
578 | 35 | SetProtoMethod( |
579 | 35 | isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>); |
580 | 35 | SetProtoMethod( |
581 | 35 | isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>); |
582 | 35 | SetProtoMethod(isolate, |
583 | 35 | t, |
584 | 35 | "writeLatin1String", |
585 | 35 | JSMethod<&StreamBase::WriteString<LATIN1>>); |
586 | 35 | t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"), |
587 | 35 | True(isolate)); |
588 | 35 | AddAccessor(isolate, |
589 | 35 | sig, |
590 | 35 | static_cast<PropertyAttribute>(DontDelete | DontEnum), |
591 | 35 | t, |
592 | 35 | BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>, |
593 | 35 | BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField, |
594 | 35 | &Value::IsFunction>, |
595 | 35 | FIXED_ONE_BYTE_STRING(isolate, "onread")); |
596 | 35 | } |
597 | | |
598 | | void StreamBase::RegisterExternalReferences( |
599 | 0 | ExternalReferenceRegistry* registry) { |
600 | | // This function is called by a single thread during start up, so it is safe |
601 | | // to use a local static variable here. |
602 | 0 | static bool is_registered = false; |
603 | 0 | if (is_registered) return; |
604 | 0 | registry->Register(GetFD); |
605 | 0 | registry->Register(GetExternal); |
606 | 0 | registry->Register(GetBytesRead); |
607 | 0 | registry->Register(GetBytesWritten); |
608 | 0 | registry->Register(JSMethod<&StreamBase::ReadStartJS>); |
609 | 0 | registry->Register(JSMethod<&StreamBase::ReadStopJS>); |
610 | 0 | registry->Register(JSMethod<&StreamBase::Shutdown>); |
611 | 0 | registry->Register(JSMethod<&StreamBase::UseUserBuffer>); |
612 | 0 | registry->Register(JSMethod<&StreamBase::Writev>); |
613 | 0 | registry->Register(JSMethod<&StreamBase::WriteBuffer>); |
614 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>); |
615 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>); |
616 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>); |
617 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>); |
618 | 0 | registry->Register( |
619 | 0 | BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>); |
620 | 0 | registry->Register( |
621 | 0 | BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField, |
622 | 0 | &Value::IsFunction>); |
623 | 0 | is_registered = true; |
624 | 0 | } |
625 | | |
626 | 0 | void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) { |
627 | | // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD(). |
628 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
629 | 0 | if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL); |
630 | | |
631 | 0 | if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); |
632 | | |
633 | 0 | args.GetReturnValue().Set(wrap->GetFD()); |
634 | 0 | } |
635 | | |
636 | 0 | void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) { |
637 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
638 | 0 | if (wrap == nullptr) return args.GetReturnValue().Set(0); |
639 | | |
640 | | // uint64_t -> double. 53bits is enough for all real cases. |
641 | 0 | args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_)); |
642 | 0 | } |
643 | | |
644 | 0 | void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) { |
645 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
646 | 0 | if (wrap == nullptr) return args.GetReturnValue().Set(0); |
647 | | |
648 | | // uint64_t -> double. 53bits is enough for all real cases. |
649 | 0 | args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_)); |
650 | 0 | } |
651 | | |
652 | 0 | void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) { |
653 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
654 | 0 | if (wrap == nullptr) return; |
655 | | |
656 | 0 | Local<External> ext = External::New(args.GetIsolate(), wrap); |
657 | 0 | args.GetReturnValue().Set(ext); |
658 | 0 | } |
659 | | |
660 | | template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)> |
661 | 0 | void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) { |
662 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
663 | 0 | if (wrap == nullptr) return; |
664 | | |
665 | 0 | if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); |
666 | | |
667 | 0 | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap()); |
668 | 0 | args.GetReturnValue().Set((wrap->*Method)(args)); |
669 | 0 | } Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStartJS>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStopJS>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Shutdown>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::UseUserBuffer>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Writev>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::WriteBuffer>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) |
670 | | |
671 | 0 | int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { |
672 | | // No TryWrite by default |
673 | 0 | return 0; |
674 | 0 | } |
675 | | |
676 | | |
677 | 0 | const char* StreamResource::Error() const { |
678 | 0 | return nullptr; |
679 | 0 | } |
680 | | |
681 | | |
682 | 0 | void StreamResource::ClearError() { |
683 | | // No-op |
684 | 0 | } |
685 | | |
686 | | |
687 | 0 | uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { |
688 | 0 | CHECK_NOT_NULL(stream_); |
689 | 0 | Environment* env = static_cast<StreamBase*>(stream_)->stream_env(); |
690 | 0 | return env->allocate_managed_buffer(suggested_size); |
691 | 0 | } |
692 | | |
693 | 0 | void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { |
694 | 0 | CHECK_NOT_NULL(stream_); |
695 | 0 | StreamBase* stream = static_cast<StreamBase*>(stream_); |
696 | 0 | Environment* env = stream->stream_env(); |
697 | 0 | Isolate* isolate = env->isolate(); |
698 | 0 | HandleScope handle_scope(isolate); |
699 | 0 | Context::Scope context_scope(env->context()); |
700 | 0 | std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_); |
701 | |
|
702 | 0 | if (nread <= 0) { |
703 | 0 | if (nread < 0) |
704 | 0 | stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>()); |
705 | 0 | return; |
706 | 0 | } |
707 | | |
708 | 0 | CHECK_LE(static_cast<size_t>(nread), bs->ByteLength()); |
709 | 0 | if (static_cast<size_t>(nread) != bs->ByteLength()) { |
710 | 0 | std::unique_ptr<BackingStore> old_bs = std::move(bs); |
711 | 0 | bs = ArrayBuffer::NewBackingStore( |
712 | 0 | isolate, nread, BackingStoreInitializationMode::kUninitialized); |
713 | 0 | memcpy(bs->Data(), old_bs->Data(), nread); |
714 | 0 | } |
715 | |
|
716 | 0 | stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs))); |
717 | 0 | } |
718 | | |
719 | | |
720 | 0 | uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) { |
721 | 0 | return buffer_; |
722 | 0 | } |
723 | | |
724 | | |
725 | 0 | void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { |
726 | 0 | CHECK_NOT_NULL(stream_); |
727 | | |
728 | 0 | StreamBase* stream = static_cast<StreamBase*>(stream_); |
729 | 0 | Environment* env = stream->stream_env(); |
730 | 0 | HandleScope handle_scope(env->isolate()); |
731 | 0 | Context::Scope context_scope(env->context()); |
732 | | |
733 | | // In the case that there's an error and buf is null, return immediately. |
734 | | // This can happen on unices when POLLHUP is received and UV_EOF is returned |
735 | | // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows. |
736 | 0 | if (buf.base == nullptr && nread < 0) { |
737 | 0 | stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>()); |
738 | 0 | return; |
739 | 0 | } |
740 | | |
741 | 0 | CHECK_EQ(buf.base, buffer_.base); |
742 | | |
743 | 0 | MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread, |
744 | 0 | Local<ArrayBuffer>(), |
745 | 0 | 0, |
746 | 0 | StreamBase::SKIP_NREAD_CHECKS); |
747 | 0 | Local<Value> next_buf_v; |
748 | 0 | if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) { |
749 | 0 | buffer_.base = Buffer::Data(next_buf_v); |
750 | 0 | buffer_.len = Buffer::Length(next_buf_v); |
751 | 0 | } |
752 | 0 | } |
753 | | |
754 | | |
755 | | void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( |
756 | 0 | StreamReq* req_wrap, int status) { |
757 | 0 | StreamBase* stream = static_cast<StreamBase*>(stream_); |
758 | 0 | Environment* env = stream->stream_env(); |
759 | 0 | if (!env->can_call_into_js()) return; |
760 | 0 | AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); |
761 | 0 | HandleScope handle_scope(env->isolate()); |
762 | 0 | Context::Scope context_scope(env->context()); |
763 | 0 | CHECK(!async_wrap->persistent().IsEmpty()); |
764 | 0 | Local<Object> req_wrap_obj = async_wrap->object(); |
765 | |
|
766 | 0 | Local<Value> argv[] = { |
767 | 0 | Integer::New(env->isolate(), status), |
768 | 0 | stream->GetObject(), |
769 | 0 | Undefined(env->isolate()) |
770 | 0 | }; |
771 | |
|
772 | 0 | const char* msg = stream->Error(); |
773 | 0 | if (msg != nullptr) { |
774 | 0 | argv[2] = OneByteString(env->isolate(), msg); |
775 | 0 | stream->ClearError(); |
776 | 0 | } |
777 | |
|
778 | 0 | if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) |
779 | 0 | async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); |
780 | 0 | } |
781 | | |
782 | | void ReportWritesToJSStreamListener::OnStreamAfterWrite( |
783 | 0 | WriteWrap* req_wrap, int status) { |
784 | 0 | OnStreamAfterReqFinished(req_wrap, status); |
785 | 0 | } |
786 | | |
787 | | void ReportWritesToJSStreamListener::OnStreamAfterShutdown( |
788 | 0 | ShutdownWrap* req_wrap, int status) { |
789 | 0 | OnStreamAfterReqFinished(req_wrap, status); |
790 | 0 | } |
791 | | |
792 | 0 | void ShutdownWrap::OnDone(int status) { |
793 | 0 | stream()->EmitAfterShutdown(this, status); |
794 | 0 | Dispose(); |
795 | 0 | } |
796 | | |
797 | 0 | void WriteWrap::OnDone(int status) { |
798 | 0 | stream()->EmitAfterWrite(this, status); |
799 | 0 | Dispose(); |
800 | 0 | } |
801 | | |
802 | 0 | StreamListener::~StreamListener() { |
803 | 0 | if (stream_ != nullptr) |
804 | 0 | stream_->RemoveStreamListener(this); |
805 | 0 | } |
806 | | |
807 | 0 | void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { |
808 | 0 | CHECK_NOT_NULL(previous_listener_); |
809 | 0 | previous_listener_->OnStreamAfterShutdown(w, status); |
810 | 0 | } |
811 | | |
812 | 0 | void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { |
813 | 0 | CHECK_NOT_NULL(previous_listener_); |
814 | 0 | previous_listener_->OnStreamAfterWrite(w, status); |
815 | 0 | } |
816 | | |
817 | 0 | StreamResource::~StreamResource() { |
818 | 0 | while (listener_ != nullptr) { |
819 | 0 | StreamListener* listener = listener_; |
820 | 0 | listener->OnStreamDestroy(); |
821 | | // Remove the listener if it didn’t remove itself. This makes the logic |
822 | | // in `OnStreamDestroy()` implementations easier, because they |
823 | | // may call generic cleanup functions which can just remove the |
824 | | // listener unconditionally. |
825 | 0 | if (listener == listener_) |
826 | 0 | RemoveStreamListener(listener_); |
827 | 0 | } |
828 | 0 | } |
829 | | |
830 | 0 | void StreamResource::RemoveStreamListener(StreamListener* listener) { |
831 | 0 | CHECK_NOT_NULL(listener); |
832 | | |
833 | 0 | StreamListener* previous; |
834 | 0 | StreamListener* current; |
835 | | |
836 | | // Remove from the linked list. |
837 | | // No loop condition because we want a crash if listener is not found. |
838 | 0 | for (current = listener_, previous = nullptr;; |
839 | 0 | previous = current, current = current->previous_listener_) { |
840 | 0 | CHECK_NOT_NULL(current); |
841 | 0 | if (current == listener) { |
842 | 0 | if (previous != nullptr) |
843 | 0 | previous->previous_listener_ = current->previous_listener_; |
844 | 0 | else |
845 | 0 | listener_ = listener->previous_listener_; |
846 | 0 | break; |
847 | 0 | } |
848 | 0 | } |
849 | | |
850 | 0 | listener->stream_ = nullptr; |
851 | 0 | listener->previous_listener_ = nullptr; |
852 | 0 | } |
853 | | |
854 | | ShutdownWrap* StreamBase::CreateShutdownWrap( |
855 | 0 | Local<Object> object) { |
856 | 0 | auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object); |
857 | 0 | wrap->MakeWeak(); |
858 | 0 | return wrap; |
859 | 0 | } |
860 | | |
861 | | WriteWrap* StreamBase::CreateWriteWrap( |
862 | 0 | Local<Object> object) { |
863 | 0 | auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object); |
864 | 0 | wrap->MakeWeak(); |
865 | 0 | return wrap; |
866 | 0 | } |
867 | | |
868 | 0 | void StreamReq::Done(int status, const char* error_str) { |
869 | 0 | AsyncWrap* async_wrap = GetAsyncWrap(); |
870 | 0 | Environment* env = async_wrap->env(); |
871 | 0 | if (error_str != nullptr) { |
872 | 0 | v8::HandleScope handle_scope(env->isolate()); |
873 | 0 | if (async_wrap->object() |
874 | 0 | ->Set(env->context(), |
875 | 0 | env->error_string(), |
876 | 0 | OneByteString(env->isolate(), error_str)) |
877 | 0 | .IsNothing()) { |
878 | 0 | return; |
879 | 0 | } |
880 | 0 | } |
881 | | |
882 | 0 | OnDone(status); |
883 | 0 | } |
884 | | |
885 | | } // namespace node |