Coverage Report

Created: 2025-03-04 07:22

/src/serenity/Userland/Libraries/LibWeb/Streams/ReadableStream.cpp
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2022, Linus Groh <linusg@serenityos.org>
3
 * Copyright (c) 2023-2024, Shannon Booth <shannon@serenityos.org>
4
 * Copyright (c) 2024, Kenneth Myhra <kennethmyhra@serenityos.org>
5
 *
6
 * SPDX-License-Identifier: BSD-2-Clause
7
 */
8
9
#include <LibJS/Runtime/PromiseCapability.h>
10
#include <LibWeb/Bindings/Intrinsics.h>
11
#include <LibWeb/Bindings/ReadableStreamPrototype.h>
12
#include <LibWeb/DOM/AbortSignal.h>
13
#include <LibWeb/Streams/AbstractOperations.h>
14
#include <LibWeb/Streams/ReadableByteStreamController.h>
15
#include <LibWeb/Streams/ReadableStream.h>
16
#include <LibWeb/Streams/ReadableStreamBYOBReader.h>
17
#include <LibWeb/Streams/ReadableStreamDefaultController.h>
18
#include <LibWeb/Streams/ReadableStreamDefaultReader.h>
19
#include <LibWeb/Streams/UnderlyingSource.h>
20
#include <LibWeb/WebIDL/ExceptionOr.h>
21
22
namespace Web::Streams {
23
24
JS_DEFINE_ALLOCATOR(ReadableStream);
25
26
// https://streams.spec.whatwg.org/#rs-constructor
27
WebIDL::ExceptionOr<JS::NonnullGCPtr<ReadableStream>> ReadableStream::construct_impl(JS::Realm& realm, Optional<JS::Handle<JS::Object>> const& underlying_source_object, QueuingStrategy const& strategy)
28
0
{
29
0
    auto& vm = realm.vm();
30
31
0
    auto readable_stream = realm.heap().allocate<ReadableStream>(realm, realm);
32
33
    // 1. If underlyingSource is missing, set it to null.
34
0
    auto underlying_source = underlying_source_object.has_value() ? JS::Value(underlying_source_object.value()) : JS::js_null();
35
36
    // 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource.
37
0
    auto underlying_source_dict = TRY(UnderlyingSource::from_value(vm, underlying_source));
38
39
    // 3. Perform ! InitializeReadableStream(this).
40
41
    // 4. If underlyingSourceDict["type"] is "bytes":
42
0
    if (underlying_source_dict.type.has_value() && underlying_source_dict.type.value() == ReadableStreamType::Bytes) {
43
        // 1. If strategy["size"] exists, throw a RangeError exception.
44
0
        if (strategy.size)
45
0
            return WebIDL::SimpleException { WebIDL::SimpleExceptionType::RangeError, "Size strategy not allowed for byte stream"sv };
46
47
        // 2. Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
48
0
        auto high_water_mark = TRY(extract_high_water_mark(strategy, 0));
49
50
        // 3. Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark).
51
0
        TRY(set_up_readable_byte_stream_controller_from_underlying_source(*readable_stream, underlying_source, underlying_source_dict, high_water_mark));
52
0
    }
53
    // 5. Otherwise,
54
0
    else {
55
        // 1. Assert: underlyingSourceDict["type"] does not exist.
56
0
        VERIFY(!underlying_source_dict.type.has_value());
57
58
        // 2. Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
59
0
        auto size_algorithm = extract_size_algorithm(vm, strategy);
60
61
        // 3. Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
62
0
        auto high_water_mark = TRY(extract_high_water_mark(strategy, 1));
63
64
        // 4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm).
65
0
        TRY(set_up_readable_stream_default_controller_from_underlying_source(*readable_stream, underlying_source, underlying_source_dict, high_water_mark, size_algorithm));
66
0
    }
67
68
0
    return readable_stream;
69
0
}
70
71
// https://streams.spec.whatwg.org/#rs-from
72
WebIDL::ExceptionOr<JS::NonnullGCPtr<ReadableStream>> ReadableStream::from(JS::VM& vm, JS::Value async_iterable)
73
0
{
74
    // 1. Return ? ReadableStreamFromIterable(asyncIterable).
75
0
    return TRY(readable_stream_from_iterable(vm, async_iterable));
76
0
}
77
78
ReadableStream::ReadableStream(JS::Realm& realm)
79
0
    : PlatformObject(realm)
80
0
{
81
0
}
82
83
0
ReadableStream::~ReadableStream() = default;
84
85
// https://streams.spec.whatwg.org/#rs-locked
86
bool ReadableStream::locked() const
87
0
{
88
    // 1. Return ! IsReadableStreamLocked(this).
89
0
    return is_readable_stream_locked(*this);
90
0
}
91
92
// https://streams.spec.whatwg.org/#rs-cancel
93
JS::NonnullGCPtr<JS::Object> ReadableStream::cancel(JS::Value reason)
94
0
{
95
0
    auto& realm = this->realm();
96
97
    // 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
98
0
    if (is_readable_stream_locked(*this)) {
99
0
        auto exception = JS::TypeError::create(realm, "Cannot cancel a locked stream"sv);
100
0
        return WebIDL::create_rejected_promise(realm, JS::Value { exception })->promise();
101
0
    }
102
103
    // 2. Return ! ReadableStreamCancel(this, reason).
104
0
    return readable_stream_cancel(*this, reason)->promise();
105
0
}
106
107
// https://streams.spec.whatwg.org/#rs-get-reader
108
WebIDL::ExceptionOr<ReadableStreamReader> ReadableStream::get_reader(ReadableStreamGetReaderOptions const& options)
109
0
{
110
    // 1. If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
111
0
    if (!options.mode.has_value())
112
0
        return ReadableStreamReader { TRY(acquire_readable_stream_default_reader(*this)) };
113
114
    // 2. Assert: options["mode"] is "byob".
115
0
    VERIFY(*options.mode == Bindings::ReadableStreamReaderMode::Byob);
116
117
    // 3. Return ? AcquireReadableStreamBYOBReader(this).
118
0
    return ReadableStreamReader { TRY(acquire_readable_stream_byob_reader(*this)) };
119
0
}
120
121
WebIDL::ExceptionOr<JS::NonnullGCPtr<ReadableStream>> ReadableStream::pipe_through(ReadableWritablePair transform, StreamPipeOptions const& options)
122
0
{
123
    // 1. If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
124
0
    if (is_readable_stream_locked(*this))
125
0
        return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Failed to execute 'pipeThrough' on 'ReadableStream': Cannot pipe a locked stream"sv };
126
127
    // 2. If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
128
0
    if (is_writable_stream_locked(*transform.writable))
129
0
        return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Failed to execute 'pipeThrough' on 'ReadableStream': parameter 1's 'writable' is locked"sv };
130
131
    // 3. Let signal be options["signal"] if it exists, or undefined otherwise.
132
0
    auto signal = options.signal ? JS::Value(options.signal) : JS::js_undefined();
133
134
    // 4. Let promise be ! ReadableStreamPipeTo(this, transform["writable"], options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
135
0
    auto promise = readable_stream_pipe_to(*this, *transform.writable, options.prevent_close, options.prevent_abort, options.prevent_cancel, signal);
136
137
    // 5. Set promise.[[PromiseIsHandled]] to true.
138
0
    WebIDL::mark_promise_as_handled(*promise);
139
140
    // 6. Return transform["readable"].
141
0
    return JS::NonnullGCPtr { *transform.readable };
142
0
}
143
144
JS::NonnullGCPtr<JS::Object> ReadableStream::pipe_to(WritableStream& destination, StreamPipeOptions const& options)
145
0
{
146
0
    auto& realm = this->realm();
147
148
    // 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
149
0
    if (is_readable_stream_locked(*this)) {
150
0
        auto promise = WebIDL::create_promise(realm);
151
0
        WebIDL::reject_promise(realm, promise, JS::TypeError::create(realm, "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe a locked stream"sv));
152
0
        return promise->promise();
153
0
    }
154
155
    // 2. If ! IsWritableStreamLocked(destination) is true, return a promise rejected with a TypeError exception.
156
0
    if (is_writable_stream_locked(destination)) {
157
0
        auto promise = WebIDL::create_promise(realm);
158
0
        WebIDL::reject_promise(realm, promise, JS::TypeError::create(realm, "Failed to execute 'pipeTo' on 'ReadableStream':  Cannot pipe to a locked stream"sv));
159
0
        return promise->promise();
160
0
    }
161
162
    // 3. Let signal be options["signal"] if it exists, or undefined otherwise.
163
0
    auto signal = options.signal ? JS::Value(options.signal) : JS::js_undefined();
164
165
    // 4. Return ! ReadableStreamPipeTo(this, destination, options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
166
0
    return readable_stream_pipe_to(*this, destination, options.prevent_close, options.prevent_abort, options.prevent_cancel, signal)->promise();
167
0
}
168
169
// https://streams.spec.whatwg.org/#readablestream-tee
170
WebIDL::ExceptionOr<ReadableStreamPair> ReadableStream::tee()
171
0
{
172
    // To tee a ReadableStream stream, return ? ReadableStreamTee(stream, true).
173
0
    return TRY(readable_stream_tee(realm(), *this, true));
174
0
}
175
176
// https://streams.spec.whatwg.org/#readablestream-close
177
void ReadableStream::close()
178
0
{
179
0
    controller()->visit(
180
        // 1. If stream.[[controller]] implements ReadableByteStreamController
181
0
        [&](JS::NonnullGCPtr<ReadableByteStreamController> controller) {
182
            // 1. Perform ! ReadableByteStreamControllerClose(stream.[[controller]]).
183
0
            MUST(readable_byte_stream_controller_close(controller));
184
185
            // 2. If stream.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(stream.[[controller]], 0).
186
0
            if (!controller->pending_pull_intos().is_empty())
187
0
                MUST(readable_byte_stream_controller_respond(controller, 0));
188
0
        },
189
190
        // 2. Otherwise, perform ! ReadableStreamDefaultControllerClose(stream.[[controller]]).
191
0
        [&](JS::NonnullGCPtr<ReadableStreamDefaultController> controller) {
192
0
            readable_stream_default_controller_close(*controller);
193
0
        });
194
0
}
195
196
// https://streams.spec.whatwg.org/#readablestream-error
197
void ReadableStream::error(JS::Value error)
198
0
{
199
0
    controller()->visit(
200
        // 1. If stream.[[controller]] implements ReadableByteStreamController, then perform
201
        //    ! ReadableByteStreamControllerError(stream.[[controller]], e).
202
0
        [&](JS::NonnullGCPtr<ReadableByteStreamController> controller) {
203
0
            readable_byte_stream_controller_error(controller, error);
204
0
        },
205
206
        // 2. Otherwise, perform ! ReadableStreamDefaultControllerError(stream.[[controller]], e).
207
0
        [&](JS::NonnullGCPtr<ReadableStreamDefaultController> controller) {
208
0
            readable_stream_default_controller_error(controller, error);
209
0
        });
210
0
}
211
212
void ReadableStream::initialize(JS::Realm& realm)
213
0
{
214
0
    Base::initialize(realm);
215
0
    WEB_SET_PROTOTYPE_FOR_INTERFACE(ReadableStream);
216
0
}
217
218
void ReadableStream::visit_edges(Cell::Visitor& visitor)
219
0
{
220
0
    Base::visit_edges(visitor);
221
0
    if (m_controller.has_value())
222
0
        m_controller->visit([&](auto& controller) { visitor.visit(controller); });
Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_0::operator()<JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultController> >(JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultController>&) const
Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_0::operator()<JS::NonnullGCPtr<Web::Streams::ReadableByteStreamController> >(JS::NonnullGCPtr<Web::Streams::ReadableByteStreamController>&) const
223
0
    visitor.visit(m_stored_error);
224
0
    if (m_reader.has_value())
225
0
        m_reader->visit([&](auto& reader) { visitor.visit(reader); });
Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_1::operator()<JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultReader> >(JS::NonnullGCPtr<Web::Streams::ReadableStreamDefaultReader>&) const
Unexecuted instantiation: ReadableStream.cpp:auto Web::Streams::ReadableStream::visit_edges(JS::Cell::Visitor&)::$_1::operator()<JS::NonnullGCPtr<Web::Streams::ReadableStreamBYOBReader> >(JS::NonnullGCPtr<Web::Streams::ReadableStreamBYOBReader>&) const
226
0
}
227
228
// https://streams.spec.whatwg.org/#readablestream-locked
229
bool ReadableStream::is_readable() const
230
0
{
231
    // A ReadableStream stream is readable if stream.[[state]] is "readable".
232
0
    return m_state == State::Readable;
233
0
}
234
235
// https://streams.spec.whatwg.org/#readablestream-closed
236
bool ReadableStream::is_closed() const
237
0
{
238
    // A ReadableStream stream is closed if stream.[[state]] is "closed".
239
0
    return m_state == State::Closed;
240
0
}
241
242
// https://streams.spec.whatwg.org/#readablestream-errored
243
bool ReadableStream::is_errored() const
244
0
{
245
    // A ReadableStream stream is errored if stream.[[state]] is "errored".
246
0
    return m_state == State::Errored;
247
0
}
248
// https://streams.spec.whatwg.org/#readablestream-locked
249
bool ReadableStream::is_locked() const
250
0
{
251
    // A ReadableStream stream is locked if ! IsReadableStreamLocked(stream) returns true.
252
0
    return is_readable_stream_locked(*this);
253
0
}
254
255
// https://streams.spec.whatwg.org/#is-readable-stream-disturbed
256
bool ReadableStream::is_disturbed() const
257
0
{
258
    // A ReadableStream stream is disturbed if stream.[[disturbed]] is true.
259
0
    return m_disturbed;
260
0
}
261
262
}