/src/node/src/quic/application.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC |
2 | | |
3 | | #include "application.h" |
4 | | #include <async_wrap-inl.h> |
5 | | #include <debug_utils-inl.h> |
6 | | #include <node_bob.h> |
7 | | #include <node_sockaddr-inl.h> |
8 | | #include <uv.h> |
9 | | #include <v8.h> |
10 | | #include "defs.h" |
11 | | #include "endpoint.h" |
12 | | #include "http3.h" |
13 | | #include "packet.h" |
14 | | #include "session.h" |
15 | | |
16 | | namespace node { |
17 | | |
18 | | using v8::Just; |
19 | | using v8::Local; |
20 | | using v8::Maybe; |
21 | | using v8::Nothing; |
22 | | using v8::Object; |
23 | | using v8::Value; |
24 | | |
25 | | namespace quic { |
26 | | |
27 | | // ============================================================================ |
28 | | // Session::Application_Options |
29 | | const Session::Application_Options Session::Application_Options::kDefault = {}; |
30 | | |
31 | 0 | Session::Application_Options::operator const nghttp3_settings() const { |
32 | | // In theory, Application_Options might contain options for more than just |
33 | | // HTTP/3. Here we extract only the properties that are relevant to HTTP/3. |
34 | 0 | return nghttp3_settings{ |
35 | 0 | max_field_section_size, |
36 | 0 | static_cast<size_t>(qpack_max_dtable_capacity), |
37 | 0 | static_cast<size_t>(qpack_encoder_max_dtable_capacity), |
38 | 0 | static_cast<size_t>(qpack_blocked_streams), |
39 | 0 | enable_connect_protocol, |
40 | 0 | enable_datagrams, |
41 | 0 | }; |
42 | 0 | } |
43 | | |
44 | 0 | std::string Session::Application_Options::ToString() const { |
45 | 0 | DebugIndentScope indent; |
46 | 0 | auto prefix = indent.Prefix(); |
47 | 0 | std::string res("{"); |
48 | 0 | res += prefix + "max header pairs: " + std::to_string(max_header_pairs); |
49 | 0 | res += prefix + "max header length: " + std::to_string(max_header_length); |
50 | 0 | res += prefix + |
51 | 0 | "max field section size: " + std::to_string(max_field_section_size); |
52 | 0 | res += prefix + "qpack max dtable capacity: " + |
53 | 0 | std::to_string(qpack_max_dtable_capacity); |
54 | 0 | res += prefix + "qpack encoder max dtable capacity: " + |
55 | 0 | std::to_string(qpack_encoder_max_dtable_capacity); |
56 | 0 | res += prefix + |
57 | 0 | "qpack blocked streams: " + std::to_string(qpack_blocked_streams); |
58 | 0 | res += prefix + "enable connect protocol: " + |
59 | 0 | (enable_connect_protocol ? std::string("yes") : std::string("no")); |
60 | 0 | res += prefix + "enable datagrams: " + |
61 | 0 | (enable_datagrams ? std::string("yes") : std::string("no")); |
62 | 0 | res += indent.Close(); |
63 | 0 | return res; |
64 | 0 | } |
65 | | |
66 | | Maybe<Session::Application_Options> Session::Application_Options::From( |
67 | 0 | Environment* env, Local<Value> value) { |
68 | 0 | if (value.IsEmpty() || (!value->IsUndefined() && !value->IsObject())) { |
69 | 0 | THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object"); |
70 | 0 | return Nothing<Application_Options>(); |
71 | 0 | } |
72 | | |
73 | 0 | Application_Options options; |
74 | 0 | auto& state = BindingData::Get(env); |
75 | 0 | if (value->IsUndefined()) { |
76 | 0 | return Just<Application_Options>(options); |
77 | 0 | } |
78 | | |
79 | 0 | auto params = value.As<Object>(); |
80 | |
|
81 | 0 | #define SET(name) \ |
82 | 0 | SetOption<Session::Application_Options, \ |
83 | 0 | &Session::Application_Options::name>( \ |
84 | 0 | env, &options, params, state.name##_string()) |
85 | |
|
86 | 0 | if (!SET(max_header_pairs) || !SET(max_header_length) || |
87 | 0 | !SET(max_field_section_size) || !SET(qpack_max_dtable_capacity) || |
88 | 0 | !SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams) || |
89 | 0 | !SET(enable_connect_protocol) || !SET(enable_datagrams)) { |
90 | 0 | return Nothing<Application_Options>(); |
91 | 0 | } |
92 | | |
93 | 0 | #undef SET |
94 | | |
95 | 0 | return Just<Application_Options>(options); |
96 | 0 | } |
97 | | |
98 | | Session::Application::Application(Session* session, const Options& options) |
99 | 0 | : session_(session) {} |
100 | | |
101 | 0 | bool Session::Application::Start() { |
102 | | // By default there is nothing to do. Specific implementations may |
103 | | // override to perform more actions. |
104 | 0 | Debug(session_, "Session application started"); |
105 | 0 | return true; |
106 | 0 | } |
107 | | |
108 | | void Session::Application::AcknowledgeStreamData(Stream* stream, |
109 | 0 | size_t datalen) { |
110 | 0 | Debug(session_, |
111 | 0 | "Application acknowledging stream %" PRIi64 " data: %zu", |
112 | 0 | stream->id(), |
113 | 0 | datalen); |
114 | 0 | DCHECK_NOT_NULL(stream); |
115 | 0 | stream->Acknowledge(datalen); |
116 | 0 | } |
117 | | |
118 | 0 | void Session::Application::BlockStream(int64_t id) { |
119 | 0 | Debug(session_, "Application blocking stream %" PRIi64, id); |
120 | 0 | auto stream = session().FindStream(id); |
121 | 0 | if (stream) stream->EmitBlocked(); |
122 | 0 | } |
123 | | |
124 | | bool Session::Application::CanAddHeader(size_t current_count, |
125 | | size_t current_headers_length, |
126 | 0 | size_t this_header_length) { |
127 | | // By default headers are not supported. |
128 | 0 | Debug(session_, "Application cannot add header"); |
129 | 0 | return false; |
130 | 0 | } |
131 | | |
132 | | bool Session::Application::SendHeaders(const Stream& stream, |
133 | | HeadersKind kind, |
134 | | const v8::Local<v8::Array>& headers, |
135 | 0 | HeadersFlags flags) { |
136 | | // By default do nothing. |
137 | 0 | Debug(session_, "Application cannot send headers"); |
138 | 0 | return false; |
139 | 0 | } |
140 | | |
141 | 0 | void Session::Application::ResumeStream(int64_t id) { |
142 | 0 | Debug(session_, "Application resuming stream %" PRIi64, id); |
143 | | // By default do nothing. |
144 | 0 | } |
145 | | |
146 | | void Session::Application::ExtendMaxStreams(EndpointLabel label, |
147 | | Direction direction, |
148 | 0 | uint64_t max_streams) { |
149 | 0 | Debug(session_, "Application extending max streams"); |
150 | | // By default do nothing. |
151 | 0 | } |
152 | | |
153 | | void Session::Application::ExtendMaxStreamData(Stream* stream, |
154 | 0 | uint64_t max_data) { |
155 | 0 | Debug(session_, "Application extending max stream data"); |
156 | | // By default do nothing. |
157 | 0 | } |
158 | | |
159 | | void Session::Application::CollectSessionTicketAppData( |
160 | 0 | SessionTicket::AppData* app_data) const { |
161 | 0 | Debug(session_, "Application collecting session ticket app data"); |
162 | | // By default do nothing. |
163 | 0 | } |
164 | | |
165 | | SessionTicket::AppData::Status |
166 | | Session::Application::ExtractSessionTicketAppData( |
167 | | const SessionTicket::AppData& app_data, |
168 | 0 | SessionTicket::AppData::Source::Flag flag) { |
169 | 0 | Debug(session_, "Application extracting session ticket app data"); |
170 | | // By default we do not have any application data to retrieve. |
171 | 0 | return flag == SessionTicket::AppData::Source::Flag::STATUS_RENEW |
172 | 0 | ? SessionTicket::AppData::Status::TICKET_USE_RENEW |
173 | 0 | : SessionTicket::AppData::Status::TICKET_USE; |
174 | 0 | } |
175 | | |
176 | | void Session::Application::SetStreamPriority(const Stream& stream, |
177 | | StreamPriority priority, |
178 | 0 | StreamPriorityFlags flags) { |
179 | 0 | Debug( |
180 | 0 | session_, "Application setting stream %" PRIi64 " priority", stream.id()); |
181 | | // By default do nothing. |
182 | 0 | } |
183 | | |
184 | 0 | StreamPriority Session::Application::GetStreamPriority(const Stream& stream) { |
185 | 0 | return StreamPriority::DEFAULT; |
186 | 0 | } |
187 | | |
188 | 0 | Packet* Session::Application::CreateStreamDataPacket() { |
189 | 0 | return Packet::Create(env(), |
190 | 0 | session_->endpoint_.get(), |
191 | 0 | session_->remote_address_, |
192 | 0 | ngtcp2_conn_get_max_tx_udp_payload_size(*session_), |
193 | 0 | "stream data"); |
194 | 0 | } |
195 | | |
196 | 0 | void Session::Application::StreamClose(Stream* stream, QuicError error) { |
197 | 0 | Debug(session_, |
198 | 0 | "Application closing stream %" PRIi64 " with error %s", |
199 | 0 | stream->id(), |
200 | 0 | error); |
201 | 0 | stream->Destroy(error); |
202 | 0 | } |
203 | | |
204 | 0 | void Session::Application::StreamStopSending(Stream* stream, QuicError error) { |
205 | 0 | Debug(session_, |
206 | 0 | "Application stopping sending on stream %" PRIi64 " with error %s", |
207 | 0 | stream->id(), |
208 | 0 | error); |
209 | 0 | DCHECK_NOT_NULL(stream); |
210 | 0 | stream->ReceiveStopSending(error); |
211 | 0 | } |
212 | | |
213 | | void Session::Application::StreamReset(Stream* stream, |
214 | | uint64_t final_size, |
215 | 0 | QuicError error) { |
216 | 0 | Debug(session_, |
217 | 0 | "Application resetting stream %" PRIi64 " with error %s", |
218 | 0 | stream->id(), |
219 | 0 | error); |
220 | 0 | stream->ReceiveStreamReset(final_size, error); |
221 | 0 | } |
222 | | |
223 | 0 | void Session::Application::SendPendingData() { |
224 | 0 | Debug(session_, "Application sending pending data"); |
225 | 0 | PathStorage path; |
226 | |
|
227 | 0 | Packet* packet = nullptr; |
228 | 0 | uint8_t* pos = nullptr; |
229 | 0 | int err = 0; |
230 | |
|
231 | 0 | size_t maxPacketCount = std::min(static_cast<size_t>(64000), |
232 | 0 | ngtcp2_conn_get_send_quantum(*session_)); |
233 | 0 | size_t packetSendCount = 0; |
234 | |
|
235 | 0 | const auto updateTimer = [&] { |
236 | 0 | Debug(session_, "Application updating the session timer"); |
237 | 0 | ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime()); |
238 | 0 | session_->UpdateTimer(); |
239 | 0 | }; |
240 | |
|
241 | 0 | const auto congestionLimited = [&](auto packet) { |
242 | 0 | auto len = pos - ngtcp2_vec(*packet).base; |
243 | | // We are either congestion limited or done. |
244 | 0 | if (len) { |
245 | | // Some data was serialized into the packet. We need to send it. |
246 | 0 | packet->Truncate(len); |
247 | 0 | session_->Send(std::move(packet), path); |
248 | 0 | } |
249 | |
|
250 | 0 | updateTimer(); |
251 | 0 | }; |
252 | |
|
253 | 0 | for (;;) { |
254 | 0 | ssize_t ndatalen; |
255 | 0 | StreamData stream_data; |
256 | |
|
257 | 0 | err = GetStreamData(&stream_data); |
258 | |
|
259 | 0 | if (err < 0) { |
260 | 0 | session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
261 | 0 | return session_->Close(Session::CloseMethod::SILENT); |
262 | 0 | } |
263 | | |
264 | 0 | if (packet == nullptr) { |
265 | 0 | packet = CreateStreamDataPacket(); |
266 | 0 | if (packet == nullptr) { |
267 | 0 | session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
268 | 0 | return session_->Close(Session::CloseMethod::SILENT); |
269 | 0 | } |
270 | 0 | pos = ngtcp2_vec(*packet).base; |
271 | 0 | } |
272 | | |
273 | 0 | ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); |
274 | |
|
275 | 0 | if (nwrite <= 0) { |
276 | 0 | switch (nwrite) { |
277 | 0 | case 0: |
278 | 0 | if (stream_data.id >= 0) ResumeStream(stream_data.id); |
279 | 0 | return congestionLimited(std::move(packet)); |
280 | 0 | case NGTCP2_ERR_STREAM_DATA_BLOCKED: { |
281 | 0 | session().StreamDataBlocked(stream_data.id); |
282 | 0 | if (session().max_data_left() == 0) { |
283 | 0 | if (stream_data.id >= 0) ResumeStream(stream_data.id); |
284 | 0 | return congestionLimited(std::move(packet)); |
285 | 0 | } |
286 | 0 | CHECK_LE(ndatalen, 0); |
287 | 0 | continue; |
288 | 0 | } |
289 | 0 | case NGTCP2_ERR_STREAM_SHUT_WR: { |
290 | | // Indicates that the writable side of the stream has been closed |
291 | | // locally or the stream is being reset. In either case, we can't send |
292 | | // any stream data! |
293 | 0 | CHECK_GE(stream_data.id, 0); |
294 | | // We need to notify the stream that the writable side has been closed |
295 | | // and no more outbound data can be sent. |
296 | 0 | CHECK_LE(ndatalen, 0); |
297 | 0 | auto stream = session_->FindStream(stream_data.id); |
298 | 0 | if (stream) stream->EndWritable(); |
299 | 0 | continue; |
300 | 0 | } |
301 | 0 | case NGTCP2_ERR_WRITE_MORE: { |
302 | 0 | CHECK_GT(ndatalen, 0); |
303 | 0 | if (!StreamCommit(&stream_data, ndatalen)) return session_->Close(); |
304 | 0 | pos += ndatalen; |
305 | 0 | continue; |
306 | 0 | } |
307 | 0 | } |
308 | | |
309 | 0 | packet->Done(UV_ECANCELED); |
310 | 0 | session_->last_error_ = QuicError::ForNgtcp2Error(nwrite); |
311 | 0 | return session_->Close(Session::CloseMethod::SILENT); |
312 | 0 | } |
313 | | |
314 | 0 | pos += nwrite; |
315 | 0 | if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) { |
316 | | // Since we are closing the session here, we don't worry about updating |
317 | | // the pkt tx time. The failed StreamCommit should have updated the |
318 | | // last_error_ appropriately. |
319 | 0 | packet->Done(UV_ECANCELED); |
320 | 0 | return session_->Close(Session::CloseMethod::SILENT); |
321 | 0 | } |
322 | | |
323 | 0 | if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id); |
324 | |
|
325 | 0 | packet->Truncate(nwrite); |
326 | 0 | session_->Send(std::move(packet), path); |
327 | |
|
328 | 0 | pos = nullptr; |
329 | |
|
330 | 0 | if (++packetSendCount == maxPacketCount) { |
331 | 0 | break; |
332 | 0 | } |
333 | 0 | } |
334 | | |
335 | 0 | updateTimer(); |
336 | 0 | } |
337 | | |
338 | | ssize_t Session::Application::WriteVStream(PathStorage* path, |
339 | | uint8_t* buf, |
340 | | ssize_t* ndatalen, |
341 | 0 | const StreamData& stream_data) { |
342 | 0 | CHECK_LE(stream_data.count, kMaxVectorCount); |
343 | 0 | uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE; |
344 | 0 | if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE; |
345 | 0 | if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; |
346 | 0 | ssize_t ret = ngtcp2_conn_writev_stream( |
347 | 0 | *session_, |
348 | 0 | &path->path, |
349 | 0 | nullptr, |
350 | 0 | buf, |
351 | 0 | ngtcp2_conn_get_max_tx_udp_payload_size(*session_), |
352 | 0 | ndatalen, |
353 | 0 | flags, |
354 | 0 | stream_data.id, |
355 | 0 | stream_data.buf, |
356 | 0 | stream_data.count, |
357 | 0 | uv_hrtime()); |
358 | 0 | return ret; |
359 | 0 | } |
360 | | |
361 | | // The DefaultApplication is the default implementation of Session::Application |
362 | | // that is used for all unrecognized ALPN identifiers. |
363 | | class DefaultApplication final : public Session::Application { |
364 | | public: |
365 | | // Marked NOLINT because the cpp linter gets confused about this using |
366 | | // statement not being sorted with the using v8 statements at the top |
367 | | // of the namespace. |
368 | | using Application::Application; // NOLINT |
369 | | |
370 | | bool ReceiveStreamData(Stream* stream, |
371 | | const uint8_t* data, |
372 | | size_t datalen, |
373 | 0 | Stream::ReceiveDataFlags flags) override { |
374 | 0 | Debug(&session(), "Default application receiving stream data"); |
375 | 0 | DCHECK_NOT_NULL(stream); |
376 | 0 | if (!stream->is_destroyed()) stream->ReceiveData(data, datalen, flags); |
377 | 0 | return true; |
378 | 0 | } |
379 | | |
380 | 0 | int GetStreamData(StreamData* stream_data) override { |
381 | 0 | Debug(&session(), "Default application getting stream data"); |
382 | 0 | DCHECK_NOT_NULL(stream_data); |
383 | | // If the queue is empty, there aren't any streams with data yet |
384 | 0 | if (stream_queue_.IsEmpty()) return 0; |
385 | | |
386 | 0 | const auto get_length = [](auto vec, size_t count) { |
387 | 0 | CHECK_NOT_NULL(vec); |
388 | 0 | size_t len = 0; |
389 | 0 | for (size_t n = 0; n < count; n++) len += vec[n].len; |
390 | 0 | return len; |
391 | 0 | }; |
392 | |
|
393 | 0 | Stream* stream = stream_queue_.PopFront(); |
394 | 0 | CHECK_NOT_NULL(stream); |
395 | 0 | stream_data->stream.reset(stream); |
396 | 0 | stream_data->id = stream->id(); |
397 | 0 | auto next = |
398 | 0 | [&](int status, const ngtcp2_vec* data, size_t count, bob::Done done) { |
399 | 0 | switch (status) { |
400 | 0 | case bob::Status::STATUS_BLOCK: |
401 | | // Fall through |
402 | 0 | case bob::Status::STATUS_WAIT: |
403 | 0 | return; |
404 | 0 | case bob::Status::STATUS_EOS: |
405 | 0 | stream_data->fin = 1; |
406 | 0 | } |
407 | | |
408 | 0 | stream_data->count = count; |
409 | |
|
410 | 0 | if (count > 0) { |
411 | 0 | stream->Schedule(&stream_queue_); |
412 | 0 | stream_data->remaining = get_length(data, count); |
413 | 0 | } else { |
414 | 0 | stream_data->remaining = 0; |
415 | 0 | } |
416 | | |
417 | | // Not calling done here because we defer committing |
418 | | // the data until after we're sure it's written. |
419 | 0 | }; |
420 | |
|
421 | 0 | if (LIKELY(!stream->is_eos())) { |
422 | 0 | int ret = stream->Pull(std::move(next), |
423 | 0 | bob::Options::OPTIONS_SYNC, |
424 | 0 | stream_data->data, |
425 | 0 | arraysize(stream_data->data), |
426 | 0 | kMaxVectorCount); |
427 | 0 | if (ret == bob::Status::STATUS_EOS) { |
428 | 0 | stream_data->fin = 1; |
429 | 0 | } |
430 | 0 | } else { |
431 | 0 | stream_data->fin = 1; |
432 | 0 | } |
433 | |
|
434 | 0 | return 0; |
435 | 0 | } |
436 | | |
437 | 0 | void ResumeStream(int64_t id) override { |
438 | 0 | Debug(&session(), "Default application resuming stream %" PRIi64, id); |
439 | 0 | ScheduleStream(id); |
440 | 0 | } |
441 | | |
442 | 0 | bool ShouldSetFin(const StreamData& stream_data) override { |
443 | 0 | auto const is_empty = [](auto vec, size_t cnt) { |
444 | 0 | size_t i; |
445 | 0 | for (i = 0; i < cnt && vec[i].len == 0; ++i) { |
446 | 0 | } |
447 | 0 | return i == cnt; |
448 | 0 | }; |
449 | |
|
450 | 0 | return stream_data.stream && is_empty(stream_data.buf, stream_data.count); |
451 | 0 | } |
452 | | |
453 | 0 | bool StreamCommit(StreamData* stream_data, size_t datalen) override { |
454 | 0 | Debug(&session(), "Default application committing stream data"); |
455 | 0 | DCHECK_NOT_NULL(stream_data); |
456 | 0 | const auto consume = [](ngtcp2_vec** pvec, size_t* pcnt, size_t len) { |
457 | 0 | ngtcp2_vec* v = *pvec; |
458 | 0 | size_t cnt = *pcnt; |
459 | |
|
460 | 0 | for (; cnt > 0; --cnt, ++v) { |
461 | 0 | if (v->len > len) { |
462 | 0 | v->len -= len; |
463 | 0 | v->base += len; |
464 | 0 | break; |
465 | 0 | } |
466 | 0 | len -= v->len; |
467 | 0 | } |
468 | |
|
469 | 0 | *pvec = v; |
470 | 0 | *pcnt = cnt; |
471 | 0 | }; |
472 | |
|
473 | 0 | CHECK(stream_data->stream); |
474 | 0 | stream_data->remaining -= datalen; |
475 | 0 | consume(&stream_data->buf, &stream_data->count, datalen); |
476 | 0 | stream_data->stream->Commit(datalen); |
477 | 0 | return true; |
478 | 0 | } |
479 | | |
480 | | SET_SELF_SIZE(DefaultApplication) |
481 | | SET_MEMORY_INFO_NAME(DefaultApplication) |
482 | | SET_NO_MEMORY_INFO() |
483 | | |
484 | | private: |
485 | 0 | void ScheduleStream(int64_t id) { |
486 | 0 | Debug(&session(), "Default application scheduling stream %" PRIi64, id); |
487 | 0 | auto stream = session().FindStream(id); |
488 | 0 | if (stream && !stream->is_destroyed()) { |
489 | 0 | stream->Schedule(&stream_queue_); |
490 | 0 | } |
491 | 0 | } |
492 | | |
493 | 0 | void UnscheduleStream(int64_t id) { |
494 | 0 | Debug(&session(), "Default application unscheduling stream %" PRIi64, id); |
495 | 0 | auto stream = session().FindStream(id); |
496 | 0 | if (stream && !stream->is_destroyed()) stream->Unschedule(); |
497 | 0 | } |
498 | | |
499 | | Stream::Queue stream_queue_; |
500 | | }; |
501 | | |
502 | 0 | std::unique_ptr<Session::Application> Session::select_application() { |
503 | | // In the future, we may end up supporting additional QUIC protocols. As they |
504 | | // are added, extend the cases here to create and return them. |
505 | |
|
506 | 0 | if (config_.options.tls_options.alpn == NGHTTP3_ALPN_H3) { |
507 | 0 | Debug(this, "Selecting HTTP/3 application"); |
508 | 0 | return createHttp3Application(this, config_.options.application_options); |
509 | 0 | } |
510 | | |
511 | 0 | Debug(this, "Selecting default application"); |
512 | 0 | return std::make_unique<DefaultApplication>( |
513 | 0 | this, config_.options.application_options); |
514 | 0 | } |
515 | | |
516 | | } // namespace quic |
517 | | } // namespace node |
518 | | |
519 | | #endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC |