1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/http/conn_pool.h"
6
#include "envoy/http/header_map.h"
7
#include "envoy/network/connection.h"
8
#include "envoy/router/router_ratelimit.h"
9
#include "envoy/tcp/conn_pool.h"
10
#include "envoy/tcp/upstream.h"
11
#include "envoy/upstream/load_balancer.h"
12
#include "envoy/upstream/thread_local_cluster.h"
13
#include "envoy/upstream/upstream.h"
14

            
15
#include "source/common/buffer/buffer_impl.h"
16
#include "source/common/common/dump_state_utils.h"
17
#include "source/common/http/codec_client.h"
18
#include "source/common/http/hash_policy.h"
19
#include "source/common/http/null_route_impl.h"
20
#include "source/common/http/response_decoder_impl_base.h"
21
#include "source/common/network/utility.h"
22
#include "source/common/router/config_impl.h"
23
#include "source/common/router/header_parser.h"
24
#include "source/common/router/router.h"
25
#include "source/extensions/early_data/default_early_data_policy.h"
26

            
27
namespace Envoy {
28
namespace TcpProxy {
29

            
30
constexpr absl::string_view DisableTunnelingFilterStateKey = "envoy.tcp_proxy.disable_tunneling";
31

            
32
class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks {
33
public:
34
  TcpConnPool(Upstream::HostConstSharedPtr host, Upstream::ThreadLocalCluster& thread_local_cluster,
35
              Upstream::LoadBalancerContext* context,
36
              Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
37
              StreamInfo::StreamInfo& downstream_info);
38
  ~TcpConnPool() override;
39

            
40
1758
  bool valid() const { return conn_pool_data_.has_value(); }
41

            
42
  // GenericConnPool
43
  void newStream(GenericConnectionPoolCallbacks& callbacks) override;
44

            
45
  // Tcp::ConnectionPool::Callbacks
46
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
47
                     absl::string_view transport_failure_reason,
48
                     Upstream::HostDescriptionConstSharedPtr host) override;
49
  void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
50
                   Upstream::HostDescriptionConstSharedPtr host) override;
51

            
52
private:
53
  absl::optional<Upstream::TcpPoolData> conn_pool_data_{};
54
  Tcp::ConnectionPool::Cancellable* upstream_handle_{};
55
  GenericConnectionPoolCallbacks* callbacks_{};
56
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
57
  StreamInfo::StreamInfo& downstream_info_;
58
};
59

            
60
class HttpUpstream;
61
class CombinedUpstream;
62

            
63
// This class is specific to TCP proxy connection pool and enables TCP proxying mode
64
// for HTTP upstreams. This is currently only needed for HTTP/1 client codec that half closes
65
// upstream network connection after encoding end_stream in TCP proxy (i.e. via CONNECT).
66
class RouterUpstreamRequest : public Router::UpstreamRequest {
67
public:
68
  using Router::UpstreamRequest::UpstreamRequest;
69

            
70
  void onPoolReady(std::unique_ptr<Router::GenericUpstream>&& upstream,
71
                   Upstream::HostDescriptionConstSharedPtr host,
72
                   const Network::ConnectionInfoProvider& address_provider,
73
421
                   StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override {
74
421
    upstream->enableTcpTunneling();
75
421
    Router::UpstreamRequest::onPoolReady(std::move(upstream), host, address_provider, info,
76
421
                                         protocol);
77
421
  }
78
};
79

            
80
class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks {
81
public:
82
  HttpConnPool(Upstream::HostConstSharedPtr host,
83
               Upstream::ThreadLocalCluster& thread_local_cluster,
84
               Upstream::LoadBalancerContext* context, const TunnelingConfigHelper& config,
85
               Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
86
               Http::StreamDecoderFilterCallbacks&, Http::CodecType type,
87
               StreamInfo::StreamInfo& downstream_info);
88

            
89
  using RouterUpstreamRequestPtr = std::unique_ptr<RouterUpstreamRequest>;
90
  ~HttpConnPool() override;
91

            
92
822
  bool valid() const { return conn_pool_data_.has_value() || generic_conn_pool_; }
93
436
  Http::CodecType codecType() const { return type_; }
94
  std::unique_ptr<Router::GenericConnPool> createConnPool(Upstream::HostConstSharedPtr host,
95
                                                          Upstream::ThreadLocalCluster&,
96
                                                          Upstream::LoadBalancerContext* context,
97
                                                          absl::optional<Http::Protocol> protocol);
98

            
99
  // GenericConnPool
100
  void newStream(GenericConnectionPoolCallbacks& callbacks) override;
101

            
102
  // Http::ConnectionPool::Callbacks,
103
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
104
                     absl::string_view transport_failure_reason,
105
                     Upstream::HostDescriptionConstSharedPtr host) override;
106
  void onPoolReady(Http::RequestEncoder& request_encoder,
107
                   Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
108
                   absl::optional<Http::Protocol>) override;
109

            
110
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr, bool);
111
  void onHttpPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
112
                       Ssl::ConnectionInfoConstSharedPtr ssl_info);
113

            
114
  class Callbacks {
115
  public:
116
    Callbacks(HttpConnPool& conn_pool, Upstream::HostDescriptionConstSharedPtr host,
117
              Ssl::ConnectionInfoConstSharedPtr ssl_info)
118
814
        : conn_pool_(&conn_pool), host_(host), ssl_info_(ssl_info) {}
119
825
    virtual ~Callbacks() = default;
120
590
    virtual void onSuccess(Http::RequestEncoder* request_encoder) {
121
590
      ASSERT(conn_pool_ != nullptr);
122
590
      if (!Runtime::runtimeFeatureEnabled(
123
590
              "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
124
281
        ASSERT(request_encoder != nullptr);
125
281
        conn_pool_->onGenericPoolReady(host_, request_encoder->getStream().connectionInfoProvider(),
126
281
                                       ssl_info_);
127
281
        return;
128
281
      }
129

            
130
309
      Network::ConnectionInfoProviderSharedPtr local_connection_info_provider(
131
309
          std::make_shared<Network::ConnectionInfoSetterImpl>(
132
309
              Network::Utility::getCanonicalIpv4LoopbackAddress(),
133
309
              Network::Utility::getCanonicalIpv4LoopbackAddress()));
134

            
135
309
      conn_pool_->onGenericPoolReady(host_, *local_connection_info_provider.get(), ssl_info_);
136
309
    }
137
167
    virtual void onFailure() {
138
167
      ASSERT(conn_pool_ != nullptr);
139
167
      conn_pool_->callbacks_->onGenericPoolFailure(
140
167
          ConnectionPool::PoolFailureReason::RemoteConnectionFailure, "", host_);
141
167
    }
142

            
143
  protected:
144
11
    Callbacks() = default;
145

            
146
  private:
147
    HttpConnPool* conn_pool_{};
148
    Upstream::HostDescriptionConstSharedPtr host_;
149
    Ssl::ConnectionInfoConstSharedPtr ssl_info_;
150
  };
151

            
152
private:
153
  void onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
154
                          const Network::ConnectionInfoProvider& address_provider,
155
                          Ssl::ConnectionInfoConstSharedPtr ssl_info);
156
  const TunnelingConfigHelper& config_;
157
  Http::CodecType type_;
158
  absl::optional<Upstream::HttpPoolData> conn_pool_data_{};
159
  Http::ConnectionPool::Cancellable* upstream_handle_{};
160
  GenericConnectionPoolCallbacks* callbacks_{};
161
  Http::StreamDecoderFilterCallbacks* decoder_filter_callbacks_;
162
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
163
  std::unique_ptr<HttpUpstream> upstream_;
164
  std::unique_ptr<CombinedUpstream> combined_upstream_;
165
  StreamInfo::StreamInfo& downstream_info_;
166
  std::unique_ptr<Router::GenericConnPool> generic_conn_pool_;
167
};
168

            
169
class TcpUpstream : public GenericUpstream {
170
public:
171
  TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
172
              Tcp::ConnectionPool::UpstreamCallbacks& callbacks);
173

            
174
  // GenericUpstream
175
  bool readDisable(bool disable) override;
176
  void encodeData(Buffer::Instance& data, bool end_stream) override;
177
  void addBytesSentCallback(Network::Connection::BytesSentCb cb) override;
178
  Tcp::ConnectionPool::ConnectionData* onDownstreamEvent(Network::ConnectionEvent event,
179
                                                         absl::string_view details = "") override;
180
  bool startUpstreamSecureTransport() override;
181
  Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override;
182
  StreamInfo::DetectedCloseType detectedCloseType() const override;
183
  absl::string_view localCloseReason() const override;
184

            
185
private:
186
  Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_;
187
};
188

            
189
class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
190
public:
191
  using TunnelingConfig =
192
      envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
193

            
194
  HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
195
               const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info,
196
               Http::CodecType type);
197
  ~HttpUpstream() override;
198
  bool isValidResponse(const Http::ResponseHeaderMap&);
199

            
200
  void doneReading();
201
  void doneWriting();
202
420
  Http::ResponseDecoder& responseDecoder() { return response_decoder_; }
203

            
204
  // GenericUpstream
205
  bool readDisable(bool disable) override;
206
  void encodeData(Buffer::Instance& data, bool end_stream) override;
207
  void addBytesSentCallback(Network::Connection::BytesSentCb cb) override;
208
  Tcp::ConnectionPool::ConnectionData* onDownstreamEvent(Network::ConnectionEvent event,
209
                                                         absl::string_view details = "") override;
210
  // HTTP upstream must not implement converting upstream transport
211
  // socket from non-secure to secure mode.
212
  bool startUpstreamSecureTransport() override { return false; }
213

            
214
  // Http::StreamCallbacks
215
  void onResetStream(Http::StreamResetReason reason,
216
                     absl::string_view transport_failure_reason) override;
217
  void onAboveWriteBufferHighWatermark() override;
218
  void onBelowWriteBufferLowWatermark() override;
219

            
220
  virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl);
221
402
  void setConnPoolCallbacks(std::unique_ptr<HttpConnPool::Callbacks>&& callbacks) {
222
402
    conn_pool_callbacks_ = std::move(callbacks);
223
402
  }
224
  Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override { return nullptr; }
225
  StreamInfo::DetectedCloseType detectedCloseType() const override;
226

            
227
protected:
228
  void resetEncoder(Network::ConnectionEvent event, bool inform_downstream = true);
229
  // The encoder offered by the upstream http client.
230
  Http::RequestEncoder* request_encoder_{};
231
  // The config object that is owned by the downstream network filter chain factory.
232
  const TunnelingConfigHelper& config_;
233
  // The downstream info that is owned by the downstream connection.
234
  StreamInfo::StreamInfo& downstream_info_;
235
  std::unique_ptr<Http::RequestHeaderMapImpl> downstream_headers_;
236

            
237
private:
238
  class DecoderShim : public Http::ResponseDecoderImplBase {
239
  public:
240
480
    DecoderShim(HttpUpstream& parent) : parent_(parent) {}
241
    void decode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {}
242
348
    void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override {
243
348
      bool is_valid_response = parent_.isValidResponse(*headers);
244
348
      parent_.config_.propagateResponseHeaders(std::move(headers),
245
348
                                               parent_.downstream_info_.filterState());
246
348
      if (!is_valid_response || end_stream) {
247
64
        parent_.resetEncoder(Network::ConnectionEvent::LocalClose);
248
290
      } else if (parent_.conn_pool_callbacks_ != nullptr) {
249
284
        parent_.conn_pool_callbacks_->onSuccess(parent_.request_encoder_);
250
284
        parent_.conn_pool_callbacks_.reset();
251
284
      }
252
348
    }
253
196037
    void decodeData(Buffer::Instance& data, bool end_stream) override {
254
196037
      parent_.upstream_callbacks_.onUpstreamData(data, end_stream);
255
196037
      if (end_stream) {
256
145
        parent_.doneReading();
257
145
      }
258
196037
    }
259
20
    void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override {
260
20
      parent_.config_.propagateResponseTrailers(std::move(trailers),
261
20
                                                parent_.downstream_info_.filterState());
262
20
      Buffer::OwnedImpl data;
263
20
      parent_.upstream_callbacks_.onUpstreamData(data, /* end_stream = */ true);
264
20
      parent_.doneReading();
265
20
    }
266
    void decodeMetadata(Http::MetadataMapPtr&&) override {}
267
3
    void dumpState(std::ostream& os, int indent_level) const override {
268
3
      DUMP_STATE_UNIMPLEMENTED(DecoderShim);
269
3
    }
270

            
271
  private:
272
    HttpUpstream& parent_;
273
  };
274
  DecoderShim response_decoder_;
275
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
276
  const Http::CodecType type_;
277
  bool read_half_closed_{};
278
  bool write_half_closed_{};
279

            
280
  // Used to defer onGenericPoolReady and onGenericPoolFailure to the reception
281
  // of the CONNECT response or the resetEncoder.
282
  std::unique_ptr<HttpConnPool::Callbacks> conn_pool_callbacks_;
283
};
284

            
285
class CombinedUpstream : public GenericUpstream, public Envoy::Router::RouterFilterInterface {
286
public:
287
  CombinedUpstream(HttpConnPool& http_conn_pool, Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
288
                   Http::StreamDecoderFilterCallbacks& decoder_callbacks,
289
                   const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);
290
436
  ~CombinedUpstream() override = default;
291
  using UpstreamRequest = Router::UpstreamRequest;
292
211195
  Http::ResponseDecoder& responseDecoder() { return response_decoder_; }
293
  void doneReading();
294
  void doneWriting();
295
  using UpstreamRequestPtr = std::unique_ptr<UpstreamRequest>;
296
  void setRouterUpstreamRequest(UpstreamRequestPtr);
297
  void newStream(GenericConnectionPoolCallbacks& callbacks);
298
  void encodeData(Buffer::Instance& data, bool end_stream) override;
299
  Tcp::ConnectionPool::ConnectionData* onDownstreamEvent(Network::ConnectionEvent event,
300
                                                         absl::string_view details = "") override;
301
  bool isValidResponse(const Http::ResponseHeaderMap&);
302
  bool readDisable(bool disable) override;
303
423
  void setConnPoolCallbacks(std::unique_ptr<HttpConnPool::Callbacks>&& callbacks) {
304
423
    conn_pool_callbacks_ = std::move(callbacks);
305
423
  }
306
  void recordUpstreamSslConnection();
307
310
  void addBytesSentCallback(Network::Connection::BytesSentCb) override{};
308
  // HTTP upstream must not implement converting upstream transport
309
  // socket from non-secure to secure mode.
310
1
  bool startUpstreamSecureTransport() override { return false; }
311
1
  Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override { return nullptr; }
312
  StreamInfo::DetectedCloseType detectedCloseType() const override;
313

            
314
  // Router::RouterFilterInterface
315
  void onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
316
                         UpstreamRequest& upstream_request, bool end_stream) override;
317
  void onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
318
                      bool end_stream) override;
319
1
  void onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&&, UpstreamRequest&) override {}
320
  void onUpstreamTrailers(Http::ResponseTrailerMapPtr&&, UpstreamRequest&) override;
321
1
  void onUpstreamMetadata(Http::MetadataMapPtr&&) override {}
322
  void onUpstreamReset(Http::StreamResetReason stream_reset_reason,
323
                       absl::string_view transport_failure_reason, UpstreamRequest&) override;
324
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
325
421
                              bool pool_success) override {
326
421
    parent_.onUpstreamHostSelected(host, pool_success);
327
421
  }
328
1
  void onPerTryTimeout(UpstreamRequest&) override {}
329
1
  void onPerTryIdleTimeout(UpstreamRequest&) override {}
330
1
  void onStreamMaxDurationReached(UpstreamRequest&) override {}
331
  void setupRouteTimeoutForWebsocketUpgrade() override {}
332
  void disableRouteTimeoutForWebsocketUpgrade() override {}
333
453448
  Http::StreamDecoderFilterCallbacks* callbacks() override { return &decoder_filter_callbacks_; }
334
19145
  Upstream::ClusterInfoConstSharedPtr cluster() override {
335
19145
    return decoder_filter_callbacks_.clusterInfo();
336
19145
  }
337
2497
  Router::FilterConfig& config() override {
338
2497
    return const_cast<Router::FilterConfig&>(config_.routerFilterConfig());
339
2497
  }
340
14
  Router::TimeoutData timeout() override { return {}; }
341
422
  absl::optional<std::chrono::milliseconds> dynamicMaxStreamDuration() const override {
342
422
    return absl::nullopt;
343
422
  }
344
  Http::RequestHeaderMap* downstreamHeaders() override;
345
16657
  Http::RequestTrailerMap* downstreamTrailers() override { return nullptr; }
346
3988
  bool downstreamResponseStarted() const override { return false; }
347
422
  bool downstreamEndStream() const override { return false; }
348
1
  uint32_t attemptCount() const override { return 0; }
349

            
350
protected:
351
  void onResetEncoder(Network::ConnectionEvent event, bool inform_downstream = true);
352

            
353
  // The config object that is owned by the downstream network filter chain factory.
354
  const TunnelingConfigHelper& config_;
355
  // The downstream info that is owned by the downstream connection.
356
  StreamInfo::StreamInfo& downstream_info_;
357
  std::unique_ptr<Http::RequestHeaderMapImpl> downstream_headers_;
358
  HttpConnPool& parent_;
359

            
360
private:
361
  Http::StreamDecoderFilterCallbacks& decoder_filter_callbacks_;
362
  class DecoderShim : public Http::ResponseDecoderImplBase {
363
  public:
364
436
    DecoderShim(CombinedUpstream& parent) : parent_(parent) {}
365
    // Http::ResponseDecoder
366
    void decode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {}
367
368
    void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override {
368
368
      bool is_valid_response = parent_.isValidResponse(*headers);
369
368
      parent_.config_.propagateResponseHeaders(std::move(headers),
370
368
                                               parent_.downstream_info_.filterState());
371
368
      if (!is_valid_response || end_stream) {
372
58
        parent_.onResetEncoder(Network::ConnectionEvent::LocalClose);
373
312
      } else if (parent_.conn_pool_callbacks_ != nullptr) {
374
310
        parent_.conn_pool_callbacks_->onSuccess(nullptr /*parent_.request_encoder_*/);
375
310
        parent_.conn_pool_callbacks_.reset();
376
310
      }
377
368
    }
378
210811
    void decodeData(Buffer::Instance& data, bool end_stream) override {
379
210811
      parent_.upstream_callbacks_.onUpstreamData(data, end_stream);
380
210811
      if (end_stream) {
381
153
        parent_.doneReading();
382
153
      }
383
210811
    }
384
15
    void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override {
385
15
      parent_.config_.propagateResponseTrailers(std::move(trailers),
386
15
                                                parent_.downstream_info_.filterState());
387
15
      Buffer::OwnedImpl data;
388
15
      parent_.upstream_callbacks_.onUpstreamData(data, /* end_stream = */ true);
389
15
      parent_.doneReading();
390
15
    }
391
    void decodeMetadata(Http::MetadataMapPtr&&) override {}
392
1
    void dumpState(std::ostream& os, int indent_level) const override {
393
1
      DUMP_STATE_UNIMPLEMENTED(DecoderShim);
394
1
    }
395

            
396
  private:
397
    CombinedUpstream& parent_;
398
  };
399
  DecoderShim response_decoder_;
400
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
401
  std::unique_ptr<HttpConnPool::Callbacks> conn_pool_callbacks_;
402
  bool read_half_closed_{};
403
  bool write_half_closed_{};
404
  // upstream_request_ has to be destroyed first as they may use CombinedUpstream parent
405
  // during destruction.
406
  UpstreamRequestPtr upstream_request_;
407
  Http::CodecType type_;
408
};
409

            
410
} // namespace TcpProxy
411
} // namespace Envoy