1
#include "source/common/http/codec_client.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/http/codec.h"
7

            
8
#include "source/common/common/enum_to_int.h"
9
#include "source/common/config/utility.h"
10
#include "source/common/http/exception.h"
11
#include "source/common/http/http1/codec_impl.h"
12
#include "source/common/http/http2/codec_impl.h"
13
#include "source/common/http/status.h"
14
#include "source/common/http/utility.h"
15
#include "source/common/runtime/runtime_features.h"
16

            
17
#ifdef ENVOY_ENABLE_QUIC
18
#include "source/common/quic/client_codec_impl.h"
19
#endif
20

            
21
namespace Envoy {
22
namespace Http {
23

            
24
CodecClient::CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
25
                         Upstream::HostDescriptionConstSharedPtr host,
26
                         Event::Dispatcher& dispatcher)
27
51234
    : type_(type), host_(host), connection_(std::move(connection)),
28
51234
      idle_timeout_(host_->cluster().idleTimeout()),
29
51234
      enable_idle_timer_only_when_connected_(Runtime::runtimeFeatureEnabled(
30
51234
          "envoy.reloadable_features.codec_client_enable_idle_timer_only_when_connected")) {
31
51234
  if (type_ != CodecType::HTTP3) {
32
    // Make sure upstream connections process data and then the FIN, rather than processing
33
    // TCP disconnects immediately. (see https://github.com/envoyproxy/envoy/issues/1679 for
34
    // details)
35
48291
    connection_->detectEarlyCloseWhenReadDisabled(false);
36
48291
  }
37
51234
  connection_->addConnectionCallbacks(*this);
38
51234
  connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});
39

            
40
51234
  if (idle_timeout_) {
41
28892
    idle_timer_ = dispatcher.createTimer([this]() -> void { onIdleTimeout(); });
42
    // If the runtime flag is disabled, start the idle timer immediately even when connection
43
    // is not yet established, restoring the old behavior.
44
28892
    if (!enable_idle_timer_only_when_connected_) {
45
2
      enableIdleTimer();
46
2
    }
47
28892
  }
48

            
49
  // We just universally set no delay on connections. Theoretically we might at some point want
50
  // to make this configurable.
51
51234
  connection_->noDelay(true);
52
51234
}
53

            
54
51233
void CodecClient::connect() {
55
51233
  ASSERT(!connect_called_);
56
51233
  connect_called_ = true;
57
51233
  ASSERT(codec_ != nullptr);
58
  // In general, codecs are handed new not-yet-connected connections, but in the
59
  // case of ALPN, the codec may be handed an already connected connection.
60
51233
  if (!connection_->connecting()) {
61
403
    ASSERT(connection_->state() == Network::Connection::State::Open);
62
403
    connected_ = true;
63
403
    enableIdleTimer();
64
51171
  } else {
65
50830
    ENVOY_CONN_LOG(debug, "connecting", *connection_);
66
50830
    connection_->connect();
67
50830
  }
68
51233
}
69

            
70
45242
void CodecClient::close(Network::ConnectionCloseType type, absl::string_view details) {
71
45242
  connection_->close(type, details);
72
45242
}
73

            
74
98029
void CodecClient::deleteRequest(ActiveRequest& request) {
75
98029
  connection_->dispatcher().deferredDelete(request.removeFromList(active_requests_));
76
98029
  if (codec_client_callbacks_) {
77
67436
    codec_client_callbacks_->onStreamDestroy();
78
67436
  }
79
98029
  if (numActiveRequests() == 0) {
80
70371
    enableIdleTimer();
81
70371
  }
82
98029
}
83

            
84
21205
RequestEncoder& CodecClient::newStream(ResponseDecoderHandlePtr response_decoder_handle) {
85
21205
  return enlistAndCreateEncoder(
86
21205
      std::make_unique<ActiveRequest>(*this, std::move(response_decoder_handle)));
87
21205
}
88

            
89
76824
RequestEncoder& CodecClient::newStream(ResponseDecoder& response_decoder) {
90
76824
  return enlistAndCreateEncoder(std::make_unique<ActiveRequest>(*this, response_decoder));
91
76824
}
92

            
93
98029
RequestEncoder& CodecClient::enlistAndCreateEncoder(ActiveRequestPtr request) {
94
98029
  request->setEncoder(codec_->newStream(*request));
95
98029
  LinkedList::moveIntoList(std::move(request), active_requests_);
96

            
97
98029
  auto upstream_info = connection_->streamInfo().upstreamInfo();
98
98029
  upstream_info->setUpstreamNumStreams(upstream_info->upstreamNumStreams() + 1);
99

            
100
98029
  disableIdleTimer();
101
98029
  return *active_requests_.front();
102
98029
}
103

            
104
102992
void CodecClient::onEvent(Network::ConnectionEvent event) {
105
102992
  if (event == Network::ConnectionEvent::Connected) {
106
50629
    ENVOY_CONN_LOG(debug, "connected", *connection_);
107
50629
    connected_ = true;
108
50629
    enableIdleTimer();
109
50629
    return;
110
50629
  }
111

            
112
52363
  if (event == Network::ConnectionEvent::RemoteClose) {
113
13228
    remote_closed_ = true;
114
13228
  }
115

            
116
  // HTTP/1 can signal end of response by disconnecting. We need to handle that case.
117
52363
  if (type_ == CodecType::HTTP1 && event == Network::ConnectionEvent::RemoteClose &&
118
52363
      !active_requests_.empty()) {
119
197
    Buffer::OwnedImpl empty;
120
197
    onData(empty);
121
197
  }
122

            
123
52363
  if (event == Network::ConnectionEvent::RemoteClose ||
124
52363
      event == Network::ConnectionEvent::LocalClose) {
125
51221
    ENVOY_CONN_LOG(debug, "disconnect. resetting {} pending requests", *connection_,
126
51221
                   active_requests_.size());
127
51221
    disableIdleTimer();
128
51221
    idle_timer_.reset();
129
51221
    StreamResetReason reason = event == Network::ConnectionEvent::RemoteClose
130
51221
                                   ? StreamResetReason::RemoteConnectionFailure
131
51221
                                   : StreamResetReason::LocalConnectionFailure;
132
51221
    if (connected_) {
133
50881
      reason = StreamResetReason::ConnectionTermination;
134
50881
      if (protocol_error_) {
135
176
        reason = StreamResetReason::ProtocolError;
136
176
        connection_->streamInfo().setResponseFlag(
137
176
            StreamInfo::CoreResponseFlag::UpstreamProtocolError);
138
176
      }
139
50881
    }
140
57602
    while (!active_requests_.empty()) {
141
      // Fake resetting all active streams so that reset() callbacks get invoked.
142
6381
      active_requests_.front()->getStream().resetStream(reason);
143
6381
    }
144
51221
  }
145
52363
}
146

            
147
82515
void CodecClient::responsePreDecodeComplete(ActiveRequest& request) {
148
82515
  ENVOY_CONN_LOG(debug, "response complete", *connection_);
149
82515
  if (codec_client_callbacks_) {
150
53212
    codec_client_callbacks_->onStreamPreDecodeComplete();
151
53212
  }
152
82515
  request.decode_complete_ = true;
153
82515
  if (request.encode_complete_ || !request.wait_encode_complete_) {
154
81577
    completeRequest(request);
155
81695
  } else {
156
938
    ENVOY_CONN_LOG(debug, "waiting for encode to complete", *connection_);
157
938
  }
158
82515
}
159

            
160
84942
void CodecClient::requestEncodeComplete(ActiveRequest& request) {
161
84942
  ENVOY_CONN_LOG(debug, "encode complete", *connection_);
162
84942
  request.encode_complete_ = true;
163
84942
  if (request.decode_complete_) {
164
93
    completeRequest(request);
165
93
  }
166
84942
}
167

            
168
81670
void CodecClient::completeRequest(ActiveRequest& request) {
169
81670
  deleteRequest(request);
170

            
171
  // HTTP/2 can send us a reset after a complete response if the request was not complete. Users
172
  // of CodecClient will deal with the premature response case and we should not handle any
173
  // further reset notification.
174
81670
  request.removeEncoderCallbacks();
175
81670
}
176

            
177
16359
void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) {
178
16359
  ENVOY_CONN_LOG(debug, "Request reset. Reason {}", *connection_, static_cast<int>(reason));
179
16359
  if (codec_client_callbacks_) {
180
15069
    codec_client_callbacks_->onStreamReset(reason);
181
15069
  }
182

            
183
16359
  deleteRequest(request);
184
16359
}
185

            
186
124323
void CodecClient::onData(Buffer::Instance& data) {
187
124323
  const Status status = codec_->dispatch(data);
188

            
189
124323
  if (!status.ok()) {
190
605
    ENVOY_CONN_LOG(debug, "Error dispatching received data: {}", *connection_, status.message());
191

            
192
    // Don't count 408 responses where we have no active requests as protocol errors.
193
    // Don't count graceful GOAWAY closes.
194
605
    const bool not_408 =
195
605
        !isPrematureResponseError(status) ||
196
605
        (!active_requests_.empty() || getPrematureResponseHttpCode(status) != Code::RequestTimeout);
197
605
    const bool is_goaway = isGoAwayGracefulCloseError(status);
198
605
    if (not_408 && !is_goaway) {
199
578
      host_->cluster().trafficStats()->upstream_cx_protocol_error_.inc();
200
578
      protocol_error_ = true;
201
578
    }
202
605
    close();
203
605
  }
204

            
205
  // All data should be consumed at this point if the connection remains open.
206
124323
  ASSERT(data.length() == 0 || connection_->state() != Network::Connection::State::Open,
207
124323
         absl::StrCat("extraneous bytes after response complete: ", data.length()));
208
124323
}
209

            
210
97396
Status CodecClient::ActiveRequest::encodeHeaders(const RequestHeaderMap& headers, bool end_stream) {
211
#ifdef ENVOY_ENABLE_UHV
212
  if (header_validator_) {
213
    bool failure = false;
214
    std::string failure_details;
215
    auto transformation_result = header_validator_->transformRequestHeaders(headers);
216
    if (!transformation_result.status.ok()) {
217
      ENVOY_CONN_LOG(debug, "Request header transformation failed: {}\n{}", *parent_.connection_,
218
                     transformation_result.status.details(), headers);
219
      failure = true;
220
      failure_details = std::string(transformation_result.status.details());
221
    } else {
222
      // Validate header map after request encoder transformations
223
      const ::Envoy::Http::HeaderValidator::ValidationResult validation_result =
224
          header_validator_->validateRequestHeaders(
225
              transformation_result.new_headers ? *transformation_result.new_headers : headers);
226
      if (!validation_result.ok()) {
227
        ENVOY_CONN_LOG(debug, "Request header validation failed: {}\n{}", *parent_.connection_,
228
                       validation_result.details(),
229
                       transformation_result.new_headers ? *transformation_result.new_headers
230
                                                         : headers);
231
        failure = true;
232
        failure_details = std::string(validation_result.details());
233
      }
234
    }
235
    if (failure) {
236
      return absl::InvalidArgumentError(
237
          absl::StrCat("header validation failed: ", failure_details));
238
    }
239
    return RequestEncoderWrapper::encodeHeaders(
240
        transformation_result.new_headers ? *transformation_result.new_headers : headers,
241
        end_stream);
242
  }
243
#endif
244
97396
  return RequestEncoderWrapper::encodeHeaders(headers, end_stream);
245
97396
}
246

            
247
85575
void CodecClient::ActiveRequest::decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) {
248
#ifdef ENVOY_ENABLE_UHV
249
  if (header_validator_) {
250
    const ::Envoy::Http::HeaderValidator::ValidationResult validation_result =
251
        header_validator_->validateResponseHeaders(*headers);
252
    bool failure = !validation_result.ok();
253
    std::string failure_details(validation_result.details());
254
    if (!failure) {
255
      const ::Envoy::Http::ClientHeaderValidator::TransformationResult transformation_result =
256
          header_validator_->transformResponseHeaders(*headers);
257
      failure = !transformation_result.ok();
258
      failure_details = std::string(validation_result.details());
259
    }
260
    if (failure) {
261
      ENVOY_CONN_LOG(debug, "Response header validation failed: {}\n{}", *parent_.connection_,
262
                     failure_details, *headers);
263
      if ((parent_.codec_->protocol() == Protocol::Http2 &&
264
           !parent_.host_->cluster()
265
                .httpProtocolOptions()
266
                .http2Options()
267
                .override_stream_error_on_invalid_http_message()
268
                .value()) ||
269
          (parent_.codec_->protocol() == Protocol::Http3 &&
270
           !parent_.host_->cluster()
271
                .httpProtocolOptions()
272
                .http3Options()
273
                .override_stream_error_on_invalid_http_message()
274
                .value())) {
275
        parent_.protocol_error_ = true;
276
        parent_.close();
277
      } else {
278
        inner_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
279
      }
280
      return;
281
    }
282
  }
283
#endif
284
85575
  ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
285
85575
}
286

            
287
CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
288
                                 Upstream::HostDescriptionConstSharedPtr host,
289
                                 Event::Dispatcher& dispatcher,
290
                                 Random::RandomGenerator& random_generator,
291
                                 const Network::TransportSocketOptionsConstSharedPtr& options,
292
                                 bool should_connect)
293
50926
    : CodecClient(type, std::move(connection), host, dispatcher) {
294
50926
  switch (type) {
295
34089
  case CodecType::HTTP1: {
296
    // If the transport socket indicates this is being proxied, inform the HTTP/1.1 codec. It will
297
    // send fully qualified URLs iff the underlying transport is plaintext.
298
34089
    bool proxied = false;
299
34089
    if (options && options->http11ProxyInfo().has_value()) {
300
7
      proxied = true;
301
7
    }
302
34089
    codec_ = std::make_unique<Http1::ClientConnectionImpl>(
303
34089
        *connection_, host->cluster().http1CodecStats(), *this,
304
34089
        host->cluster().httpProtocolOptions().http1Settings(),
305
34089
        host->cluster().maxResponseHeadersKb(), host->cluster().maxResponseHeadersCount(), proxied);
306
34089
    break;
307
  }
308
13895
  case CodecType::HTTP2:
309
13895
    codec_ = std::make_unique<Http2::ClientConnectionImpl>(
310
13895
        *connection_, *this, host->cluster().http2CodecStats(), random_generator,
311
13895
        host->cluster().httpProtocolOptions().http2Options(),
312
13895
        host->cluster().maxResponseHeadersKb().value_or(Http::DEFAULT_MAX_REQUEST_HEADERS_KB),
313
13895
        host->cluster().maxResponseHeadersCount(), Http2::ProdNghttp2SessionFactory::get());
314
13895
    break;
315
2942
  case CodecType::HTTP3: {
316
2942
#ifdef ENVOY_ENABLE_QUIC
317
2942
    auto& quic_session = dynamic_cast<Quic::EnvoyQuicClientSession&>(*connection_);
318
2942
    codec_ = std::make_unique<Quic::QuicHttpClientConnectionImpl>(
319
2942
        quic_session, *this, host->cluster().http3CodecStats(),
320
2942
        host->cluster().httpProtocolOptions().http3Options(),
321
2942
        host->cluster().maxResponseHeadersKb().value_or(Http::DEFAULT_MAX_REQUEST_HEADERS_KB),
322
2942
        host->cluster().maxResponseHeadersCount());
323
    // Initialize the session after max request header size is changed in above http client
324
    // connection creation.
325
2942
    quic_session.Initialize();
326
2942
    break;
327
#else
328
    // Should be blocked by configuration checking at an earlier point.
329
    PANIC("unexpected");
330
#endif
331
  }
332
50926
  }
333
50926
  if (should_connect) {
334
49983
    connect();
335
49983
  }
336
50926
}
337

            
338
} // namespace Http
339
} // namespace Envoy