Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/http/codec_client.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/http/codec_client.h"
2
3
#include <cstdint>
4
#include <memory>
5
6
#include "envoy/http/codec.h"
7
8
#include "source/common/common/enum_to_int.h"
9
#include "source/common/config/utility.h"
10
#include "source/common/http/exception.h"
11
#include "source/common/http/http1/codec_impl.h"
12
#include "source/common/http/http2/codec_impl.h"
13
#include "source/common/http/status.h"
14
#include "source/common/http/utility.h"
15
16
#ifdef ENVOY_ENABLE_QUIC
17
#include "source/common/quic/client_codec_impl.h"
18
#endif
19
20
namespace Envoy {
21
namespace Http {
22
23
CodecClient::CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
24
                         Upstream::HostDescriptionConstSharedPtr host,
25
                         Event::Dispatcher& dispatcher)
26
    : type_(type), host_(host), connection_(std::move(connection)),
27
22.1k
      idle_timeout_(host_->cluster().idleTimeout()) {
28
22.1k
  if (type_ != CodecType::HTTP3) {
29
    // Make sure upstream connections process data and then the FIN, rather than processing
30
    // TCP disconnects immediately. (see https://github.com/envoyproxy/envoy/issues/1679 for
31
    // details)
32
22.1k
    connection_->detectEarlyCloseWhenReadDisabled(false);
33
22.1k
  }
34
22.1k
  connection_->addConnectionCallbacks(*this);
35
22.1k
  connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});
36
37
22.1k
  if (idle_timeout_) {
38
911
    idle_timer_ = dispatcher.createTimer([this]() -> void { onIdleTimeout(); });
39
911
    enableIdleTimer();
40
911
  }
41
42
  // We just universally set no delay on connections. Theoretically we might at some point want
43
  // to make this configurable.
44
22.1k
  connection_->noDelay(true);
45
22.1k
}
46
47
22.1k
void CodecClient::connect() {
48
22.1k
  ASSERT(!connect_called_);
49
22.1k
  connect_called_ = true;
50
22.1k
  ASSERT(codec_ != nullptr);
51
  // In general, codecs are handed new not-yet-connected connections, but in the
52
  // case of ALPN, the codec may be handed an already connected connection.
53
22.1k
  if (!connection_->connecting()) {
54
20.4k
    ASSERT(connection_->state() == Network::Connection::State::Open);
55
20.4k
    connected_ = true;
56
20.4k
  } else {
57
1.72k
    ENVOY_CONN_LOG(debug, "connecting", *connection_);
58
1.72k
    connection_->connect();
59
1.72k
  }
60
22.1k
}
61
62
19.6k
void CodecClient::close(Network::ConnectionCloseType type) { connection_->close(type); }
63
64
24.9k
void CodecClient::deleteRequest(ActiveRequest& request) {
65
24.9k
  connection_->dispatcher().deferredDelete(request.removeFromList(active_requests_));
66
24.9k
  if (codec_client_callbacks_) {
67
2.55k
    codec_client_callbacks_->onStreamDestroy();
68
2.55k
  }
69
24.9k
  if (numActiveRequests() == 0) {
70
24.8k
    enableIdleTimer();
71
24.8k
  }
72
24.9k
}
73
74
24.9k
RequestEncoder& CodecClient::newStream(ResponseDecoder& response_decoder) {
75
24.9k
  ActiveRequestPtr request(new ActiveRequest(*this, response_decoder));
76
24.9k
  request->setEncoder(codec_->newStream(*request));
77
24.9k
  LinkedList::moveIntoList(std::move(request), active_requests_);
78
79
24.9k
  auto upstream_info = connection_->streamInfo().upstreamInfo();
80
24.9k
  upstream_info->setUpstreamNumStreams(upstream_info->upstreamNumStreams() + 1);
81
82
24.9k
  disableIdleTimer();
83
24.9k
  return *active_requests_.front();
84
24.9k
}
85
86
24.8k
void CodecClient::onEvent(Network::ConnectionEvent event) {
87
24.8k
  if (event == Network::ConnectionEvent::Connected) {
88
2.62k
    ENVOY_CONN_LOG(debug, "connected", *connection_);
89
2.62k
    connected_ = true;
90
2.62k
    return;
91
2.62k
  }
92
93
22.1k
  if (event == Network::ConnectionEvent::RemoteClose) {
94
969
    remote_closed_ = true;
95
969
  }
96
97
  // HTTP/1 can signal end of response by disconnecting. We need to handle that case.
98
22.1k
  if (type_ == CodecType::HTTP1 && event == Network::ConnectionEvent::RemoteClose &&
99
22.1k
      !active_requests_.empty()) {
100
514
    Buffer::OwnedImpl empty;
101
514
    onData(empty);
102
514
  }
103
104
22.1k
  if (event == Network::ConnectionEvent::RemoteClose ||
105
22.1k
      event == Network::ConnectionEvent::LocalClose) {
106
22.1k
    ENVOY_CONN_LOG(debug, "disconnect. resetting {} pending requests", *connection_,
107
22.1k
                   active_requests_.size());
108
22.1k
    disableIdleTimer();
109
22.1k
    idle_timer_.reset();
110
22.1k
    StreamResetReason reason = event == Network::ConnectionEvent::RemoteClose
111
22.1k
                                   ? StreamResetReason::RemoteConnectionFailure
112
22.1k
                                   : StreamResetReason::LocalConnectionFailure;
113
22.1k
    if (connected_) {
114
22.1k
      reason = StreamResetReason::ConnectionTermination;
115
22.1k
      if (protocol_error_) {
116
68
        reason = StreamResetReason::ProtocolError;
117
68
        connection_->streamInfo().setResponseFlag(
118
68
            StreamInfo::CoreResponseFlag::UpstreamProtocolError);
119
68
      }
120
22.1k
    }
121
27.6k
    while (!active_requests_.empty()) {
122
      // Fake resetting all active streams so that reset() callbacks get invoked.
123
5.49k
      active_requests_.front()->getStream().resetStream(reason);
124
5.49k
    }
125
22.1k
  }
126
22.1k
}
127
128
5.23k
void CodecClient::responsePreDecodeComplete(ActiveRequest& request) {
129
5.23k
  ENVOY_CONN_LOG(debug, "response complete", *connection_);
130
5.23k
  if (codec_client_callbacks_) {
131
1.70k
    codec_client_callbacks_->onStreamPreDecodeComplete();
132
1.70k
  }
133
5.23k
  request.decode_complete_ = true;
134
5.23k
  if (request.encode_complete_ || !request.wait_encode_complete_) {
135
5.13k
    completeRequest(request);
136
5.13k
  } else {
137
102
    ENVOY_CONN_LOG(debug, "waiting for encode to complete", *connection_);
138
102
  }
139
5.23k
}
140
141
24.8k
void CodecClient::requestEncodeComplete(ActiveRequest& request) {
142
24.8k
  ENVOY_CONN_LOG(debug, "encode complete", *connection_);
143
24.8k
  request.encode_complete_ = true;
144
24.8k
  if (request.decode_complete_) {
145
0
    completeRequest(request);
146
0
  }
147
24.8k
}
148
149
5.13k
void CodecClient::completeRequest(ActiveRequest& request) {
150
5.13k
  deleteRequest(request);
151
152
  // HTTP/2 can send us a reset after a complete response if the request was not complete. Users
153
  // of CodecClient will deal with the premature response case and we should not handle any
154
  // further reset notification.
155
5.13k
  request.removeEncoderCallbacks();
156
5.13k
}
157
158
19.8k
void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) {
159
19.8k
  ENVOY_CONN_LOG(debug, "request reset", *connection_);
160
19.8k
  if (codec_client_callbacks_) {
161
953
    codec_client_callbacks_->onStreamReset(reason);
162
953
  }
163
164
19.8k
  deleteRequest(request);
165
19.8k
}
166
167
7.30k
void CodecClient::onData(Buffer::Instance& data) {
168
7.30k
  const Status status = codec_->dispatch(data);
169
170
7.30k
  if (!status.ok()) {
171
486
    ENVOY_CONN_LOG(debug, "Error dispatching received data: {}", *connection_, status.message());
172
173
    // Don't count 408 responses where we have no active requests as protocol errors
174
486
    if (!isPrematureResponseError(status) ||
175
486
        (!active_requests_.empty() ||
176
486
         getPrematureResponseHttpCode(status) != Code::RequestTimeout)) {
177
486
      host_->cluster().trafficStats()->upstream_cx_protocol_error_.inc();
178
486
      protocol_error_ = true;
179
486
    }
180
486
    close();
181
486
  }
182
183
  // All data should be consumed at this point if the connection remains open.
184
7.30k
  ASSERT(data.length() == 0 || connection_->state() != Network::Connection::State::Open,
185
7.30k
         absl::StrCat("extraneous bytes after response complete: ", data.length()));
186
7.30k
}
187
188
24.9k
Status CodecClient::ActiveRequest::encodeHeaders(const RequestHeaderMap& headers, bool end_stream) {
189
#ifdef ENVOY_ENABLE_UHV
190
  if (header_validator_) {
191
    bool failure = false;
192
    std::string failure_details;
193
    auto transformation_result = header_validator_->transformRequestHeaders(headers);
194
    if (!transformation_result.status.ok()) {
195
      ENVOY_CONN_LOG(debug, "Request header transformation failed: {}\n{}", *parent_.connection_,
196
                     transformation_result.status.details(), headers);
197
      failure = true;
198
      failure_details = std::string(transformation_result.status.details());
199
    } else {
200
      // Validate header map after request encoder transformations
201
      const ::Envoy::Http::HeaderValidator::ValidationResult validation_result =
202
          header_validator_->validateRequestHeaders(
203
              transformation_result.new_headers ? *transformation_result.new_headers : headers);
204
      if (!validation_result.ok()) {
205
        ENVOY_CONN_LOG(debug, "Request header validation failed: {}\n{}", *parent_.connection_,
206
                       validation_result.details(),
207
                       transformation_result.new_headers ? *transformation_result.new_headers
208
                                                         : headers);
209
        failure = true;
210
        failure_details = std::string(validation_result.details());
211
      }
212
    }
213
    if (failure) {
214
      return absl::InvalidArgumentError(
215
          absl::StrCat("header validation failed: ", failure_details));
216
    }
217
    return RequestEncoderWrapper::encodeHeaders(
218
        transformation_result.new_headers ? *transformation_result.new_headers : headers,
219
        end_stream);
220
  }
221
#endif
222
24.9k
  return RequestEncoderWrapper::encodeHeaders(headers, end_stream);
223
24.9k
}
224
225
7.14k
void CodecClient::ActiveRequest::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
226
#ifdef ENVOY_ENABLE_UHV
227
  if (header_validator_) {
228
    const ::Envoy::Http::HeaderValidator::ValidationResult validation_result =
229
        header_validator_->validateResponseHeaders(*headers);
230
    bool failure = !validation_result.ok();
231
    std::string failure_details(validation_result.details());
232
    if (!failure) {
233
      const ::Envoy::Http::ClientHeaderValidator::TransformationResult transformation_result =
234
          header_validator_->transformResponseHeaders(*headers);
235
      failure = !transformation_result.ok();
236
      failure_details = std::string(validation_result.details());
237
    }
238
    if (failure) {
239
      ENVOY_CONN_LOG(debug, "Response header validation failed: {}\n{}", *parent_.connection_,
240
                     failure_details, *headers);
241
      if ((parent_.codec_->protocol() == Protocol::Http2 &&
242
           !parent_.host_->cluster()
243
                .http2Options()
244
                .override_stream_error_on_invalid_http_message()
245
                .value()) ||
246
          (parent_.codec_->protocol() == Protocol::Http3 &&
247
           !parent_.host_->cluster()
248
                .http3Options()
249
                .override_stream_error_on_invalid_http_message()
250
                .value())) {
251
        parent_.protocol_error_ = true;
252
        parent_.close();
253
      } else {
254
        inner_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
255
      }
256
      return;
257
    }
258
  }
259
#endif
260
7.14k
  ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
261
7.14k
}
262
263
CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
264
                                 Upstream::HostDescriptionConstSharedPtr host,
265
                                 Event::Dispatcher& dispatcher,
266
                                 Random::RandomGenerator& random_generator,
267
                                 const Network::TransportSocketOptionsConstSharedPtr& options,
268
                                 bool should_connect)
269
1.72k
    : CodecClient(type, std::move(connection), host, dispatcher) {
270
1.72k
  switch (type) {
271
30
  case CodecType::HTTP1: {
272
    // If the transport socket indicates this is being proxied, inform the HTTP/1.1 codec. It will
273
    // send fully qualified URLs iff the underlying transport is plaintext.
274
30
    bool proxied = false;
275
30
    if (options && options->http11ProxyInfo().has_value()) {
276
0
      proxied = true;
277
0
    }
278
30
    codec_ = std::make_unique<Http1::ClientConnectionImpl>(
279
30
        *connection_, host->cluster().http1CodecStats(), *this, host->cluster().http1Settings(),
280
30
        host->cluster().maxResponseHeadersCount(), proxied);
281
30
    break;
282
0
  }
283
1.69k
  case CodecType::HTTP2:
284
1.69k
    codec_ = std::make_unique<Http2::ClientConnectionImpl>(
285
1.69k
        *connection_, *this, host->cluster().http2CodecStats(), random_generator,
286
1.69k
        host->cluster().http2Options(), Http::DEFAULT_MAX_REQUEST_HEADERS_KB,
287
1.69k
        host->cluster().maxResponseHeadersCount(), Http2::ProdNghttp2SessionFactory::get());
288
1.69k
    break;
289
0
  case CodecType::HTTP3: {
290
0
#ifdef ENVOY_ENABLE_QUIC
291
0
    auto& quic_session = dynamic_cast<Quic::EnvoyQuicClientSession&>(*connection_);
292
0
    codec_ = std::make_unique<Quic::QuicHttpClientConnectionImpl>(
293
0
        quic_session, *this, host->cluster().http3CodecStats(), host->cluster().http3Options(),
294
0
        Http::DEFAULT_MAX_REQUEST_HEADERS_KB, host->cluster().maxResponseHeadersCount());
295
    // Initialize the session after max request header size is changed in above http client
296
    // connection creation.
297
0
    quic_session.Initialize();
298
0
    break;
299
#else
300
    // Should be blocked by configuration checking at an earlier point.
301
    PANIC("unexpected");
302
#endif
303
0
  }
304
1.72k
  }
305
1.72k
  if (should_connect) {
306
1.72k
    connect();
307
1.72k
  }
308
1.72k
}
309
310
} // namespace Http
311
} // namespace Envoy