/src/node/src/stream_base.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "stream_base.h" // NOLINT(build/include_inline) |
2 | | #include "stream_base-inl.h" |
3 | | #include "stream_wrap.h" |
4 | | |
5 | | #include "env-inl.h" |
6 | | #include "js_stream.h" |
7 | | #include "node.h" |
8 | | #include "node_buffer.h" |
9 | | #include "node_errors.h" |
10 | | #include "node_external_reference.h" |
11 | | #include "string_bytes.h" |
12 | | #include "util-inl.h" |
13 | | #include "v8.h" |
14 | | |
15 | | #include <climits> // INT_MAX |
16 | | |
17 | | namespace node { |
18 | | |
19 | | using v8::Array; |
20 | | using v8::ArrayBuffer; |
21 | | using v8::BackingStore; |
22 | | using v8::ConstructorBehavior; |
23 | | using v8::Context; |
24 | | using v8::DontDelete; |
25 | | using v8::DontEnum; |
26 | | using v8::External; |
27 | | using v8::Function; |
28 | | using v8::FunctionCallbackInfo; |
29 | | using v8::FunctionTemplate; |
30 | | using v8::HandleScope; |
31 | | using v8::Integer; |
32 | | using v8::Isolate; |
33 | | using v8::Local; |
34 | | using v8::MaybeLocal; |
35 | | using v8::Object; |
36 | | using v8::PropertyAttribute; |
37 | | using v8::ReadOnly; |
38 | | using v8::SideEffectType; |
39 | | using v8::Signature; |
40 | | using v8::String; |
41 | | using v8::Value; |
42 | | |
43 | 0 | int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) { |
44 | 0 | Environment* env = stream_env(); |
45 | |
|
46 | 0 | v8::HandleScope handle_scope(env->isolate()); |
47 | |
|
48 | 0 | if (req_wrap_obj.IsEmpty()) { |
49 | 0 | if (!env->shutdown_wrap_template() |
50 | 0 | ->NewInstance(env->context()) |
51 | 0 | .ToLocal(&req_wrap_obj)) { |
52 | 0 | return UV_EBUSY; |
53 | 0 | } |
54 | 0 | StreamReq::ResetObject(req_wrap_obj); |
55 | 0 | } |
56 | | |
57 | 0 | BaseObjectPtr<AsyncWrap> req_wrap_ptr; |
58 | 0 | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); |
59 | 0 | ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); |
60 | 0 | if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap()); |
61 | 0 | int err = DoShutdown(req_wrap); |
62 | |
|
63 | 0 | if (err != 0 && req_wrap != nullptr) { |
64 | 0 | req_wrap->Dispose(); |
65 | 0 | } |
66 | |
|
67 | 0 | const char* msg = Error(); |
68 | 0 | if (msg != nullptr) { |
69 | 0 | if (req_wrap_obj |
70 | 0 | ->Set(env->context(), |
71 | 0 | env->error_string(), |
72 | 0 | OneByteString(env->isolate(), msg)) |
73 | 0 | .IsNothing()) { |
74 | 0 | return UV_EBUSY; |
75 | 0 | } |
76 | 0 | ClearError(); |
77 | 0 | } |
78 | | |
79 | 0 | return err; |
80 | 0 | } |
81 | | |
82 | | StreamWriteResult StreamBase::Write(uv_buf_t* bufs, |
83 | | size_t count, |
84 | | uv_stream_t* send_handle, |
85 | | v8::Local<v8::Object> req_wrap_obj, |
86 | 729 | bool skip_try_write) { |
87 | 729 | Environment* env = stream_env(); |
88 | 729 | int err; |
89 | | |
90 | 729 | size_t total_bytes = 0; |
91 | 1.45k | for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len; |
92 | 729 | bytes_written_ += total_bytes; |
93 | | |
94 | 729 | if (send_handle == nullptr && HasDoTryWrite() && !skip_try_write) { |
95 | 0 | err = DoTryWrite(&bufs, &count); |
96 | 0 | if (err != 0 || count == 0) { |
97 | 0 | return StreamWriteResult{false, err, nullptr, total_bytes, {}}; |
98 | 0 | } |
99 | 0 | } |
100 | | |
101 | 729 | v8::HandleScope handle_scope(env->isolate()); |
102 | | |
103 | 729 | if (req_wrap_obj.IsEmpty()) { |
104 | 0 | if (!env->write_wrap_template() |
105 | 0 | ->NewInstance(env->context()) |
106 | 0 | .ToLocal(&req_wrap_obj)) { |
107 | 0 | return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; |
108 | 0 | } |
109 | 0 | StreamReq::ResetObject(req_wrap_obj); |
110 | 0 | } |
111 | | |
112 | 729 | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); |
113 | 729 | WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); |
114 | 729 | BaseObjectPtr<AsyncWrap> req_wrap_ptr(req_wrap->GetAsyncWrap()); |
115 | | |
116 | 729 | err = DoWrite(req_wrap, bufs, count, send_handle); |
117 | 729 | bool async = err == 0; |
118 | | |
119 | 729 | if (!async) { |
120 | 0 | req_wrap->Dispose(); |
121 | 0 | req_wrap = nullptr; |
122 | 0 | } |
123 | | |
124 | 729 | const char* msg = Error(); |
125 | 729 | if (msg != nullptr) { |
126 | 0 | if (req_wrap_obj |
127 | 0 | ->Set(env->context(), |
128 | 0 | env->error_string(), |
129 | 0 | OneByteString(env->isolate(), msg)) |
130 | 0 | .IsNothing()) { |
131 | 0 | return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; |
132 | 0 | } |
133 | 0 | ClearError(); |
134 | 0 | } |
135 | | |
136 | 729 | return StreamWriteResult{ |
137 | 729 | async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)}; |
138 | 729 | } |
139 | | |
140 | | template int StreamBase::WriteString<ASCII>( |
141 | | const FunctionCallbackInfo<Value>& args); |
142 | | template int StreamBase::WriteString<UTF8>( |
143 | | const FunctionCallbackInfo<Value>& args); |
144 | | template int StreamBase::WriteString<UCS2>( |
145 | | const FunctionCallbackInfo<Value>& args); |
146 | | template int StreamBase::WriteString<LATIN1>( |
147 | | const FunctionCallbackInfo<Value>& args); |
148 | | |
149 | | |
150 | 0 | int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) { |
151 | 0 | return ReadStart(); |
152 | 0 | } |
153 | | |
154 | | |
155 | 0 | int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) { |
156 | 0 | return ReadStop(); |
157 | 0 | } |
158 | | |
159 | 0 | int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) { |
160 | 0 | CHECK(Buffer::HasInstance(args[0])); |
161 | | |
162 | 0 | uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0])); |
163 | 0 | PushStreamListener(new CustomBufferJSListener(buf)); |
164 | 0 | return 0; |
165 | 0 | } |
166 | | |
167 | 0 | int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) { |
168 | 0 | CHECK(args[0]->IsObject()); |
169 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
170 | |
|
171 | 0 | return Shutdown(req_wrap_obj); |
172 | 0 | } |
173 | | |
174 | 1.05k | void StreamBase::SetWriteResult(const StreamWriteResult& res) { |
175 | 1.05k | env_->stream_base_state()[kBytesWritten] = res.bytes; |
176 | 1.05k | env_->stream_base_state()[kLastWriteWasAsync] = res.async; |
177 | 1.05k | } |
178 | | |
179 | 0 | int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { |
180 | 0 | Environment* env = Environment::GetCurrent(args); |
181 | 0 | Isolate* isolate = env->isolate(); |
182 | 0 | Local<Context> context = env->context(); |
183 | |
|
184 | 0 | CHECK(args[0]->IsObject()); |
185 | 0 | CHECK(args[1]->IsArray()); |
186 | | |
187 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
188 | 0 | Local<Array> chunks = args[1].As<Array>(); |
189 | 0 | bool all_buffers = args[2]->IsTrue(); |
190 | |
|
191 | 0 | size_t count; |
192 | 0 | if (all_buffers) |
193 | 0 | count = chunks->Length(); |
194 | 0 | else |
195 | 0 | count = chunks->Length() >> 1; |
196 | |
|
197 | 0 | MaybeStackBuffer<uv_buf_t, 16> bufs(count); |
198 | |
|
199 | 0 | size_t storage_size = 0; |
200 | 0 | size_t offset; |
201 | |
|
202 | 0 | if (!all_buffers) { |
203 | | // Determine storage size first |
204 | 0 | for (size_t i = 0; i < count; i++) { |
205 | 0 | Local<Value> chunk; |
206 | 0 | if (!chunks->Get(context, i * 2).ToLocal(&chunk)) |
207 | 0 | return -1; |
208 | | |
209 | 0 | if (Buffer::HasInstance(chunk)) |
210 | 0 | continue; |
211 | | // Buffer chunk, no additional storage required |
212 | | |
213 | | // String chunk |
214 | 0 | Local<String> string; |
215 | 0 | if (!chunk->ToString(context).ToLocal(&string)) |
216 | 0 | return -1; |
217 | 0 | Local<Value> next_chunk; |
218 | 0 | if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk)) |
219 | 0 | return -1; |
220 | 0 | enum encoding encoding = ParseEncoding(isolate, next_chunk); |
221 | 0 | size_t chunk_size; |
222 | 0 | if ((encoding == UTF8 && |
223 | 0 | string->Length() > 65535 && |
224 | 0 | !StringBytes::Size(isolate, string, encoding).To(&chunk_size)) || |
225 | 0 | !StringBytes::StorageSize(isolate, string, encoding) |
226 | 0 | .To(&chunk_size)) { |
227 | 0 | return -1; |
228 | 0 | } |
229 | 0 | storage_size += chunk_size; |
230 | 0 | } |
231 | | |
232 | 0 | if (storage_size > INT_MAX) |
233 | 0 | return UV_ENOBUFS; |
234 | 0 | } else { |
235 | 0 | for (size_t i = 0; i < count; i++) { |
236 | 0 | Local<Value> chunk; |
237 | 0 | if (!chunks->Get(context, i).ToLocal(&chunk)) |
238 | 0 | return -1; |
239 | 0 | bufs[i].base = Buffer::Data(chunk); |
240 | 0 | bufs[i].len = Buffer::Length(chunk); |
241 | 0 | } |
242 | 0 | } |
243 | | |
244 | 0 | std::unique_ptr<BackingStore> bs; |
245 | 0 | if (storage_size > 0) { |
246 | 0 | NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); |
247 | 0 | bs = ArrayBuffer::NewBackingStore(isolate, storage_size); |
248 | 0 | } |
249 | |
|
250 | 0 | offset = 0; |
251 | 0 | if (!all_buffers) { |
252 | 0 | for (size_t i = 0; i < count; i++) { |
253 | 0 | Local<Value> chunk; |
254 | 0 | if (!chunks->Get(context, i * 2).ToLocal(&chunk)) |
255 | 0 | return -1; |
256 | | |
257 | | // Write buffer |
258 | 0 | if (Buffer::HasInstance(chunk)) { |
259 | 0 | bufs[i].base = Buffer::Data(chunk); |
260 | 0 | bufs[i].len = Buffer::Length(chunk); |
261 | 0 | continue; |
262 | 0 | } |
263 | | |
264 | | // Write string |
265 | 0 | CHECK_LE(offset, storage_size); |
266 | 0 | char* str_storage = |
267 | 0 | static_cast<char*>(bs ? bs->Data() : nullptr) + offset; |
268 | 0 | size_t str_size = (bs ? bs->ByteLength() : 0) - offset; |
269 | |
|
270 | 0 | Local<String> string; |
271 | 0 | if (!chunk->ToString(context).ToLocal(&string)) |
272 | 0 | return -1; |
273 | 0 | Local<Value> next_chunk; |
274 | 0 | if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk)) |
275 | 0 | return -1; |
276 | 0 | enum encoding encoding = ParseEncoding(isolate, next_chunk); |
277 | 0 | str_size = StringBytes::Write(isolate, |
278 | 0 | str_storage, |
279 | 0 | str_size, |
280 | 0 | string, |
281 | 0 | encoding); |
282 | 0 | bufs[i].base = str_storage; |
283 | 0 | bufs[i].len = str_size; |
284 | 0 | offset += str_size; |
285 | 0 | } |
286 | 0 | } |
287 | | |
288 | 0 | StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); |
289 | 0 | SetWriteResult(res); |
290 | 0 | if (res.wrap != nullptr && storage_size > 0) |
291 | 0 | res.wrap->SetBackingStore(std::move(bs)); |
292 | 0 | return res.err; |
293 | 0 | } |
294 | | |
295 | | |
296 | 0 | int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { |
297 | 0 | CHECK(args[0]->IsObject()); |
298 | | |
299 | 0 | Environment* env = Environment::GetCurrent(args); |
300 | |
|
301 | 0 | if (!args[1]->IsUint8Array()) { |
302 | 0 | node::THROW_ERR_INVALID_ARG_TYPE(env, "Second argument must be a buffer"); |
303 | 0 | return 0; |
304 | 0 | } |
305 | | |
306 | 0 | Local<Object> req_wrap_obj = args[0].As<Object>(); |
307 | 0 | uv_buf_t buf; |
308 | 0 | buf.base = Buffer::Data(args[1]); |
309 | 0 | buf.len = Buffer::Length(args[1]); |
310 | |
|
311 | 0 | uv_stream_t* send_handle = nullptr; |
312 | |
|
313 | 0 | if (args[2]->IsObject() && IsIPCPipe()) { |
314 | 0 | Local<Object> send_handle_obj = args[2].As<Object>(); |
315 | |
|
316 | 0 | HandleWrap* wrap; |
317 | 0 | ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); |
318 | 0 | send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); |
319 | | // Reference LibuvStreamWrap instance to prevent it from being garbage |
320 | | // collected before `AfterWrite` is called. |
321 | 0 | if (req_wrap_obj->Set(env->context(), |
322 | 0 | env->handle_string(), |
323 | 0 | send_handle_obj).IsNothing()) { |
324 | 0 | return -1; |
325 | 0 | } |
326 | 0 | } |
327 | | |
328 | 0 | StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); |
329 | 0 | SetWriteResult(res); |
330 | |
|
331 | 0 | return res.err; |
332 | 0 | } |
333 | | |
334 | | |
335 | | template <enum encoding enc> |
336 | 1.05k | int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { |
337 | 1.05k | Environment* env = Environment::GetCurrent(args); |
338 | 1.05k | Isolate* isolate = env->isolate(); |
339 | 1.05k | CHECK(args[0]->IsObject()); |
340 | 1.05k | CHECK(args[1]->IsString()); |
341 | | |
342 | 1.05k | Local<Object> req_wrap_obj = args[0].As<Object>(); |
343 | 1.05k | Local<String> string = args[1].As<String>(); |
344 | 1.05k | Local<Object> send_handle_obj; |
345 | 1.05k | if (args[2]->IsObject()) |
346 | 0 | send_handle_obj = args[2].As<Object>(); |
347 | | |
348 | | // Compute the size of the storage that the string will be flattened into. |
349 | | // For UTF8 strings that are very long, go ahead and take the hit for |
350 | | // computing their actual size, rather than tripling the storage. |
351 | 1.05k | size_t storage_size; |
352 | 1.05k | if ((enc == UTF8 && |
353 | 1.05k | string->Length() > 65535 && |
354 | 1.05k | !StringBytes::Size(isolate, string, enc).To(&storage_size)) || |
355 | 1.05k | !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) { |
356 | 0 | return -1; |
357 | 0 | } |
358 | | |
359 | 1.05k | if (storage_size > INT_MAX) |
360 | 0 | return UV_ENOBUFS; |
361 | | |
362 | | // Try writing immediately if write size isn't too big |
363 | 1.05k | char stack_storage[16384]; // 16kb |
364 | 1.05k | size_t data_size; |
365 | 1.05k | size_t synchronously_written = 0; |
366 | 1.05k | uv_buf_t buf; |
367 | | |
368 | 1.05k | bool try_write = HasDoTryWrite() && storage_size <= sizeof(stack_storage) && |
369 | 1.05k | (!IsIPCPipe() || send_handle_obj.IsEmpty()); |
370 | 1.05k | if (try_write) { |
371 | 1.05k | data_size = StringBytes::Write(isolate, |
372 | 1.05k | stack_storage, |
373 | 1.05k | storage_size, |
374 | 1.05k | string, |
375 | 1.05k | enc); |
376 | 1.05k | buf = uv_buf_init(stack_storage, data_size); |
377 | | |
378 | 1.05k | uv_buf_t* bufs = &buf; |
379 | 1.05k | size_t count = 1; |
380 | 1.05k | const int err = DoTryWrite(&bufs, &count); |
381 | | // Keep track of the bytes written here, because we're taking a shortcut |
382 | | // by using `DoTryWrite()` directly instead of using the utilities |
383 | | // provided by `Write()`. |
384 | 1.05k | synchronously_written = count == 0 ? data_size : data_size - buf.len; |
385 | 1.05k | bytes_written_ += synchronously_written; |
386 | | |
387 | | // Immediate failure or success |
388 | 1.05k | if (err != 0 || count == 0) { |
389 | 330 | SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} }); |
390 | 330 | return err; |
391 | 330 | } |
392 | | |
393 | | // Partial write |
394 | 729 | CHECK_EQ(count, 1); |
395 | 729 | } |
396 | | |
397 | 729 | std::unique_ptr<BackingStore> bs; |
398 | | |
399 | 729 | if (try_write) { |
400 | | // Copy partial data |
401 | 729 | NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); |
402 | 729 | bs = ArrayBuffer::NewBackingStore(isolate, buf.len); |
403 | 729 | memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len); |
404 | 729 | data_size = buf.len; |
405 | 729 | } else { |
406 | | // Write it |
407 | 0 | NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); |
408 | 0 | bs = ArrayBuffer::NewBackingStore(isolate, storage_size); |
409 | 0 | data_size = StringBytes::Write(isolate, |
410 | 0 | static_cast<char*>(bs->Data()), |
411 | 0 | storage_size, |
412 | 0 | string, |
413 | 0 | enc); |
414 | 0 | } |
415 | | |
416 | 729 | CHECK_LE(data_size, storage_size); |
417 | | |
418 | 729 | buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size); |
419 | | |
420 | 729 | uv_stream_t* send_handle = nullptr; |
421 | | |
422 | 729 | if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { |
423 | 0 | HandleWrap* wrap; |
424 | 0 | ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); |
425 | 0 | send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); |
426 | | // Reference LibuvStreamWrap instance to prevent it from being garbage |
427 | | // collected before `AfterWrite` is called. |
428 | 0 | if (req_wrap_obj->Set(env->context(), |
429 | 0 | env->handle_string(), |
430 | 0 | send_handle_obj).IsNothing()) { |
431 | 0 | return -1; |
432 | 0 | } |
433 | 0 | } |
434 | | |
435 | 729 | StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write); |
436 | 729 | res.bytes += synchronously_written; |
437 | | |
438 | 729 | SetWriteResult(res); |
439 | 729 | if (res.wrap != nullptr) |
440 | 729 | res.wrap->SetBackingStore(std::move(bs)); |
441 | | |
442 | 729 | return res.err; |
443 | 729 | } Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&) int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&) Line | Count | Source | 336 | 1.05k | int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { | 337 | 1.05k | Environment* env = Environment::GetCurrent(args); | 338 | 1.05k | Isolate* isolate = env->isolate(); | 339 | 1.05k | CHECK(args[0]->IsObject()); | 340 | 1.05k | CHECK(args[1]->IsString()); | 341 | | | 342 | 1.05k | Local<Object> req_wrap_obj = args[0].As<Object>(); | 343 | 1.05k | Local<String> string = args[1].As<String>(); | 344 | 1.05k | Local<Object> send_handle_obj; | 345 | 1.05k | if (args[2]->IsObject()) | 346 | 0 | send_handle_obj = args[2].As<Object>(); | 347 | | | 348 | | // Compute the size of the storage that the string will be flattened into. | 349 | | // For UTF8 strings that are very long, go ahead and take the hit for | 350 | | // computing their actual size, rather than tripling the storage. | 351 | 1.05k | size_t storage_size; | 352 | 1.05k | if ((enc == UTF8 && | 353 | 1.05k | string->Length() > 65535 && | 354 | 1.05k | !StringBytes::Size(isolate, string, enc).To(&storage_size)) || | 355 | 1.05k | !StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) { | 356 | 0 | return -1; | 357 | 0 | } | 358 | | | 359 | 1.05k | if (storage_size > INT_MAX) | 360 | 0 | return UV_ENOBUFS; | 361 | | | 362 | | // Try writing immediately if write size isn't too big | 363 | 1.05k | char stack_storage[16384]; // 16kb | 364 | 1.05k | size_t data_size; | 365 | 1.05k | size_t synchronously_written = 0; | 366 | 1.05k | uv_buf_t buf; | 367 | | | 368 | 1.05k | bool try_write = HasDoTryWrite() && storage_size <= sizeof(stack_storage) && | 369 | 1.05k | (!IsIPCPipe() || send_handle_obj.IsEmpty()); | 370 | 1.05k | if (try_write) { | 371 | 1.05k | data_size = StringBytes::Write(isolate, | 372 | 1.05k | stack_storage, | 373 | 1.05k | storage_size, | 374 | 1.05k | string, | 375 | 1.05k | enc); | 376 | 1.05k | buf = uv_buf_init(stack_storage, data_size); | 377 | | | 378 | 1.05k | uv_buf_t* bufs = &buf; | 379 | 1.05k | size_t count = 1; | 380 | 1.05k | const int err = DoTryWrite(&bufs, &count); | 381 | | // Keep track of the bytes written here, because we're taking a shortcut | 382 | | // by using `DoTryWrite()` directly instead of using the utilities | 383 | | // provided by `Write()`. | 384 | 1.05k | synchronously_written = count == 0 ? data_size : data_size - buf.len; | 385 | 1.05k | bytes_written_ += synchronously_written; | 386 | | | 387 | | // Immediate failure or success | 388 | 1.05k | if (err != 0 || count == 0) { | 389 | 330 | SetWriteResult(StreamWriteResult { false, err, nullptr, data_size, {} }); | 390 | 330 | return err; | 391 | 330 | } | 392 | | | 393 | | // Partial write | 394 | 729 | CHECK_EQ(count, 1); | 395 | 729 | } | 396 | | | 397 | 729 | std::unique_ptr<BackingStore> bs; | 398 | | | 399 | 729 | if (try_write) { | 400 | | // Copy partial data | 401 | 729 | NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); | 402 | 729 | bs = ArrayBuffer::NewBackingStore(isolate, buf.len); | 403 | 729 | memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len); | 404 | 729 | data_size = buf.len; | 405 | 729 | } else { | 406 | | // Write it | 407 | 0 | NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data()); | 408 | 0 | bs = ArrayBuffer::NewBackingStore(isolate, storage_size); | 409 | 0 | data_size = StringBytes::Write(isolate, | 410 | 0 | static_cast<char*>(bs->Data()), | 411 | 0 | storage_size, | 412 | 0 | string, | 413 | 0 | enc); | 414 | 0 | } | 415 | | | 416 | 729 | CHECK_LE(data_size, storage_size); | 417 | | | 418 | 729 | buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size); | 419 | | | 420 | 729 | uv_stream_t* send_handle = nullptr; | 421 | | | 422 | 729 | if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { | 423 | 0 | HandleWrap* wrap; | 424 | 0 | ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); | 425 | 0 | send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); | 426 | | // Reference LibuvStreamWrap instance to prevent it from being garbage | 427 | | // collected before `AfterWrite` is called. | 428 | 0 | if (req_wrap_obj->Set(env->context(), | 429 | 0 | env->handle_string(), | 430 | 0 | send_handle_obj).IsNothing()) { | 431 | 0 | return -1; | 432 | 0 | } | 433 | 0 | } | 434 | | | 435 | 729 | StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj, try_write); | 436 | 729 | res.bytes += synchronously_written; | 437 | | | 438 | 729 | SetWriteResult(res); | 439 | 729 | if (res.wrap != nullptr) | 440 | 729 | res.wrap->SetBackingStore(std::move(bs)); | 441 | | | 442 | 729 | return res.err; | 443 | 729 | } |
Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&) |
444 | | |
445 | | |
446 | | MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread, |
447 | | Local<ArrayBuffer> ab, |
448 | | size_t offset, |
449 | 0 | StreamBaseJSChecks checks) { |
450 | 0 | Environment* env = env_; |
451 | |
|
452 | 0 | DCHECK_EQ(static_cast<int32_t>(nread), nread); |
453 | 0 | DCHECK_LE(offset, INT32_MAX); |
454 | |
|
455 | 0 | if (checks == DONT_SKIP_NREAD_CHECKS) { |
456 | 0 | if (ab.IsEmpty()) { |
457 | 0 | DCHECK_EQ(offset, 0); |
458 | 0 | DCHECK_LE(nread, 0); |
459 | 0 | } else { |
460 | 0 | DCHECK_GE(nread, 0); |
461 | 0 | } |
462 | 0 | } |
463 | |
|
464 | 0 | env->stream_base_state()[kReadBytesOrError] = static_cast<int32_t>(nread); |
465 | 0 | env->stream_base_state()[kArrayBufferOffset] = offset; |
466 | |
|
467 | 0 | Local<Value> argv[] = { |
468 | 0 | ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>() |
469 | 0 | }; |
470 | |
|
471 | 0 | AsyncWrap* wrap = GetAsyncWrap(); |
472 | 0 | CHECK_NOT_NULL(wrap); |
473 | 0 | Local<Value> onread = wrap->object() |
474 | 0 | ->GetInternalField(StreamBase::kOnReadFunctionField) |
475 | 0 | .As<Value>(); |
476 | 0 | CHECK(onread->IsFunction()); |
477 | 0 | return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); |
478 | 0 | } |
479 | | |
480 | | |
481 | 0 | bool StreamBase::IsIPCPipe() { |
482 | 0 | return false; |
483 | 0 | } |
484 | | |
485 | | |
486 | 0 | int StreamBase::GetFD() { |
487 | 0 | return -1; |
488 | 0 | } |
489 | | |
490 | | |
491 | 757 | Local<Object> StreamBase::GetObject() { |
492 | 757 | return GetAsyncWrap()->object(); |
493 | 757 | } |
494 | | |
495 | | void StreamBase::AddMethod(Isolate* isolate, |
496 | | Local<Signature> signature, |
497 | | enum PropertyAttribute attributes, |
498 | | Local<FunctionTemplate> t, |
499 | | JSMethodFunction* stream_method, |
500 | 826k | Local<String> string) { |
501 | 826k | Local<FunctionTemplate> templ = |
502 | 826k | NewFunctionTemplate(isolate, |
503 | 826k | stream_method, |
504 | 826k | signature, |
505 | 826k | ConstructorBehavior::kThrow, |
506 | 826k | SideEffectType::kHasNoSideEffect); |
507 | 826k | t->PrototypeTemplate()->SetAccessorProperty( |
508 | 826k | string, templ, Local<FunctionTemplate>(), attributes); |
509 | 826k | } |
510 | | |
511 | 84.4k | void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) { |
512 | 84.4k | AddMethods(env->isolate_data(), t); |
513 | 84.4k | } |
514 | | |
515 | | void StreamBase::AddMethods(IsolateData* isolate_data, |
516 | 206k | Local<FunctionTemplate> t) { |
517 | 206k | Isolate* isolate = isolate_data->isolate(); |
518 | 206k | HandleScope scope(isolate); |
519 | | |
520 | 206k | enum PropertyAttribute attributes = |
521 | 206k | static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum); |
522 | 206k | Local<Signature> sig = Signature::New(isolate, t); |
523 | | |
524 | 206k | AddMethod(isolate, sig, attributes, t, GetFD, isolate_data->fd_string()); |
525 | 206k | AddMethod(isolate, |
526 | 206k | sig, |
527 | 206k | attributes, |
528 | 206k | t, |
529 | 206k | GetExternal, |
530 | 206k | isolate_data->external_stream_string()); |
531 | 206k | AddMethod(isolate, |
532 | 206k | sig, |
533 | 206k | attributes, |
534 | 206k | t, |
535 | 206k | GetBytesRead, |
536 | 206k | isolate_data->bytes_read_string()); |
537 | 206k | AddMethod(isolate, |
538 | 206k | sig, |
539 | 206k | attributes, |
540 | 206k | t, |
541 | 206k | GetBytesWritten, |
542 | 206k | isolate_data->bytes_written_string()); |
543 | 206k | SetProtoMethod(isolate, t, "readStart", JSMethod<&StreamBase::ReadStartJS>); |
544 | 206k | SetProtoMethod(isolate, t, "readStop", JSMethod<&StreamBase::ReadStopJS>); |
545 | 206k | SetProtoMethod(isolate, t, "shutdown", JSMethod<&StreamBase::Shutdown>); |
546 | 206k | SetProtoMethod( |
547 | 206k | isolate, t, "useUserBuffer", JSMethod<&StreamBase::UseUserBuffer>); |
548 | 206k | SetProtoMethod(isolate, t, "writev", JSMethod<&StreamBase::Writev>); |
549 | 206k | SetProtoMethod(isolate, t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); |
550 | 206k | SetProtoMethod(isolate, |
551 | 206k | t, |
552 | 206k | "writeAsciiString", |
553 | 206k | JSMethod<&StreamBase::WriteString<ASCII>>); |
554 | 206k | SetProtoMethod( |
555 | 206k | isolate, t, "writeUtf8String", JSMethod<&StreamBase::WriteString<UTF8>>); |
556 | 206k | SetProtoMethod( |
557 | 206k | isolate, t, "writeUcs2String", JSMethod<&StreamBase::WriteString<UCS2>>); |
558 | 206k | SetProtoMethod(isolate, |
559 | 206k | t, |
560 | 206k | "writeLatin1String", |
561 | 206k | JSMethod<&StreamBase::WriteString<LATIN1>>); |
562 | 206k | t->PrototypeTemplate()->Set(FIXED_ONE_BYTE_STRING(isolate, "isStreamBase"), |
563 | 206k | True(isolate)); |
564 | 206k | t->PrototypeTemplate()->SetAccessor( |
565 | 206k | FIXED_ONE_BYTE_STRING(isolate, "onread"), |
566 | 206k | BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>, |
567 | 206k | BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField, |
568 | 206k | &Value::IsFunction>); |
569 | 206k | } |
570 | | |
571 | | void StreamBase::RegisterExternalReferences( |
572 | 0 | ExternalReferenceRegistry* registry) { |
573 | | // This function is called by a single thread during start up, so it is safe |
574 | | // to use a local static variable here. |
575 | 0 | static bool is_registered = false; |
576 | 0 | if (is_registered) return; |
577 | 0 | registry->Register(GetFD); |
578 | 0 | registry->Register(GetExternal); |
579 | 0 | registry->Register(GetBytesRead); |
580 | 0 | registry->Register(GetBytesWritten); |
581 | 0 | registry->Register(JSMethod<&StreamBase::ReadStartJS>); |
582 | 0 | registry->Register(JSMethod<&StreamBase::ReadStopJS>); |
583 | 0 | registry->Register(JSMethod<&StreamBase::Shutdown>); |
584 | 0 | registry->Register(JSMethod<&StreamBase::UseUserBuffer>); |
585 | 0 | registry->Register(JSMethod<&StreamBase::Writev>); |
586 | 0 | registry->Register(JSMethod<&StreamBase::WriteBuffer>); |
587 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<ASCII>>); |
588 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<UTF8>>); |
589 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<UCS2>>); |
590 | 0 | registry->Register(JSMethod<&StreamBase::WriteString<LATIN1>>); |
591 | 0 | registry->Register( |
592 | 0 | BaseObject::InternalFieldGet<StreamBase::kOnReadFunctionField>); |
593 | 0 | registry->Register( |
594 | 0 | BaseObject::InternalFieldSet<StreamBase::kOnReadFunctionField, |
595 | 0 | &Value::IsFunction>); |
596 | 0 | is_registered = true; |
597 | 0 | } |
598 | | |
599 | 0 | void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) { |
600 | | // Mimic implementation of StreamBase::GetFD() and UDPWrap::GetFD(). |
601 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
602 | 0 | if (wrap == nullptr) return args.GetReturnValue().Set(UV_EINVAL); |
603 | | |
604 | 0 | if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); |
605 | | |
606 | 0 | args.GetReturnValue().Set(wrap->GetFD()); |
607 | 0 | } |
608 | | |
609 | 0 | void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) { |
610 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
611 | 0 | if (wrap == nullptr) return args.GetReturnValue().Set(0); |
612 | | |
613 | | // uint64_t -> double. 53bits is enough for all real cases. |
614 | 0 | args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_)); |
615 | 0 | } |
616 | | |
617 | 0 | void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) { |
618 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
619 | 0 | if (wrap == nullptr) return args.GetReturnValue().Set(0); |
620 | | |
621 | | // uint64_t -> double. 53bits is enough for all real cases. |
622 | 0 | args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_)); |
623 | 0 | } |
624 | | |
625 | 0 | void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) { |
626 | 0 | StreamBase* wrap = StreamBase::FromObject(args.This().As<Object>()); |
627 | 0 | if (wrap == nullptr) return; |
628 | | |
629 | 0 | Local<External> ext = External::New(args.GetIsolate(), wrap); |
630 | 0 | args.GetReturnValue().Set(ext); |
631 | 0 | } |
632 | | |
633 | | template <int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)> |
634 | 1.05k | void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) { |
635 | 1.05k | StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>()); |
636 | 1.05k | if (wrap == nullptr) return; |
637 | | |
638 | 1.05k | if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); |
639 | | |
640 | 1.05k | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap()); |
641 | 1.05k | args.GetReturnValue().Set((wrap->*Method)(args)); |
642 | 1.05k | } Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStartJS>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::ReadStopJS>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Shutdown>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::UseUserBuffer>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::Writev>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&node::StreamBase::WriteBuffer>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)0>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) Line | Count | Source | 634 | 1.05k | void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) { | 635 | 1.05k | StreamBase* wrap = StreamBase::FromObject(args.Holder().As<Object>()); | 636 | 1.05k | if (wrap == nullptr) return; | 637 | | | 638 | 1.05k | if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); | 639 | | | 640 | 1.05k | AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap->GetAsyncWrap()); | 641 | 1.05k | args.GetReturnValue().Set((wrap->*Method)(args)); | 642 | 1.05k | } |
Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)3>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) Unexecuted instantiation: void node::StreamBase::JSMethod<&(int node::StreamBase::WriteString<(node::encoding)4>(v8::FunctionCallbackInfo<v8::Value> const&))>(v8::FunctionCallbackInfo<v8::Value> const&) |
643 | | |
644 | 0 | int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { |
645 | | // No TryWrite by default |
646 | 0 | return 0; |
647 | 0 | } |
648 | | |
649 | | |
650 | 729 | const char* StreamResource::Error() const { |
651 | 729 | return nullptr; |
652 | 729 | } |
653 | | |
654 | | |
655 | 0 | void StreamResource::ClearError() { |
656 | | // No-op |
657 | 0 | } |
658 | | |
659 | | |
660 | 0 | uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { |
661 | 0 | CHECK_NOT_NULL(stream_); |
662 | 0 | Environment* env = static_cast<StreamBase*>(stream_)->stream_env(); |
663 | 0 | return env->allocate_managed_buffer(suggested_size); |
664 | 0 | } |
665 | | |
666 | 0 | void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { |
667 | 0 | CHECK_NOT_NULL(stream_); |
668 | 0 | StreamBase* stream = static_cast<StreamBase*>(stream_); |
669 | 0 | Environment* env = stream->stream_env(); |
670 | 0 | Isolate* isolate = env->isolate(); |
671 | 0 | HandleScope handle_scope(isolate); |
672 | 0 | Context::Scope context_scope(env->context()); |
673 | 0 | std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_); |
674 | |
|
675 | 0 | if (nread <= 0) { |
676 | 0 | if (nread < 0) |
677 | 0 | stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>()); |
678 | 0 | return; |
679 | 0 | } |
680 | | |
681 | 0 | CHECK_LE(static_cast<size_t>(nread), bs->ByteLength()); |
682 | 0 | if (static_cast<size_t>(nread) != bs->ByteLength()) { |
683 | 0 | std::unique_ptr<BackingStore> old_bs = std::move(bs); |
684 | 0 | bs = ArrayBuffer::NewBackingStore(isolate, nread); |
685 | 0 | memcpy(static_cast<char*>(bs->Data()), |
686 | 0 | static_cast<char*>(old_bs->Data()), |
687 | 0 | nread); |
688 | 0 | } |
689 | |
|
690 | 0 | stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs))); |
691 | 0 | } |
692 | | |
693 | | |
694 | 0 | uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) { |
695 | 0 | return buffer_; |
696 | 0 | } |
697 | | |
698 | | |
699 | 0 | void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { |
700 | 0 | CHECK_NOT_NULL(stream_); |
701 | | |
702 | 0 | StreamBase* stream = static_cast<StreamBase*>(stream_); |
703 | 0 | Environment* env = stream->stream_env(); |
704 | 0 | HandleScope handle_scope(env->isolate()); |
705 | 0 | Context::Scope context_scope(env->context()); |
706 | | |
707 | | // In the case that there's an error and buf is null, return immediately. |
708 | | // This can happen on unices when POLLHUP is received and UV_EOF is returned |
709 | | // or when getting an error while performing a UV_HANDLE_ZERO_READ on Windows. |
710 | 0 | if (buf.base == nullptr && nread < 0) { |
711 | 0 | stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>()); |
712 | 0 | return; |
713 | 0 | } |
714 | | |
715 | 0 | CHECK_EQ(buf.base, buffer_.base); |
716 | | |
717 | 0 | MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread, |
718 | 0 | Local<ArrayBuffer>(), |
719 | 0 | 0, |
720 | 0 | StreamBase::SKIP_NREAD_CHECKS); |
721 | 0 | Local<Value> next_buf_v; |
722 | 0 | if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) { |
723 | 0 | buffer_.base = Buffer::Data(next_buf_v); |
724 | 0 | buffer_.len = Buffer::Length(next_buf_v); |
725 | 0 | } |
726 | 0 | } |
727 | | |
728 | | |
729 | | void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( |
730 | 729 | StreamReq* req_wrap, int status) { |
731 | 729 | StreamBase* stream = static_cast<StreamBase*>(stream_); |
732 | 729 | Environment* env = stream->stream_env(); |
733 | 729 | if (!env->can_call_into_js()) return; |
734 | 0 | AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); |
735 | 0 | HandleScope handle_scope(env->isolate()); |
736 | 0 | Context::Scope context_scope(env->context()); |
737 | 0 | CHECK(!async_wrap->persistent().IsEmpty()); |
738 | 0 | Local<Object> req_wrap_obj = async_wrap->object(); |
739 | |
|
740 | 0 | Local<Value> argv[] = { |
741 | 0 | Integer::New(env->isolate(), status), |
742 | 0 | stream->GetObject(), |
743 | 0 | Undefined(env->isolate()) |
744 | 0 | }; |
745 | |
|
746 | 0 | const char* msg = stream->Error(); |
747 | 0 | if (msg != nullptr) { |
748 | 0 | argv[2] = OneByteString(env->isolate(), msg); |
749 | 0 | stream->ClearError(); |
750 | 0 | } |
751 | |
|
752 | 0 | if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) |
753 | 0 | async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); |
754 | 0 | } |
755 | | |
756 | | void ReportWritesToJSStreamListener::OnStreamAfterWrite( |
757 | 729 | WriteWrap* req_wrap, int status) { |
758 | 729 | OnStreamAfterReqFinished(req_wrap, status); |
759 | 729 | } |
760 | | |
761 | | void ReportWritesToJSStreamListener::OnStreamAfterShutdown( |
762 | 0 | ShutdownWrap* req_wrap, int status) { |
763 | 0 | OnStreamAfterReqFinished(req_wrap, status); |
764 | 0 | } |
765 | | |
766 | 0 | void ShutdownWrap::OnDone(int status) { |
767 | 0 | stream()->EmitAfterShutdown(this, status); |
768 | 0 | Dispose(); |
769 | 0 | } |
770 | | |
771 | 729 | void WriteWrap::OnDone(int status) { |
772 | 729 | stream()->EmitAfterWrite(this, status); |
773 | 729 | Dispose(); |
774 | 729 | } |
775 | | |
776 | 86.4k | StreamListener::~StreamListener() { |
777 | 86.4k | if (stream_ != nullptr) |
778 | 84.5k | stream_->RemoveStreamListener(this); |
779 | 86.4k | } |
780 | | |
781 | 0 | void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { |
782 | 0 | CHECK_NOT_NULL(previous_listener_); |
783 | 0 | previous_listener_->OnStreamAfterShutdown(w, status); |
784 | 0 | } |
785 | | |
786 | 0 | void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { |
787 | 0 | CHECK_NOT_NULL(previous_listener_); |
788 | 0 | previous_listener_->OnStreamAfterWrite(w, status); |
789 | 0 | } |
790 | | |
791 | 84.5k | StreamResource::~StreamResource() { |
792 | 85.3k | while (listener_ != nullptr) { |
793 | 757 | StreamListener* listener = listener_; |
794 | 757 | listener->OnStreamDestroy(); |
795 | | // Remove the listener if it didn’t remove itself. This makes the logic |
796 | | // in `OnStreamDestroy()` implementations easier, because they |
797 | | // may call generic cleanup functions which can just remove the |
798 | | // listener unconditionally. |
799 | 757 | if (listener == listener_) |
800 | 757 | RemoveStreamListener(listener_); |
801 | 757 | } |
802 | 84.5k | } |
803 | | |
804 | 85.3k | void StreamResource::RemoveStreamListener(StreamListener* listener) { |
805 | 85.3k | CHECK_NOT_NULL(listener); |
806 | | |
807 | 85.3k | StreamListener* previous; |
808 | 85.3k | StreamListener* current; |
809 | | |
810 | | // Remove from the linked list. |
811 | | // No loop condition because we want a crash if listener is not found. |
812 | 85.3k | for (current = listener_, previous = nullptr;; |
813 | 86.0k | previous = current, current = current->previous_listener_) { |
814 | 86.0k | CHECK_NOT_NULL(current); |
815 | 86.0k | if (current == listener) { |
816 | 85.3k | if (previous != nullptr) |
817 | 757 | previous->previous_listener_ = current->previous_listener_; |
818 | 84.5k | else |
819 | 84.5k | listener_ = listener->previous_listener_; |
820 | 85.3k | break; |
821 | 85.3k | } |
822 | 86.0k | } |
823 | | |
824 | 85.3k | listener->stream_ = nullptr; |
825 | 85.3k | listener->previous_listener_ = nullptr; |
826 | 85.3k | } |
827 | | |
828 | | ShutdownWrap* StreamBase::CreateShutdownWrap( |
829 | 0 | Local<Object> object) { |
830 | 0 | auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object); |
831 | 0 | wrap->MakeWeak(); |
832 | 0 | return wrap; |
833 | 0 | } |
834 | | |
835 | | WriteWrap* StreamBase::CreateWriteWrap( |
836 | 0 | Local<Object> object) { |
837 | 0 | auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object); |
838 | 0 | wrap->MakeWeak(); |
839 | 0 | return wrap; |
840 | 0 | } |
841 | | |
842 | 729 | void StreamReq::Done(int status, const char* error_str) { |
843 | 729 | AsyncWrap* async_wrap = GetAsyncWrap(); |
844 | 729 | Environment* env = async_wrap->env(); |
845 | 729 | if (error_str != nullptr) { |
846 | 0 | v8::HandleScope handle_scope(env->isolate()); |
847 | 0 | if (async_wrap->object() |
848 | 0 | ->Set(env->context(), |
849 | 0 | env->error_string(), |
850 | 0 | OneByteString(env->isolate(), error_str)) |
851 | 0 | .IsNothing()) { |
852 | 0 | return; |
853 | 0 | } |
854 | 0 | } |
855 | | |
856 | 729 | OnDone(status); |
857 | 729 | } |
858 | | |
859 | | } // namespace node |