1
#include "source/common/tcp_proxy/tcp_proxy.h"
2

            
3
#include <cstdint>
4
#include <memory>
5
#include <string>
6

            
7
#include "envoy/buffer/buffer.h"
8
#include "envoy/config/accesslog/v3/accesslog.pb.h"
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
12
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h"
13
#include "envoy/extensions/request_id/uuid/v3/uuid.pb.h"
14
#include "envoy/registry/registry.h"
15
#include "envoy/stats/scope.h"
16
#include "envoy/stream_info/bool_accessor.h"
17
#include "envoy/upstream/cluster_manager.h"
18
#include "envoy/upstream/upstream.h"
19

            
20
#include "source/common/access_log/access_log_impl.h"
21
#include "source/common/common/assert.h"
22
#include "source/common/common/empty_string.h"
23
#include "source/common/common/enum_to_int.h"
24
#include "source/common/common/fmt.h"
25
#include "source/common/common/macros.h"
26
#include "source/common/common/utility.h"
27
#include "source/common/config/metadata.h"
28
#include "source/common/config/utility.h"
29
#include "source/common/config/well_known_names.h"
30
#include "source/common/http/request_id_extension_impl.h"
31
#include "source/common/network/application_protocol.h"
32
#include "source/common/network/proxy_protocol_filter_state.h"
33
#include "source/common/network/socket_option_factory.h"
34
#include "source/common/network/transport_socket_options_impl.h"
35
#include "source/common/network/upstream_server_name.h"
36
#include "source/common/network/upstream_socket_options_filter_state.h"
37
#include "source/common/router/metadatamatchcriteria_impl.h"
38
#include "source/common/router/shadow_writer_impl.h"
39
#include "source/common/stream_info/stream_id_provider_impl.h"
40
#include "source/common/stream_info/uint64_accessor_impl.h"
41
#include "source/common/tracing/http_tracer_impl.h"
42

            
43
#include "absl/container/flat_hash_set.h"
44

            
45
namespace Envoy {
46
namespace TcpProxy {
47

            
48
// Type alias for UpstreamConnectMode to simplify usage throughout this file.
49
using UpstreamConnectMode = envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode;
50

            
51
2673
const std::string& PerConnectionCluster::key() {
52
2673
  CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.cluster");
53
2673
}
54

            
55
class PerConnectionClusterFactory : public StreamInfo::FilterState::ObjectFactory {
56
public:
57
86
  std::string name() const override { return PerConnectionCluster::key(); }
58
  std::unique_ptr<StreamInfo::FilterState::Object>
59
2
  createFromBytes(absl::string_view data) const override {
60
2
    return std::make_unique<PerConnectionCluster>(data);
61
2
  }
62
};
63

            
64
REGISTER_FACTORY(PerConnectionClusterFactory, StreamInfo::FilterState::ObjectFactory);
65

            
66
class PerConnectionIdleTimeoutMsObjectFactory : public StreamInfo::FilterState::ObjectFactory {
67
public:
68
89
  std::string name() const override { return std::string(PerConnectionIdleTimeoutMs); }
69
  std::unique_ptr<StreamInfo::FilterState::Object>
70
4
  createFromBytes(absl::string_view data) const override {
71
4
    uint64_t duration_in_milliseconds = 0;
72
4
    if (absl::SimpleAtoi(data, &duration_in_milliseconds)) {
73
2
      return std::make_unique<StreamInfo::UInt64AccessorImpl>(duration_in_milliseconds);
74
2
    }
75

            
76
2
    return nullptr;
77
4
  }
78
};
79

            
80
REGISTER_FACTORY(PerConnectionIdleTimeoutMsObjectFactory, StreamInfo::FilterState::ObjectFactory);
81

            
82
Config::SimpleRouteImpl::SimpleRouteImpl(const Config& parent, absl::string_view cluster_name)
83
1368
    : parent_(parent), cluster_name_(cluster_name) {}
84

            
85
Config::WeightedClusterEntry::WeightedClusterEntry(
86
    const Config& parent, const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::
87
                              WeightedCluster::ClusterWeight& config)
88
37
    : parent_(parent), cluster_name_(config.name()), cluster_weight_(config.weight()) {
89
37
  if (config.has_metadata_match()) {
90
14
    const auto filter_it = config.metadata_match().filter_metadata().find(
91
14
        Envoy::Config::MetadataFilters::get().ENVOY_LB);
92
14
    if (filter_it != config.metadata_match().filter_metadata().end()) {
93
14
      if (parent.cluster_metadata_match_criteria_) {
94
8
        metadata_match_criteria_ =
95
8
            parent.cluster_metadata_match_criteria_->mergeMatchCriteria(filter_it->second);
96
8
      } else {
97
6
        metadata_match_criteria_ =
98
6
            std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
99
6
      }
100
14
    }
101
14
  }
102
37
}
103

            
104
OnDemandConfig::OnDemandConfig(
105
    const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_OnDemand& on_demand_message,
106
    Server::Configuration::FactoryContext& context, Stats::Scope& scope)
107
32
    : odcds_([&]() -> Upstream::OdCdsApiHandlePtr {
108
32
        if (Runtime::runtimeFeatureEnabled(
109
32
                "envoy.reloadable_features.tcp_proxy_odcds_over_ads_fix") &&
110
32
            on_demand_message.odcds_config().config_source_specifier_case() ==
111
32
                envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds) {
112
8
          return THROW_OR_RETURN_VALUE(
113
8
              context.serverFactoryContext().clusterManager().allocateOdCdsApi(
114
8
                  &Upstream::XdstpOdCdsApiImpl::create, on_demand_message.odcds_config(),
115
8
                  OptRef<xds::core::v3::ResourceLocator>(), context.messageValidationVisitor()),
116
8
              Upstream::OdCdsApiHandlePtr);
117
8
        }
118
24
        return THROW_OR_RETURN_VALUE(
119
32
            context.serverFactoryContext().clusterManager().allocateOdCdsApi(
120
32
                &Upstream::OdCdsApiImpl::create, on_demand_message.odcds_config(),
121
32
                OptRef<xds::core::v3::ResourceLocator>(), context.messageValidationVisitor()),
122
32
            Upstream::OdCdsApiHandlePtr);
123
32
      }()),
124
      lookup_timeout_(
125
32
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(on_demand_message, timeout, 60000))),
126
32
      stats_(generateStats(scope)) {}
127

            
128
32
OnDemandStats OnDemandConfig::generateStats(Stats::Scope& scope) {
129
32
  return {ON_DEMAND_TCP_PROXY_STATS(POOL_COUNTER(scope))};
130
32
}
131

            
132
Config::SharedConfig::SharedConfig(
133
    const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
134
    Server::Configuration::FactoryContext& context)
135
1425
    : stats_scope_(context.scope().createScope(fmt::format("tcp.{}", config.stat_prefix()))),
136
1425
      stats_(generateStats(*stats_scope_)),
137
1425
      flush_access_log_on_start_(config.access_log_options().flush_access_log_on_start()),
138
1425
      proxy_protocol_tlv_merge_policy_(config.proxy_protocol_tlv_merge_policy()) {
139
1425
  if (config.has_idle_timeout()) {
140
102
    const uint64_t timeout = DurationUtil::durationToMilliseconds(config.idle_timeout());
141
102
    if (timeout > 0) {
142
65
      idle_timeout_ = std::chrono::milliseconds(timeout);
143
65
    }
144
1323
  } else {
145
1323
    idle_timeout_ = std::chrono::hours(1);
146
1323
  }
147
1425
  if (config.has_tunneling_config()) {
148
731
    tunneling_config_helper_ =
149
731
        std::make_unique<TunnelingConfigHelperImpl>(*stats_scope_.get(), config, context);
150
731
  }
151
1425
  if (config.has_max_downstream_connection_duration()) {
152
13
    const uint64_t connection_duration =
153
13
        DurationUtil::durationToMilliseconds(config.max_downstream_connection_duration());
154
13
    max_downstream_connection_duration_ = std::chrono::milliseconds(connection_duration);
155
13
  }
156
1425
  if (config.has_max_downstream_connection_duration_jitter_percentage()) {
157
10
    max_downstream_connection_duration_jitter_percentage_ =
158
10
        config.max_downstream_connection_duration_jitter_percentage().value();
159
10
  }
160

            
161
1425
  if (config.has_access_log_options()) {
162
25
    if (config.flush_access_log_on_connected() /* deprecated */) {
163
2
      throw EnvoyException(
164
2
          "Only one of flush_access_log_on_connected or access_log_options can be specified.");
165
2
    }
166

            
167
23
    if (config.has_access_log_flush_interval() /* deprecated */) {
168
2
      throw EnvoyException(
169
2
          "Only one of access_log_flush_interval or access_log_options can be specified.");
170
2
    }
171

            
172
21
    flush_access_log_on_connected_ = config.access_log_options().flush_access_log_on_connected();
173

            
174
21
    if (config.access_log_options().has_access_log_flush_interval()) {
175
6
      const uint64_t flush_interval = DurationUtil::durationToMilliseconds(
176
6
          config.access_log_options().access_log_flush_interval());
177
6
      access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
178
6
    }
179
1407
  } else {
180
1400
    flush_access_log_on_connected_ = config.flush_access_log_on_connected();
181

            
182
1400
    if (config.has_access_log_flush_interval()) {
183
1
      const uint64_t flush_interval =
184
1
          DurationUtil::durationToMilliseconds(config.access_log_flush_interval());
185
1
      access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
186
1
    }
187
1400
  }
188

            
189
1421
  if (config.has_on_demand() && config.on_demand().has_odcds_config()) {
190
32
    on_demand_config_ =
191
32
        std::make_unique<OnDemandConfig>(config.on_demand(), context, *stats_scope_);
192
32
  }
193

            
194
1421
  if (config.has_backoff_options()) {
195
39
    const uint64_t base_interval_ms =
196
39
        PROTOBUF_GET_MS_REQUIRED(config.backoff_options(), base_interval);
197
39
    const uint64_t max_interval_ms =
198
39
        PROTOBUF_GET_MS_OR_DEFAULT(config.backoff_options(), max_interval, base_interval_ms * 10);
199

            
200
39
    if (max_interval_ms < base_interval_ms) {
201
1
      throw EnvoyException(
202
1
          "max_backoff_interval must be greater or equal to base_backoff_interval");
203
1
    }
204

            
205
38
    backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
206
38
        base_interval_ms, max_interval_ms, context.serverFactoryContext().api().randomGenerator());
207
38
  }
208

            
209
1420
  if (!config.proxy_protocol_tlvs().empty()) {
210
29
    proxy_protocol_tlvs_ =
211
29
        parseTLVs(config.proxy_protocol_tlvs(), context, dynamic_tlv_formatters_);
212
29
  }
213
1420
}
214

            
215
Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
216
               Server::Configuration::FactoryContext& context)
217
1425
    : max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)),
218
1425
      upstream_drain_manager_slot_(context.serverFactoryContext().threadLocal().allocateSlot()),
219
1425
      shared_config_(std::make_shared<SharedConfig>(config, context)),
220
1425
      random_generator_(context.serverFactoryContext().api().randomGenerator()),
221
1425
      regex_engine_(context.serverFactoryContext().regexEngine()) {
222
2544
  upstream_drain_manager_slot_->set([](Event::Dispatcher&) {
223
2531
    ThreadLocal::ThreadLocalObjectSharedPtr drain_manager =
224
2531
        std::make_shared<UpstreamDrainManager>();
225
2531
    return drain_manager;
226
2531
  });
227

            
228
1425
  if (!config.cluster().empty()) {
229
1367
    default_route_ = std::make_shared<const SimpleRouteImpl>(*this, config.cluster());
230
1367
  }
231

            
232
1425
  if (config.has_metadata_match()) {
233
17
    const auto& filter_metadata = config.metadata_match().filter_metadata();
234

            
235
17
    const auto filter_it = filter_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
236

            
237
17
    if (filter_it != filter_metadata.end()) {
238
17
      cluster_metadata_match_criteria_ =
239
17
          std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
240
17
    }
241
17
  }
242

            
243
  // Weighted clusters will be enabled only if the default cluster is absent.
244
1425
  if (default_route_ == nullptr && config.has_weighted_clusters()) {
245
24
    total_cluster_weight_ = 0;
246
24
    for (const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::WeightedCluster::
247
37
             ClusterWeight& cluster_desc : config.weighted_clusters().clusters()) {
248
37
      WeightedClusterEntryConstSharedPtr cluster_entry(
249
37
          std::make_shared<const WeightedClusterEntry>(*this, cluster_desc));
250
37
      weighted_clusters_.emplace_back(std::move(cluster_entry));
251
37
      total_cluster_weight_ += weighted_clusters_.back()->clusterWeight();
252
37
    }
253
24
  }
254

            
255
1425
  for (const envoy::config::accesslog::v3::AccessLog& log_config : config.access_log()) {
256
301
    access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
257
301
  }
258

            
259
1425
  if (!config.hash_policy().empty()) {
260
4
    hash_policy_ = std::make_unique<Network::HashPolicyImpl>(config.hash_policy());
261
4
  }
262

            
263
  // Parse upstream connection establishment configuration.
264
1425
  upstream_connect_mode_ = config.upstream_connect_mode();
265

            
266
1425
  if (config.has_max_early_data_bytes()) {
267
59
    max_early_data_bytes_ = config.max_early_data_bytes().value();
268
59
  }
269

            
270
  // Validate: Non-IMMEDIATE modes require max_early_data_bytes to be set.
271
  // Setting it to zero is allowed and will disable early data buffering.
272
1425
  if (upstream_connect_mode_ != UpstreamConnectMode::IMMEDIATE &&
273
1425
      !max_early_data_bytes_.has_value()) {
274
2
    throw EnvoyException(
275
2
        "max_early_data_bytes must be set when upstream_connect_mode is not IMMEDIATE");
276
2
  }
277
1425
}
278

            
279
2581
RouteConstSharedPtr Config::getRegularRouteFromEntries(Network::Connection& connection) {
280
  // First check if the per-connection state to see if we need to route to a pre-selected cluster
281
2581
  if (const auto* per_connection_cluster =
282
2581
          connection.streamInfo().filterState()->getDataReadOnly<PerConnectionCluster>(
283
2581
              PerConnectionCluster::key());
284
2581
      per_connection_cluster != nullptr) {
285
1
    return std::make_shared<const SimpleRouteImpl>(*this, per_connection_cluster->value());
286
1
  }
287

            
288
2580
  if (default_route_ != nullptr) {
289
2580
    return default_route_;
290
2580
  }
291

            
292
  // no match, no more routes to try
293
  return nullptr;
294
2580
}
295

            
296
2618
RouteConstSharedPtr Config::getRouteFromEntries(Network::Connection& connection) {
297
2618
  if (weighted_clusters_.empty()) {
298
2581
    return getRegularRouteFromEntries(connection);
299
2581
  }
300
37
  return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_,
301
37
                                          random_generator_.random(), false);
302
2618
}
303

            
304
const absl::optional<std::chrono::milliseconds>
305
2510
Config::calculateMaxDownstreamConnectionDurationWithJitter() {
306
2510
  const auto& max_downstream_connection_duration = maxDownstreamConnectionDuration();
307
2510
  if (!max_downstream_connection_duration) {
308
2495
    return max_downstream_connection_duration;
309
2495
  }
310

            
311
15
  const auto& jitter_percentage = maxDownstreamConnectionDurationJitterPercentage();
312
15
  if (!jitter_percentage) {
313
3
    return max_downstream_connection_duration;
314
3
  }
315

            
316
  // Apply jitter: base_duration * (1 + random(0, jitter_factor));
317
12
  const uint64_t max_jitter_ms = std::ceil(max_downstream_connection_duration.value().count() *
318
12
                                           (jitter_percentage.value() / 100.0));
319

            
320
12
  if (max_jitter_ms == 0) {
321
1
    return max_downstream_connection_duration;
322
1
  }
323

            
324
11
  const uint64_t jitter_ms = randomGenerator().random() % max_jitter_ms;
325
11
  return std::chrono::milliseconds(
326
11
      static_cast<uint64_t>(max_downstream_connection_duration.value().count() + jitter_ms));
327
12
}
328

            
329
12
UpstreamDrainManager& Config::drainManager() {
330
12
  return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
331
12
}
332

            
333
Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager)
334
2522
    : tracing_config_(Tracing::EgressConfig::get()), config_(config),
335
2522
      cluster_manager_(cluster_manager), downstream_callbacks_(*this),
336
2522
      upstream_callbacks_(new UpstreamCallbacks(this)),
337
2522
      upstream_decoder_filter_callbacks_(HttpStreamDecoderFilterCallbacks(this)) {
338
2522
  ASSERT(config != nullptr);
339
2522
}
340

            
341
2522
Filter::~Filter() {
342
  // Disable access log flush timer if it is enabled.
343
2522
  disableAccessLogFlushTimer();
344

            
345
2522
  disableRetryTimer();
346

            
347
  // Flush the final end stream access log entry.
348
2522
  flushAccessLog(AccessLog::AccessLogType::TcpConnectionEnd);
349

            
350
2522
  ASSERT(generic_conn_pool_ == nullptr);
351
2522
  ASSERT(upstream_ == nullptr);
352
2522
}
353

            
354
1425
TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) {
355
1425
  return {ALL_TCP_PROXY_STATS(POOL_COUNTER(scope), POOL_GAUGE(scope))};
356
1425
}
357

            
358
Network::ProxyProtocolTLVVector
359
Config::SharedConfig::parseTLVs(absl::Span<const envoy::config::core::v3::TlvEntry* const> tlvs,
360
                                Server::Configuration::GenericFactoryContext& context,
361
29
                                std::vector<TlvFormatter>& dynamic_tlvs) {
362
29
  Network::ProxyProtocolTLVVector tlv_vector;
363
33
  for (const auto& tlv : tlvs) {
364
33
    const uint8_t tlv_type = static_cast<uint8_t>(tlv->type());
365

            
366
    // Validate that only one of value or format_string is set.
367
33
    const bool has_value = !tlv->value().empty();
368
33
    const bool has_format_string = tlv->has_format_string();
369

            
370
33
    if (has_value && has_format_string) {
371
2
      throw EnvoyException(
372
2
          "Invalid TLV configuration: only one of 'value' or 'format_string' may be set.");
373
2
    }
374

            
375
31
    if (!has_value && !has_format_string) {
376
2
      throw EnvoyException(
377
2
          "Invalid TLV configuration: one of 'value' or 'format_string' must be set.");
378
2
    }
379

            
380
29
    if (has_value) {
381
      // Static TLV value must be at least one byte long.
382
15
      if (tlv->value().size() < 1) {
383
        throw EnvoyException("Invalid TLV configuration: 'value' must be at least one byte long.");
384
      }
385
15
      tlv_vector.push_back(
386
15
          {tlv_type, std::vector<unsigned char>(tlv->value().begin(), tlv->value().end())});
387
15
    } else {
388
      // Dynamic TLV value using formatter.
389
14
      auto formatter_or_error =
390
14
          Formatter::SubstitutionFormatStringUtils::fromProtoConfig(tlv->format_string(), context);
391
14
      if (!formatter_or_error.ok()) {
392
2
        throw EnvoyException(absl::StrCat("Failed to parse TLV format string: ",
393
2
                                          formatter_or_error.status().ToString()));
394
2
      }
395
12
      dynamic_tlvs.push_back({tlv_type, std::move(*formatter_or_error)});
396
12
    }
397
29
  }
398
23
  return tlv_vector;
399
29
}
400

            
401
Network::ProxyProtocolTLVVector
402
2465
Config::SharedConfig::evaluateDynamicTLVs(const StreamInfo::StreamInfo& stream_info) const {
403
2465
  Network::ProxyProtocolTLVVector result = proxy_protocol_tlvs_;
404

            
405
  // Evaluate dynamic TLV formatters.
406
2465
  for (const auto& tlv_formatter : dynamic_tlv_formatters_) {
407
12
    const std::string formatted_value = tlv_formatter.formatter->format({}, stream_info);
408

            
409
    // Convert formatted string to bytes and add to result.
410
12
    result.push_back({tlv_formatter.type,
411
12
                      std::vector<unsigned char>(formatted_value.begin(), formatted_value.end())});
412
12
  }
413

            
414
2465
  return result;
415
2465
}
416

            
417
2522
void Filter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
418
2522
  initialize(callbacks, true);
419
2522
}
420

            
421
2522
void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats) {
422
2522
  read_callbacks_ = &callbacks;
423
2522
  ENVOY_CONN_LOG(debug, "new tcp proxy session", read_callbacks_->connection());
424

            
425
2522
  read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
426
2522
  read_callbacks_->connection().enableHalfClose(true);
427

            
428
  // Check that we are generating only the byte meters we need.
429
  // The Downstream should be unset and the Upstream should be populated.
430
2522
  ASSERT(getStreamInfo().getDownstreamBytesMeter() == nullptr);
431
2522
  ASSERT(getStreamInfo().getUpstreamBytesMeter() != nullptr);
432

            
433
  // Initialize connection establishment mode.
434
2522
  connect_mode_ = config_->upstreamConnectMode();
435

            
436
  // Check if early data buffering is enabled.
437
2522
  if (config_->maxEarlyDataBytes().has_value()) {
438
53
    receive_before_connect_ = true;
439
53
    max_buffered_bytes_ = config_->maxEarlyDataBytes().value();
440
53
    ENVOY_CONN_LOG(debug, "receive_before_connect enabled with max buffer size: {}",
441
53
                   read_callbacks_->connection(), max_buffered_bytes_);
442
2469
  } else {
443
    // Legacy behavior: check filter state for receive_before_connect.
444
2469
    const StreamInfo::BoolAccessor* receive_before_connect =
445
2469
        read_callbacks_->connection()
446
2469
            .streamInfo()
447
2469
            .filterState()
448
2469
            ->getDataReadOnly<StreamInfo::BoolAccessor>(ReceiveBeforeConnectKey);
449

            
450
2469
    if (receive_before_connect != nullptr && receive_before_connect->value()) {
451
14
      ENVOY_CONN_LOG(debug, "receive_before_connect is enabled (legacy)",
452
14
                     read_callbacks_->connection());
453
14
      receive_before_connect_ = true;
454
      // Use 0 buffer size for legacy mode to always read-disable immediately.
455
14
      max_buffered_bytes_ = 0;
456
14
    }
457
2469
  }
458

            
459
  // Handle TLS handshake wait mode.
460
2522
  if (connect_mode_ == UpstreamConnectMode::ON_DOWNSTREAM_TLS_HANDSHAKE) {
461
13
    const auto ssl_connection = read_callbacks_->connection().ssl();
462
13
    if (ssl_connection != nullptr) {
463
8
      waiting_for_tls_handshake_ = true;
464
8
      ENVOY_CONN_LOG(debug, "waiting for downstream TLS handshake before connecting",
465
8
                     read_callbacks_->connection());
466
      // TODO: Register callback for TLS handshake completion.
467
9
    } else {
468
      // Non-TLS connection - TLS handshake mode behaves as IMMEDIATE.
469
5
      ENVOY_CONN_LOG(debug,
470
5
                     "downstream connection is not TLS, treating TLS handshake mode as IMMEDIATE",
471
5
                     read_callbacks_->connection());
472
5
      connect_mode_ = UpstreamConnectMode::IMMEDIATE;
473
5
    }
474
13
  }
475

            
476
2522
  if (!receive_before_connect_) {
477
    // Need to disable reads so that we don't write to an upstream that might fail
478
    // in onData(). This will get re-enabled when the upstream connection is
479
    // established.
480
2455
    read_callbacks_->connection().readDisable(true);
481
2455
  }
482

            
483
2522
  getStreamInfo().setDownstreamBytesMeter(std::make_shared<StreamInfo::BytesMeter>());
484
2522
  getStreamInfo().setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
485

            
486
2522
  config_->stats().downstream_cx_total_.inc();
487
2522
  if (set_connection_stats) {
488
2522
    read_callbacks_->connection().setConnectionStats(
489
2522
        {config_->stats().downstream_cx_rx_bytes_total_,
490
2522
         config_->stats().downstream_cx_rx_bytes_buffered_,
491
2522
         config_->stats().downstream_cx_tx_bytes_total_,
492
2522
         config_->stats().downstream_cx_tx_bytes_buffered_, nullptr, nullptr});
493
2522
  }
494
2522
}
495

            
496
200
void Filter::onInitFailure(UpstreamFailureReason reason) {
497
  // If ODCDS fails, the filter will not attempt to create a connection to
498
  // upstream, as it does not have an assigned upstream. As such the filter will
499
  // not have started attempting to connect to an upstream and there is no
500
  // connection pool callback latency to record.
501
200
  if (initial_upstream_connection_start_time_.has_value()) {
502
188
    if (!getStreamInfo().upstreamInfo()->upstreamTiming().connectionPoolCallbackLatency()) {
503
122
      getStreamInfo().upstreamInfo()->upstreamTiming().recordConnectionPoolCallbackLatency(
504
122
          initial_upstream_connection_start_time_.value(),
505
122
          read_callbacks_->connection().dispatcher().timeSource());
506
122
    }
507
188
  }
508
200
  read_callbacks_->connection().close(
509
200
      Network::ConnectionCloseType::NoFlush,
510
200
      absl::StrCat(StreamInfo::LocalCloseReasons::get().TcpProxyInitializationFailure,
511
200
                   enumToInt(reason)));
512
200
}
513

            
514
20084
void Filter::readDisableUpstream(bool disable) {
515
20084
  bool success = false;
516
20084
  if (upstream_) {
517
20069
    success = upstream_->readDisable(disable);
518
20069
  }
519
20084
  if (!success) {
520
15
    return;
521
15
  }
522
20069
  if (disable) {
523
10042
    read_callbacks_->upstreamHost()
524
10042
        ->cluster()
525
10042
        .trafficStats()
526
10042
        ->upstream_flow_control_paused_reading_total_.inc();
527
10042
  } else {
528
10027
    read_callbacks_->upstreamHost()
529
10027
        ->cluster()
530
10027
        .trafficStats()
531
10027
        ->upstream_flow_control_resumed_reading_total_.inc();
532
10027
  }
533
20069
}
534

            
535
35882
void Filter::readDisableDownstream(bool disable) {
536
35882
  if (read_callbacks_->connection().state() != Network::Connection::State::Open) {
537
    // During idle timeouts, we close both upstream and downstream with NoFlush.
538
    // Envoy still does a best-effort flush which can case readDisableDownstream to be called
539
    // despite the downstream connection being closed.
540
2
    return;
541
2
  }
542

            
543
35880
  const Network::Connection::ReadDisableStatus read_disable_status =
544
35880
      read_callbacks_->connection().readDisable(disable);
545

            
546
35880
  if (read_disable_status == Network::Connection::ReadDisableStatus::TransitionedToReadDisabled) {
547
17928
    config_->stats().downstream_flow_control_paused_reading_total_.inc();
548
17952
  } else if (read_disable_status ==
549
17952
             Network::Connection::ReadDisableStatus::TransitionedToReadEnabled) {
550
17929
    config_->stats().downstream_flow_control_resumed_reading_total_.inc();
551
17929
  }
552
35880
}
553

            
554
893420
StreamInfo::StreamInfo& Filter::getStreamInfo() {
555
893420
  return read_callbacks_->connection().streamInfo();
556
893420
}
557

            
558
10042
void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
559
10042
  ASSERT(!on_high_watermark_called_);
560
10042
  on_high_watermark_called_ = true;
561
  // If downstream has too much data buffered, stop reading on the upstream connection.
562
10042
  parent_.readDisableUpstream(true);
563
10042
}
564

            
565
10042
void Filter::DownstreamCallbacks::onBelowWriteBufferLowWatermark() {
566
10042
  ASSERT(on_high_watermark_called_);
567
10042
  on_high_watermark_called_ = false;
568
  // The downstream buffer has been drained. Resume reading from upstream.
569
10042
  parent_.readDisableUpstream(false);
570
10042
}
571

            
572
2456
void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) {
573
2456
  if (event == Network::ConnectionEvent::Connected ||
574
2456
      event == Network::ConnectionEvent::ConnectedZeroRtt) {
575
2
    return;
576
2
  }
577
2454
  if (drainer_ == nullptr) {
578
2442
    parent_->onUpstreamEvent(event);
579
2442
  } else {
580
12
    drainer_->onEvent(event);
581
12
  }
582
2454
}
583

            
584
17941
void Filter::UpstreamCallbacks::onAboveWriteBufferHighWatermark() {
585
  // In case when upstream connection is draining `parent_` will be set to nullptr.
586
  // TCP Tunneling may call on high/low watermark multiple times.
587
17941
  ASSERT(parent_ == nullptr || parent_->config_->tunnelingConfigHelper() ||
588
17941
         !on_high_watermark_called_);
589
17941
  on_high_watermark_called_ = true;
590

            
591
17941
  if (parent_ != nullptr) {
592
    // There's too much data buffered in the upstream write buffer, so stop reading.
593
17941
    parent_->readDisableDownstream(true);
594
17941
  }
595
17941
}
596

            
597
17941
void Filter::UpstreamCallbacks::onBelowWriteBufferLowWatermark() {
598
  // In case when upstream connection is draining `parent_` will be set to nullptr.
599
  // TCP Tunneling may call on high/low watermark multiple times.
600
17941
  ASSERT(parent_ == nullptr || parent_->config_->tunnelingConfigHelper() ||
601
17941
         on_high_watermark_called_);
602
17941
  on_high_watermark_called_ = false;
603

            
604
17941
  if (parent_ != nullptr) {
605
    // The upstream write buffer is drained. Resume reading.
606
17941
    parent_->readDisableDownstream(false);
607
17941
  }
608
17941
}
609

            
610
409427
void Filter::UpstreamCallbacks::onUpstreamData(Buffer::Instance& data, bool end_stream) {
611
409427
  if (parent_) {
612
409425
    parent_->onUpstreamData(data, end_stream);
613
409425
  } else {
614
2
    drainer_->onData(data, end_stream);
615
2
  }
616
409427
}
617

            
618
1500
void Filter::UpstreamCallbacks::onBytesSent() {
619
1500
  if (drainer_ == nullptr) {
620
1475
    parent_->resetIdleTimer();
621
1475
  } else {
622
25
    drainer_->onBytesSent();
623
25
  }
624
1500
}
625

            
626
36
void Filter::UpstreamCallbacks::onIdleTimeout() {
627
36
  if (drainer_ == nullptr) {
628
34
    parent_->onIdleTimeout();
629
34
  } else {
630
2
    drainer_->onIdleTimeout();
631
2
  }
632
36
}
633

            
634
12
void Filter::UpstreamCallbacks::drain(Drainer& drainer) {
635
12
  ASSERT(drainer_ == nullptr); // This should only get set once.
636
12
  drainer_ = &drainer;
637
12
  parent_ = nullptr;
638
12
}
639

            
640
2638
Network::FilterStatus Filter::establishUpstreamConnection() {
641
2638
  const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
642
2638
  ENVOY_CONN_LOG(debug, "establishUpstreamConnection called: cluster_name={}, route_={}",
643
2638
                 read_callbacks_->connection(), cluster_name, route_ != nullptr);
644
2638
  Upstream::ThreadLocalCluster* thread_local_cluster =
645
2638
      cluster_manager_.getThreadLocalCluster(cluster_name);
646

            
647
2638
  ENVOY_CONN_LOG(debug, "establishUpstreamConnection: thread_local_cluster={}",
648
2638
                 read_callbacks_->connection(), thread_local_cluster != nullptr);
649

            
650
2638
  if (!thread_local_cluster) {
651
58
    auto odcds = config_->onDemandCds();
652
58
    if (!odcds.has_value()) {
653
      // No ODCDS? It means that on-demand discovery is disabled.
654
2
      ENVOY_CONN_LOG(debug, "Cluster not found {} and no on demand cluster set.",
655
2
                     read_callbacks_->connection(), cluster_name);
656
2
      config_->stats().downstream_cx_no_route_.inc();
657
2
      getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound);
658
2
      onInitFailure(UpstreamFailureReason::NoRoute);
659
56
    } else {
660
56
      ASSERT(!cluster_discovery_handle_);
661
56
      auto callback = std::make_unique<Upstream::ClusterDiscoveryCallback>(
662
56
          [this](Upstream::ClusterDiscoveryStatus cluster_status) {
663
52
            onClusterDiscoveryCompletion(cluster_status);
664
52
          });
665
56
      config_->onDemandStats().on_demand_cluster_attempt_.inc();
666
56
      cluster_discovery_handle_ = odcds->requestOnDemandClusterDiscovery(
667
56
          cluster_name, std::move(callback), config_->odcdsTimeout());
668
56
    }
669
58
    return Network::FilterStatus::StopIteration;
670
58
  }
671

            
672
2580
  ENVOY_CONN_LOG(debug, "Creating connection to cluster {}", read_callbacks_->connection(),
673
2580
                 cluster_name);
674
2580
  if (!initial_upstream_connection_start_time_.has_value()) {
675
2476
    initial_upstream_connection_start_time_.emplace(
676
2476
        read_callbacks_->connection().dispatcher().timeSource().monotonicTime());
677
2476
  }
678

            
679
2580
  const Upstream::ClusterInfoConstSharedPtr& cluster = thread_local_cluster->info();
680
2580
  getStreamInfo().setUpstreamClusterInfo(cluster);
681

            
682
  // Check this here because the TCP conn pool will queue our request waiting for a connection that
683
  // will never be released.
684
2580
  if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) {
685
2
    getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
686
2
    cluster->trafficStats()->upstream_cx_overflow_.inc();
687
2
    onInitFailure(UpstreamFailureReason::ResourceLimitExceeded);
688
2
    return Network::FilterStatus::StopIteration;
689
2
  }
690

            
691
2578
  auto& downstream_connection = read_callbacks_->connection();
692
2578
  auto& filter_state = downstream_connection.streamInfo().filterState();
693

            
694
2578
  auto* existing_state = filter_state->getDataMutable<Network::ProxyProtocolFilterState>(
695
2578
      Network::ProxyProtocolFilterState::key());
696

            
697
2578
  if (existing_state == nullptr) {
698
    // No downstream proxy protocol state exists - create new state with tcp_proxy TLVs.
699
2457
    const auto tlvs = config_->sharedConfig()->evaluateDynamicTLVs(getStreamInfo());
700
2457
    filter_state->setData(
701
2457
        Network::ProxyProtocolFilterState::key(),
702
2457
        std::make_shared<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
703
2457
            downstream_connection.connectionInfoProvider().remoteAddress(),
704
2457
            downstream_connection.connectionInfoProvider().localAddress(), tlvs}),
705
2457
        StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Connection);
706
2459
  } else if (config_->sharedConfig()->proxyProtocolTlvMergePolicy() !=
707
121
             envoy::extensions::filters::network::tcp_proxy::v3::ADD_IF_ABSENT) {
708
    // Existing state found and merge policy is not ADD_IF_ABSENT - merge TLVs.
709
8
    const auto& existing_data = existing_state->value();
710
8
    const auto configured_tlvs = config_->sharedConfig()->evaluateDynamicTLVs(getStreamInfo());
711

            
712
8
    Network::ProxyProtocolTLVVector merged_tlvs;
713

            
714
8
    if (config_->sharedConfig()->proxyProtocolTlvMergePolicy() ==
715
8
        envoy::extensions::filters::network::tcp_proxy::v3::OVERWRITE_BY_TYPE_IF_EXISTS_OR_ADD) {
716
      // Overwrite by type: configured TLVs take precedence for matching types.
717
6
      absl::flat_hash_set<uint8_t> configured_tlv_types;
718
6
      for (const auto& tlv : configured_tlvs) {
719
6
        configured_tlv_types.insert(tlv.type);
720
6
      }
721

            
722
6
      for (const auto& tlv : configured_tlvs) {
723
6
        merged_tlvs.push_back(tlv);
724
6
      }
725

            
726
10
      for (const auto& tlv : existing_data.tlv_vector_) {
727
10
        if (!configured_tlv_types.contains(tlv.type)) {
728
8
          merged_tlvs.push_back(tlv);
729
8
        }
730
10
      }
731
6
    } else if (config_->sharedConfig()->proxyProtocolTlvMergePolicy() ==
732
2
               envoy::extensions::filters::network::tcp_proxy::v3::APPEND_IF_EXISTS_OR_ADD) {
733
      // Append: preserve all existing TLVs, then add configured TLVs.
734
6
      for (const auto& tlv : existing_data.tlv_vector_) {
735
6
        merged_tlvs.push_back(tlv);
736
6
      }
737

            
738
4
      for (const auto& tlv : configured_tlvs) {
739
4
        merged_tlvs.push_back(tlv);
740
4
      }
741
2
    }
742

            
743
    // Update filter state with merged TLVs, preserving existing addresses and version.
744
8
    filter_state->setData(
745
8
        Network::ProxyProtocolFilterState::key(),
746
8
        std::make_shared<Network::ProxyProtocolFilterState>(Network::ProxyProtocolDataWithVersion{
747
8
            {existing_data.src_addr_, existing_data.dst_addr_, merged_tlvs},
748
8
            existing_data.version_}),
749
8
        StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Connection);
750
8
  }
751
  // else: ADD_IF_ABSENT policy with existing state - keep existing state as-is.
752
2578
  transport_socket_options_ =
753
2578
      Network::TransportSocketOptionsUtility::fromFilterState(*filter_state);
754

            
755
2578
  if (auto typed_state = filter_state->getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
756
2578
          Network::UpstreamSocketOptionsFilterState::key());
757
2578
      typed_state != nullptr) {
758
2
    auto downstream_options = typed_state->value();
759
2
    if (!upstream_options_) {
760
2
      upstream_options_ = std::make_shared<Network::Socket::Options>();
761
2
    }
762
2
    Network::Socket::appendOptions(upstream_options_, downstream_options);
763
2
  }
764

            
765
2578
  if (!maybeTunnel(*thread_local_cluster)) {
766
    // Either cluster is unknown, factory doesn't exist, there are no healthy hosts, or
767
    // createGenericConnPool returned nullptr. Handle this gracefully.
768
31
    getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
769
31
    onInitFailure(UpstreamFailureReason::NoHealthyUpstream);
770
31
    return Network::FilterStatus::StopIteration;
771
31
  }
772
  // Determine the return status based on whether we can receive data before connection.
773
  // Return Continue if we're allowing data to be read (either for buffering or to trigger
774
  // connection). Return StopIteration if we need to wait for the upstream connection first.
775
2547
  return receive_before_connect_ ? Network::FilterStatus::Continue
776
2547
                                 : Network::FilterStatus::StopIteration;
777
2578
}
778

            
779
52
void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) {
780
  // Clear the cluster_discovery_handle_ before calling establishUpstreamConnection since we may
781
  // request cluster again.
782
52
  cluster_discovery_handle_.reset();
783
52
  const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
784
52
  switch (cluster_status) {
785
8
  case Upstream::ClusterDiscoveryStatus::Missing:
786
8
    ENVOY_CONN_LOG(debug, "On demand cluster {} is missing", read_callbacks_->connection(),
787
8
                   cluster_name);
788
8
    config_->onDemandStats().on_demand_cluster_missing_.inc();
789
8
    break;
790
6
  case Upstream::ClusterDiscoveryStatus::Timeout:
791
6
    ENVOY_CONN_LOG(debug, "On demand cluster {} was not found before timeout.",
792
6
                   read_callbacks_->connection(), cluster_name);
793
6
    config_->onDemandStats().on_demand_cluster_timeout_.inc();
794
6
    break;
795
38
  case Upstream::ClusterDiscoveryStatus::Available:
796
    // cluster_discovery_handle_ would have been cancelled if the downstream were closed.
797
38
    ASSERT(!downstream_closed_);
798
38
    ENVOY_CONN_LOG(debug, "On demand cluster {} is found. Establishing connection.",
799
38
                   read_callbacks_->connection(), cluster_name);
800
38
    config_->onDemandStats().on_demand_cluster_success_.inc();
801
38
    establishUpstreamConnection();
802
38
    return;
803
52
  }
804
  // Failure path.
805
14
  config_->stats().downstream_cx_no_route_.inc();
806
14
  getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound);
807
14
  onInitFailure(UpstreamFailureReason::NoRoute);
808
14
}
809

            
810
2578
bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
811
2578
  GenericConnPoolFactory* factory = nullptr;
812
2578
  if (cluster.info()->upstreamConfig().has_value()) {
813
4
    factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
814
4
        cluster.info()->upstreamConfig().ref());
815
2574
  } else {
816
2574
    factory = Envoy::Config::Utility::getFactoryByName<GenericConnPoolFactory>(
817
2574
        "envoy.filters.connection_pools.tcp.generic");
818
2574
  }
819
2578
  if (!factory) {
820
2
    return false;
821
2
  }
822
2576
  if (Runtime::runtimeFeatureEnabled(
823
2576
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
824
    // TODO(vikaschoudhary16): Initialize route_ once per cluster.
825
533
    upstream_decoder_filter_callbacks_.route_ = THROW_OR_RETURN_VALUE(
826
533
        Http::NullRouteImpl::create(cluster.info()->name(),
827
533
                                    Router::RetryPolicyImpl::DefaultRetryPolicy,
828
533
                                    config_->regexEngine()),
829
533
        std::unique_ptr<Http::NullRouteImpl>);
830
533
  }
831
2576
  Upstream::HostConstSharedPtr host =
832
2576
      Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(cluster.chooseHost(this));
833
2576
  if (host) {
834
    // Track attempted hosts for access logging
835
2568
    getStreamInfo().upstreamInfo()->addUpstreamHostAttempted(host);
836
2568
    generic_conn_pool_ = factory->createGenericConnPool(
837
2568
        host, cluster, config_->tunnelingConfigHelper(), this, *upstream_callbacks_,
838
2568
        upstream_decoder_filter_callbacks_, getStreamInfo());
839
2568
  }
840
2576
  if (generic_conn_pool_) {
841
2547
    connecting_ = true;
842
2547
    connect_attempts_++;
843
2547
    getStreamInfo().setAttemptCount(connect_attempts_);
844
2547
    generic_conn_pool_->newStream(*this);
845
    // Because we never return open connections to the pool, this either has a handle waiting on
846
    // connection completion, or onPoolFailure has been invoked. Either way, stop iteration.
847
2547
    return true;
848
2547
  }
849
29
  return false;
850
2576
}
851

            
852
void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
853
                                  absl::string_view failure_reason,
854
226
                                  Upstream::HostDescriptionConstSharedPtr host) {
855
226
  if (Runtime::runtimeFeatureEnabled(
856
226
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
857
    // generic_conn_pool_ is an instance of TcpProxy::HttpConnPool.
858
    // generic_conn_pool_->newStream() is called in maybeTunnel() which initializes an instance of
859
    // Router::UpstreamRequest. If Router::UpstreamRequest receives headers from the upstream which
860
    // results in end_stream=true, then via callbacks passed to Router::UpstreamRequest,
861
    // TcpProxy::Filter::onGenericPoolFailure() gets invoked. If we do not do deferredDelete here,
862
    // then the same instance of UpstreamRequest which is under execution will go out of scope.
863
80
    read_callbacks_->connection().dispatcher().deferredDelete(std::move(generic_conn_pool_));
864
173
  } else {
865
146
    generic_conn_pool_.reset();
866
146
  }
867

            
868
226
  read_callbacks_->upstreamHost(host);
869
226
  getStreamInfo().upstreamInfo()->setUpstreamHost(host);
870
226
  getStreamInfo().upstreamInfo()->setUpstreamTransportFailureReason(failure_reason);
871

            
872
226
  switch (reason) {
873
  case ConnectionPool::PoolFailureReason::Overflow:
874
19
  case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
875
19
    upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose);
876
19
    break;
877
193
  case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
878
193
    upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose);
879
193
    break;
880
14
  case ConnectionPool::PoolFailureReason::Timeout:
881
14
    onConnectTimeout();
882
14
    break;
883
226
  }
884
226
}
885

            
886
void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
887
                                std::unique_ptr<GenericUpstream>&& upstream,
888
                                Upstream::HostDescriptionConstSharedPtr& host,
889
                                const Network::ConnectionInfoProvider& address_provider,
890
2213
                                Ssl::ConnectionInfoConstSharedPtr ssl_info) {
891
2213
  StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
892
2213
  if (!upstream_info.upstreamTiming().connectionPoolCallbackLatency()) {
893
1904
    upstream_info.upstreamTiming().recordConnectionPoolCallbackLatency(
894
1904
        initial_upstream_connection_start_time_.value(),
895
1904
        read_callbacks_->connection().dispatcher().timeSource());
896
1904
  }
897
2213
  upstream_ = std::move(upstream);
898
2213
  generic_conn_pool_.reset();
899
2213
  read_callbacks_->upstreamHost(host);
900
  // No need to set information using address_provider in case routing via Router::UpstreamRequest
901
  // because in that case, information is already set by the
902
  // Router::UpstreamRequest::onPoolReady() method before reaching here.
903
2213
  if (upstream_info.upstreamLocalAddress() == nullptr) {
904
1904
    upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
905
1904
    upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
906
1904
  }
907
2213
  upstream_info.setUpstreamHost(host);
908
2213
  upstream_info.setUpstreamSslConnection(ssl_info);
909

            
910
2213
  onUpstreamConnection();
911
2213
  read_callbacks_->continueReading();
912
2213
  if (info) {
913
1623
    upstream_info.setUpstreamFilterState(info->filterState());
914
1623
  }
915
2213
} // namespace TcpProxy
916

            
917
175
const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
918
175
  const Router::MetadataMatchCriteria* route_criteria =
919
175
      (route_ != nullptr) ? route_->metadataMatchCriteria() : nullptr;
920

            
921
175
  const auto& request_metadata = getStreamInfo().dynamicMetadata().filter_metadata();
922
175
  const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
923

            
924
175
  if (filter_it != request_metadata.end() && route_criteria != nullptr) {
925
5
    metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second);
926
5
    return metadata_match_criteria_.get();
927
170
  } else if (filter_it != request_metadata.end()) {
928
2
    metadata_match_criteria_ =
929
2
        std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
930
2
    return metadata_match_criteria_.get();
931
168
  } else {
932
168
    return route_criteria;
933
168
  }
934
175
}
935

            
936
88
const std::string& TunnelResponseHeaders::key() {
937
88
  CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.propagate_response_headers");
938
88
}
939

            
940
14
const std::string& TunnelResponseTrailers::key() {
941
14
  CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.propagate_response_trailers");
942
14
}
943

            
944
TunnelingConfigHelperImpl::TunnelingConfigHelperImpl(
945
    Stats::Scope& stats_scope,
946
    const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config_message,
947
    Server::Configuration::FactoryContext& context)
948
835
    : header_parser_(THROW_OR_RETURN_VALUE(Envoy::Router::HeaderParser::configure(
949
                                               config_message.tunneling_config().headers_to_add()),
950
                                           Router::HeaderParserPtr)),
951
835
      propagate_response_headers_(config_message.tunneling_config().propagate_response_headers()),
952
835
      propagate_response_trailers_(config_message.tunneling_config().propagate_response_trailers()),
953
835
      post_path_(config_message.tunneling_config().post_path()),
954
835
      route_stat_name_storage_("tcpproxy_tunneling", context.scope().symbolTable()),
955
      // TODO(vikaschoudhary16): figure out which of the following router_config_ members are
956
      // not required by tcp_proxy and move them to a different class
957
835
      router_config_(context.serverFactoryContext(), route_stat_name_storage_.statName(),
958
835
                     stats_scope, context.serverFactoryContext().clusterManager(),
959
835
                     context.serverFactoryContext().runtime(),
960
835
                     context.serverFactoryContext().api().randomGenerator(),
961
835
                     std::make_unique<Router::ShadowWriterImpl>(
962
835
                         context.serverFactoryContext().clusterManager()),
963
835
                     true, false, false, false, false, false, false, {},
964
835
                     context.serverFactoryContext().api().timeSource(),
965
835
                     context.serverFactoryContext().httpContext(),
966
835
                     context.serverFactoryContext().routerContext()),
967
835
      server_factory_context_(context.serverFactoryContext()) {
968
835
  if (!post_path_.empty() && !config_message.tunneling_config().use_post()) {
969
3
    throw EnvoyException("Can't set a post path when POST method isn't used");
970
3
  }
971
832
  if (config_message.tunneling_config().use_post()) {
972
38
    post_path_ = post_path_.empty() ? "/" : post_path_;
973
38
  }
974

            
975
832
  envoy::config::core::v3::SubstitutionFormatString substitution_format_config;
976
832
  substitution_format_config.mutable_text_format_source()->set_inline_string(
977
832
      config_message.tunneling_config().hostname());
978
832
  hostname_fmt_ = THROW_OR_RETURN_VALUE(Formatter::SubstitutionFormatStringUtils::fromProtoConfig(
979
832
                                            substitution_format_config, context),
980
832
                                        Formatter::FormatterPtr);
981

            
982
  // Initialize request ID extension if explicitly configured.
983
832
  const auto& rid_config = config_message.tunneling_config().request_id_extension();
984
832
  if (rid_config.has_typed_config()) {
985
55
    auto extension_or_error = Http::RequestIDExtensionFactory::fromProto(rid_config, context);
986
55
    if (!extension_or_error.ok()) {
987
      throw EnvoyException(absl::StrCat("Failed to create request ID extension: ",
988
                                        extension_or_error.status().ToString()));
989
    }
990
55
    request_id_extension_ = extension_or_error.value();
991
55
  }
992

            
993
  // Populate optional request ID customization fields if provided.
994
832
  if (!config_message.tunneling_config().request_id_header().empty()) {
995
25
    request_id_header_ = config_message.tunneling_config().request_id_header();
996
25
  }
997
832
  if (!config_message.tunneling_config().request_id_metadata_key().empty()) {
998
25
    request_id_metadata_key_ = config_message.tunneling_config().request_id_metadata_key();
999
25
  }
832
}
928
std::string TunnelingConfigHelperImpl::host(const StreamInfo::StreamInfo& stream_info) const {
928
  return hostname_fmt_->format({}, stream_info);
928
}
void TunnelingConfigHelperImpl::propagateResponseHeaders(
    Http::ResponseHeaderMapPtr&& headers,
716
    const StreamInfo::FilterStateSharedPtr& filter_state) const {
716
  if (!propagate_response_headers_) {
628
    return;
628
  }
88
  filter_state->setData(
88
      TunnelResponseHeaders::key(), std::make_shared<TunnelResponseHeaders>(std::move(headers)),
88
      StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Connection);
88
}
void TunnelingConfigHelperImpl::propagateResponseTrailers(
    Http::ResponseTrailerMapPtr&& trailers,
35
    const StreamInfo::FilterStateSharedPtr& filter_state) const {
35
  if (!propagate_response_trailers_) {
21
    return;
21
  }
14
  filter_state->setData(
14
      TunnelResponseTrailers::key(), std::make_shared<TunnelResponseTrailers>(std::move(trailers)),
14
      StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
14
}
14
void Filter::onConnectTimeout() {
14
  ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection());
14
  read_callbacks_->upstreamHost()->outlierDetector().putResult(
14
      Upstream::Outlier::Result::LocalOriginTimeout);
14
  getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamConnectionFailure);
  // Raise LocalClose, which will trigger a reconnect if needed/configured.
14
  upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose);
14
}
19567
Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
19567
  ENVOY_CONN_LOG(debug,
19567
                 "onData: received {} bytes, end_stream={}, has upstream {}, "
19567
                 "receive_before_connect_={}, connect_mode_={}",
19567
                 read_callbacks_->connection(), data.length(), end_stream, upstream_ != nullptr,
19567
                 receive_before_connect_, static_cast<int>(connect_mode_));
19567
  getStreamInfo().getDownstreamBytesMeter()->addWireBytesReceived(data.length());
19567
  if (upstream_) {
19504
    getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length());
19504
    upstream_->encodeData(data, end_stream);
19504
    resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
19508
  } else if (receive_before_connect_) {
    // Buffer data received before upstream connection exists.
63
    early_data_buffer_.move(data);
    // Track end_stream even if buffer is empty so we can propagate it to upstream later.
63
    if (!early_data_end_stream_) {
63
      early_data_end_stream_ = end_stream;
63
    }
    // Mark that we've received initial data and trigger connection if needed.
    // Don't trigger connection if downstream is closing without sending any data.
63
    if (!initial_data_received_ && !(end_stream && early_data_buffer_.length() == 0)) {
43
      initial_data_received_ = true;
      // For ON_DOWNSTREAM_DATA mode, establish the upstream connection now.
43
      if (connect_mode_ == UpstreamConnectMode::ON_DOWNSTREAM_DATA) {
34
        ENVOY_CONN_LOG(debug,
34
                       "Initial data received, establishing upstream connection. "
34
                       "early_data_buffer_.length()={}",
34
                       read_callbacks_->connection(), early_data_buffer_.length());
        // Route should already be set in onNewConnection().
34
        ASSERT(route_ != nullptr);
34
        establishUpstreamConnection();
34
      }
43
    }
    // Read-disable downstream when receiving early data to prevent excessive buffering.
    // For legacy mode (max_buffered_bytes_ == 0), always read-disable (backward compatibility).
    // For new API, read-disable only when buffer exceeds limit to prevent excessive memory usage.
    // Note: We track read_disabled_due_to_buffer_ to know whether to re-enable reading
    // when the upstream connection is established.
63
    if (early_data_buffer_.length() >= max_buffered_bytes_) {
      // Read-disable when buffer exceeds limit to prevent excessive memory usage.
      // Note: For legacy mode (max_buffered_bytes_ == 0), this will always trigger.
27
      ENVOY_CONN_LOG(debug, "Early data buffer exceeded max size {}, read-disabling downstream",
27
                     read_callbacks_->connection(), max_buffered_bytes_);
27
      read_callbacks_->connection().readDisable(true);
27
      read_disabled_due_to_buffer_ = true;
27
    }
63
    config_->stats().early_data_received_count_total_.inc();
63
  }
  // The upstream should consume all of the data.
  // Before there is an upstream the connection should be readDisabled. If the upstream is
  // destroyed, there should be no further reads as well.
19567
  ASSERT(0 == data.length());
19567
  return Network::FilterStatus::StopIteration;
19567
}
2501
Network::FilterStatus Filter::onNewConnection() {
2501
  const auto& max_downstream_connection_duration =
2501
      config_->calculateMaxDownstreamConnectionDurationWithJitter();
2501
  if (max_downstream_connection_duration) {
7
    connection_duration_timer_ = read_callbacks_->connection().dispatcher().createTimer(
7
        [this]() -> void { onMaxDownstreamConnectionDuration(); });
7
    connection_duration_timer_->enableTimer(max_downstream_connection_duration.value());
7
  }
2501
  if (config_->accessLogFlushInterval().has_value()) {
5
    access_log_flush_timer_ = read_callbacks_->connection().dispatcher().createTimer(
8
        [this]() -> void { onAccessLogFlushInterval(); });
5
    resetAccessLogFlushTimer();
5
  }
2501
  idle_timeout_ = config_->idleTimeout();
2501
  if (const auto* per_connection_idle_timeout =
2501
          getStreamInfo().filterState()->getDataReadOnly<StreamInfo::UInt64Accessor>(
2501
              PerConnectionIdleTimeoutMs);
2501
      per_connection_idle_timeout != nullptr) {
2
    idle_timeout_ = std::chrono::milliseconds(per_connection_idle_timeout->value());
2
  }
2501
  if (idle_timeout_) {
    // The idle_timer_ can be moved to a Drainer, so related callbacks call into
    // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch
    // the call to either TcpProxy or to Drainer, depending on the current state.
2463
    idle_timer_ = read_callbacks_->connection().dispatcher().createTimer(
2463
        [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); });
2463
    if (Runtime::runtimeFeatureEnabled(
2463
            "envoy.reloadable_features.tcp_proxy_set_idle_timer_immediately_on_new_connection")) {
      // Start the idle timer immediately so that if no response is received from the upstream,
      // the downstream connection will time out.
2441
      resetIdleTimer();
2441
    }
2463
  }
  // Set UUID for the connection. This is used for logging and tracing.
2501
  getStreamInfo().setStreamIdProvider(
2501
      std::make_shared<StreamInfo::StreamIdProviderImpl>(config_->randomGenerator().uuid()));
2501
  if (config_->flushAccessLogOnStart()) {
1
    flushAccessLog(AccessLog::AccessLogType::TcpConnectionStart);
1
  }
2501
  ASSERT(upstream_ == nullptr);
  // Check if we should delay upstream connection establishment.
2501
  if (connect_mode_ == UpstreamConnectMode::IMMEDIATE) {
    // Immediate connection establishment. This is the default behavior.
2454
    route_ = pickRoute();
2454
    return establishUpstreamConnection();
2454
  }
  // For ON_DOWNSTREAM_DATA or ON_DOWNSTREAM_TLS_HANDSHAKE modes, delay the connection.
  // Pre-pick the route so it's available when connection is triggered.
47
  route_ = pickRoute();
  // Log the specific delay reason.
47
  if (connect_mode_ == UpstreamConnectMode::ON_DOWNSTREAM_DATA) {
39
    ENVOY_CONN_LOG(debug, "Delaying upstream connection establishment until initial data received",
39
                   read_callbacks_->connection());
39
  } else if (connect_mode_ == UpstreamConnectMode::ON_DOWNSTREAM_TLS_HANDSHAKE) {
8
    ENVOY_CONN_LOG(debug,
8
                   "Delaying upstream connection establishment until TLS handshake completes",
8
                   read_callbacks_->connection());
8
  }
  // Use receive_before_connect_ to determine whether to continue reading or stop iteration.
47
  return receive_before_connect_ ? Network::FilterStatus::Continue
47
                                 : Network::FilterStatus::StopIteration;
2501
}
2
bool Filter::startUpstreamSecureTransport() {
2
  bool switched_to_tls = upstream_->startUpstreamSecureTransport();
2
  if (switched_to_tls) {
    StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
    upstream_info.setUpstreamSslConnection(upstream_->getUpstreamConnectionSslInfo());
  }
2
  return switched_to_tls;
2
}
4778
void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
  // Handle TLS handshake completion for connections where we're waiting for it.
4778
  if (event == Network::ConnectionEvent::Connected && waiting_for_tls_handshake_) {
    // The Connected event for SSL connections is fired after TLS handshake completes.
4
    ENVOY_CONN_LOG(debug, "downstream TLS handshake completed via Connected event",
4
                   read_callbacks_->connection());
4
    onDownstreamTlsHandshakeComplete();
4
    return;
4
  }
4774
  if (event == Network::ConnectionEvent::LocalClose ||
4774
      event == Network::ConnectionEvent::RemoteClose) {
2506
    downstream_closed_ = true;
    // Cancel the potential odcds callback.
2506
    cluster_discovery_handle_ = nullptr;
2506
  }
4774
  ENVOY_CONN_LOG(trace, "on downstream event {}, has upstream = {}", read_callbacks_->connection(),
4774
                 static_cast<int>(event), upstream_ != nullptr);
4774
  if (upstream_) {
1403
    absl::string_view downstream_close_details = read_callbacks_->connection().localCloseReason();
1403
    Tcp::ConnectionPool::ConnectionDataPtr conn_data(
1403
        upstream_->onDownstreamEvent(event, downstream_close_details));
1403
    if (conn_data != nullptr &&
1403
        conn_data->connection().state() != Network::Connection::State::Closed) {
12
      config_->drainManager().add(config_->sharedConfig(), std::move(conn_data),
12
                                  std::move(upstream_callbacks_), std::move(idle_timer_),
12
                                  idle_timeout_, read_callbacks_->upstreamHost());
12
    }
1403
    if (event == Network::ConnectionEvent::LocalClose ||
1403
        event == Network::ConnectionEvent::RemoteClose) {
1351
      upstream_.reset();
1351
      disableIdleTimer();
1351
    }
1403
  }
4774
  if (generic_conn_pool_) {
2243
    if (event == Network::ConnectionEvent::LocalClose ||
2243
        event == Network::ConnectionEvent::RemoteClose) {
      // Cancel the conn pool request and close any excess pending requests.
97
      generic_conn_pool_.reset();
97
    }
2243
  }
4774
}
409425
void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) {
409425
  ENVOY_CONN_LOG(trace, "upstream connection received {} bytes, end_stream={}",
409425
                 read_callbacks_->connection(), data.length(), end_stream);
409425
  getStreamInfo().getUpstreamBytesMeter()->addWireBytesReceived(data.length());
409425
  getStreamInfo().getDownstreamBytesMeter()->addWireBytesSent(data.length());
409425
  read_callbacks_->connection().write(data, end_stream);
409425
  ASSERT(0 == data.length());
409425
  resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
409425
}
2442
void Filter::onUpstreamEvent(Network::ConnectionEvent event) {
2442
  if (event == Network::ConnectionEvent::ConnectedZeroRtt) {
    return;
  }
  // Update the connecting flag before processing the event because we may start a new connection
  // attempt in establishUpstreamConnection.
2442
  bool connecting = connecting_;
2442
  connecting_ = false;
2442
  if (event == Network::ConnectionEvent::RemoteClose ||
2442
      event == Network::ConnectionEvent::LocalClose) {
    // Propagate the upstream local close reason to the downstream stream info's upstreamInfo.
2442
    if (upstream_) {
2172
      getStreamInfo().upstreamInfo()->setUpstreamLocalCloseReason(upstream_->localCloseReason());
2172
    }
2442
    if (Runtime::runtimeFeatureEnabled(
2442
            "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
469
      read_callbacks_->connection().dispatcher().deferredDelete(std::move(upstream_));
2225
    } else if (upstream_) {
1827
      getStreamInfo().upstreamInfo()->setUpstreamDetectedCloseType(upstream_->detectedCloseType());
1827
      upstream_.reset();
1827
    }
2442
    disableIdleTimer();
2442
    if (connecting) {
270
      if (event == Network::ConnectionEvent::RemoteClose) {
237
        getStreamInfo().setResponseFlag(StreamInfo::UpstreamConnectionFailure);
        // upstreamHost can be nullptr if we received a disconnect from the upstream before
        // receiving any response
237
        if (read_callbacks_->upstreamHost() != nullptr) {
193
          read_callbacks_->upstreamHost()->outlierDetector().putResult(
193
              Upstream::Outlier::Result::LocalOriginConnectFailed);
193
        }
237
      }
270
      if (!downstream_closed_) {
        // Always defer retry to a different event loop iteration via the retry timer.
257
        if (connect_attempts_ >= config_->maxConnectAttempts()) {
151
          onConnectMaxAttempts();
151
          return;
151
        }
106
        enableRetryTimer();
106
      }
2172
    } else {
      // TODO(botengyao): propagate RST back to downstream connection if RST is received.
2172
      if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
862
        read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
862
      }
2172
    }
2442
  }
2442
}
151
void Filter::onConnectMaxAttempts() {
151
  getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
151
  const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
151
  Upstream::ThreadLocalCluster* thread_local_cluster =
151
      cluster_manager_.getThreadLocalCluster(cluster_name);
151
  if (thread_local_cluster) {
151
    thread_local_cluster->info()->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
151
  }
151
  onInitFailure(UpstreamFailureReason::ConnectFailed);
151
}
2213
void Filter::onUpstreamConnection() {
2213
  connecting_ = false;
  // If we have received any data before upstream connection is established, or if the downstream
  // has indicated end of stream, send the data and/or end_stream to the upstream connection.
2213
  if (early_data_buffer_.length() > 0 || early_data_end_stream_) {
    // Early data should only happen when receive_before_connect is enabled.
27
    ASSERT(receive_before_connect_);
27
    if (early_data_buffer_.length() > 0) {
23
      getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(early_data_buffer_.length());
23
    }
27
    upstream_->encodeData(early_data_buffer_, early_data_end_stream_);
27
    ASSERT(0 == early_data_buffer_.length());
27
  }
  // Re-enable downstream reads if we disabled reading.
  // Reading can be disabled in two cases:
  // 1. Buffer overflow when receive_before_connect is enabled (tracked by
  // read_disabled_due_to_buffer_)
  // 2. In establishUpstreamConnection() when receive_before_connect is disabled
2213
  if (read_disabled_due_to_buffer_) {
15
    read_callbacks_->connection().readDisable(false);
15
    read_disabled_due_to_buffer_ = false;
2198
  } else if (!receive_before_connect_) {
    // Re-enable downstream reads that were disabled in establishUpstreamConnection()
    // when early data reception was NOT enabled.
2182
    read_callbacks_->connection().readDisable(false);
2182
  }
2213
  read_callbacks_->upstreamHost()->outlierDetector().putResult(
2213
      Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);
2213
  ENVOY_CONN_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}",
2213
                 read_callbacks_->connection(),
2213
                 getStreamInfo().downstreamAddressProvider().requestedServerName());
2213
  if (idle_timeout_) {
2195
    resetIdleTimer();
95077
    read_callbacks_->connection().addBytesSentCallback([this](uint64_t) {
94725
      resetIdleTimer();
94725
      return true;
94725
    });
2195
    if (upstream_) {
2248
      upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) -> bool {
1500
        upstream_callbacks->onBytesSent();
1500
        return true;
1500
      });
2195
    }
2195
  }
2213
  if (config_->flushAccessLogOnConnected()) {
6
    flushAccessLog(AccessLog::AccessLogType::TcpUpstreamConnected);
6
  }
2213
}
34
void Filter::onIdleTimeout() {
34
  ENVOY_CONN_LOG(debug, "Session timed out", read_callbacks_->connection());
34
  config_->stats().idle_timeout_.inc();
  // This results in also closing the upstream connection.
34
  read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush,
34
                                      StreamInfo::LocalCloseReasons::get().TcpSessionIdleTimeout);
34
}
3
void Filter::onMaxDownstreamConnectionDuration() {
3
  ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
3
  getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::DurationTimeout);
3
  config_->stats().max_downstream_connection_duration_.inc();
3
  read_callbacks_->connection().close(
3
      Network::ConnectionCloseType::NoFlush,
3
      StreamInfo::LocalCloseReasons::get().MaxConnectionDurationReached);
3
}
8
void Filter::onAccessLogFlushInterval() {
8
  flushAccessLog(AccessLog::AccessLogType::TcpPeriodic);
8
  const SystemTime now = read_callbacks_->connection().dispatcher().timeSource().systemTime();
8
  getStreamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(now);
8
  if (getStreamInfo().getUpstreamBytesMeter()) {
8
    getStreamInfo().getUpstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(now);
8
  }
8
  resetAccessLogFlushTimer();
8
}
2537
void Filter::flushAccessLog(AccessLog::AccessLogType access_log_type) {
2537
  const Formatter::Context log_context{nullptr, nullptr, nullptr, {}, access_log_type};
2563
  for (const auto& access_log : config_->accessLogs()) {
507
    access_log->log(log_context, getStreamInfo());
507
  }
2537
}
13
void Filter::resetAccessLogFlushTimer() {
13
  if (access_log_flush_timer_ != nullptr) {
13
    ASSERT(config_->accessLogFlushInterval().has_value());
13
    access_log_flush_timer_->enableTimer(config_->accessLogFlushInterval().value());
13
  }
13
}
2522
void Filter::disableAccessLogFlushTimer() {
2522
  if (access_log_flush_timer_ != nullptr) {
5
    access_log_flush_timer_->disableTimer();
5
    access_log_flush_timer_.reset();
5
  }
2522
}
529765
void Filter::resetIdleTimer() {
529765
  if (idle_timer_ != nullptr) {
529440
    ASSERT(idle_timeout_);
529440
    idle_timer_->enableTimer(idle_timeout_.value());
529440
  }
529765
}
3793
void Filter::disableIdleTimer() {
3793
  if (idle_timer_ != nullptr) {
2343
    idle_timer_->disableTimer();
2343
    idle_timer_.reset();
2343
  }
3793
}
106
void Filter::onRetryTimer() {
106
  route_ = pickRoute();
106
  establishUpstreamConnection();
106
}
106
void Filter::enableRetryTimer() {
  // Create the retry timer on the first retry.
106
  if (!retry_timer_) {
98
    retry_timer_ =
106
        read_callbacks_->connection().dispatcher().createTimer([this] { onRetryTimer(); });
98
  }
  // If the backoff strategy is not configured, the next backoff time will be 0.
  // This will allow the retry to happen on different event loop iteration, which
  // will allow the connection pool to be cleaned up from the previous closed connection.
106
  uint64_t next_backoff_ms = 0;
106
  if (config_->backoffStrategy()) {
42
    next_backoff_ms = config_->backoffStrategy()->nextBackOffMs();
42
  }
106
  retry_timer_->enableTimer(std::chrono::milliseconds(next_backoff_ms));
106
}
2522
void Filter::disableRetryTimer() {
2522
  if (retry_timer_ != nullptr) {
98
    retry_timer_->disableTimer();
98
    retry_timer_.reset();
98
  }
2522
}
6
void Filter::onDownstreamTlsHandshakeComplete() {
6
  ENVOY_CONN_LOG(debug, "downstream TLS handshake complete", read_callbacks_->connection());
6
  tls_handshake_complete_ = true;
6
  waiting_for_tls_handshake_ = false;
  // For ON_DOWNSTREAM_TLS_HANDSHAKE mode, establish the upstream connection now.
6
  if (connect_mode_ == UpstreamConnectMode::ON_DOWNSTREAM_TLS_HANDSHAKE) {
    // Route should already be set in onNewConnection().
6
    ASSERT(route_ != nullptr);
6
    establishUpstreamConnection();
6
  }
6
}
Filter::HttpStreamDecoderFilterCallbacks::HttpStreamDecoderFilterCallbacks(Filter* parent)
2524
    : parent_(parent), request_trailer_map_(Http::RequestTrailerMapImpl::create()) {}
2531
UpstreamDrainManager::~UpstreamDrainManager() {
  // If connections aren't closed before they are destructed an ASSERT fires,
  // so cancel all pending drains, which causes the connections to be closed.
2531
  if (!drainers_.empty()) {
1
    auto& dispatcher = drainers_.begin()->second->dispatcher();
2
    while (!drainers_.empty()) {
1
      auto begin = drainers_.begin();
1
      Drainer* key = begin->first;
1
      begin->second->cancelDrain();
      // cancelDrain() should cause that drainer to be removed from drainers_.
      // ASSERT so that we don't end up in an infinite loop.
1
      ASSERT(drainers_.find(key) == drainers_.end());
1
    }
    // This destructor is run when shutting down `ThreadLocal`. The destructor of some objects use
    // earlier `ThreadLocal` slots (for accessing the runtime snapshot) so they must run before that
    // slot is destructed. Clear the list to enforce that ordering.
1
    dispatcher.clearDeferredDeleteList();
1
  }
2531
}
void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config,
                               Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
                               const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
                               Event::TimerPtr&& idle_timer,
                               absl::optional<std::chrono::milliseconds> idle_timeout,
12
                               const Upstream::HostDescriptionConstSharedPtr& upstream_host) {
12
  DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data),
12
                                 std::move(idle_timer), idle_timeout, upstream_host));
12
  callbacks->drain(*drainer);
  // Use temporary to ensure we get the pointer before we move it out of drainer
12
  Drainer* ptr = drainer.get();
12
  drainers_[ptr] = std::move(drainer);
12
}
12
void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatcher) {
12
  auto it = drainers_.find(&drainer);
12
  ASSERT(it != drainers_.end());
12
  dispatcher.deferredDelete(std::move(it->second));
12
  drainers_.erase(it);
12
}
Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
                 const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
                 Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
                 absl::optional<std::chrono::milliseconds> idle_timeout,
                 const Upstream::HostDescriptionConstSharedPtr& upstream_host)
12
    : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)),
12
      idle_timer_(std::move(idle_timer)), idle_timeout_(idle_timeout),
12
      upstream_host_(upstream_host), config_(config) {
12
  ENVOY_CONN_LOG(trace, "draining the upstream connection", upstream_conn_data_->connection());
12
  config_->stats().upstream_flush_total_.inc();
12
  config_->stats().upstream_flush_active_.inc();
12
}
12
void Drainer::onEvent(Network::ConnectionEvent event) {
12
  if (event == Network::ConnectionEvent::RemoteClose ||
12
      event == Network::ConnectionEvent::LocalClose) {
12
    if (idle_timer_ != nullptr) {
12
      idle_timer_->disableTimer();
12
    }
12
    config_->stats().upstream_flush_active_.dec();
12
    parent_.remove(*this, upstream_conn_data_->connection().dispatcher());
12
  }
12
}
2
void Drainer::onData(Buffer::Instance& data, bool) {
2
  if (data.length() > 0) {
    // There is no downstream connection to send any data to, but the upstream
    // sent some data. Try to behave similar to what the kernel would do
    // when it receives data on a connection where the application has closed
    // the socket or ::shutdown(fd, SHUT_RD), and close/reset the connection.
2
    cancelDrain();
2
  }
2
}
2
void Drainer::onIdleTimeout() {
2
  config_->stats().idle_timeout_.inc();
2
  cancelDrain();
2
}
25
void Drainer::onBytesSent() {
25
  if (idle_timer_ != nullptr) {
25
    idle_timer_->enableTimer(idle_timeout_.value());
25
  }
25
}
5
void Drainer::cancelDrain() {
  // This sends onEvent(LocalClose).
5
  upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
5
}
1
Event::Dispatcher& Drainer::dispatcher() { return upstream_conn_data_->connection().dispatcher(); }
} // namespace TcpProxy
} // namespace Envoy