/src/serenity/Userland/Libraries/LibWeb/Streams/ReadableStream.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2022, Linus Groh <linusg@serenityos.org> |
3 | | * Copyright (c) 2023-2024, Shannon Booth <shannon@serenityos.org> |
4 | | * Copyright (c) 2024, Kenneth Myhra <kennethmyhra@serenityos.org> |
5 | | * |
6 | | * SPDX-License-Identifier: BSD-2-Clause |
7 | | */ |
8 | | |
9 | | #include <LibJS/Runtime/PromiseCapability.h> |
10 | | #include <LibWeb/Bindings/Intrinsics.h> |
11 | | #include <LibWeb/Bindings/ReadableStreamPrototype.h> |
12 | | #include <LibWeb/DOM/AbortSignal.h> |
13 | | #include <LibWeb/Streams/AbstractOperations.h> |
14 | | #include <LibWeb/Streams/ReadableByteStreamController.h> |
15 | | #include <LibWeb/Streams/ReadableStream.h> |
16 | | #include <LibWeb/Streams/ReadableStreamBYOBReader.h> |
17 | | #include <LibWeb/Streams/ReadableStreamDefaultController.h> |
18 | | #include <LibWeb/Streams/ReadableStreamDefaultReader.h> |
19 | | #include <LibWeb/Streams/UnderlyingSource.h> |
20 | | #include <LibWeb/WebIDL/ExceptionOr.h> |
21 | | |
22 | | namespace Web::Streams { |
23 | | |
24 | | JS_DEFINE_ALLOCATOR(ReadableStream); |
25 | | |
26 | | // https://streams.spec.whatwg.org/#rs-constructor |
27 | | WebIDL::ExceptionOr<JS::NonnullGCPtr<ReadableStream>> ReadableStream::construct_impl(JS::Realm& realm, Optional<JS::Handle<JS::Object>> const& underlying_source_object, QueuingStrategy const& strategy) |
28 | 0 | { |
29 | 0 | auto& vm = realm.vm(); |
30 | |
|
31 | 0 | auto readable_stream = realm.heap().allocate<ReadableStream>(realm, realm); |
32 | | |
33 | | // 1. If underlyingSource is missing, set it to null. |
34 | 0 | auto underlying_source = underlying_source_object.has_value() ? JS::Value(underlying_source_object.value()) : JS::js_null(); |
35 | | |
36 | | // 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource. |
37 | 0 | auto underlying_source_dict = TRY(UnderlyingSource::from_value(vm, underlying_source)); |
38 | | |
39 | | // 3. Perform ! InitializeReadableStream(this). |
40 | | |
41 | | // 4. If underlyingSourceDict["type"] is "bytes": |
42 | 0 | if (underlying_source_dict.type.has_value() && underlying_source_dict.type.value() == ReadableStreamType::Bytes) { |
43 | | // 1. If strategy["size"] exists, throw a RangeError exception. |
44 | 0 | if (strategy.size) |
45 | 0 | return WebIDL::SimpleException { WebIDL::SimpleExceptionType::RangeError, "Size strategy not allowed for byte stream"sv }; |
46 | | |
47 | | // 2. Let highWaterMark be ? ExtractHighWaterMark(strategy, 0). |
48 | 0 | auto high_water_mark = TRY(extract_high_water_mark(strategy, 0)); |
49 | | |
50 | | // 3. Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark). |
51 | 0 | TRY(set_up_readable_byte_stream_controller_from_underlying_source(*readable_stream, underlying_source, underlying_source_dict, high_water_mark)); |
52 | 0 | } |
53 | | // 5. Otherwise, |
54 | 0 | else { |
55 | | // 1. Assert: underlyingSourceDict["type"] does not exist. |
56 | 0 | VERIFY(!underlying_source_dict.type.has_value()); |
57 | | |
58 | | // 2. Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy). |
59 | 0 | auto size_algorithm = extract_size_algorithm(vm, strategy); |
60 | | |
61 | | // 3. Let highWaterMark be ? ExtractHighWaterMark(strategy, 1). |
62 | 0 | auto high_water_mark = TRY(extract_high_water_mark(strategy, 1)); |
63 | | |
64 | | // 4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm). |
65 | 0 | TRY(set_up_readable_stream_default_controller_from_underlying_source(*readable_stream, underlying_source, underlying_source_dict, high_water_mark, size_algorithm)); |
66 | 0 | } |
67 | | |
68 | 0 | return readable_stream; |
69 | 0 | } |
70 | | |
71 | | // https://streams.spec.whatwg.org/#rs-from |
72 | | WebIDL::ExceptionOr<JS::NonnullGCPtr<ReadableStream>> ReadableStream::from(JS::VM& vm, JS::Value async_iterable) |
73 | 0 | { |
74 | | // 1. Return ? ReadableStreamFromIterable(asyncIterable). |
75 | 0 | return TRY(readable_stream_from_iterable(vm, async_iterable)); |
76 | 0 | } |
77 | | |
78 | | ReadableStream::ReadableStream(JS::Realm& realm) |
79 | 0 | : PlatformObject(realm) |
80 | 0 | { |
81 | 0 | } |
82 | | |
83 | 0 | ReadableStream::~ReadableStream() = default; |
84 | | |
85 | | // https://streams.spec.whatwg.org/#rs-locked |
86 | | bool ReadableStream::locked() const |
87 | 0 | { |
88 | | // 1. Return ! IsReadableStreamLocked(this). |
89 | 0 | return is_readable_stream_locked(*this); |
90 | 0 | } |
91 | | |
92 | | // https://streams.spec.whatwg.org/#rs-cancel |
93 | | JS::NonnullGCPtr<JS::Object> ReadableStream::cancel(JS::Value reason) |
94 | 0 | { |
95 | 0 | auto& realm = this->realm(); |
96 | | |
97 | | // 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception. |
98 | 0 | if (is_readable_stream_locked(*this)) { |
99 | 0 | auto exception = JS::TypeError::create(realm, "Cannot cancel a locked stream"sv); |
100 | 0 | return WebIDL::create_rejected_promise(realm, JS::Value { exception })->promise(); |
101 | 0 | } |
102 | | |
103 | | // 2. Return ! ReadableStreamCancel(this, reason). |
104 | 0 | return readable_stream_cancel(*this, reason)->promise(); |
105 | 0 | } |
106 | | |
107 | | // https://streams.spec.whatwg.org/#rs-get-reader |
108 | | WebIDL::ExceptionOr<ReadableStreamReader> ReadableStream::get_reader(ReadableStreamGetReaderOptions const& options) |
109 | 0 | { |
110 | | // 1. If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this). |
111 | 0 | if (!options.mode.has_value()) |
112 | 0 | return ReadableStreamReader { TRY(acquire_readable_stream_default_reader(*this)) }; |
113 | | |
114 | | // 2. Assert: options["mode"] is "byob". |
115 | 0 | VERIFY(*options.mode == Bindings::ReadableStreamReaderMode::Byob); |
116 | | |
117 | | // 3. Return ? AcquireReadableStreamBYOBReader(this). |
118 | 0 | return ReadableStreamReader { TRY(acquire_readable_stream_byob_reader(*this)) }; |
119 | 0 | } |
120 | | |
121 | | WebIDL::ExceptionOr<JS::NonnullGCPtr<ReadableStream>> ReadableStream::pipe_through(ReadableWritablePair transform, StreamPipeOptions const& options) |
122 | 0 | { |
123 | | // 1. If ! IsReadableStreamLocked(this) is true, throw a TypeError exception. |
124 | 0 | if (is_readable_stream_locked(*this)) |
125 | 0 | return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Failed to execute 'pipeThrough' on 'ReadableStream': Cannot pipe a locked stream"sv }; |
126 | | |
127 | | // 2. If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception. |
128 | 0 | if (is_writable_stream_locked(*transform.writable)) |
129 | 0 | return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Failed to execute 'pipeThrough' on 'ReadableStream': parameter 1's 'writable' is locked"sv }; |
130 | | |
131 | | // 3. Let signal be options["signal"] if it exists, or undefined otherwise. |
132 | 0 | auto signal = options.signal ? JS::Value(options.signal) : JS::js_undefined(); |
133 | | |
134 | | // 4. Let promise be ! ReadableStreamPipeTo(this, transform["writable"], options["preventClose"], options["preventAbort"], options["preventCancel"], signal). |
135 | 0 | auto promise = readable_stream_pipe_to(*this, *transform.writable, options.prevent_close, options.prevent_abort, options.prevent_cancel, signal); |
136 | | |
137 | | // 5. Set promise.[[PromiseIsHandled]] to true. |
138 | 0 | WebIDL::mark_promise_as_handled(*promise); |
139 | | |
140 | | // 6. Return transform["readable"]. |
141 | 0 | return JS::NonnullGCPtr { *transform.readable }; |
142 | 0 | } |
143 | | |
144 | | JS::NonnullGCPtr<JS::Object> ReadableStream::pipe_to(WritableStream& destination, StreamPipeOptions const& options) |
145 | 0 | { |
146 | 0 | auto& realm = this->realm(); |
147 | | |
148 | | // 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception. |
149 | 0 | if (is_readable_stream_locked(*this)) { |
150 | 0 | auto promise = WebIDL::create_promise(realm); |
151 | 0 | WebIDL::reject_promise(realm, promise, JS::TypeError::create(realm, "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe a locked stream"sv)); |
152 | 0 | return promise->promise(); |
153 | 0 | } |
154 | | |
155 | | // 2. If ! IsWritableStreamLocked(destination) is true, return a promise rejected with a TypeError exception. |
156 | 0 | if (is_writable_stream_locked(destination)) { |
157 | 0 | auto promise = WebIDL::create_promise(realm); |
158 | 0 | WebIDL::reject_promise(realm, promise, JS::TypeError::create(realm, "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"sv)); |
159 | 0 | return promise->promise(); |
160 | 0 | } |
161 | | |
162 | | // 3. Let signal be options["signal"] if it exists, or undefined otherwise. |
163 | 0 | auto signal = options.signal ? JS::Value(options.signal) : JS::js_undefined(); |
164 | | |
165 | | // 4. Return ! ReadableStreamPipeTo(this, destination, options["preventClose"], options["preventAbort"], options["preventCancel"], signal). |
166 | 0 | return readable_stream_pipe_to(*this, destination, options.prevent_close, options.prevent_abort, options.prevent_cancel, signal)->promise(); |
167 | 0 | } |
168 | | |
169 | | // https://streams.spec.whatwg.org/#readablestream-tee |
170 | | WebIDL::ExceptionOr<ReadableStreamPair> ReadableStream::tee() |
171 | 0 | { |
172 | | // To tee a ReadableStream stream, return ? ReadableStreamTee(stream, true). |
173 | 0 | return TRY(readable_stream_tee(realm(), *this, true)); |
174 | 0 | } |
175 | | |
176 | | // https://streams.spec.whatwg.org/#readablestream-close |
177 | | void ReadableStream::close() |
178 | 0 | { |
179 | 0 | controller()->visit( |
180 | | // 1. If stream.[[controller]] implements ReadableByteStreamController |
181 | 0 | [&](JS::NonnullGCPtr<ReadableByteStreamController> controller) { |
182 | | // 1. Perform ! ReadableByteStreamControllerClose(stream.[[controller]]). |
183 | 0 | MUST(readable_byte_stream_controller_close(controller)); |
184 | | |
185 | | // 2. If stream.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(stream.[[controller]], 0). |
186 | 0 | if (!controller->pending_pull_intos().is_empty()) |
187 | 0 | MUST(readable_byte_stream_controller_respond(controller, 0)); |
188 | 0 | }, |
189 | | |
190 | | // 2. Otherwise, perform ! ReadableStreamDefaultControllerClose(stream.[[controller]]). |
191 | 0 | [&](JS::NonnullGCPtr<ReadableStreamDefaultController> controller) { |
192 | 0 | readable_stream_default_controller_close(*controller); |
193 | 0 | }); |
194 | 0 | } |
195 | | |
196 | | // https://streams.spec.whatwg.org/#readablestream-error |
197 | | void ReadableStream::error(JS::Value error) |
198 | 0 | { |
199 | 0 | controller()->visit( |
200 | | // 1. If stream.[[controller]] implements ReadableByteStreamController, then perform |
201 | | // ! ReadableByteStreamControllerError(stream.[[controller]], e). |
202 | 0 | [&](JS::NonnullGCPtr<ReadableByteStreamController> controller) { |
203 | 0 | readable_byte_stream_controller_error(controller, error); |
204 | 0 | }, |
205 | | |
206 | | // 2. Otherwise, perform ! ReadableStreamDefaultControllerError(stream.[[controller]], e). |
207 | 0 | [&](JS::NonnullGCPtr<ReadableStreamDefaultController> controller) { |
208 | 0 | readable_stream_default_controller_error(controller, error); |
209 | 0 | }); |
210 | 0 | } |
211 | | |
212 | | void ReadableStream::initialize(JS::Realm& realm) |
213 | 0 | { |
214 | 0 | Base::initialize(realm); |
215 | 0 | WEB_SET_PROTOTYPE_FOR_INTERFACE(ReadableStream); |
216 | 0 | } |
217 | | |
218 | | void ReadableStream::visit_edges(Cell::Visitor& visitor) |
219 | 0 | { |
220 | 0 | Base::visit_edges(visitor); |
221 | 0 | if (m_controller.has_value()) |
222 | 0 | m_controller->visit([&](auto& controller) { visitor.visit(controller); }); Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_0::operator()<JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultController> >(JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultController>&) const Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_0::operator()<JS::NonnullGCPtr<Web::Streams::ReadableByteStreamController> >(JS::NonnullGCPtr<Web::Streams::ReadableByteStreamController>&) const |
223 | 0 | visitor.visit(m_stored_error); |
224 | 0 | if (m_reader.has_value()) |
225 | 0 | m_reader->visit([&](auto& reader) { visitor.visit(reader); }); Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_1::operator()<JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultReader> >(JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultReader>&) const Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_1::operator()<JS::NonnullGCPtr<Web::Streams::ReadableStreamBYOBReader> >(JS::NonnullGCPtr<Web::Streams::ReadableStreamBYOBReader>&) const |
226 | 0 | } |
227 | | |
228 | | // https://streams.spec.whatwg.org/#readablestream-locked |
229 | | bool ReadableStream::is_readable() const |
230 | 0 | { |
231 | | // A ReadableStream stream is readable if stream.[[state]] is "readable". |
232 | 0 | return m_state == State::Readable; |
233 | 0 | } |
234 | | |
235 | | // https://streams.spec.whatwg.org/#readablestream-closed |
236 | | bool ReadableStream::is_closed() const |
237 | 0 | { |
238 | | // A ReadableStream stream is closed if stream.[[state]] is "closed". |
239 | 0 | return m_state == State::Closed; |
240 | 0 | } |
241 | | |
242 | | // https://streams.spec.whatwg.org/#readablestream-errored |
243 | | bool ReadableStream::is_errored() const |
244 | 0 | { |
245 | | // A ReadableStream stream is errored if stream.[[state]] is "errored". |
246 | 0 | return m_state == State::Errored; |
247 | 0 | } |
248 | | // https://streams.spec.whatwg.org/#readablestream-locked |
249 | | bool ReadableStream::is_locked() const |
250 | 0 | { |
251 | | // A ReadableStream stream is locked if ! IsReadableStreamLocked(stream) returns true. |
252 | 0 | return is_readable_stream_locked(*this); |
253 | 0 | } |
254 | | |
255 | | // https://streams.spec.whatwg.org/#is-readable-stream-disturbed |
256 | | bool ReadableStream::is_disturbed() const |
257 | 0 | { |
258 | | // A ReadableStream stream is disturbed if stream.[[disturbed]] is true. |
259 | 0 | return m_disturbed; |
260 | 0 | } |
261 | | |
262 | | } |