1
#include "source/common/quic/envoy_quic_stream.h"
2

            
3
#include "source/common/http/utility.h"
4

            
5
#include "quiche/quic/core/http/http_encoder.h"
6
#include "quiche/quic/core/qpack/qpack_encoder.h"
7
#include "quiche/quic/core/qpack/qpack_instruction_encoder.h"
8

            
9
namespace Envoy {
10
namespace Quic {
11

            
12
12528
void EnvoyQuicStream::encodeData(Buffer::Instance& data, bool end_stream) {
13
12528
  ENVOY_STREAM_LOG(debug, "encodeData (end_stream={}) of {} bytes.", *this, end_stream,
14
12528
                   data.length());
15
12528
  const bool has_data = data.length() > 0;
16
12528
  if (!has_data && !end_stream) {
17
25
    return;
18
25
  }
19
12503
  if (quic_stream_.write_side_closed()) {
20
2
    IS_ENVOY_BUG(fmt::format("encodeData is called on write-closed stream. {}", quicStreamState()));
21
2
    return;
22
2
  }
23
12501
  ASSERT(!local_end_stream_);
24
12501
  local_end_stream_ = end_stream;
25
12501
  SendBufferMonitor::ScopedWatermarkBufferUpdater updater(&quic_stream_, this);
26
12501
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
27
12501
  if (http_datagram_handler_) {
28
16
    IncrementalBytesSentTracker tracker(quic_stream_, *mutableBytesMeter(), false);
29
16
    if (!http_datagram_handler_->encodeCapsuleFragment(data.toString(), end_stream)) {
30
      quic_stream_.Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
31
      return;
32
    }
33
12499
  } else {
34
12485
#endif
35
12485
    Buffer::RawSliceVector raw_slices = data.getRawSlices();
36
12485
    absl::InlinedVector<quiche::QuicheMemSlice, 4> quic_slices;
37
12485
    quic_slices.reserve(raw_slices.size());
38
16070
    for (auto& slice : raw_slices) {
39
15521
      ASSERT(slice.len_ != 0);
40
      // Move each slice into a stand-alone buffer.
41
      // TODO(danzh): investigate the cost of allocating one buffer per slice.
42
      // If it turns out to be expensive, add a new function to free data in the middle in buffer
43
      // interface and re-design QuicheMemSliceImpl.
44
15521
      auto single_slice_buffer = std::make_unique<Buffer::OwnedImpl>();
45
15521
      single_slice_buffer->move(data, slice.len_);
46
15521
      quic_slices.emplace_back(
47
15521
          reinterpret_cast<char*>(slice.mem_), slice.len_,
48
15521
          [single_slice_buffer = std::move(single_slice_buffer)](absl::string_view) mutable {
49
            // Free this memory explicitly when the callback is invoked.
50
15521
            single_slice_buffer = nullptr;
51
15521
          });
52
15521
    }
53
12485
    quic::QuicConsumedData result{0, false};
54
12485
    absl::Span<quiche::QuicheMemSlice> span(quic_slices);
55
12485
    {
56
12485
      IncrementalBytesSentTracker tracker(quic_stream_, *mutableBytesMeter(), false);
57
12485
      result = quic_stream_.WriteBodySlices(span, end_stream);
58
12485
      if (stats_gatherer_ != nullptr) {
59
5117
        stats_gatherer_->addBytesSent(result.bytes_consumed, end_stream);
60
5117
      }
61
12485
    }
62
    // QUIC stream must take all.
63
12485
    if (result.bytes_consumed == 0 && has_data) {
64
      IS_ENVOY_BUG(fmt::format("Send buffer didn't take all the data. Stream is write {} with {} "
65
                               "bytes in send buffer. Current write was rejected.",
66
                               quic_stream_.write_side_closed() ? "closed" : "open",
67
                               quic_stream_.BufferedDataBytes()));
68
      quic_stream_.Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
69
      return;
70
    }
71
12485
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
72
12485
  }
73
12501
#endif
74
12501
  if (local_end_stream_) {
75
2869
    if (codec_callbacks_) {
76
833
      codec_callbacks_->onCodecEncodeComplete();
77
833
    }
78
2869
    onLocalEndStream();
79
2869
  }
80
12501
}
81

            
82
182
void EnvoyQuicStream::encodeTrailersImpl(quiche::HttpHeaderBlock&& trailers) {
83
182
  if (quic_stream_.write_side_closed()) {
84
2
    IS_ENVOY_BUG("encodeTrailers is called on write-closed stream.");
85
2
    return;
86
2
  }
87
180
  ASSERT(!local_end_stream_);
88
180
  local_end_stream_ = true;
89

            
90
180
  SendBufferMonitor::ScopedWatermarkBufferUpdater updater(&quic_stream_, this);
91
180
  {
92
180
    IncrementalBytesSentTracker tracker(quic_stream_, *mutableBytesMeter(), true);
93
180
    size_t bytes_sent = quic_stream_.WriteTrailers(std::move(trailers), nullptr);
94
180
    ENVOY_BUG(bytes_sent != 0, "Failed to encode trailers.");
95
180
    if (stats_gatherer_ != nullptr) {
96
74
      stats_gatherer_->addBytesSent(bytes_sent, true);
97
74
    }
98
180
  }
99
180
  if (codec_callbacks_) {
100
22
    codec_callbacks_->onCodecEncodeComplete();
101
22
  }
102
180
  onLocalEndStream();
103
180
}
104

            
105
std::unique_ptr<Http::MetadataMap>
106
1006
EnvoyQuicStream::metadataMapFromHeaderList(const quic::QuicHeaderList& header_list) {
107
1006
  auto metadata_map = std::make_unique<Http::MetadataMap>();
108
1262
  for (const auto& [key, value] : header_list) {
109
1262
    (*metadata_map)[key] = value;
110
1262
  }
111
1006
  return metadata_map;
112
1006
}
113

            
114
namespace {
115

            
116
// Returns a new `unique_ptr<char[]>` containing the characters copied from `str`.
117
3102
std::unique_ptr<char[]> dataFromString(const std::string& str) {
118
3102
  auto data = std::make_unique<char[]>(str.length());
119
3102
  memcpy(&data[0], str.data(), str.length()); // NOLINT(safe-memcpy)
120
3102
  return data;
121
3102
}
122

            
123
void serializeMetadata(const Http::MetadataMapPtr& metadata, quic::QuicStreamId id,
124
1551
                       absl::InlinedVector<quiche::QuicheMemSlice, 2>& slices) {
125
1551
  quic::NoopDecoderStreamErrorDelegate decoder_stream_error_delegate;
126
1551
  quic::QpackEncoder qpack_encoder(&decoder_stream_error_delegate, quic::HuffmanEncoding::kDisabled,
127
1551
                                   quic::CookieCrumbling::kDisabled);
128

            
129
1551
  quiche::HttpHeaderBlock header_block;
130
1807
  for (const auto& [key, value] : *metadata) {
131
1807
    header_block.AppendValueOrAddHeader(key, value);
132
1807
  }
133

            
134
  // The METADATA frame consist of a frame header, which includes payload
135
  // length, and a payload, which is the QPACK-encoded metadata block. In order
136
  // to generate the frame header, the payload needs to be generated first.
137
1551
  std::string metadata_frame_payload =
138
1551
      qpack_encoder.EncodeHeaderList(id, header_block,
139
1551
                                     /* encoder_stream_sent_byte_count = */ nullptr);
140
1551
  std::string metadata_frame_header =
141
1551
      quic::HttpEncoder::SerializeMetadataFrameHeader(metadata_frame_payload.size());
142

            
143
1551
  slices.emplace_back(dataFromString(metadata_frame_header), metadata_frame_header.length());
144
1551
  slices.emplace_back(dataFromString(metadata_frame_payload), metadata_frame_payload.length());
145
1551
}
146

            
147
} // namespace
148

            
149
1727
void EnvoyQuicStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) {
150
1727
  if (!http3_options_.allow_metadata()) {
151
174
    ENVOY_STREAM_LOG(debug, "METADATA not supported by config.", *this);
152
174
    stats_.metadata_not_supported_error_.inc();
153
174
    return;
154
174
  }
155
1553
  if (quic_stream_.write_side_closed()) {
156
2
    return;
157
2
  }
158
1551
  ASSERT(!local_end_stream_);
159

            
160
1551
  for (const Http::MetadataMapPtr& metadata : metadata_map_vector) {
161
1551
    absl::InlinedVector<quiche::QuicheMemSlice, 2> quic_slices;
162
1551
    quic_slices.reserve(2);
163
1551
    serializeMetadata(metadata, quic_stream_.id(), quic_slices);
164
1551
    absl::Span<quiche::QuicheMemSlice> metadata_frame(quic_slices);
165

            
166
1551
    SendBufferMonitor::ScopedWatermarkBufferUpdater updater(&quic_stream_, this);
167
1551
    quic::QuicConsumedData result{0, false};
168
1551
    {
169
1551
      IncrementalBytesSentTracker tracker(quic_stream_, *mutableBytesMeter(), false);
170
1551
      result = quic_stream_.WriteMemSlices(metadata_frame, /*end_stream=*/false);
171
1551
    }
172
    // QUIC stream must take all.
173
1551
    if (result.bytes_consumed == 0) {
174
      IS_ENVOY_BUG(fmt::format("Send buffer didn't take all the data. Stream is write {} with {} "
175
                               "bytes in send buffer. Current write was rejected.",
176
                               quic_stream_.write_side_closed() ? "closed" : "open",
177
                               quic_stream_.BufferedDataBytes()));
178
      quic_stream_.Reset(quic::QUIC_ERROR_PROCESSING_STREAM);
179
      return;
180
    }
181
1551
    if (!quic_session_.connection()->connected()) {
182
      // Return early if sending METADATA caused the connection to close.
183
      return;
184
    }
185
1551
  }
186
1551
}
187

            
188
3
std::string EnvoyQuicStream::quicStreamState() {
189
3
  return fmt::format(
190
3
      "QUIC stream state: local_end_stream_ {}, rst_received "
191
3
      "{}, rst_sent {}, fin_received {}, fin_sent {}, fin_buffered {}, fin_outstanding {}, "
192
3
      "stream_error {}, connection_error {}, connection connected: {}.",
193
3
      local_end_stream_, quic_stream_.rst_received(), quic_stream_.rst_sent(),
194
3
      quic_stream_.fin_received(), quic_stream_.fin_sent(), quic_stream_.fin_buffered(),
195
3
      quic_stream_.fin_outstanding(),
196
3
      quic::QuicRstStreamErrorCodeToString(quic_stream_.stream_error()),
197
3
      quic::QuicErrorCodeToString(quic_stream_.connection_error()),
198
3
      quic_session_.connection()->connected());
199
3
}
200

            
201
} // namespace Quic
202
} // namespace Envoy