Coverage Report

Created: 2025-08-28 09:57

/src/node/src/quic/application.cc
Line
Count
Source (jump to first uncovered line)
1
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
2
3
#include "application.h"
4
#include <async_wrap-inl.h>
5
#include <debug_utils-inl.h>
6
#include <node_bob.h>
7
#include <node_sockaddr-inl.h>
8
#include <uv.h>
9
#include <v8.h>
10
#include "defs.h"
11
#include "endpoint.h"
12
#include "http3.h"
13
#include "packet.h"
14
#include "session.h"
15
16
namespace node {
17
18
using v8::Just;
19
using v8::Local;
20
using v8::Maybe;
21
using v8::Nothing;
22
using v8::Object;
23
using v8::Value;
24
25
namespace quic {
26
27
// ============================================================================
28
// Session::Application_Options
29
const Session::Application_Options Session::Application_Options::kDefault = {};
30
31
0
Session::Application_Options::operator const nghttp3_settings() const {
32
  // In theory, Application_Options might contain options for more than just
33
  // HTTP/3. Here we extract only the properties that are relevant to HTTP/3.
34
0
  return nghttp3_settings{
35
0
      max_field_section_size,
36
0
      static_cast<size_t>(qpack_max_dtable_capacity),
37
0
      static_cast<size_t>(qpack_encoder_max_dtable_capacity),
38
0
      static_cast<size_t>(qpack_blocked_streams),
39
0
      enable_connect_protocol,
40
0
      enable_datagrams,
41
0
  };
42
0
}
43
44
0
std::string Session::Application_Options::ToString() const {
45
0
  DebugIndentScope indent;
46
0
  auto prefix = indent.Prefix();
47
0
  std::string res("{");
48
0
  res += prefix + "max header pairs: " + std::to_string(max_header_pairs);
49
0
  res += prefix + "max header length: " + std::to_string(max_header_length);
50
0
  res += prefix +
51
0
         "max field section size: " + std::to_string(max_field_section_size);
52
0
  res += prefix + "qpack max dtable capacity: " +
53
0
         std::to_string(qpack_max_dtable_capacity);
54
0
  res += prefix + "qpack encoder max dtable capacity: " +
55
0
         std::to_string(qpack_encoder_max_dtable_capacity);
56
0
  res += prefix +
57
0
         "qpack blocked streams: " + std::to_string(qpack_blocked_streams);
58
0
  res += prefix + "enable connect protocol: " +
59
0
         (enable_connect_protocol ? std::string("yes") : std::string("no"));
60
0
  res += prefix + "enable datagrams: " +
61
0
         (enable_datagrams ? std::string("yes") : std::string("no"));
62
0
  res += indent.Close();
63
0
  return res;
64
0
}
65
66
Maybe<Session::Application_Options> Session::Application_Options::From(
67
0
    Environment* env, Local<Value> value) {
68
0
  if (value.IsEmpty() || (!value->IsUndefined() && !value->IsObject())) {
69
0
    THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object");
70
0
    return Nothing<Application_Options>();
71
0
  }
72
73
0
  Application_Options options;
74
0
  auto& state = BindingData::Get(env);
75
0
  if (value->IsUndefined()) {
76
0
    return Just<Application_Options>(options);
77
0
  }
78
79
0
  auto params = value.As<Object>();
80
81
0
#define SET(name)                                                              \
82
0
  SetOption<Session::Application_Options,                                      \
83
0
            &Session::Application_Options::name>(                              \
84
0
      env, &options, params, state.name##_string())
85
86
0
  if (!SET(max_header_pairs) || !SET(max_header_length) ||
87
0
      !SET(max_field_section_size) || !SET(qpack_max_dtable_capacity) ||
88
0
      !SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams) ||
89
0
      !SET(enable_connect_protocol) || !SET(enable_datagrams)) {
90
0
    return Nothing<Application_Options>();
91
0
  }
92
93
0
#undef SET
94
95
0
  return Just<Application_Options>(options);
96
0
}
97
98
Session::Application::Application(Session* session, const Options& options)
99
0
    : session_(session) {}
100
101
0
bool Session::Application::Start() {
102
  // By default there is nothing to do. Specific implementations may
103
  // override to perform more actions.
104
0
  Debug(session_, "Session application started");
105
0
  return true;
106
0
}
107
108
void Session::Application::AcknowledgeStreamData(Stream* stream,
109
0
                                                 size_t datalen) {
110
0
  Debug(session_,
111
0
        "Application acknowledging stream %" PRIi64 " data: %zu",
112
0
        stream->id(),
113
0
        datalen);
114
0
  DCHECK_NOT_NULL(stream);
115
0
  stream->Acknowledge(datalen);
116
0
}
117
118
0
void Session::Application::BlockStream(int64_t id) {
119
0
  Debug(session_, "Application blocking stream %" PRIi64, id);
120
0
  auto stream = session().FindStream(id);
121
0
  if (stream) stream->EmitBlocked();
122
0
}
123
124
bool Session::Application::CanAddHeader(size_t current_count,
125
                                        size_t current_headers_length,
126
0
                                        size_t this_header_length) {
127
  // By default headers are not supported.
128
0
  Debug(session_, "Application cannot add header");
129
0
  return false;
130
0
}
131
132
bool Session::Application::SendHeaders(const Stream& stream,
133
                                       HeadersKind kind,
134
                                       const v8::Local<v8::Array>& headers,
135
0
                                       HeadersFlags flags) {
136
  // By default do nothing.
137
0
  Debug(session_, "Application cannot send headers");
138
0
  return false;
139
0
}
140
141
0
void Session::Application::ResumeStream(int64_t id) {
142
0
  Debug(session_, "Application resuming stream %" PRIi64, id);
143
  // By default do nothing.
144
0
}
145
146
void Session::Application::ExtendMaxStreams(EndpointLabel label,
147
                                            Direction direction,
148
0
                                            uint64_t max_streams) {
149
0
  Debug(session_, "Application extending max streams");
150
  // By default do nothing.
151
0
}
152
153
void Session::Application::ExtendMaxStreamData(Stream* stream,
154
0
                                               uint64_t max_data) {
155
0
  Debug(session_, "Application extending max stream data");
156
  // By default do nothing.
157
0
}
158
159
void Session::Application::CollectSessionTicketAppData(
160
0
    SessionTicket::AppData* app_data) const {
161
0
  Debug(session_, "Application collecting session ticket app data");
162
  // By default do nothing.
163
0
}
164
165
SessionTicket::AppData::Status
166
Session::Application::ExtractSessionTicketAppData(
167
    const SessionTicket::AppData& app_data,
168
0
    SessionTicket::AppData::Source::Flag flag) {
169
0
  Debug(session_, "Application extracting session ticket app data");
170
  // By default we do not have any application data to retrieve.
171
0
  return flag == SessionTicket::AppData::Source::Flag::STATUS_RENEW
172
0
             ? SessionTicket::AppData::Status::TICKET_USE_RENEW
173
0
             : SessionTicket::AppData::Status::TICKET_USE;
174
0
}
175
176
void Session::Application::SetStreamPriority(const Stream& stream,
177
                                             StreamPriority priority,
178
0
                                             StreamPriorityFlags flags) {
179
0
  Debug(
180
0
      session_, "Application setting stream %" PRIi64 " priority", stream.id());
181
  // By default do nothing.
182
0
}
183
184
0
StreamPriority Session::Application::GetStreamPriority(const Stream& stream) {
185
0
  return StreamPriority::DEFAULT;
186
0
}
187
188
0
Packet* Session::Application::CreateStreamDataPacket() {
189
0
  return Packet::Create(env(),
190
0
                        session_->endpoint_.get(),
191
0
                        session_->remote_address_,
192
0
                        ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
193
0
                        "stream data");
194
0
}
195
196
0
void Session::Application::StreamClose(Stream* stream, QuicError error) {
197
0
  Debug(session_,
198
0
        "Application closing stream %" PRIi64 " with error %s",
199
0
        stream->id(),
200
0
        error);
201
0
  stream->Destroy(error);
202
0
}
203
204
0
void Session::Application::StreamStopSending(Stream* stream, QuicError error) {
205
0
  Debug(session_,
206
0
        "Application stopping sending on stream %" PRIi64 " with error %s",
207
0
        stream->id(),
208
0
        error);
209
0
  DCHECK_NOT_NULL(stream);
210
0
  stream->ReceiveStopSending(error);
211
0
}
212
213
void Session::Application::StreamReset(Stream* stream,
214
                                       uint64_t final_size,
215
0
                                       QuicError error) {
216
0
  Debug(session_,
217
0
        "Application resetting stream %" PRIi64 " with error %s",
218
0
        stream->id(),
219
0
        error);
220
0
  stream->ReceiveStreamReset(final_size, error);
221
0
}
222
223
0
void Session::Application::SendPendingData() {
224
0
  Debug(session_, "Application sending pending data");
225
0
  PathStorage path;
226
227
0
  Packet* packet = nullptr;
228
0
  uint8_t* pos = nullptr;
229
0
  int err = 0;
230
231
0
  size_t maxPacketCount = std::min(static_cast<size_t>(64000),
232
0
                                   ngtcp2_conn_get_send_quantum(*session_));
233
0
  size_t packetSendCount = 0;
234
235
0
  const auto updateTimer = [&] {
236
0
    Debug(session_, "Application updating the session timer");
237
0
    ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime());
238
0
    session_->UpdateTimer();
239
0
  };
240
241
0
  const auto congestionLimited = [&](auto packet) {
242
0
    auto len = pos - ngtcp2_vec(*packet).base;
243
    // We are either congestion limited or done.
244
0
    if (len) {
245
      // Some data was serialized into the packet. We need to send it.
246
0
      packet->Truncate(len);
247
0
      session_->Send(std::move(packet), path);
248
0
    }
249
250
0
    updateTimer();
251
0
  };
252
253
0
  for (;;) {
254
0
    ssize_t ndatalen;
255
0
    StreamData stream_data;
256
257
0
    err = GetStreamData(&stream_data);
258
259
0
    if (err < 0) {
260
0
      session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
261
0
      return session_->Close(Session::CloseMethod::SILENT);
262
0
    }
263
264
0
    if (packet == nullptr) {
265
0
      packet = CreateStreamDataPacket();
266
0
      if (packet == nullptr) {
267
0
        session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
268
0
        return session_->Close(Session::CloseMethod::SILENT);
269
0
      }
270
0
      pos = ngtcp2_vec(*packet).base;
271
0
    }
272
273
0
    ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data);
274
275
0
    if (nwrite <= 0) {
276
0
      switch (nwrite) {
277
0
        case 0:
278
0
          if (stream_data.id >= 0) ResumeStream(stream_data.id);
279
0
          return congestionLimited(std::move(packet));
280
0
        case NGTCP2_ERR_STREAM_DATA_BLOCKED: {
281
0
          session().StreamDataBlocked(stream_data.id);
282
0
          if (session().max_data_left() == 0) {
283
0
            if (stream_data.id >= 0) ResumeStream(stream_data.id);
284
0
            return congestionLimited(std::move(packet));
285
0
          }
286
0
          CHECK_LE(ndatalen, 0);
287
0
          continue;
288
0
        }
289
0
        case NGTCP2_ERR_STREAM_SHUT_WR: {
290
          // Indicates that the writable side of the stream has been closed
291
          // locally or the stream is being reset. In either case, we can't send
292
          // any stream data!
293
0
          CHECK_GE(stream_data.id, 0);
294
          // We need to notify the stream that the writable side has been closed
295
          // and no more outbound data can be sent.
296
0
          CHECK_LE(ndatalen, 0);
297
0
          auto stream = session_->FindStream(stream_data.id);
298
0
          if (stream) stream->EndWritable();
299
0
          continue;
300
0
        }
301
0
        case NGTCP2_ERR_WRITE_MORE: {
302
0
          CHECK_GT(ndatalen, 0);
303
0
          if (!StreamCommit(&stream_data, ndatalen)) return session_->Close();
304
0
          pos += ndatalen;
305
0
          continue;
306
0
        }
307
0
      }
308
309
0
      packet->Done(UV_ECANCELED);
310
0
      session_->last_error_ = QuicError::ForNgtcp2Error(nwrite);
311
0
      return session_->Close(Session::CloseMethod::SILENT);
312
0
    }
313
314
0
    pos += nwrite;
315
0
    if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) {
316
      // Since we are closing the session here, we don't worry about updating
317
      // the pkt tx time. The failed StreamCommit should have updated the
318
      // last_error_ appropriately.
319
0
      packet->Done(UV_ECANCELED);
320
0
      return session_->Close(Session::CloseMethod::SILENT);
321
0
    }
322
323
0
    if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id);
324
325
0
    packet->Truncate(nwrite);
326
0
    session_->Send(std::move(packet), path);
327
328
0
    pos = nullptr;
329
330
0
    if (++packetSendCount == maxPacketCount) {
331
0
      break;
332
0
    }
333
0
  }
334
335
0
  updateTimer();
336
0
}
337
338
ssize_t Session::Application::WriteVStream(PathStorage* path,
339
                                           uint8_t* buf,
340
                                           ssize_t* ndatalen,
341
0
                                           const StreamData& stream_data) {
342
0
  CHECK_LE(stream_data.count, kMaxVectorCount);
343
0
  uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE;
344
0
  if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE;
345
0
  if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
346
0
  ssize_t ret = ngtcp2_conn_writev_stream(
347
0
      *session_,
348
0
      &path->path,
349
0
      nullptr,
350
0
      buf,
351
0
      ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
352
0
      ndatalen,
353
0
      flags,
354
0
      stream_data.id,
355
0
      stream_data.buf,
356
0
      stream_data.count,
357
0
      uv_hrtime());
358
0
  return ret;
359
0
}
360
361
// The DefaultApplication is the default implementation of Session::Application
362
// that is used for all unrecognized ALPN identifiers.
363
class DefaultApplication final : public Session::Application {
364
 public:
365
  // Marked NOLINT because the cpp linter gets confused about this using
366
  // statement not being sorted with the using v8 statements at the top
367
  // of the namespace.
368
  using Application::Application;  // NOLINT
369
370
  bool ReceiveStreamData(Stream* stream,
371
                         const uint8_t* data,
372
                         size_t datalen,
373
0
                         Stream::ReceiveDataFlags flags) override {
374
0
    Debug(&session(), "Default application receiving stream data");
375
0
    DCHECK_NOT_NULL(stream);
376
0
    if (!stream->is_destroyed()) stream->ReceiveData(data, datalen, flags);
377
0
    return true;
378
0
  }
379
380
0
  int GetStreamData(StreamData* stream_data) override {
381
0
    Debug(&session(), "Default application getting stream data");
382
0
    DCHECK_NOT_NULL(stream_data);
383
    // If the queue is empty, there aren't any streams with data yet
384
0
    if (stream_queue_.IsEmpty()) return 0;
385
386
0
    const auto get_length = [](auto vec, size_t count) {
387
0
      CHECK_NOT_NULL(vec);
388
0
      size_t len = 0;
389
0
      for (size_t n = 0; n < count; n++) len += vec[n].len;
390
0
      return len;
391
0
    };
392
393
0
    Stream* stream = stream_queue_.PopFront();
394
0
    CHECK_NOT_NULL(stream);
395
0
    stream_data->stream.reset(stream);
396
0
    stream_data->id = stream->id();
397
0
    auto next =
398
0
        [&](int status, const ngtcp2_vec* data, size_t count, bob::Done done) {
399
0
          switch (status) {
400
0
            case bob::Status::STATUS_BLOCK:
401
              // Fall through
402
0
            case bob::Status::STATUS_WAIT:
403
0
              return;
404
0
            case bob::Status::STATUS_EOS:
405
0
              stream_data->fin = 1;
406
0
          }
407
408
0
          stream_data->count = count;
409
410
0
          if (count > 0) {
411
0
            stream->Schedule(&stream_queue_);
412
0
            stream_data->remaining = get_length(data, count);
413
0
          } else {
414
0
            stream_data->remaining = 0;
415
0
          }
416
417
          // Not calling done here because we defer committing
418
          // the data until after we're sure it's written.
419
0
        };
420
421
0
    if (LIKELY(!stream->is_eos())) {
422
0
      int ret = stream->Pull(std::move(next),
423
0
                             bob::Options::OPTIONS_SYNC,
424
0
                             stream_data->data,
425
0
                             arraysize(stream_data->data),
426
0
                             kMaxVectorCount);
427
0
      if (ret == bob::Status::STATUS_EOS) {
428
0
        stream_data->fin = 1;
429
0
      }
430
0
    } else {
431
0
      stream_data->fin = 1;
432
0
    }
433
434
0
    return 0;
435
0
  }
436
437
0
  void ResumeStream(int64_t id) override {
438
0
    Debug(&session(), "Default application resuming stream %" PRIi64, id);
439
0
    ScheduleStream(id);
440
0
  }
441
442
0
  bool ShouldSetFin(const StreamData& stream_data) override {
443
0
    auto const is_empty = [](auto vec, size_t cnt) {
444
0
      size_t i;
445
0
      for (i = 0; i < cnt && vec[i].len == 0; ++i) {
446
0
      }
447
0
      return i == cnt;
448
0
    };
449
450
0
    return stream_data.stream && is_empty(stream_data.buf, stream_data.count);
451
0
  }
452
453
0
  bool StreamCommit(StreamData* stream_data, size_t datalen) override {
454
0
    Debug(&session(), "Default application committing stream data");
455
0
    DCHECK_NOT_NULL(stream_data);
456
0
    const auto consume = [](ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
457
0
      ngtcp2_vec* v = *pvec;
458
0
      size_t cnt = *pcnt;
459
460
0
      for (; cnt > 0; --cnt, ++v) {
461
0
        if (v->len > len) {
462
0
          v->len -= len;
463
0
          v->base += len;
464
0
          break;
465
0
        }
466
0
        len -= v->len;
467
0
      }
468
469
0
      *pvec = v;
470
0
      *pcnt = cnt;
471
0
    };
472
473
0
    CHECK(stream_data->stream);
474
0
    stream_data->remaining -= datalen;
475
0
    consume(&stream_data->buf, &stream_data->count, datalen);
476
0
    stream_data->stream->Commit(datalen);
477
0
    return true;
478
0
  }
479
480
  SET_SELF_SIZE(DefaultApplication)
481
  SET_MEMORY_INFO_NAME(DefaultApplication)
482
  SET_NO_MEMORY_INFO()
483
484
 private:
485
0
  void ScheduleStream(int64_t id) {
486
0
    Debug(&session(), "Default application scheduling stream %" PRIi64, id);
487
0
    auto stream = session().FindStream(id);
488
0
    if (stream && !stream->is_destroyed()) {
489
0
      stream->Schedule(&stream_queue_);
490
0
    }
491
0
  }
492
493
0
  void UnscheduleStream(int64_t id) {
494
0
    Debug(&session(), "Default application unscheduling stream %" PRIi64, id);
495
0
    auto stream = session().FindStream(id);
496
0
    if (stream && !stream->is_destroyed()) stream->Unschedule();
497
0
  }
498
499
  Stream::Queue stream_queue_;
500
};
501
502
0
std::unique_ptr<Session::Application> Session::select_application() {
503
  // In the future, we may end up supporting additional QUIC protocols. As they
504
  // are added, extend the cases here to create and return them.
505
506
0
  if (config_.options.tls_options.alpn == NGHTTP3_ALPN_H3) {
507
0
    Debug(this, "Selecting HTTP/3 application");
508
0
    return createHttp3Application(this, config_.options.application_options);
509
0
  }
510
511
0
  Debug(this, "Selecting default application");
512
0
  return std::make_unique<DefaultApplication>(
513
0
      this, config_.options.application_options);
514
0
}
515
516
}  // namespace quic
517
}  // namespace node
518
519
#endif  // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC