/src/node/src/quic/streams.h
Line | Count | Source |
1 | | #pragma once |
2 | | |
3 | | #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
4 | | |
5 | | #include <aliased_struct.h> |
6 | | #include <async_wrap.h> |
7 | | #include <base_object.h> |
8 | | #include <dataqueue/queue.h> |
9 | | #include <env.h> |
10 | | #include <memory_tracker.h> |
11 | | #include <node_blob.h> |
12 | | #include <node_bob.h> |
13 | | #include <node_http_common.h> |
14 | | #include <util.h> |
15 | | #include "bindingdata.h" |
16 | | #include "data.h" |
17 | | |
18 | | namespace node::quic { |
19 | | |
20 | | class Session; |
21 | | class Stream; |
22 | | |
23 | | using Ngtcp2Source = bob::SourceImpl<ngtcp2_vec>; |
24 | | |
25 | | // When a request to open a stream is made before a Session is able to actually |
26 | | // open a stream (either because the handshake is not yet sufficiently complete |
27 | | // or concurrency limits are temporarily reached) then the request to open the |
28 | | // stream is represented as a queued PendingStream. |
29 | | // |
30 | | // The PendingStream instance itself is owned by the stream created but a |
31 | | // reference sits in a linked list in the session. |
32 | | // |
33 | | // The PendingStream request can be canceled by dropping the PendingStream |
34 | | // instance before it can be fulfilled, at which point it is removed from the |
35 | | // pending stream queue. |
36 | | // |
37 | | // Note that only locally initiated streams can be created in a pending state. |
38 | | class PendingStream final { |
39 | | public: |
40 | | PendingStream(Direction direction, |
41 | | Stream* stream, |
42 | | BaseObjectWeakPtr<Session> session); |
43 | | DISALLOW_COPY_AND_MOVE(PendingStream) |
44 | | ~PendingStream(); |
45 | | |
46 | | // Called when the stream has been opened. Transitions the stream from a |
47 | | // pending state to an opened state. |
48 | | void fulfill(stream_id id); |
49 | | |
50 | | // Called when opening the stream fails or is canceled. Transitions the |
51 | | // stream into a closed/destroyed state. |
52 | | void reject(QuicError error = QuicError()); |
53 | | |
54 | 0 | inline Direction direction() const { return direction_; } |
55 | | |
56 | | private: |
57 | | Direction direction_; |
58 | | Stream* stream_; |
59 | | BaseObjectWeakPtr<Session> session_; |
60 | | bool waiting_ = true; |
61 | | |
62 | | ListNode<PendingStream> pending_stream_queue_; |
63 | | |
64 | | public: |
65 | | using PendingStreamQueue = |
66 | | ListHead<PendingStream, &PendingStream::pending_stream_queue_>; |
67 | | }; |
68 | | |
69 | | // QUIC Stream's are simple data flows that may be: |
70 | | // |
71 | | // * Bidirectional (both sides can send) or Unidirectional (one side can send) |
72 | | // * Server or Client Initiated |
73 | | // |
74 | | // The flow direction and origin of the stream are important in determining the |
75 | | // write and read state (Open or Closed). Specifically: |
76 | | // |
77 | | // Bidirectional Stream States: |
78 | | // +--------+--------------+----------+----------+ |
79 | | // | ON | Initiated By | Readable | Writable | |
80 | | // +--------+--------------+----------+----------+ |
81 | | // | Server | Server | Y | Y | |
82 | | // +--------+--------------+----------+----------+ |
83 | | // | Server | Client | Y | Y | |
84 | | // +--------+--------------+----------+----------+ |
85 | | // | Client | Server | Y | Y | |
86 | | // +--------+--------------+----------+----------+ |
87 | | // | Client | Client | Y | Y | |
88 | | // +--------+--------------+----------+----------+ |
89 | | // |
90 | | // Unidirectional Stream States |
91 | | // +--------+--------------+----------+----------+ |
92 | | // | ON | Initiated By | Readable | Writable | |
93 | | // +--------+--------------+----------+----------+ |
94 | | // | Server | Server | N | Y | |
95 | | // +--------+--------------+----------+----------+ |
96 | | // | Server | Client | Y | N | |
97 | | // +--------+--------------+----------+----------+ |
98 | | // | Client | Server | Y | N | |
99 | | // +--------+--------------+----------+----------+ |
100 | | // | Client | Client | N | Y | |
101 | | // +--------+--------------+----------+----------+ |
102 | | // |
103 | | // All data sent via the Stream is buffered internally until either receipt is |
104 | | // acknowledged from the peer or attempts to send are abandoned. The fact that |
105 | | // data is buffered in memory makes it essential that the flow control for the |
106 | | // session and the stream are properly handled. For now, we are largely relying |
107 | | // on ngtcp2's default flow control mechanisms which generally should be doing |
108 | | // the right thing. From the JavaScript side, the application pushes data into |
109 | | // the stream's outbound queue and ngtcp2 pulls data from that queue as it is |
110 | | // able. The stream outbound has a high watermark. The JS side can choose to |
111 | | // continue writing data even after the high watermark is reached but this |
112 | | // risks using up large amounts of memory if the session is slow to send data |
113 | | // or the peer is slow to acknowledge receipt. The JavaScript side needs to |
114 | | // be aware of this risk and pay proper attention to the backpressure signals. |
115 | | // |
116 | | // A Stream may be in a fully closed state (No longer readable nor writable) |
117 | | // state but still have unacknowledged data in both the inbound and outbound |
118 | | // queues. |
119 | | // |
120 | | // A Stream is gracefully closed when (a) both read and write states are closed, |
121 | | // (b) all queued data has been acknowledged. |
122 | | // |
123 | | // The Stream may be forcefully closed immediately using destroy(err). This |
124 | | // causes all queued outbound data to be cleared, pending JavaScript writes |
125 | | // to be abandoned, the Stream to be immediately closed at the ngtcp2 level |
126 | | // without waiting for any outstanding acknowledgements. Keep in mind, however, |
127 | | // that the peer is not notified that the stream is destroyed and may attempt |
128 | | // to continue sending data and acknowledgements until it is able to determine |
129 | | // that the stream is gone. Any data that has already been received and is in |
130 | | // the inbound queue is preserved and may be read by the application. |
131 | | // |
132 | | // QUIC streams in general do not have headers. Some QUIC applications, however, |
133 | | // may associate headers with the stream (HTTP/3 for instance). As a |
134 | | // convenience, the Stream class will hold onto these headers for the |
135 | | // application. |
136 | | // |
137 | | // Streams may be created in a pending state. This means that while the Stream |
138 | | // object is created, it has not yet been opened in ngtcp2 and therefore has |
139 | | // no official status yet. Certain operations can still be performed on the |
140 | | // stream object such as providing data, adding headers, or destroying the |
141 | | // stream. |
142 | | // |
143 | | // When a stream is created the data source for the stream must be given. |
144 | | // If no data source is given, then the stream is assumed to not have any |
145 | | // outbound data. If the stream was created as bidirectional, the outbound |
146 | | // side will be closed. The data source can be fixed length or may support |
147 | | // streaming. What this means practically is, when a stream is opened, |
148 | | // you must already have a sense of whether that will provide data or |
149 | | // not. When in doubt, specify a streaming data source, which can produce |
150 | | // zero-length output. |
151 | | class Stream final : public AsyncWrap, |
152 | | public Ngtcp2Source, |
153 | | public DataQueue::BackpressureListener { |
154 | | public: |
155 | | using Header = NgHeaderBase<BindingData>; |
156 | | |
157 | | // Acquire a DataQueue from the given value if it is valid. The return |
158 | | // follows the typical V8 rules for Maybe types. If an error occurs, |
159 | | // the Maybe will be empty and an exception will be set on the isolate. |
160 | | static v8::Maybe<std::shared_ptr<DataQueue>> GetDataQueueFromSource( |
161 | | Environment* env, v8::Local<v8::Value> value); |
162 | | |
163 | | // The stream_user_data field is from ngtcp2 and will point to the |
164 | | // Stream instance associated with the stream_id. |
165 | | static Stream* From(void* stream_user_data); |
166 | | |
167 | | JS_CONSTRUCTOR(Stream); |
168 | | JS_BINDING_INIT_BOILERPLATE(); |
169 | | |
170 | | // Creates a new non-pending stream. The directionality of the stream |
171 | | // is inferred from the stream id. |
172 | | static BaseObjectPtr<Stream> Create( |
173 | | Session* session, |
174 | | stream_id id, |
175 | | std::shared_ptr<DataQueue> source = nullptr); |
176 | | |
177 | | // Creates a new pending stream. |
178 | | static BaseObjectPtr<Stream> Create( |
179 | | Session* session, |
180 | | Direction direction, |
181 | | std::shared_ptr<DataQueue> source = nullptr); |
182 | | |
183 | | // The constructor is only public to be visible by MakeDetachedBaseObject. |
184 | | // Call Create to create new instances of Stream. |
185 | | Stream(BaseObjectWeakPtr<Session> session, |
186 | | v8::Local<v8::Object> obj, |
187 | | stream_id id, |
188 | | std::shared_ptr<DataQueue> source); |
189 | | |
190 | | // Creates the stream in a pending state. The constructor is only public |
191 | | // to be visible to MakeDetachedBaseObject. Call Create to create new |
192 | | // instances of Stream. |
193 | | Stream(BaseObjectWeakPtr<Session> session, |
194 | | v8::Local<v8::Object> obj, |
195 | | Direction direction, |
196 | | std::shared_ptr<DataQueue> source); |
197 | | DISALLOW_COPY_AND_MOVE(Stream) |
198 | | ~Stream() override; |
199 | | |
200 | | // While the stream is still pending, the id will be kMaxStreamId, |
201 | | // inidicating the maximum possible stream id is kMaxStreamId - 1. |
202 | | stream_id id() const; |
203 | | |
204 | | // While the stream is still pending, the origin will be invalid. |
205 | | Side origin() const; |
206 | | |
207 | | Direction direction() const; |
208 | | |
209 | | Session& session() const; |
210 | | |
211 | | // True if this stream was created in a pending state and is still waiting |
212 | | // to be created. |
213 | | bool is_pending() const; |
214 | | |
215 | | // True if we've completely sent all outbound data for this stream. |
216 | | // Importantly, this does not necessarily mean that we are completely |
217 | | // done with the outbound data. We may still be waiting on outbound |
218 | | // data to be acknowledged by the remote peer. |
219 | | bool is_eos() const; |
220 | | |
221 | | // True if this stream is still in a readable state. |
222 | | bool is_readable() const; |
223 | | |
224 | | // True if this stream is still in a writable state. |
225 | | bool is_writable() const; |
226 | | |
227 | | // Called by the session/application to indicate that the specified number |
228 | | // of bytes have been acknowledged by the peer. |
229 | | void Acknowledge(size_t datalen); |
230 | | |
231 | | // Called by the session/application to indicate that the specified number |
232 | | // of bytes have been transmitted to the peer. This is an initial |
233 | | // indication occuring the first time data is sent. It does not indicate |
234 | | // that the data has been retransmitted due to loss or has been |
235 | | // acknowledged to have been received by the peer. |
236 | | void Commit(size_t datalen); |
237 | | |
238 | | void EndWritable(); |
239 | | void EndReadable(std::optional<uint64_t> maybe_final_size = std::nullopt); |
240 | | void EntryRead(size_t amount) override; |
241 | | |
242 | | // Pulls data from the internal outbound DataQueue configured for this stream. |
243 | | // This is called by the session/application when it is preparing to send |
244 | | // data to the peer. There is no guarantee that the requested amount of data |
245 | | // will actually be sent. The amount of data actually sent is indicated |
246 | | // by the datalen argument to the Commit() method. |
247 | | int DoPull(bob::Next<ngtcp2_vec> next, |
248 | | int options, |
249 | | ngtcp2_vec* data, |
250 | | size_t count, |
251 | | size_t max_count_hint) override; |
252 | | |
253 | | // Forcefully close the stream immediately. Data already queued in the |
254 | | // inbound is preserved but new data will not be accepted. All pending |
255 | | // writes are abandoned, and the stream is immediately closed at the ngtcp2 |
256 | | // level without waiting for any outstanding acknowledgements. |
257 | | void Destroy(QuicError error = QuicError()); |
258 | | |
259 | | struct ReceiveDataFlags final { |
260 | | // Identifies the final chunk of data that the peer will send for the |
261 | | // stream. |
262 | | bool fin = false; |
263 | | // Indicates that this chunk of data was received in a 0RTT packet before |
264 | | // the TLS handshake completed, suggesting that is is not as secure and |
265 | | // could be replayed by an attacker. |
266 | | bool early = false; |
267 | | }; |
268 | | |
269 | | void ReceiveData(const uint8_t* data, size_t len, ReceiveDataFlags flags); |
270 | | void ReceiveStopSending(QuicError error); |
271 | | void ReceiveStreamReset(uint64_t final_size, QuicError error); |
272 | | |
273 | | // Currently, only HTTP/3 streams support headers. These methods are here |
274 | | // to support that. They are not used when using any other QUIC application. |
275 | | |
276 | | void BeginHeaders(HeadersKind kind); |
277 | | void set_headers_kind(HeadersKind kind); |
278 | | // Returns false if the header cannot be added. This will typically happen |
279 | | // if the application does not support headers, a maximum number of headers |
280 | | // have already been added, or the maximum total header length is reached. |
281 | | bool AddHeader(const Header& header); |
282 | | |
283 | | SET_NO_MEMORY_INFO() |
284 | | SET_MEMORY_INFO_NAME(Stream) |
285 | | SET_SELF_SIZE(Stream) |
286 | | |
287 | | struct State; |
288 | | struct Stats; |
289 | | |
290 | | private: |
291 | | struct Impl; |
292 | | struct PendingHeaders; |
293 | | |
294 | | class Outbound; |
295 | | |
296 | | // Gets a reader for the data received for this stream from the peer, |
297 | | BaseObjectPtr<Blob::Reader> get_reader(); |
298 | | |
299 | | void set_final_size(uint64_t amount); |
300 | | void set_outbound(std::shared_ptr<DataQueue> source); |
301 | | |
302 | | bool is_local_unidirectional() const; |
303 | | bool is_remote_unidirectional() const; |
304 | | |
305 | | // JavaScript callouts |
306 | | |
307 | | // Notifies the JavaScript side that the stream has been destroyed. |
308 | | void EmitClose(const QuicError& error); |
309 | | |
310 | | // Notifies the JavaScript side that the stream has been reset. |
311 | | void EmitReset(const QuicError& error); |
312 | | |
313 | | // Notifies the JavaScript side that the application is ready to receive |
314 | | // trailing headers. Any trailing headers must be sent immediately, and |
315 | | // synchronously when this callback is triggered. |
316 | | void EmitWantTrailers(); |
317 | | |
318 | | // Notifies the JavaScript side that sending data on the stream has been |
319 | | // blocked because of flow control restriction. |
320 | | void EmitBlocked(); |
321 | | |
322 | | // Delivers the set of inbound headers that have been collected. |
323 | | void EmitHeaders(); |
324 | | |
325 | | void NotifyReadableEnded(error_code code); |
326 | | void NotifyWritableEnded(error_code code); |
327 | | |
328 | | // When a pending stream is finally opened, the NotifyStreamOpened method |
329 | | // will be called and the id will be assigned. |
330 | | void NotifyStreamOpened(stream_id id); |
331 | | void EnqueuePendingHeaders(HeadersKind kind, |
332 | | v8::Local<v8::Array> headers, |
333 | | HeadersFlags flags); |
334 | | |
335 | | AliasedStruct<Stats> stats_; |
336 | | AliasedStruct<State> state_; |
337 | | BaseObjectWeakPtr<Session> session_; |
338 | | std::unique_ptr<Outbound> outbound_; |
339 | | std::shared_ptr<DataQueue> inbound_; |
340 | | |
341 | | // If the stream cannot be opened yet, it will be created in a pending state. |
342 | | // Once the owning session is able to, it will complete opening of the stream |
343 | | // and the stream id will be assigned. |
344 | | std::optional<std::unique_ptr<PendingStream>> maybe_pending_stream_ = |
345 | | std::nullopt; |
346 | | std::vector<std::unique_ptr<PendingHeaders>> pending_headers_queue_; |
347 | | error_code pending_close_read_code_ = 0; |
348 | | error_code pending_close_write_code_ = 0; |
349 | | |
350 | | struct PendingPriority { |
351 | | StreamPriority priority; |
352 | | StreamPriorityFlags flags; |
353 | | }; |
354 | | std::optional<PendingPriority> pending_priority_ = std::nullopt; |
355 | | |
356 | | // The headers_ field holds a block of headers that have been received and |
357 | | // are being buffered for delivery to the JavaScript side. |
358 | | // TODO(@jasnell): Use v8::Global instead of v8::Local here. |
359 | | v8::LocalVector<v8::Value> headers_; |
360 | | |
361 | | // The headers_kind_ field indicates the kind of headers that are being |
362 | | // buffered. |
363 | | HeadersKind headers_kind_ = HeadersKind::INITIAL; |
364 | | |
365 | | // The headers_length_ field holds the total length of the headers that have |
366 | | // been buffered. |
367 | | size_t headers_length_ = 0; |
368 | | |
369 | | friend struct Impl; |
370 | | friend class PendingStream; |
371 | | friend class Http3ApplicationImpl; |
372 | | friend class DefaultApplication; |
373 | | |
374 | | public: |
375 | | // The Queue/Schedule/Unschedule here are part of the mechanism used to |
376 | | // determine which streams have data to send on the session. When a stream |
377 | | // potentially has data available, it will be scheduled in the Queue. Then, |
378 | | // when the Session::Application starts sending pending data, it will check |
379 | | // the queue to see if there are streams waiting. If there are, it will grab |
380 | | // one and check to see if there is data to send. When a stream does not have |
381 | | // data to send (such as when it is initially created or is using an async |
382 | | // source that is still waiting for data to be pushed) it will not appear in |
383 | | // the queue. |
384 | | ListNode<Stream> stream_queue_; |
385 | | using Queue = ListHead<Stream, &Stream::stream_queue_>; |
386 | | |
387 | | void Schedule(Queue* queue); |
388 | | void Unschedule(); |
389 | | }; |
390 | | |
391 | | } // namespace node::quic |
392 | | |
393 | | #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |