Coverage Report

Created: 2026-01-21 08:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/node/src/quic/streams.h
Line
Count
Source
1
#pragma once
2
3
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
4
5
#include <aliased_struct.h>
6
#include <async_wrap.h>
7
#include <base_object.h>
8
#include <dataqueue/queue.h>
9
#include <env.h>
10
#include <memory_tracker.h>
11
#include <node_blob.h>
12
#include <node_bob.h>
13
#include <node_http_common.h>
14
#include <util.h>
15
#include "bindingdata.h"
16
#include "data.h"
17
18
namespace node::quic {
19
20
class Session;
21
class Stream;
22
23
using Ngtcp2Source = bob::SourceImpl<ngtcp2_vec>;
24
25
// When a request to open a stream is made before a Session is able to actually
26
// open a stream (either because the handshake is not yet sufficiently complete
27
// or concurrency limits are temporarily reached) then the request to open the
28
// stream is represented as a queued PendingStream.
29
//
30
// The PendingStream instance itself is owned by the stream created but a
31
// reference sits in a linked list in the session.
32
//
33
// The PendingStream request can be canceled by dropping the PendingStream
34
// instance before it can be fulfilled, at which point it is removed from the
35
// pending stream queue.
36
//
37
// Note that only locally initiated streams can be created in a pending state.
38
class PendingStream final {
39
 public:
40
  PendingStream(Direction direction,
41
                Stream* stream,
42
                BaseObjectWeakPtr<Session> session);
43
  DISALLOW_COPY_AND_MOVE(PendingStream)
44
  ~PendingStream();
45
46
  // Called when the stream has been opened. Transitions the stream from a
47
  // pending state to an opened state.
48
  void fulfill(stream_id id);
49
50
  // Called when opening the stream fails or is canceled. Transitions the
51
  // stream into a closed/destroyed state.
52
  void reject(QuicError error = QuicError());
53
54
0
  inline Direction direction() const { return direction_; }
55
56
 private:
57
  Direction direction_;
58
  Stream* stream_;
59
  BaseObjectWeakPtr<Session> session_;
60
  bool waiting_ = true;
61
62
  ListNode<PendingStream> pending_stream_queue_;
63
64
 public:
65
  using PendingStreamQueue =
66
      ListHead<PendingStream, &PendingStream::pending_stream_queue_>;
67
};
68
69
// QUIC Stream's are simple data flows that may be:
70
//
71
// * Bidirectional (both sides can send) or Unidirectional (one side can send)
72
// * Server or Client Initiated
73
//
74
// The flow direction and origin of the stream are important in determining the
75
// write and read state (Open or Closed). Specifically:
76
//
77
// Bidirectional Stream States:
78
// +--------+--------------+----------+----------+
79
// |   ON   | Initiated By | Readable | Writable |
80
// +--------+--------------+----------+----------+
81
// | Server |   Server     |    Y     |    Y     |
82
// +--------+--------------+----------+----------+
83
// | Server |   Client     |    Y     |    Y     |
84
// +--------+--------------+----------+----------+
85
// | Client |   Server     |    Y     |    Y     |
86
// +--------+--------------+----------+----------+
87
// | Client |   Client     |    Y     |    Y     |
88
// +--------+--------------+----------+----------+
89
//
90
// Unidirectional Stream States
91
// +--------+--------------+----------+----------+
92
// |   ON   | Initiated By | Readable | Writable |
93
// +--------+--------------+----------+----------+
94
// | Server |   Server     |    N     |    Y     |
95
// +--------+--------------+----------+----------+
96
// | Server |   Client     |    Y     |    N     |
97
// +--------+--------------+----------+----------+
98
// | Client |   Server     |    Y     |    N     |
99
// +--------+--------------+----------+----------+
100
// | Client |   Client     |    N     |    Y     |
101
// +--------+--------------+----------+----------+
102
//
103
// All data sent via the Stream is buffered internally until either receipt is
104
// acknowledged from the peer or attempts to send are abandoned. The fact that
105
// data is buffered in memory makes it essential that the flow control for the
106
// session and the stream are properly handled. For now, we are largely relying
107
// on ngtcp2's default flow control mechanisms which generally should be doing
108
// the right thing. From the JavaScript side, the application pushes data into
109
// the stream's outbound queue and ngtcp2 pulls data from that queue as it is
110
// able. The stream outbound has a high watermark. The JS side can choose to
111
// continue writing data even after the high watermark is reached but this
112
// risks using up large amounts of memory if the session is slow to send data
113
// or the peer is slow to acknowledge receipt. The JavaScript side needs to
114
// be aware of this risk and pay proper attention to the backpressure signals.
115
//
116
// A Stream may be in a fully closed state (No longer readable nor writable)
117
// state but still have unacknowledged data in both the inbound and outbound
118
// queues.
119
//
120
// A Stream is gracefully closed when (a) both read and write states are closed,
121
// (b) all queued data has been acknowledged.
122
//
123
// The Stream may be forcefully closed immediately using destroy(err). This
124
// causes all queued outbound data to be cleared, pending JavaScript writes
125
// to be abandoned, the Stream to be immediately closed at the ngtcp2 level
126
// without waiting for any outstanding acknowledgements. Keep in mind, however,
127
// that the peer is not notified that the stream is destroyed and may attempt
128
// to continue sending data and acknowledgements until it is able to determine
129
// that the stream is gone. Any data that has already been received and is in
130
// the inbound queue is preserved and may be read by the application.
131
//
132
// QUIC streams in general do not have headers. Some QUIC applications, however,
133
// may associate headers with the stream (HTTP/3 for instance). As a
134
// convenience, the Stream class will hold onto these headers for the
135
// application.
136
//
137
// Streams may be created in a pending state. This means that while the Stream
138
// object is created, it has not yet been opened in ngtcp2 and therefore has
139
// no official status yet. Certain operations can still be performed on the
140
// stream object such as providing data, adding headers, or destroying the
141
// stream.
142
//
143
// When a stream is created the data source for the stream must be given.
144
// If no data source is given, then the stream is assumed to not have any
145
// outbound data. If the stream was created as bidirectional, the outbound
146
// side will be closed. The data source can be fixed length or may support
147
// streaming. What this means practically is, when a stream is opened,
148
// you must already have a sense of whether that will provide data or
149
// not. When in doubt, specify a streaming data source, which can produce
150
// zero-length output.
151
class Stream final : public AsyncWrap,
152
                     public Ngtcp2Source,
153
                     public DataQueue::BackpressureListener {
154
 public:
155
  using Header = NgHeaderBase<BindingData>;
156
157
  // Acquire a DataQueue from the given value if it is valid. The return
158
  // follows the typical V8 rules for Maybe types. If an error occurs,
159
  // the Maybe will be empty and an exception will be set on the isolate.
160
  static v8::Maybe<std::shared_ptr<DataQueue>> GetDataQueueFromSource(
161
      Environment* env, v8::Local<v8::Value> value);
162
163
  // The stream_user_data field is from ngtcp2 and will point to the
164
  // Stream instance associated with the stream_id.
165
  static Stream* From(void* stream_user_data);
166
167
  JS_CONSTRUCTOR(Stream);
168
  JS_BINDING_INIT_BOILERPLATE();
169
170
  // Creates a new non-pending stream. The directionality of the stream
171
  // is inferred from the stream id.
172
  static BaseObjectPtr<Stream> Create(
173
      Session* session,
174
      stream_id id,
175
      std::shared_ptr<DataQueue> source = nullptr);
176
177
  // Creates a new pending stream.
178
  static BaseObjectPtr<Stream> Create(
179
      Session* session,
180
      Direction direction,
181
      std::shared_ptr<DataQueue> source = nullptr);
182
183
  // The constructor is only public to be visible by MakeDetachedBaseObject.
184
  // Call Create to create new instances of Stream.
185
  Stream(BaseObjectWeakPtr<Session> session,
186
         v8::Local<v8::Object> obj,
187
         stream_id id,
188
         std::shared_ptr<DataQueue> source);
189
190
  // Creates the stream in a pending state. The constructor is only public
191
  // to be visible to MakeDetachedBaseObject. Call Create to create new
192
  // instances of Stream.
193
  Stream(BaseObjectWeakPtr<Session> session,
194
         v8::Local<v8::Object> obj,
195
         Direction direction,
196
         std::shared_ptr<DataQueue> source);
197
  DISALLOW_COPY_AND_MOVE(Stream)
198
  ~Stream() override;
199
200
  // While the stream is still pending, the id will be kMaxStreamId,
201
  // inidicating the maximum possible stream id is kMaxStreamId - 1.
202
  stream_id id() const;
203
204
  // While the stream is still pending, the origin will be invalid.
205
  Side origin() const;
206
207
  Direction direction() const;
208
209
  Session& session() const;
210
211
  // True if this stream was created in a pending state and is still waiting
212
  // to be created.
213
  bool is_pending() const;
214
215
  // True if we've completely sent all outbound data for this stream.
216
  // Importantly, this does not necessarily mean that we are completely
217
  // done with the outbound data. We may still be waiting on outbound
218
  // data to be acknowledged by the remote peer.
219
  bool is_eos() const;
220
221
  // True if this stream is still in a readable state.
222
  bool is_readable() const;
223
224
  // True if this stream is still in a writable state.
225
  bool is_writable() const;
226
227
  // Called by the session/application to indicate that the specified number
228
  // of bytes have been acknowledged by the peer.
229
  void Acknowledge(size_t datalen);
230
231
  // Called by the session/application to indicate that the specified number
232
  // of bytes have been transmitted to the peer.  This is an initial
233
  // indication occuring the first time data is sent. It does not indicate
234
  // that the data has been retransmitted due to loss or has been
235
  // acknowledged to have been received by the peer.
236
  void Commit(size_t datalen);
237
238
  void EndWritable();
239
  void EndReadable(std::optional<uint64_t> maybe_final_size = std::nullopt);
240
  void EntryRead(size_t amount) override;
241
242
  // Pulls data from the internal outbound DataQueue configured for this stream.
243
  // This is called by the session/application when it is preparing to send
244
  // data to the peer. There is no guarantee that the requested amount of data
245
  // will actually be sent. The amount of data actually sent is indicated
246
  // by the datalen argument to the Commit() method.
247
  int DoPull(bob::Next<ngtcp2_vec> next,
248
             int options,
249
             ngtcp2_vec* data,
250
             size_t count,
251
             size_t max_count_hint) override;
252
253
  // Forcefully close the stream immediately. Data already queued in the
254
  // inbound is preserved but new data will not be accepted. All pending
255
  // writes are abandoned, and the stream is immediately closed at the ngtcp2
256
  // level without waiting for any outstanding acknowledgements.
257
  void Destroy(QuicError error = QuicError());
258
259
  struct ReceiveDataFlags final {
260
    // Identifies the final chunk of data that the peer will send for the
261
    // stream.
262
    bool fin = false;
263
    // Indicates that this chunk of data was received in a 0RTT packet before
264
    // the TLS handshake completed, suggesting that is is not as secure and
265
    // could be replayed by an attacker.
266
    bool early = false;
267
  };
268
269
  void ReceiveData(const uint8_t* data, size_t len, ReceiveDataFlags flags);
270
  void ReceiveStopSending(QuicError error);
271
  void ReceiveStreamReset(uint64_t final_size, QuicError error);
272
273
  // Currently, only HTTP/3 streams support headers. These methods are here
274
  // to support that. They are not used when using any other QUIC application.
275
276
  void BeginHeaders(HeadersKind kind);
277
  void set_headers_kind(HeadersKind kind);
278
  // Returns false if the header cannot be added. This will typically happen
279
  // if the application does not support headers, a maximum number of headers
280
  // have already been added, or the maximum total header length is reached.
281
  bool AddHeader(const Header& header);
282
283
  SET_NO_MEMORY_INFO()
284
  SET_MEMORY_INFO_NAME(Stream)
285
  SET_SELF_SIZE(Stream)
286
287
  struct State;
288
  struct Stats;
289
290
 private:
291
  struct Impl;
292
  struct PendingHeaders;
293
294
  class Outbound;
295
296
  // Gets a reader for the data received for this stream from the peer,
297
  BaseObjectPtr<Blob::Reader> get_reader();
298
299
  void set_final_size(uint64_t amount);
300
  void set_outbound(std::shared_ptr<DataQueue> source);
301
302
  bool is_local_unidirectional() const;
303
  bool is_remote_unidirectional() const;
304
305
  // JavaScript callouts
306
307
  // Notifies the JavaScript side that the stream has been destroyed.
308
  void EmitClose(const QuicError& error);
309
310
  // Notifies the JavaScript side that the stream has been reset.
311
  void EmitReset(const QuicError& error);
312
313
  // Notifies the JavaScript side that the application is ready to receive
314
  // trailing headers. Any trailing headers must be sent immediately, and
315
  // synchronously when this callback is triggered.
316
  void EmitWantTrailers();
317
318
  // Notifies the JavaScript side that sending data on the stream has been
319
  // blocked because of flow control restriction.
320
  void EmitBlocked();
321
322
  // Delivers the set of inbound headers that have been collected.
323
  void EmitHeaders();
324
325
  void NotifyReadableEnded(error_code code);
326
  void NotifyWritableEnded(error_code code);
327
328
  // When a pending stream is finally opened, the NotifyStreamOpened method
329
  // will be called and the id will be assigned.
330
  void NotifyStreamOpened(stream_id id);
331
  void EnqueuePendingHeaders(HeadersKind kind,
332
                             v8::Local<v8::Array> headers,
333
                             HeadersFlags flags);
334
335
  AliasedStruct<Stats> stats_;
336
  AliasedStruct<State> state_;
337
  BaseObjectWeakPtr<Session> session_;
338
  std::unique_ptr<Outbound> outbound_;
339
  std::shared_ptr<DataQueue> inbound_;
340
341
  // If the stream cannot be opened yet, it will be created in a pending state.
342
  // Once the owning session is able to, it will complete opening of the stream
343
  // and the stream id will be assigned.
344
  std::optional<std::unique_ptr<PendingStream>> maybe_pending_stream_ =
345
      std::nullopt;
346
  std::vector<std::unique_ptr<PendingHeaders>> pending_headers_queue_;
347
  error_code pending_close_read_code_ = 0;
348
  error_code pending_close_write_code_ = 0;
349
350
  struct PendingPriority {
351
    StreamPriority priority;
352
    StreamPriorityFlags flags;
353
  };
354
  std::optional<PendingPriority> pending_priority_ = std::nullopt;
355
356
  // The headers_ field holds a block of headers that have been received and
357
  // are being buffered for delivery to the JavaScript side.
358
  // TODO(@jasnell): Use v8::Global instead of v8::Local here.
359
  v8::LocalVector<v8::Value> headers_;
360
361
  // The headers_kind_ field indicates the kind of headers that are being
362
  // buffered.
363
  HeadersKind headers_kind_ = HeadersKind::INITIAL;
364
365
  // The headers_length_ field holds the total length of the headers that have
366
  // been buffered.
367
  size_t headers_length_ = 0;
368
369
  friend struct Impl;
370
  friend class PendingStream;
371
  friend class Http3ApplicationImpl;
372
  friend class DefaultApplication;
373
374
 public:
375
  // The Queue/Schedule/Unschedule here are part of the mechanism used to
376
  // determine which streams have data to send on the session. When a stream
377
  // potentially has data available, it will be scheduled in the Queue. Then,
378
  // when the Session::Application starts sending pending data, it will check
379
  // the queue to see if there are streams waiting. If there are, it will grab
380
  // one and check to see if there is data to send. When a stream does not have
381
  // data to send (such as when it is initially created or is using an async
382
  // source that is still waiting for data to be pushed) it will not appear in
383
  // the queue.
384
  ListNode<Stream> stream_queue_;
385
  using Queue = ListHead<Stream, &Stream::stream_queue_>;
386
387
  void Schedule(Queue* queue);
388
  void Unschedule();
389
};
390
391
}  // namespace node::quic
392
393
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS