Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/tcp_proxy/upstream.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <memory>
4
5
#include "envoy/http/conn_pool.h"
6
#include "envoy/http/header_map.h"
7
#include "envoy/network/connection.h"
8
#include "envoy/router/router_ratelimit.h"
9
#include "envoy/tcp/conn_pool.h"
10
#include "envoy/tcp/upstream.h"
11
#include "envoy/upstream/load_balancer.h"
12
#include "envoy/upstream/thread_local_cluster.h"
13
#include "envoy/upstream/upstream.h"
14
15
#include "source/common/buffer/buffer_impl.h"
16
#include "source/common/common/dump_state_utils.h"
17
#include "source/common/http/codec_client.h"
18
#include "source/common/http/hash_policy.h"
19
#include "source/common/http/null_route_impl.h"
20
#include "source/common/network/utility.h"
21
#include "source/common/router/config_impl.h"
22
#include "source/common/router/header_parser.h"
23
#include "source/common/router/router.h"
24
#include "source/extensions/early_data/default_early_data_policy.h"
25
26
namespace Envoy {
27
namespace TcpProxy {
28
29
constexpr absl::string_view DisableTunnelingFilterStateKey = "envoy.tcp_proxy.disable_tunneling";
30
31
class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks {
32
public:
33
  TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
34
              Upstream::LoadBalancerContext* context,
35
              Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
36
              StreamInfo::StreamInfo& downstream_info);
37
  ~TcpConnPool() override;
38
39
0
  bool valid() const { return conn_pool_data_.has_value(); }
40
41
  // GenericConnPool
42
  void newStream(GenericConnectionPoolCallbacks& callbacks) override;
43
44
  // Tcp::ConnectionPool::Callbacks
45
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
46
                     absl::string_view transport_failure_reason,
47
                     Upstream::HostDescriptionConstSharedPtr host) override;
48
  void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
49
                   Upstream::HostDescriptionConstSharedPtr host) override;
50
51
private:
52
  absl::optional<Upstream::TcpPoolData> conn_pool_data_{};
53
  Tcp::ConnectionPool::Cancellable* upstream_handle_{};
54
  GenericConnectionPoolCallbacks* callbacks_{};
55
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
56
  StreamInfo::StreamInfo& downstream_info_;
57
};
58
59
class HttpUpstream;
60
class CombinedUpstream;
61
62
// This class is specific to TCP proxy connection pool and enables TCP proxying mode
63
// for HTTP upstreams. This is currently only needed for HTTP/1 client codec that half closes
64
// upstream network connection after encoding end_stream in TCP proxy (i.e. via CONNECT).
65
class RouterUpstreamRequest : public Router::UpstreamRequest {
66
public:
67
  using Router::UpstreamRequest::UpstreamRequest;
68
69
  void onPoolReady(std::unique_ptr<Router::GenericUpstream>&& upstream,
70
                   Upstream::HostDescriptionConstSharedPtr host,
71
                   const Network::ConnectionInfoProvider& address_provider,
72
0
                   StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override {
73
0
    upstream->enableTcpTunneling();
74
0
    Router::UpstreamRequest::onPoolReady(std::move(upstream), host, address_provider, info,
75
0
                                         protocol);
76
0
  }
77
};
78
79
class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks {
80
public:
81
  HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
82
               Upstream::LoadBalancerContext* context, const TunnelingConfigHelper& config,
83
               Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
84
               Http::StreamDecoderFilterCallbacks&, Http::CodecType type,
85
               StreamInfo::StreamInfo& downstream_info);
86
87
  using RouterUpstreamRequestPtr = std::unique_ptr<RouterUpstreamRequest>;
88
  ~HttpConnPool() override;
89
90
0
  bool valid() const { return conn_pool_data_.has_value() || generic_conn_pool_; }
91
0
  Http::CodecType codecType() const { return type_; }
92
  std::unique_ptr<Router::GenericConnPool> createConnPool(Upstream::ThreadLocalCluster&,
93
                                                          Upstream::LoadBalancerContext* context,
94
                                                          absl::optional<Http::Protocol> protocol);
95
96
  // GenericConnPool
97
  void newStream(GenericConnectionPoolCallbacks& callbacks) override;
98
99
  // Http::ConnectionPool::Callbacks,
100
  void onPoolFailure(ConnectionPool::PoolFailureReason reason,
101
                     absl::string_view transport_failure_reason,
102
                     Upstream::HostDescriptionConstSharedPtr host) override;
103
  void onPoolReady(Http::RequestEncoder& request_encoder,
104
                   Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
105
                   absl::optional<Http::Protocol>) override;
106
107
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr, bool);
108
  void onHttpPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
109
                       Ssl::ConnectionInfoConstSharedPtr ssl_info);
110
111
  class Callbacks {
112
  public:
113
    Callbacks(HttpConnPool& conn_pool, Upstream::HostDescriptionConstSharedPtr host,
114
              Ssl::ConnectionInfoConstSharedPtr ssl_info)
115
0
        : conn_pool_(&conn_pool), host_(host), ssl_info_(ssl_info) {}
116
0
    virtual ~Callbacks() = default;
117
0
    virtual void onSuccess(Http::RequestEncoder* request_encoder) {
118
0
      ASSERT(conn_pool_ != nullptr);
119
0
      if (!Runtime::runtimeFeatureEnabled(
120
0
              "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
121
0
        ASSERT(request_encoder != nullptr);
122
0
        conn_pool_->onGenericPoolReady(host_, request_encoder->getStream().connectionInfoProvider(),
123
0
                                       ssl_info_);
124
0
        return;
125
0
      }
126
127
0
      Network::ConnectionInfoProviderSharedPtr local_connection_info_provider(
128
0
          std::make_shared<Network::ConnectionInfoSetterImpl>(
129
0
              Network::Utility::getCanonicalIpv4LoopbackAddress(),
130
0
              Network::Utility::getCanonicalIpv4LoopbackAddress()));
131
132
0
      conn_pool_->onGenericPoolReady(host_, *local_connection_info_provider.get(), ssl_info_);
133
0
    }
134
0
    virtual void onFailure() {
135
0
      ASSERT(conn_pool_ != nullptr);
136
0
      conn_pool_->callbacks_->onGenericPoolFailure(
137
0
          ConnectionPool::PoolFailureReason::RemoteConnectionFailure, "", host_);
138
0
    }
139
140
  protected:
141
    Callbacks() = default;
142
143
  private:
144
    HttpConnPool* conn_pool_{};
145
    Upstream::HostDescriptionConstSharedPtr host_;
146
    Ssl::ConnectionInfoConstSharedPtr ssl_info_;
147
  };
148
149
private:
150
  void onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
151
                          const Network::ConnectionInfoProvider& address_provider,
152
                          Ssl::ConnectionInfoConstSharedPtr ssl_info);
153
  const TunnelingConfigHelper& config_;
154
  Http::CodecType type_;
155
  absl::optional<Upstream::HttpPoolData> conn_pool_data_{};
156
  Http::ConnectionPool::Cancellable* upstream_handle_{};
157
  GenericConnectionPoolCallbacks* callbacks_{};
158
  Http::StreamDecoderFilterCallbacks* decoder_filter_callbacks_;
159
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
160
  std::unique_ptr<HttpUpstream> upstream_;
161
  std::unique_ptr<CombinedUpstream> combined_upstream_;
162
  StreamInfo::StreamInfo& downstream_info_;
163
  std::unique_ptr<Router::GenericConnPool> generic_conn_pool_;
164
};
165
166
class TcpUpstream : public GenericUpstream {
167
public:
168
  TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
169
              Tcp::ConnectionPool::UpstreamCallbacks& callbacks);
170
171
  // GenericUpstream
172
  bool readDisable(bool disable) override;
173
  void encodeData(Buffer::Instance& data, bool end_stream) override;
174
  void addBytesSentCallback(Network::Connection::BytesSentCb cb) override;
175
  Tcp::ConnectionPool::ConnectionData* onDownstreamEvent(Network::ConnectionEvent event) override;
176
  bool startUpstreamSecureTransport() override;
177
  Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override;
178
179
private:
180
  Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_;
181
};
182
183
class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
184
public:
185
  using TunnelingConfig =
186
      envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
187
188
  HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
189
               const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info,
190
               Http::CodecType type);
191
  ~HttpUpstream() override;
192
  bool isValidResponse(const Http::ResponseHeaderMap&);
193
194
  void doneReading();
195
  void doneWriting();
196
0
  Http::ResponseDecoder& responseDecoder() { return response_decoder_; }
197
198
  // GenericUpstream
199
  bool readDisable(bool disable) override;
200
  void encodeData(Buffer::Instance& data, bool end_stream) override;
201
  void addBytesSentCallback(Network::Connection::BytesSentCb cb) override;
202
  Tcp::ConnectionPool::ConnectionData* onDownstreamEvent(Network::ConnectionEvent event) override;
203
  // HTTP upstream must not implement converting upstream transport
204
  // socket from non-secure to secure mode.
205
0
  bool startUpstreamSecureTransport() override { return false; }
206
207
  // Http::StreamCallbacks
208
  void onResetStream(Http::StreamResetReason reason,
209
                     absl::string_view transport_failure_reason) override;
210
  void onAboveWriteBufferHighWatermark() override;
211
  void onBelowWriteBufferLowWatermark() override;
212
213
  virtual void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl);
214
0
  void setConnPoolCallbacks(std::unique_ptr<HttpConnPool::Callbacks>&& callbacks) {
215
0
    conn_pool_callbacks_ = std::move(callbacks);
216
0
  }
217
0
  Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override { return nullptr; }
218
219
protected:
220
  void resetEncoder(Network::ConnectionEvent event, bool inform_downstream = true);
221
222
  // The encoder offered by the upstream http client.
223
  Http::RequestEncoder* request_encoder_{};
224
  // The config object that is owned by the downstream network filter chain factory.
225
  const TunnelingConfigHelper& config_;
226
  // The downstream info that is owned by the downstream connection.
227
  StreamInfo::StreamInfo& downstream_info_;
228
  std::unique_ptr<Http::RequestHeaderMapImpl> downstream_headers_;
229
230
private:
231
  Upstream::ClusterInfoConstSharedPtr cluster_;
232
  class DecoderShim : public Http::ResponseDecoder {
233
  public:
234
0
    DecoderShim(HttpUpstream& parent) : parent_(parent) {}
Unexecuted instantiation: Envoy::TcpProxy::HttpUpstream::DecoderShim::DecoderShim(Envoy::TcpProxy::HttpUpstream&)
Unexecuted instantiation: Envoy::TcpProxy::HttpUpstream::DecoderShim::DecoderShim(Envoy::TcpProxy::HttpUpstream&)
235
0
    void decode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {}
236
0
    void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override {
237
0
      bool is_valid_response = parent_.isValidResponse(*headers);
238
0
      parent_.config_.propagateResponseHeaders(std::move(headers),
239
0
                                               parent_.downstream_info_.filterState());
240
0
      if (!is_valid_response || end_stream) {
241
0
        parent_.resetEncoder(Network::ConnectionEvent::LocalClose);
242
0
      } else if (parent_.conn_pool_callbacks_ != nullptr) {
243
0
        parent_.conn_pool_callbacks_->onSuccess(parent_.request_encoder_);
244
0
        parent_.conn_pool_callbacks_.reset();
245
0
      }
246
0
    }
247
0
    void decodeData(Buffer::Instance& data, bool end_stream) override {
248
0
      parent_.upstream_callbacks_.onUpstreamData(data, end_stream);
249
0
      if (end_stream) {
250
0
        parent_.doneReading();
251
0
      }
252
0
    }
253
0
    void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override {
254
0
      parent_.config_.propagateResponseTrailers(std::move(trailers),
255
0
                                                parent_.downstream_info_.filterState());
256
0
      if (Runtime::runtimeFeatureEnabled(
257
0
              "envoy.reloadable_features.tcp_tunneling_send_downstream_fin_on_upstream_trailers")) {
258
0
        Buffer::OwnedImpl data;
259
0
        parent_.upstream_callbacks_.onUpstreamData(data, /* end_stream = */ true);
260
0
      }
261
262
0
      parent_.doneReading();
263
0
    }
264
0
    void decodeMetadata(Http::MetadataMapPtr&&) override {}
265
0
    void dumpState(std::ostream& os, int indent_level) const override {
266
0
      DUMP_STATE_UNIMPLEMENTED(DecoderShim);
267
0
    }
268
269
  private:
270
    HttpUpstream& parent_;
271
  };
272
  DecoderShim response_decoder_;
273
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
274
  const Http::CodecType type_;
275
  bool read_half_closed_{};
276
  bool write_half_closed_{};
277
278
  // Used to defer onGenericPoolReady and onGenericPoolFailure to the reception
279
  // of the CONNECT response or the resetEncoder.
280
  std::unique_ptr<HttpConnPool::Callbacks> conn_pool_callbacks_;
281
};
282
283
class CombinedUpstream : public GenericUpstream, public Envoy::Router::RouterFilterInterface {
284
public:
285
  CombinedUpstream(HttpConnPool& http_conn_pool, Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
286
                   Http::StreamDecoderFilterCallbacks& decoder_callbacks,
287
                   const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);
288
0
  ~CombinedUpstream() override = default;
289
  using UpstreamRequest = Router::UpstreamRequest;
290
0
  Http::ResponseDecoder& responseDecoder() { return response_decoder_; }
291
  void doneReading();
292
  void doneWriting();
293
  using UpstreamRequestPtr = std::unique_ptr<UpstreamRequest>;
294
  void setRouterUpstreamRequest(UpstreamRequestPtr);
295
  void newStream(GenericConnectionPoolCallbacks& callbacks);
296
  void encodeData(Buffer::Instance& data, bool end_stream) override;
297
  Tcp::ConnectionPool::ConnectionData* onDownstreamEvent(Network::ConnectionEvent event) override;
298
  bool isValidResponse(const Http::ResponseHeaderMap&);
299
  bool readDisable(bool disable) override;
300
0
  void setConnPoolCallbacks(std::unique_ptr<HttpConnPool::Callbacks>&& callbacks) {
301
0
    conn_pool_callbacks_ = std::move(callbacks);
302
0
  }
303
0
  void addBytesSentCallback(Network::Connection::BytesSentCb) override{};
304
  // HTTP upstream must not implement converting upstream transport
305
  // socket from non-secure to secure mode.
306
0
  bool startUpstreamSecureTransport() override { return false; }
307
0
  Ssl::ConnectionInfoConstSharedPtr getUpstreamConnectionSslInfo() override { return nullptr; }
308
309
  // Router::RouterFilterInterface
310
  void onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
311
                         UpstreamRequest& upstream_request, bool end_stream) override;
312
  void onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
313
                      bool end_stream) override;
314
0
  void onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&&, UpstreamRequest&) override {}
315
  void onUpstreamTrailers(Http::ResponseTrailerMapPtr&&, UpstreamRequest&) override;
316
0
  void onUpstreamMetadata(Http::MetadataMapPtr&&) override {}
317
  void onUpstreamReset(Http::StreamResetReason stream_reset_reason,
318
                       absl::string_view transport_failure_reason, UpstreamRequest&) override;
319
  void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
320
0
                              bool pool_success) override {
321
0
    parent_.onUpstreamHostSelected(host, pool_success);
322
0
  }
323
0
  void onPerTryTimeout(UpstreamRequest&) override {}
324
0
  void onPerTryIdleTimeout(UpstreamRequest&) override {}
325
0
  void onStreamMaxDurationReached(UpstreamRequest&) override {}
326
0
  Http::StreamDecoderFilterCallbacks* callbacks() override { return &decoder_filter_callbacks_; }
327
0
  Upstream::ClusterInfoConstSharedPtr cluster() override {
328
0
    return decoder_filter_callbacks_.clusterInfo();
329
0
  }
330
0
  Router::FilterConfig& config() override {
331
0
    return const_cast<Router::FilterConfig&>(config_.routerFilterConfig());
332
0
  }
333
0
  Router::TimeoutData timeout() override { return {}; }
334
0
  absl::optional<std::chrono::milliseconds> dynamicMaxStreamDuration() const override {
335
0
    return absl::nullopt;
336
0
  }
337
  Http::RequestHeaderMap* downstreamHeaders() override;
338
0
  Http::RequestTrailerMap* downstreamTrailers() override { return nullptr; }
339
0
  bool downstreamResponseStarted() const override { return false; }
340
0
  bool downstreamEndStream() const override { return false; }
341
0
  uint32_t attemptCount() const override { return 0; }
342
343
protected:
344
  void onResetEncoder(Network::ConnectionEvent event, bool inform_downstream = true);
345
346
  // The config object that is owned by the downstream network filter chain factory.
347
  const TunnelingConfigHelper& config_;
348
  // The downstream info that is owned by the downstream connection.
349
  StreamInfo::StreamInfo& downstream_info_;
350
  std::unique_ptr<Http::RequestHeaderMapImpl> downstream_headers_;
351
  HttpConnPool& parent_;
352
353
private:
354
  Http::StreamDecoderFilterCallbacks& decoder_filter_callbacks_;
355
  class DecoderShim : public Http::ResponseDecoder {
356
  public:
357
0
    DecoderShim(CombinedUpstream& parent) : parent_(parent) {}
Unexecuted instantiation: Envoy::TcpProxy::CombinedUpstream::DecoderShim::DecoderShim(Envoy::TcpProxy::CombinedUpstream&)
Unexecuted instantiation: Envoy::TcpProxy::CombinedUpstream::DecoderShim::DecoderShim(Envoy::TcpProxy::CombinedUpstream&)
358
    // Http::ResponseDecoder
359
0
    void decode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {}
360
0
    void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override {
361
0
      bool is_valid_response = parent_.isValidResponse(*headers);
362
0
      parent_.config_.propagateResponseHeaders(std::move(headers),
363
0
                                               parent_.downstream_info_.filterState());
364
0
      if (!is_valid_response || end_stream) {
365
0
        parent_.onResetEncoder(Network::ConnectionEvent::LocalClose);
366
0
      } else if (parent_.conn_pool_callbacks_ != nullptr) {
367
0
        parent_.conn_pool_callbacks_->onSuccess(nullptr /*parent_.request_encoder_*/);
368
0
        parent_.conn_pool_callbacks_.reset();
369
0
      }
370
0
    }
371
0
    void decodeData(Buffer::Instance& data, bool end_stream) override {
372
0
      parent_.upstream_callbacks_.onUpstreamData(data, end_stream);
373
0
      if (end_stream) {
374
0
        parent_.doneReading();
375
0
      }
376
0
    }
377
0
    void decodeTrailers(Http::ResponseTrailerMapPtr&& trailers) override {
378
0
      parent_.config_.propagateResponseTrailers(std::move(trailers),
379
0
                                                parent_.downstream_info_.filterState());
380
0
      if (Runtime::runtimeFeatureEnabled(
381
0
              "envoy.reloadable_features.tcp_tunneling_send_downstream_fin_on_upstream_trailers")) {
382
0
        Buffer::OwnedImpl data;
383
0
        parent_.upstream_callbacks_.onUpstreamData(data, /* end_stream = */ true);
384
0
      }
385
0
      parent_.doneReading();
386
0
    }
387
0
    void decodeMetadata(Http::MetadataMapPtr&&) override {}
388
0
    void dumpState(std::ostream& os, int indent_level) const override {
389
0
      DUMP_STATE_UNIMPLEMENTED(DecoderShim);
390
0
    }
391
392
  private:
393
    CombinedUpstream& parent_;
394
  };
395
  DecoderShim response_decoder_;
396
  Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
397
  std::unique_ptr<HttpConnPool::Callbacks> conn_pool_callbacks_;
398
  bool read_half_closed_{};
399
  bool write_half_closed_{};
400
  // upstream_request_ has to be destroyed first as they may use CombinedUpstream parent
401
  // during destruction.
402
  UpstreamRequestPtr upstream_request_;
403
};
404
405
} // namespace TcpProxy
406
} // namespace Envoy