1
#pragma once
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
#include <vector>
8

            
9
#include "envoy/access_log/access_log.h"
10
#include "envoy/common/random_generator.h"
11
#include "envoy/event/timer.h"
12
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
13
#include "envoy/http/codec.h"
14
#include "envoy/http/header_evaluator.h"
15
#include "envoy/http/request_id_extension.h"
16
#include "envoy/network/connection.h"
17
#include "envoy/network/filter.h"
18
#include "envoy/runtime/runtime.h"
19
#include "envoy/server/filter_config.h"
20
#include "envoy/stats/scope.h"
21
#include "envoy/stats/stats_macros.h"
22
#include "envoy/stats/timespan.h"
23
#include "envoy/stream_info/filter_state.h"
24
#include "envoy/upstream/cluster_manager.h"
25
#include "envoy/upstream/upstream.h"
26

            
27
#include "source/common/buffer/buffer_impl.h"
28
#include "source/common/common/assert.h"
29
#include "source/common/common/logger.h"
30
#include "source/common/formatter/substitution_format_string.h"
31
#include "source/common/http/header_map_impl.h"
32
#include "source/common/network/cidr_range.h"
33
#include "source/common/network/filter_impl.h"
34
#include "source/common/network/hash_policy.h"
35
#include "source/common/network/utility.h"
36
#include "source/common/stream_info/stream_info_impl.h"
37
#include "source/common/tcp_proxy/upstream.h"
38
#include "source/common/upstream/load_balancer_context_base.h"
39
#include "source/common/upstream/od_cds_api_impl.h"
40

            
41
#include "absl/container/node_hash_map.h"
42

            
43
namespace Envoy {
44
namespace TcpProxy {
45

            
46
constexpr absl::string_view PerConnectionIdleTimeoutMs =
47
    "envoy.tcp_proxy.per_connection_idle_timeout_ms";
48
/**
49
 * ReceiveBeforeConnectKey is the key for the receive_before_connect filter state. The
50
 * filter state value is a ``StreamInfo::BoolAccessor`` indicating whether the
51
 * receive_before_connect functionality should be enabled. Network filters setting this filter
52
 * state should return `StopIteration` in their `onNewConnection` and `onData` methods until they
53
 * have read the data they need before the upstream connection establishment, and only then allow
54
 * the filter chain to proceed to the TCP_PROXY filter.
55
 */
56
constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_before_connect";
57

            
58
/**
59
 * All tcp proxy stats. @see stats_macros.h
60
 */
61
#define ALL_TCP_PROXY_STATS(COUNTER, GAUGE)                                                        \
62
1425
  COUNTER(downstream_cx_no_route)                                                                  \
63
1425
  COUNTER(downstream_cx_rx_bytes_total)                                                            \
64
1425
  COUNTER(downstream_cx_total)                                                                     \
65
1425
  COUNTER(downstream_cx_tx_bytes_total)                                                            \
66
1425
  COUNTER(downstream_flow_control_paused_reading_total)                                            \
67
1425
  COUNTER(downstream_flow_control_resumed_reading_total)                                           \
68
1425
  COUNTER(early_data_received_count_total)                                                         \
69
1425
  COUNTER(idle_timeout)                                                                            \
70
1425
  COUNTER(max_downstream_connection_duration)                                                      \
71
1425
  COUNTER(upstream_flush_total)                                                                    \
72
1425
  GAUGE(downstream_cx_rx_bytes_buffered, Accumulate)                                               \
73
1425
  GAUGE(downstream_cx_tx_bytes_buffered, Accumulate)                                               \
74
1425
  GAUGE(upstream_flush_active, Accumulate)
75

            
76
/**
77
 * Tcp proxy stats for on-demand. These stats are generated only if the tcp proxy enables on demand.
78
 */
79
#define ON_DEMAND_TCP_PROXY_STATS(COUNTER)                                                         \
80
32
  COUNTER(on_demand_cluster_attempt)                                                               \
81
32
  COUNTER(on_demand_cluster_missing)                                                               \
82
32
  COUNTER(on_demand_cluster_timeout)                                                               \
83
32
  COUNTER(on_demand_cluster_success)
84

            
85
/**
86
 * Struct definition for all tcp proxy stats. @see stats_macros.h
87
 */
88
struct TcpProxyStats {
89
  ALL_TCP_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
90
};
91

            
92
/**
93
 * Struct definition for on-demand related tcp proxy stats. @see stats_macros.h
94
 * These stats are available if and only if the tcp proxy enables on-demand.
95
 * Note that these stats has the same prefix as `TcpProxyStats`.
96
 */
97
struct OnDemandStats {
98
  ON_DEMAND_TCP_PROXY_STATS(GENERATE_COUNTER_STRUCT)
99
};
100

            
101
class Drainer;
102
class UpstreamDrainManager;
103

            
104
/**
105
 * Route is an individual resolved route for a connection.
106
 */
107
class Route {
108
public:
109
1405
  virtual ~Route() = default;
110

            
111
  /**
112
   * Check whether this route matches a given connection.
113
   * @param connection supplies the connection to test against.
114
   * @return bool true if this route matches a given connection.
115
   */
116
  virtual bool matches(Network::Connection& connection) const PURE;
117

            
118
  /**
119
   * @return const std::string& the upstream cluster that owns the route.
120
   */
121
  virtual const std::string& clusterName() const PURE;
122

            
123
  /**
124
   * @return MetadataMatchCriteria* the metadata that a subset load balancer should match when
125
   * selecting an upstream host
126
   */
127
  virtual const Router::MetadataMatchCriteria* metadataMatchCriteria() const PURE;
128
};
129

            
130
using RouteConstSharedPtr = std::shared_ptr<const Route>;
131
using TunnelingConfig =
132
    envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
133

            
134
/**
135
 * Response headers for the tunneling connections.
136
 */
137
class TunnelResponseHeaders : public Http::TunnelResponseHeadersOrTrailersImpl {
138
public:
139
  TunnelResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers)
140
88
      : response_headers_(std::move(response_headers)) {}
141
66
  const Http::HeaderMap& value() const override { return *response_headers_; }
142
  static const std::string& key();
143

            
144
private:
145
  const Http::ResponseHeaderMapPtr response_headers_;
146
};
147

            
148
/**
149
 * Response trailers for the tunneling connections.
150
 */
151
class TunnelResponseTrailers : public Http::TunnelResponseHeadersOrTrailersImpl {
152
public:
153
  TunnelResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers)
154
14
      : response_trailers_(std::move(response_trailers)) {}
155
14
  const Http::HeaderMap& value() const override { return *response_trailers_; }
156
  static const std::string& key();
157

            
158
private:
159
  const Http::ResponseTrailerMapPtr response_trailers_;
160
};
161

            
162
class Config;
163
class TunnelingConfigHelperImpl : public TunnelingConfigHelper,
164
                                  protected Logger::Loggable<Logger::Id::filter> {
165
public:
166
  TunnelingConfigHelperImpl(
167
      Stats::Scope& scope,
168
      const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config_message,
169
      Server::Configuration::FactoryContext& context);
170
  std::string host(const StreamInfo::StreamInfo& stream_info) const override;
171
2417
  bool usePost() const override { return !post_path_.empty(); }
172
36
  const std::string& postPath() const override { return post_path_; }
173
910
  Envoy::Http::HeaderEvaluator& headerEvaluator() const override { return *header_parser_; }
174
964
  const Envoy::Http::RequestIDExtensionSharedPtr& requestIDExtension() const override {
175
964
    return request_id_extension_;
176
964
  }
177

            
178
2497
  const Envoy::Router::FilterConfig& routerFilterConfig() const override { return router_config_; }
179
  void
180
  propagateResponseHeaders(Http::ResponseHeaderMapPtr&& headers,
181
                           const StreamInfo::FilterStateSharedPtr& filter_state) const override;
182
  void
183
  propagateResponseTrailers(Http::ResponseTrailerMapPtr&& trailers,
184
                            const StreamInfo::FilterStateSharedPtr& filter_state) const override;
185
13
  Server::Configuration::ServerFactoryContext& serverFactoryContext() const override {
186
13
    return server_factory_context_;
187
13
  }
188
54
  const std::string& requestIDHeader() const override { return request_id_header_; }
189
54
  const std::string& requestIDMetadataKey() const override { return request_id_metadata_key_; }
190

            
191
private:
192
  std::unique_ptr<Envoy::Router::HeaderParser> header_parser_;
193
  Formatter::FormatterPtr hostname_fmt_;
194
  const bool propagate_response_headers_;
195
  const bool propagate_response_trailers_;
196
  std::string post_path_;
197
  // Request ID extension for tunneling requests. If null, no request ID is generated.
198
  Envoy::Http::RequestIDExtensionSharedPtr request_id_extension_;
199
  // Optional overrides for request ID header name and metadata key.
200
  std::string request_id_header_;
201
  std::string request_id_metadata_key_;
202
  Stats::StatNameManagedStorage route_stat_name_storage_;
203
  const Router::FilterConfig router_config_;
204
  Server::Configuration::ServerFactoryContext& server_factory_context_;
205
};
206

            
207
/**
208
 * On demand configurations.
209
 */
210
class OnDemandConfig {
211
public:
212
  OnDemandConfig(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_OnDemand&
213
                     on_demand_message,
214
                 Server::Configuration::FactoryContext& context, Stats::Scope& scope);
215
56
  Upstream::OdCdsApiHandle& onDemandCds() const { return *odcds_; }
216
56
  std::chrono::milliseconds timeout() const { return lookup_timeout_; }
217
108
  const OnDemandStats& stats() const { return stats_; }
218

            
219
private:
220
  static OnDemandStats generateStats(Stats::Scope& scope);
221
  Upstream::OdCdsApiHandlePtr odcds_;
222
  // The timeout of looking up the on-demand cluster.
223
  std::chrono::milliseconds lookup_timeout_;
224
  // On demand stats.
225
  OnDemandStats stats_;
226
};
227
using OnDemandConfigOptConstRef = OptRef<const OnDemandConfig>;
228

            
229
/**
230
 * Filter configuration.
231
 *
232
 * This configuration holds a TLS slot, and therefore it must be destructed
233
 * on the main thread.
234
 */
235
class Config {
236
public:
237
  /**
238
   * Configuration that can be shared and have an arbitrary lifetime safely.
239
   */
240
  class SharedConfig {
241
  public:
242
    SharedConfig(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
243
                 Server::Configuration::FactoryContext& context);
244
48645
    const TcpProxyStats& stats() { return stats_; }
245
2504
    const absl::optional<std::chrono::milliseconds>& idleTimeout() { return idle_timeout_; }
246
2218
    bool flushAccessLogOnConnected() const { return flush_access_log_on_connected_; }
247
2503
    bool flushAccessLogOnStart() const { return flush_access_log_on_start_; }
248
2512
    const absl::optional<std::chrono::milliseconds>& maxDownstreamConnectionDuration() const {
249
2512
      return max_downstream_connection_duration_;
250
2512
    }
251
16
    const absl::optional<double>& maxDownstreamConnectionDurationJitterPercentage() const {
252
16
      return max_downstream_connection_duration_jitter_percentage_;
253
16
    }
254
2519
    const absl::optional<std::chrono::milliseconds>& accessLogFlushInterval() const {
255
2519
      return access_log_flush_interval_;
256
2519
    }
257
2568
    TunnelingConfigHelperOptConstRef tunnelingConfigHelper() {
258
2568
      return makeOptRefFromPtr<const TunnelingConfigHelper>(tunneling_config_helper_.get());
259
2568
    }
260
222
    OnDemandConfigOptConstRef onDemandConfig() {
261
222
      return makeOptRefFromPtr<const OnDemandConfig>(on_demand_config_.get());
262
222
    }
263
148
    const BackOffStrategyPtr& backoffStrategy() const { return backoff_strategy_; };
264
    const Network::ProxyProtocolTLVVector& proxyProtocolTLVs() const {
265
      return proxy_protocol_tlvs_;
266
    }
267
    envoy::extensions::filters::network::tcp_proxy::v3::ProxyProtocolTlvMergePolicy
268
131
    proxyProtocolTlvMergePolicy() const {
269
131
      return proxy_protocol_tlv_merge_policy_;
270
131
    }
271

            
272
    // Evaluate dynamic TLV formatters and combine with static TLVs.
273
    Network::ProxyProtocolTLVVector
274
    evaluateDynamicTLVs(const StreamInfo::StreamInfo& stream_info) const;
275

            
276
  private:
277
    // Structure to hold TLV formatter information.
278
    struct TlvFormatter {
279
      uint8_t type;
280
      Formatter::FormatterPtr formatter;
281
    };
282

            
283
    static TcpProxyStats generateStats(Stats::Scope& scope);
284

            
285
    static Network::ProxyProtocolTLVVector
286
    parseTLVs(absl::Span<const envoy::config::core::v3::TlvEntry* const> tlvs,
287
              Server::Configuration::GenericFactoryContext& context,
288
              std::vector<TlvFormatter>& dynamic_tlvs);
289

            
290
    // Hold a Scope for the lifetime of the configuration because connections in
291
    // the UpstreamDrainManager can live longer than the listener.
292
    const Stats::ScopeSharedPtr stats_scope_;
293

            
294
    const TcpProxyStats stats_;
295
    bool flush_access_log_on_connected_ : 1;
296
    const bool flush_access_log_on_start_ : 1;
297
    absl::optional<std::chrono::milliseconds> idle_timeout_;
298
    absl::optional<std::chrono::milliseconds> max_downstream_connection_duration_;
299
    absl::optional<double> max_downstream_connection_duration_jitter_percentage_;
300
    absl::optional<std::chrono::milliseconds> access_log_flush_interval_;
301
    std::unique_ptr<TunnelingConfigHelper> tunneling_config_helper_;
302
    std::unique_ptr<OnDemandConfig> on_demand_config_;
303
    BackOffStrategyPtr backoff_strategy_;
304
    Network::ProxyProtocolTLVVector proxy_protocol_tlvs_;
305
    std::vector<TlvFormatter> dynamic_tlv_formatters_;
306
    envoy::extensions::filters::network::tcp_proxy::v3::ProxyProtocolTlvMergePolicy
307
        proxy_protocol_tlv_merge_policy_{
308
            envoy::extensions::filters::network::tcp_proxy::v3::ADD_IF_ABSENT};
309
  };
310

            
311
  using SharedConfigSharedPtr = std::shared_ptr<SharedConfig>;
312

            
313
  Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
314
         Server::Configuration::FactoryContext& context);
315

            
316
  /**
317
   * Find out which cluster an upstream connection should be opened to based on the
318
   * parameters of a downstream connection.
319
   * @param connection supplies the parameters of the downstream connection for
320
   * which the proxy needs to open the corresponding upstream.
321
   * @return the route to be used for the upstream connection.
322
   * If no route applies, returns nullptr.
323
   */
324
  RouteConstSharedPtr getRouteFromEntries(Network::Connection& connection);
325
  RouteConstSharedPtr getRegularRouteFromEntries(Network::Connection& connection);
326

            
327
48607
  const TcpProxyStats& stats() { return shared_config_->stats(); }
328
2724
  const AccessLog::InstanceSharedPtrVector& accessLogs() { return access_logs_; }
329
257
  uint32_t maxConnectAttempts() const { return max_connect_attempts_; }
330
2501
  const absl::optional<std::chrono::milliseconds>& idleTimeout() {
331
2501
    return shared_config_->idleTimeout();
332
2501
  }
333
2512
  const absl::optional<std::chrono::milliseconds>& maxDownstreamConnectionDuration() const {
334
2512
    return shared_config_->maxDownstreamConnectionDuration();
335
2512
  }
336
16
  const absl::optional<double>& maxDownstreamConnectionDurationJitterPercentage() const {
337
16
    return shared_config_->maxDownstreamConnectionDurationJitterPercentage();
338
16
  }
339
  const absl::optional<std::chrono::milliseconds>
340
  calculateMaxDownstreamConnectionDurationWithJitter();
341
2514
  const absl::optional<std::chrono::milliseconds>& accessLogFlushInterval() const {
342
2514
    return shared_config_->accessLogFlushInterval();
343
2514
  }
344
  // Return nullptr if there is no tunneling config.
345
2568
  TunnelingConfigHelperOptConstRef tunnelingConfigHelper() {
346
2568
    return shared_config_->tunnelingConfigHelper();
347
2568
  }
348
  UpstreamDrainManager& drainManager();
349
2623
  SharedConfigSharedPtr sharedConfig() { return shared_config_; }
350
167
  const Router::MetadataMatchCriteria* metadataMatchCriteria() const {
351
167
    return cluster_metadata_match_criteria_.get();
352
167
  }
353
157
  const Network::HashPolicy* hashPolicy() { return hash_policy_.get(); }
354
58
  OptRef<Upstream::OdCdsApiHandle> onDemandCds() const {
355
58
    auto on_demand_config = shared_config_->onDemandConfig();
356
58
    return on_demand_config.has_value() ? makeOptRef(on_demand_config->onDemandCds())
357
58
                                        : OptRef<Upstream::OdCdsApiHandle>();
358
58
  }
359
  // This function must not be called if on demand is disabled.
360
56
  std::chrono::milliseconds odcdsTimeout() const {
361
56
    return shared_config_->onDemandConfig()->timeout();
362
56
  }
363
  // This function must not be called if on demand is disabled.
364
108
  const OnDemandStats& onDemandStats() const { return shared_config_->onDemandConfig()->stats(); }
365
2512
  Random::RandomGenerator& randomGenerator() { return random_generator_; }
366
2213
  bool flushAccessLogOnConnected() const { return shared_config_->flushAccessLogOnConnected(); }
367
2501
  bool flushAccessLogOnStart() const { return shared_config_->flushAccessLogOnStart(); }
368
533
  Regex::Engine& regexEngine() const { return regex_engine_; }
369
148
  const BackOffStrategyPtr& backoffStrategy() const { return shared_config_->backoffStrategy(); };
370
  const Network::ProxyProtocolTLVVector& proxyProtocolTLVs() const {
371
    return shared_config_->proxyProtocolTLVs();
372
  }
373

            
374
  envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode
375
2530
  upstreamConnectMode() const {
376
2530
    return upstream_connect_mode_;
377
2530
  }
378

            
379
2590
  const absl::optional<uint32_t>& maxEarlyDataBytes() const { return max_early_data_bytes_; }
380

            
381
private:
382
  struct SimpleRouteImpl : public Route {
383
    SimpleRouteImpl(const Config& parent, absl::string_view cluster_name);
384

            
385
    // Route
386
    bool matches(Network::Connection&) const override { return true; }
387
21892
    const std::string& clusterName() const override { return cluster_name_; }
388
151
    const Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
389
151
      return parent_.metadataMatchCriteria();
390
151
    }
391

            
392
    const Config& parent_;
393
    std::string cluster_name_;
394
  };
395

            
396
  class WeightedClusterEntry : public Route {
397
  public:
398
    WeightedClusterEntry(const Config& parent,
399
                         const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::
400
                             WeightedCluster::ClusterWeight& config);
401

            
402
87
    uint64_t clusterWeight() const { return cluster_weight_; }
403

            
404
    // Route
405
    bool matches(Network::Connection&) const override { return false; }
406
49
    const std::string& clusterName() const override { return cluster_name_; }
407
31
    const Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
408
31
      if (metadata_match_criteria_) {
409
16
        return metadata_match_criteria_.get();
410
16
      }
411
15
      return parent_.metadataMatchCriteria();
412
31
    }
413

            
414
  private:
415
    const Config& parent_;
416
    const std::string cluster_name_;
417
    const uint64_t cluster_weight_;
418
    Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
419
  };
420
  using WeightedClusterEntryConstSharedPtr = std::shared_ptr<const WeightedClusterEntry>;
421

            
422
  RouteConstSharedPtr default_route_;
423
  std::vector<WeightedClusterEntryConstSharedPtr> weighted_clusters_;
424
  uint64_t total_cluster_weight_;
425
  AccessLog::InstanceSharedPtrVector access_logs_;
426
  const uint32_t max_connect_attempts_;
427
  ThreadLocal::SlotPtr upstream_drain_manager_slot_;
428
  SharedConfigSharedPtr shared_config_;
429
  std::unique_ptr<const Router::MetadataMatchCriteria> cluster_metadata_match_criteria_;
430
  Random::RandomGenerator& random_generator_;
431
  std::unique_ptr<const Network::HashPolicyImpl> hash_policy_;
432
  Regex::Engine& regex_engine_; // Static lifetime object, safe to store as a reference
433
  envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode upstream_connect_mode_{
434
      envoy::extensions::filters::network::tcp_proxy::v3::IMMEDIATE};
435
  absl::optional<uint32_t> max_early_data_bytes_;
436
};
437

            
438
using ConfigSharedPtr = std::shared_ptr<Config>;
439

            
440
/**
441
 * Per-connection TCP Proxy Cluster configuration.
442
 */
443
class PerConnectionCluster : public StreamInfo::FilterState::Object {
444
public:
445
6
  PerConnectionCluster(absl::string_view cluster) : cluster_(cluster) {}
446
4
  const std::string& value() const { return cluster_; }
447
1
  absl::optional<std::string> serializeAsString() const override { return cluster_; }
448
  static const std::string& key();
449

            
450
private:
451
  const std::string cluster_;
452
};
453

            
454
/**
455
 * An implementation of a TCP (L3/L4) proxy. This filter will instantiate a new outgoing TCP
456
 * connection using the defined load balancing proxy for the configured cluster. All data will
457
 * be proxied back and forth between the two connections.
458
 */
459
class Filter : public Network::ReadFilter,
460
               public Upstream::LoadBalancerContextBase,
461
               protected Logger::Loggable<Logger::Id::filter>,
462
               public GenericConnectionPoolCallbacks {
463
public:
464
  Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
465
  ~Filter() override;
466

            
467
  // Network::ReadFilter
468
  Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
469
  Network::FilterStatus onNewConnection() override;
470
  void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
471
  bool startUpstreamSecureTransport() override;
472

            
473
  // GenericConnectionPoolCallbacks
474
  void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
475
                          Upstream::HostDescriptionConstSharedPtr& host,
476
                          const Network::ConnectionInfoProvider& address_provider,
477
                          Ssl::ConnectionInfoConstSharedPtr ssl_info) override;
478
  void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
479
                            absl::string_view failure_reason,
480
                            Upstream::HostDescriptionConstSharedPtr host) override;
481

            
482
  // Upstream::LoadBalancerContext
483
  const Router::MetadataMatchCriteria* metadataMatchCriteria() override;
484
154
  absl::optional<uint64_t> computeHashKey() override {
485
154
    auto hash_policy = config_->hashPolicy();
486
154
    if (hash_policy) {
487
4
      return hash_policy->generateHash(*downstreamConnection());
488
4
    }
489

            
490
150
    return {};
491
154
  }
492
4932
  const Network::Connection* downstreamConnection() const override {
493
4932
    return &read_callbacks_->connection();
494
4932
  }
495

            
496
17
  StreamInfo::StreamInfo* requestStreamInfo() const override {
497
17
    return &read_callbacks_->connection().streamInfo();
498
17
  }
499

            
500
5913
  Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const override {
501
5913
    return transport_socket_options_;
502
5913
  }
503

            
504
2379
  Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override {
505
2379
    return upstream_options_;
506
2379
  }
507

            
508
  // These two functions allow enabling/disabling reads on the upstream and downstream connections.
509
  // They are called by the Downstream/Upstream Watermark callbacks to limit buffering.
510
  void readDisableUpstream(bool disable);
511
  void readDisableDownstream(bool disable);
512

            
513
  struct UpstreamCallbacks : public Tcp::ConnectionPool::UpstreamCallbacks {
514
2522
    UpstreamCallbacks(Filter* parent) : parent_(parent) {}
515

            
516
    // Tcp::ConnectionPool::UpstreamCallbacks
517
    void onUpstreamData(Buffer::Instance& data, bool end_stream) override;
518
    void onEvent(Network::ConnectionEvent event) override;
519
    void onAboveWriteBufferHighWatermark() override;
520
    void onBelowWriteBufferLowWatermark() override;
521

            
522
    void onBytesSent();
523
    void onIdleTimeout();
524
    void drain(Drainer& drainer);
525

            
526
    // Either parent_ or drainer_ will be non-NULL, but never both. This could be
527
    // logically be represented as a union, but saving one pointer of memory is
528
    // outweighed by more type safety/better error handling.
529
    //
530
    // Parent starts out as non-NULL. If the downstream connection is closed while
531
    // the upstream connection still has buffered data to flush, drainer_ becomes
532
    // non-NULL and parent_ is set to NULL.
533
    Filter* parent_{};
534
    Drainer* drainer_{};
535

            
536
    bool on_high_watermark_called_{false};
537
  };
538

            
539
  StreamInfo::StreamInfo& getStreamInfo();
540
  class HttpStreamDecoderFilterCallbacks : public Http::StreamDecoderFilterCallbacks,
541
                                           public ScopeTrackedObject {
542
  public:
543
    HttpStreamDecoderFilterCallbacks(Filter* parent);
544
    // Http::StreamDecoderFilterCallbacks
545
1265
    OptRef<const Network::Connection> connection() override {
546
1265
      return parent_->read_callbacks_->connection();
547
1265
    }
548
4099
    StreamInfo::StreamInfo& streamInfo() override { return parent_->getStreamInfo(); }
549
211880
    const ScopeTrackedObject& scope() override { return *this; }
550
213896
    Event::Dispatcher& dispatcher() override {
551
213896
      return parent_->read_callbacks_->connection().dispatcher();
552
213896
    }
553
2
    void resetStream(Http::StreamResetReason, absl::string_view) override {
554
2
      IS_ENVOY_BUG("Not implemented. Unexpected call to resetStream()");
555
2
    };
556
1265
    Router::RouteConstSharedPtr route() override { return route_; }
557
19089
    Upstream::ClusterInfoConstSharedPtr clusterInfo() override {
558
19089
      return parent_->cluster_manager_.getThreadLocalCluster(parent_->route_->clusterName())
559
19089
          ->info();
560
19089
    }
561
425
    uint64_t streamId() const override {
562
425
      auto sip = parent_->getStreamInfo().getStreamIdProvider();
563
425
      if (sip) {
564
421
        return sip->toInteger().value();
565
421
      }
566
4
      return 0;
567
425
    }
568
423
    Tracing::Span& activeSpan() override { return parent_->active_span_; }
569
423
    OptRef<const Tracing::Config> tracingConfig() const override {
570
423
      return makeOptRef<const Tracing::Config>(parent_->tracing_config_);
571
423
    }
572
2
    void continueDecoding() override {}
573
4
    void addDecodedData(Buffer::Instance&, bool) override {}
574
2
    void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {}
575
2
    Http::RequestTrailerMap& addDecodedTrailers() override { return *request_trailer_map_; }
576
2
    Http::MetadataMapVector& addDecodedMetadata() override {
577
2
      static Http::MetadataMapVector metadata_map_vector;
578
2
      return metadata_map_vector;
579
2
    }
580
2
    const Buffer::Instance* decodingBuffer() override { return nullptr; }
581
2
    void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
582
    void sendLocalReply(Http::Code, absl::string_view,
583
                        std::function<void(Http::ResponseHeaderMap& headers)>,
584
                        const absl::optional<Grpc::Status::GrpcStatus>,
585
                        absl::string_view) override {}
586
    void sendGoAwayAndClose(bool graceful [[maybe_unused]] = false) override {}
587
2
    void encode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {}
588
2
    Http::ResponseHeaderMapOptRef informationalHeaders() override { return {}; }
589
2
    void encodeHeaders(Http::ResponseHeaderMapPtr&&, bool, absl::string_view) override {}
590
2
    Http::ResponseHeaderMapOptRef responseHeaders() override { return {}; }
591
2
    void encodeData(Buffer::Instance&, bool) override {}
592
2
    Http::RequestHeaderMapOptRef requestHeaders() override { return {}; }
593
2
    Http::RequestTrailerMapOptRef requestTrailers() override { return {}; }
594
2
    void encodeTrailers(Http::ResponseTrailerMapPtr&&) override {}
595
2
    Http::ResponseTrailerMapOptRef responseTrailers() override { return {}; }
596
2
    void encodeMetadata(Http::MetadataMapPtr&&) override {}
597
8915
    void onDecoderFilterAboveWriteBufferHighWatermark() override {
598
8915
      parent_->upstream_callbacks_->onAboveWriteBufferHighWatermark();
599
8915
    }
600
8915
    void onDecoderFilterBelowWriteBufferLowWatermark() override {
601
8915
      parent_->upstream_callbacks_->onBelowWriteBufferLowWatermark();
602
8915
    }
603
421
    void addDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks&) override {}
604
421
    void removeDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks&) override {}
605
4
    void setBufferLimit(uint64_t) override {}
606
423
    uint64_t bufferLimit() override { return 0; }
607
2
    bool recreateStream(const Http::ResponseHeaderMap*) override { return false; }
608
2
    void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
609
2
    Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return nullptr; }
610
2
    const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override {
611
2
      return nullptr;
612
2
    }
613
    Router::RouteSpecificFilterConfigs perFilterConfigs() const override { return {}; }
614
844
    Buffer::BufferMemoryAccountSharedPtr account() const override { return nullptr; }
615
2
    void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override {}
616
    absl::optional<Upstream::LoadBalancerContext::OverrideHost>
617
2
    upstreamOverrideHost() const override {
618
2
      return absl::nullopt;
619
2
    }
620
    bool shouldLoadShed() const override { return false; }
621
    void restoreContextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) override {
622
      tracked_object_stack.add(*this);
623
    }
624
2
    Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override { return {}; }
625
2
    OptRef<Http::DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return {}; }
626
2
    OptRef<Http::UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {}; }
627
2
    void resetIdleTimer() override {}
628
    // absl::optional<absl::string_view> upstreamOverrideHost() const override {
629
    //   return absl::nullopt;
630
    // }
631
2
    absl::string_view filterConfigName() const override { return ""; }
632

            
633
    // ScopeTrackedObject
634
2
    void dumpState(std::ostream& os, int indent_level) const override {
635
2
      const char* spaces = spacesForLevel(indent_level);
636
2
      os << spaces << "TcpProxy " << this << DUMP_MEMBER(streamId()) << "\n";
637
2
      DUMP_DETAILS(parent_->getStreamInfo().upstreamInfo());
638
2
    }
639
    Filter* parent_{};
640
    Http::RequestTrailerMapPtr request_trailer_map_;
641
    std::shared_ptr<Http::NullRouteImpl> route_;
642
  };
643
  Tracing::NullSpan active_span_;
644
  const Tracing::Config& tracing_config_;
645

            
646
protected:
647
  struct DownstreamCallbacks : public Network::ConnectionCallbacks {
648
2522
    DownstreamCallbacks(Filter& parent) : parent_(parent) {}
649

            
650
    // Network::ConnectionCallbacks
651
4778
    void onEvent(Network::ConnectionEvent event) override { parent_.onDownstreamEvent(event); }
652
    void onAboveWriteBufferHighWatermark() override;
653
    void onBelowWriteBufferLowWatermark() override;
654

            
655
    Filter& parent_;
656
    bool on_high_watermark_called_{false};
657
  };
658

            
659
  enum class UpstreamFailureReason {
660
    ConnectFailed,
661
    NoHealthyUpstream,
662
    ResourceLimitExceeded,
663
    NoRoute,
664
  };
665

            
666
  // Callbacks for different error and success states during connection establishment
667
2607
  virtual RouteConstSharedPtr pickRoute() {
668
2607
    return config_->getRouteFromEntries(read_callbacks_->connection());
669
2607
  }
670

            
671
  virtual void onInitFailure(UpstreamFailureReason reason);
672
  void initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats);
673

            
674
  // Create connection to the upstream cluster. This function can be repeatedly called on upstream
675
  // connection failure.
676
  Network::FilterStatus establishUpstreamConnection();
677

            
678
  // The callback upon on demand cluster discovery response.
679
  void onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status);
680

            
681
  bool maybeTunnel(Upstream::ThreadLocalCluster& cluster);
682
  void onConnectMaxAttempts();
683
  void onConnectTimeout();
684
  void onDownstreamEvent(Network::ConnectionEvent event);
685
  void onUpstreamData(Buffer::Instance& data, bool end_stream);
686
  void onUpstreamEvent(Network::ConnectionEvent event);
687
  void onUpstreamConnection();
688
  void onIdleTimeout();
689
  void resetIdleTimer();
690
  void disableIdleTimer();
691
  void onMaxDownstreamConnectionDuration();
692
  void onAccessLogFlushInterval();
693
  void resetAccessLogFlushTimer();
694
  void flushAccessLog(AccessLog::AccessLogType access_log_type);
695
  void disableAccessLogFlushTimer();
696
  void onRetryTimer();
697
  void enableRetryTimer();
698
  void disableRetryTimer();
699

            
700
public:
701
  // Public for testing purposes
702
  void onDownstreamTlsHandshakeComplete();
703

            
704
protected:
705
  const ConfigSharedPtr config_;
706
  Upstream::ClusterManager& cluster_manager_;
707
  Network::ReadFilterCallbacks* read_callbacks_{};
708

            
709
  DownstreamCallbacks downstream_callbacks_;
710
  Event::TimerPtr idle_timer_;
711
  Event::TimerPtr connection_duration_timer_;
712
  Event::TimerPtr access_log_flush_timer_;
713
  Event::TimerPtr retry_timer_;
714

            
715
  // A pointer to the on demand cluster lookup when lookup is in flight.
716
  Upstream::ClusterDiscoveryCallbackHandlePtr cluster_discovery_handle_;
717

            
718
  std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
719
                                                          // read filter.
720
  // The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist
721
  // until either the upstream or downstream connection is terminated.
722
  std::unique_ptr<GenericUpstream> upstream_;
723
  // The connection pool used to set up |upstream_|.
724
  // This will be non-null from when an upstream connection is attempted until
725
  // it either succeeds or fails.
726
  std::unique_ptr<GenericConnPool> generic_conn_pool_;
727
  // Time the filter first attempted to connect to the upstream after the
728
  // cluster is discovered. Capture the first time as the filter may try multiple times to connect
729
  // to the upstream.
730
  absl::optional<MonotonicTime> initial_upstream_connection_start_time_;
731
  RouteConstSharedPtr route_;
732
  Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
733
  Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
734
  Network::Socket::OptionsSharedPtr upstream_options_;
735
  absl::optional<std::chrono::milliseconds> idle_timeout_;
736
  uint32_t connect_attempts_{};
737
  bool connecting_{};
738
  bool downstream_closed_{};
739
  // Stores the ReceiveBeforeConnect filter state value which can be set by preceding
740
  // filters in the filter chain. When the filter state is set, TCP_PROXY doesn't disable
741
  // downstream read during initialization. This feature can hence be used by preceding filters
742
  // in the filter chain to read data from the downstream connection (for eg: to parse SNI) before
743
  // the upstream connection is established.
744
  bool receive_before_connect_{false};
745
  bool early_data_end_stream_{false};
746
  Buffer::OwnedImpl early_data_buffer_{};
747
  HttpStreamDecoderFilterCallbacks upstream_decoder_filter_callbacks_;
748

            
749
  // Connection establishment mode configuration.
750
  envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode connect_mode_{
751
      envoy::extensions::filters::network::tcp_proxy::v3::IMMEDIATE};
752
  bool waiting_for_tls_handshake_{false};
753
  bool tls_handshake_complete_{false};
754
  bool initial_data_received_{false};
755
  bool read_disabled_due_to_buffer_{false}; // Track if we disabled reading due to buffer overflow.
756
  uint32_t max_buffered_bytes_{65536};      // Default 64KB.
757
};
758

            
759
// This class deals with an upstream connection that needs to finish flushing, when the downstream
760
// connection has been closed. The TcpProxy is destroyed when the downstream connection is closed,
761
// so handling the upstream connection here allows it to finish draining or timeout.
762
class Drainer : public Event::DeferredDeletable, protected Logger::Loggable<Logger::Id::filter> {
763
public:
764
  Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
765
          const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
766
          Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
767
          absl::optional<std::chrono::milliseconds> idle_timeout,
768
          const Upstream::HostDescriptionConstSharedPtr& upstream_host);
769

            
770
  void onEvent(Network::ConnectionEvent event);
771
  void onData(Buffer::Instance& data, bool end_stream);
772
  void onIdleTimeout();
773
  void onBytesSent();
774
  void cancelDrain();
775
  Event::Dispatcher& dispatcher();
776

            
777
private:
778
  UpstreamDrainManager& parent_;
779
  std::shared_ptr<Filter::UpstreamCallbacks> callbacks_;
780
  Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_;
781
  Event::TimerPtr idle_timer_;
782
  absl::optional<std::chrono::milliseconds> idle_timeout_;
783
  Upstream::HostDescriptionConstSharedPtr upstream_host_;
784
  Config::SharedConfigSharedPtr config_;
785
};
786

            
787
using DrainerPtr = std::unique_ptr<Drainer>;
788

            
789
class UpstreamDrainManager : public ThreadLocal::ThreadLocalObject {
790
public:
791
  ~UpstreamDrainManager() override;
792
  void add(const Config::SharedConfigSharedPtr& config,
793
           Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
794
           const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
795
           Event::TimerPtr&& idle_timer, absl::optional<std::chrono::milliseconds> idle_timeout,
796
           const Upstream::HostDescriptionConstSharedPtr& upstream_host);
797
  void remove(Drainer& drainer, Event::Dispatcher& dispatcher);
798

            
799
private:
800
  // This must be a map instead of set because there is no way to move elements
801
  // out of a set, and these elements get passed to deferredDelete() instead of
802
  // being deleted in-place. The key and value will always be equal.
803
  absl::node_hash_map<Drainer*, DrainerPtr> drainers_;
804
};
805

            
806
} // namespace TcpProxy
807
} // namespace Envoy