LCOV - code coverage report
Current view: top level - source/common/tcp_proxy - tcp_proxy.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 4 738 0.5 %
Date: 2024-01-05 06:35:25 Functions: 2 76 2.6 %

          Line data    Source code
       1             : #include "source/common/tcp_proxy/tcp_proxy.h"
       2             : 
       3             : #include <cstdint>
       4             : #include <memory>
       5             : #include <string>
       6             : 
       7             : #include "envoy/buffer/buffer.h"
       8             : #include "envoy/config/accesslog/v3/accesslog.pb.h"
       9             : #include "envoy/config/core/v3/base.pb.h"
      10             : #include "envoy/event/dispatcher.h"
      11             : #include "envoy/event/timer.h"
      12             : #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
      13             : #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h"
      14             : #include "envoy/registry/registry.h"
      15             : #include "envoy/stats/scope.h"
      16             : #include "envoy/upstream/cluster_manager.h"
      17             : #include "envoy/upstream/upstream.h"
      18             : 
      19             : #include "source/common/access_log/access_log_impl.h"
      20             : #include "source/common/common/assert.h"
      21             : #include "source/common/common/empty_string.h"
      22             : #include "source/common/common/enum_to_int.h"
      23             : #include "source/common/common/fmt.h"
      24             : #include "source/common/common/macros.h"
      25             : #include "source/common/common/utility.h"
      26             : #include "source/common/config/metadata.h"
      27             : #include "source/common/config/utility.h"
      28             : #include "source/common/config/well_known_names.h"
      29             : #include "source/common/network/application_protocol.h"
      30             : #include "source/common/network/proxy_protocol_filter_state.h"
      31             : #include "source/common/network/socket_option_factory.h"
      32             : #include "source/common/network/transport_socket_options_impl.h"
      33             : #include "source/common/network/upstream_server_name.h"
      34             : #include "source/common/network/upstream_socket_options_filter_state.h"
      35             : #include "source/common/router/metadatamatchcriteria_impl.h"
      36             : #include "source/common/stream_info/stream_id_provider_impl.h"
      37             : 
      38             : namespace Envoy {
      39             : namespace TcpProxy {
      40             : 
      41           2 : const std::string& PerConnectionCluster::key() {
      42           2 :   CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.cluster");
      43           2 : }
      44             : 
      45             : class PerConnectionClusterFactory : public StreamInfo::FilterState::ObjectFactory {
      46             : public:
      47           2 :   std::string name() const override { return PerConnectionCluster::key(); }
      48             :   std::unique_ptr<StreamInfo::FilterState::Object>
      49           0 :   createFromBytes(absl::string_view data) const override {
      50           0 :     return std::make_unique<PerConnectionCluster>(data);
      51           0 :   }
      52             : };
      53             : 
      54             : REGISTER_FACTORY(PerConnectionClusterFactory, StreamInfo::FilterState::ObjectFactory);
      55             : 
      56             : Config::SimpleRouteImpl::SimpleRouteImpl(const Config& parent, absl::string_view cluster_name)
      57           0 :     : parent_(parent), cluster_name_(cluster_name) {}
      58             : 
      59             : Config::WeightedClusterEntry::WeightedClusterEntry(
      60             :     const Config& parent, const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::
      61             :                               WeightedCluster::ClusterWeight& config)
      62           0 :     : parent_(parent), cluster_name_(config.name()), cluster_weight_(config.weight()) {
      63           0 :   if (config.has_metadata_match()) {
      64           0 :     const auto filter_it = config.metadata_match().filter_metadata().find(
      65           0 :         Envoy::Config::MetadataFilters::get().ENVOY_LB);
      66           0 :     if (filter_it != config.metadata_match().filter_metadata().end()) {
      67           0 :       if (parent.cluster_metadata_match_criteria_) {
      68           0 :         metadata_match_criteria_ =
      69           0 :             parent.cluster_metadata_match_criteria_->mergeMatchCriteria(filter_it->second);
      70           0 :       } else {
      71           0 :         metadata_match_criteria_ =
      72           0 :             std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
      73           0 :       }
      74           0 :     }
      75           0 :   }
      76           0 : }
      77             : 
      78           0 : OnDemandStats OnDemandConfig::generateStats(Stats::Scope& scope) {
      79           0 :   return {ON_DEMAND_TCP_PROXY_STATS(POOL_COUNTER(scope))};
      80           0 : }
      81             : 
      82             : Config::SharedConfig::SharedConfig(
      83             :     const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
      84             :     Server::Configuration::FactoryContext& context)
      85             :     : stats_scope_(context.scope().createScope(fmt::format("tcp.{}", config.stat_prefix()))),
      86           0 :       stats_(generateStats(*stats_scope_)) {
      87           0 :   if (config.has_idle_timeout()) {
      88           0 :     const uint64_t timeout = DurationUtil::durationToMilliseconds(config.idle_timeout());
      89           0 :     if (timeout > 0) {
      90           0 :       idle_timeout_ = std::chrono::milliseconds(timeout);
      91           0 :     }
      92           0 :   } else {
      93           0 :     idle_timeout_ = std::chrono::hours(1);
      94           0 :   }
      95           0 :   if (config.has_tunneling_config()) {
      96           0 :     tunneling_config_helper_ =
      97           0 :         std::make_unique<TunnelingConfigHelperImpl>(config.tunneling_config(), context);
      98           0 :   }
      99           0 :   if (config.has_max_downstream_connection_duration()) {
     100           0 :     const uint64_t connection_duration =
     101           0 :         DurationUtil::durationToMilliseconds(config.max_downstream_connection_duration());
     102           0 :     max_downstream_connection_duration_ = std::chrono::milliseconds(connection_duration);
     103           0 :   }
     104             : 
     105           0 :   if (config.has_access_log_options()) {
     106           0 :     if (config.flush_access_log_on_connected() /* deprecated */) {
     107           0 :       throw EnvoyException(
     108           0 :           "Only one of flush_access_log_on_connected or access_log_options can be specified.");
     109           0 :     }
     110             : 
     111           0 :     if (config.has_access_log_flush_interval() /* deprecated */) {
     112           0 :       throw EnvoyException(
     113           0 :           "Only one of access_log_flush_interval or access_log_options can be specified.");
     114           0 :     }
     115             : 
     116           0 :     flush_access_log_on_connected_ = config.access_log_options().flush_access_log_on_connected();
     117             : 
     118           0 :     if (config.access_log_options().has_access_log_flush_interval()) {
     119           0 :       const uint64_t flush_interval = DurationUtil::durationToMilliseconds(
     120           0 :           config.access_log_options().access_log_flush_interval());
     121           0 :       access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
     122           0 :     }
     123           0 :   } else {
     124           0 :     flush_access_log_on_connected_ = config.flush_access_log_on_connected();
     125             : 
     126           0 :     if (config.has_access_log_flush_interval()) {
     127           0 :       const uint64_t flush_interval =
     128           0 :           DurationUtil::durationToMilliseconds(config.access_log_flush_interval());
     129           0 :       access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
     130           0 :     }
     131           0 :   }
     132             : 
     133           0 :   if (config.has_on_demand() && config.on_demand().has_odcds_config()) {
     134           0 :     on_demand_config_ =
     135           0 :         std::make_unique<OnDemandConfig>(config.on_demand(), context, *stats_scope_);
     136           0 :   }
     137           0 : }
     138             : 
     139             : Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
     140             :                Server::Configuration::FactoryContext& context)
     141             :     : max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)),
     142             :       upstream_drain_manager_slot_(context.serverFactoryContext().threadLocal().allocateSlot()),
     143             :       shared_config_(std::make_shared<SharedConfig>(config, context)),
     144           0 :       random_generator_(context.serverFactoryContext().api().randomGenerator()) {
     145           0 :   upstream_drain_manager_slot_->set([](Event::Dispatcher&) {
     146           0 :     ThreadLocal::ThreadLocalObjectSharedPtr drain_manager =
     147           0 :         std::make_shared<UpstreamDrainManager>();
     148           0 :     return drain_manager;
     149           0 :   });
     150             : 
     151           0 :   if (!config.cluster().empty()) {
     152           0 :     default_route_ = std::make_shared<const SimpleRouteImpl>(*this, config.cluster());
     153           0 :   }
     154             : 
     155           0 :   if (config.has_metadata_match()) {
     156           0 :     const auto& filter_metadata = config.metadata_match().filter_metadata();
     157             : 
     158           0 :     const auto filter_it = filter_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
     159             : 
     160           0 :     if (filter_it != filter_metadata.end()) {
     161           0 :       cluster_metadata_match_criteria_ =
     162           0 :           std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
     163           0 :     }
     164           0 :   }
     165             : 
     166             :   // Weighted clusters will be enabled only if the default cluster is absent.
     167           0 :   if (default_route_ == nullptr && config.has_weighted_clusters()) {
     168           0 :     total_cluster_weight_ = 0;
     169           0 :     for (const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::WeightedCluster::
     170           0 :              ClusterWeight& cluster_desc : config.weighted_clusters().clusters()) {
     171           0 :       WeightedClusterEntryConstSharedPtr cluster_entry(
     172           0 :           std::make_shared<const WeightedClusterEntry>(*this, cluster_desc));
     173           0 :       weighted_clusters_.emplace_back(std::move(cluster_entry));
     174           0 :       total_cluster_weight_ += weighted_clusters_.back()->clusterWeight();
     175           0 :     }
     176           0 :   }
     177             : 
     178           0 :   for (const envoy::config::accesslog::v3::AccessLog& log_config : config.access_log()) {
     179           0 :     access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
     180           0 :   }
     181             : 
     182           0 :   if (!config.hash_policy().empty()) {
     183           0 :     hash_policy_ = std::make_unique<Network::HashPolicyImpl>(config.hash_policy());
     184           0 :   }
     185           0 : }
     186             : 
     187           0 : RouteConstSharedPtr Config::getRegularRouteFromEntries(Network::Connection& connection) {
     188             :   // First check if the per-connection state to see if we need to route to a pre-selected cluster
     189           0 :   if (const auto* per_connection_cluster =
     190           0 :           connection.streamInfo().filterState()->getDataReadOnly<PerConnectionCluster>(
     191           0 :               PerConnectionCluster::key());
     192           0 :       per_connection_cluster != nullptr) {
     193           0 :     return std::make_shared<const SimpleRouteImpl>(*this, per_connection_cluster->value());
     194           0 :   }
     195             : 
     196           0 :   if (default_route_ != nullptr) {
     197           0 :     return default_route_;
     198           0 :   }
     199             : 
     200             :   // no match, no more routes to try
     201           0 :   return nullptr;
     202           0 : }
     203             : 
     204           0 : RouteConstSharedPtr Config::getRouteFromEntries(Network::Connection& connection) {
     205           0 :   if (weighted_clusters_.empty()) {
     206           0 :     return getRegularRouteFromEntries(connection);
     207           0 :   }
     208           0 :   return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_,
     209           0 :                                           random_generator_.random(), false);
     210           0 : }
     211             : 
     212           0 : UpstreamDrainManager& Config::drainManager() {
     213           0 :   return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
     214           0 : }
     215             : 
     216             : Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager)
     217             :     : config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
     218           0 :       upstream_callbacks_(new UpstreamCallbacks(this)) {
     219           0 :   ASSERT(config != nullptr);
     220           0 : }
     221             : 
     222           0 : Filter::~Filter() {
     223             :   // Disable access log flush timer if it is enabled.
     224           0 :   disableAccessLogFlushTimer();
     225             : 
     226             :   // Flush the final end stream access log entry.
     227           0 :   flushAccessLog(AccessLog::AccessLogType::TcpConnectionEnd);
     228             : 
     229           0 :   ASSERT(generic_conn_pool_ == nullptr);
     230           0 :   ASSERT(upstream_ == nullptr);
     231           0 : }
     232             : 
     233           0 : TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) {
     234           0 :   return {ALL_TCP_PROXY_STATS(POOL_COUNTER(scope), POOL_GAUGE(scope))};
     235           0 : }
     236             : 
     237           0 : void Filter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
     238           0 :   initialize(callbacks, true);
     239           0 : }
     240             : 
     241           0 : void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats) {
     242           0 :   read_callbacks_ = &callbacks;
     243           0 :   ENVOY_CONN_LOG(debug, "new tcp proxy session", read_callbacks_->connection());
     244             : 
     245           0 :   read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
     246           0 :   read_callbacks_->connection().enableHalfClose(true);
     247             : 
     248             :   // Check that we are generating only the byte meters we need.
     249             :   // The Downstream should be unset and the Upstream should be populated.
     250           0 :   ASSERT(getStreamInfo().getDownstreamBytesMeter() == nullptr);
     251           0 :   ASSERT(getStreamInfo().getUpstreamBytesMeter() != nullptr);
     252             : 
     253             :   // Need to disable reads so that we don't write to an upstream that might fail
     254             :   // in onData(). This will get re-enabled when the upstream connection is
     255             :   // established.
     256           0 :   read_callbacks_->connection().readDisable(true);
     257           0 :   getStreamInfo().setDownstreamBytesMeter(std::make_shared<StreamInfo::BytesMeter>());
     258           0 :   getStreamInfo().setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
     259             : 
     260           0 :   config_->stats().downstream_cx_total_.inc();
     261           0 :   if (set_connection_stats) {
     262           0 :     read_callbacks_->connection().setConnectionStats(
     263           0 :         {config_->stats().downstream_cx_rx_bytes_total_,
     264           0 :          config_->stats().downstream_cx_rx_bytes_buffered_,
     265           0 :          config_->stats().downstream_cx_tx_bytes_total_,
     266           0 :          config_->stats().downstream_cx_tx_bytes_buffered_, nullptr, nullptr});
     267           0 :   }
     268           0 : }
     269             : 
     270           0 : void Filter::onInitFailure(UpstreamFailureReason reason) {
     271             :   // If ODCDS fails, the filter will not attempt to create a connection to
     272             :   // upstream, as it does not have an assigned upstream. As such the filter will
     273             :   // not have started attempting to connect to an upstream and there is no
     274             :   // connection pool callback latency to record.
     275           0 :   if (initial_upstream_connection_start_time_.has_value()) {
     276           0 :     getStreamInfo().upstreamInfo()->upstreamTiming().recordConnectionPoolCallbackLatency(
     277           0 :         initial_upstream_connection_start_time_.value(),
     278           0 :         read_callbacks_->connection().dispatcher().timeSource());
     279           0 :   }
     280           0 :   read_callbacks_->connection().close(
     281           0 :       Network::ConnectionCloseType::NoFlush,
     282           0 :       absl::StrCat(StreamInfo::LocalCloseReasons::get().TcpProxyInitializationFailure,
     283           0 :                    enumToInt(reason)));
     284           0 : }
     285             : 
     286           0 : void Filter::readDisableUpstream(bool disable) {
     287           0 :   bool success = false;
     288           0 :   if (upstream_) {
     289           0 :     success = upstream_->readDisable(disable);
     290           0 :   }
     291           0 :   if (!success) {
     292           0 :     return;
     293           0 :   }
     294           0 :   if (disable) {
     295           0 :     read_callbacks_->upstreamHost()
     296           0 :         ->cluster()
     297           0 :         .trafficStats()
     298           0 :         ->upstream_flow_control_paused_reading_total_.inc();
     299           0 :   } else {
     300           0 :     read_callbacks_->upstreamHost()
     301           0 :         ->cluster()
     302           0 :         .trafficStats()
     303           0 :         ->upstream_flow_control_resumed_reading_total_.inc();
     304           0 :   }
     305           0 : }
     306             : 
     307           0 : void Filter::readDisableDownstream(bool disable) {
     308           0 :   if (read_callbacks_->connection().state() != Network::Connection::State::Open) {
     309             :     // During idle timeouts, we close both upstream and downstream with NoFlush.
     310             :     // Envoy still does a best-effort flush which can case readDisableDownstream to be called
     311             :     // despite the downstream connection being closed.
     312           0 :     return;
     313           0 :   }
     314             : 
     315           0 :   const Network::Connection::ReadDisableStatus read_disable_status =
     316           0 :       read_callbacks_->connection().readDisable(disable);
     317             : 
     318           0 :   if (read_disable_status == Network::Connection::ReadDisableStatus::TransitionedToReadDisabled) {
     319           0 :     config_->stats().downstream_flow_control_paused_reading_total_.inc();
     320           0 :   } else if (read_disable_status ==
     321           0 :              Network::Connection::ReadDisableStatus::TransitionedToReadEnabled) {
     322           0 :     config_->stats().downstream_flow_control_resumed_reading_total_.inc();
     323           0 :   }
     324           0 : }
     325             : 
     326           0 : StreamInfo::StreamInfo& Filter::getStreamInfo() {
     327           0 :   return read_callbacks_->connection().streamInfo();
     328           0 : }
     329             : 
     330           0 : void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
     331           0 :   ASSERT(!on_high_watermark_called_);
     332           0 :   on_high_watermark_called_ = true;
     333             :   // If downstream has too much data buffered, stop reading on the upstream connection.
     334           0 :   parent_.readDisableUpstream(true);
     335           0 : }
     336             : 
     337           0 : void Filter::DownstreamCallbacks::onBelowWriteBufferLowWatermark() {
     338           0 :   ASSERT(on_high_watermark_called_);
     339           0 :   on_high_watermark_called_ = false;
     340             :   // The downstream buffer has been drained. Resume reading from upstream.
     341           0 :   parent_.readDisableUpstream(false);
     342           0 : }
     343             : 
     344           0 : void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) {
     345           0 :   if (event == Network::ConnectionEvent::Connected ||
     346           0 :       event == Network::ConnectionEvent::ConnectedZeroRtt) {
     347           0 :     return;
     348           0 :   }
     349           0 :   if (drainer_ == nullptr) {
     350           0 :     parent_->onUpstreamEvent(event);
     351           0 :   } else {
     352           0 :     drainer_->onEvent(event);
     353           0 :   }
     354           0 : }
     355             : 
     356           0 : void Filter::UpstreamCallbacks::onAboveWriteBufferHighWatermark() {
     357             :   // TCP Tunneling may call on high/low watermark multiple times.
     358           0 :   ASSERT(parent_->config_->tunnelingConfigHelper() || !on_high_watermark_called_);
     359           0 :   on_high_watermark_called_ = true;
     360             : 
     361           0 :   if (parent_ != nullptr) {
     362             :     // There's too much data buffered in the upstream write buffer, so stop reading.
     363           0 :     parent_->readDisableDownstream(true);
     364           0 :   }
     365           0 : }
     366             : 
     367           0 : void Filter::UpstreamCallbacks::onBelowWriteBufferLowWatermark() {
     368             :   // TCP Tunneling may call on high/low watermark multiple times.
     369           0 :   ASSERT(parent_->config_->tunnelingConfigHelper() || on_high_watermark_called_);
     370           0 :   on_high_watermark_called_ = false;
     371             : 
     372           0 :   if (parent_ != nullptr) {
     373             :     // The upstream write buffer is drained. Resume reading.
     374           0 :     parent_->readDisableDownstream(false);
     375           0 :   }
     376           0 : }
     377             : 
     378           0 : void Filter::UpstreamCallbacks::onUpstreamData(Buffer::Instance& data, bool end_stream) {
     379           0 :   if (parent_) {
     380           0 :     parent_->onUpstreamData(data, end_stream);
     381           0 :   } else {
     382           0 :     drainer_->onData(data, end_stream);
     383           0 :   }
     384           0 : }
     385             : 
     386           0 : void Filter::UpstreamCallbacks::onBytesSent() {
     387           0 :   if (drainer_ == nullptr) {
     388           0 :     parent_->resetIdleTimer();
     389           0 :   } else {
     390           0 :     drainer_->onBytesSent();
     391           0 :   }
     392           0 : }
     393             : 
     394           0 : void Filter::UpstreamCallbacks::onIdleTimeout() {
     395           0 :   if (drainer_ == nullptr) {
     396           0 :     parent_->onIdleTimeout();
     397           0 :   } else {
     398           0 :     drainer_->onIdleTimeout();
     399           0 :   }
     400           0 : }
     401             : 
     402           0 : void Filter::UpstreamCallbacks::drain(Drainer& drainer) {
     403           0 :   ASSERT(drainer_ == nullptr); // This should only get set once.
     404           0 :   drainer_ = &drainer;
     405           0 :   parent_ = nullptr;
     406           0 : }
     407             : 
     408           0 : Network::FilterStatus Filter::establishUpstreamConnection() {
     409           0 :   const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
     410           0 :   Upstream::ThreadLocalCluster* thread_local_cluster =
     411           0 :       cluster_manager_.getThreadLocalCluster(cluster_name);
     412             : 
     413           0 :   if (!thread_local_cluster) {
     414           0 :     auto odcds = config_->onDemandCds();
     415           0 :     if (!odcds.has_value()) {
     416             :       // No ODCDS? It means that on-demand discovery is disabled.
     417           0 :       ENVOY_CONN_LOG(debug, "Cluster not found {} and no on demand cluster set.",
     418           0 :                      read_callbacks_->connection(), cluster_name);
     419           0 :       config_->stats().downstream_cx_no_route_.inc();
     420           0 :       getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
     421           0 :       onInitFailure(UpstreamFailureReason::NoRoute);
     422           0 :     } else {
     423           0 :       ASSERT(!cluster_discovery_handle_);
     424           0 :       auto callback = std::make_unique<Upstream::ClusterDiscoveryCallback>(
     425           0 :           [this](Upstream::ClusterDiscoveryStatus cluster_status) {
     426           0 :             onClusterDiscoveryCompletion(cluster_status);
     427           0 :           });
     428           0 :       config_->onDemandStats().on_demand_cluster_attempt_.inc();
     429           0 :       cluster_discovery_handle_ = odcds->requestOnDemandClusterDiscovery(
     430           0 :           cluster_name, std::move(callback), config_->odcdsTimeout());
     431           0 :     }
     432           0 :     return Network::FilterStatus::StopIteration;
     433           0 :   }
     434             : 
     435           0 :   ENVOY_CONN_LOG(debug, "Creating connection to cluster {}", read_callbacks_->connection(),
     436           0 :                  cluster_name);
     437           0 :   if (!initial_upstream_connection_start_time_.has_value()) {
     438           0 :     initial_upstream_connection_start_time_.emplace(
     439           0 :         read_callbacks_->connection().dispatcher().timeSource().monotonicTime());
     440           0 :   }
     441             : 
     442           0 :   const Upstream::ClusterInfoConstSharedPtr& cluster = thread_local_cluster->info();
     443           0 :   getStreamInfo().setUpstreamClusterInfo(cluster);
     444             : 
     445             :   // Check this here because the TCP conn pool will queue our request waiting for a connection that
     446             :   // will never be released.
     447           0 :   if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) {
     448           0 :     getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
     449           0 :     cluster->trafficStats()->upstream_cx_overflow_.inc();
     450           0 :     onInitFailure(UpstreamFailureReason::ResourceLimitExceeded);
     451           0 :     return Network::FilterStatus::StopIteration;
     452           0 :   }
     453             : 
     454           0 :   const uint32_t max_connect_attempts = config_->maxConnectAttempts();
     455           0 :   if (connect_attempts_ >= max_connect_attempts) {
     456           0 :     getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
     457           0 :     cluster->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
     458           0 :     onInitFailure(UpstreamFailureReason::ConnectFailed);
     459           0 :     return Network::FilterStatus::StopIteration;
     460           0 :   }
     461             : 
     462           0 :   auto& downstream_connection = read_callbacks_->connection();
     463           0 :   auto& filter_state = downstream_connection.streamInfo().filterState();
     464           0 :   if (!filter_state->hasData<Network::ProxyProtocolFilterState>(
     465           0 :           Network::ProxyProtocolFilterState::key())) {
     466           0 :     filter_state->setData(
     467           0 :         Network::ProxyProtocolFilterState::key(),
     468           0 :         std::make_shared<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
     469           0 :             downstream_connection.connectionInfoProvider().remoteAddress(),
     470           0 :             downstream_connection.connectionInfoProvider().localAddress()}),
     471           0 :         StreamInfo::FilterState::StateType::ReadOnly,
     472           0 :         StreamInfo::FilterState::LifeSpan::Connection);
     473           0 :   }
     474           0 :   transport_socket_options_ =
     475           0 :       Network::TransportSocketOptionsUtility::fromFilterState(*filter_state);
     476             : 
     477           0 :   if (auto typed_state = filter_state->getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
     478           0 :           Network::UpstreamSocketOptionsFilterState::key());
     479           0 :       typed_state != nullptr) {
     480           0 :     auto downstream_options = typed_state->value();
     481           0 :     if (!upstream_options_) {
     482           0 :       upstream_options_ = std::make_shared<Network::Socket::Options>();
     483           0 :     }
     484           0 :     Network::Socket::appendOptions(upstream_options_, downstream_options);
     485           0 :   }
     486             : 
     487           0 :   if (!maybeTunnel(*thread_local_cluster)) {
     488             :     // Either cluster is unknown or there are no healthy hosts. tcpConnPool() increments
     489             :     // cluster->trafficStats()->upstream_cx_none_healthy in the latter case.
     490           0 :     getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
     491           0 :     onInitFailure(UpstreamFailureReason::NoHealthyUpstream);
     492           0 :   }
     493           0 :   return Network::FilterStatus::StopIteration;
     494           0 : }
     495             : 
     496           0 : void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) {
     497             :   // Clear the cluster_discovery_handle_ before calling establishUpstreamConnection since we may
     498             :   // request cluster again.
     499           0 :   cluster_discovery_handle_.reset();
     500           0 :   const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
     501           0 :   switch (cluster_status) {
     502           0 :   case Upstream::ClusterDiscoveryStatus::Missing:
     503           0 :     ENVOY_CONN_LOG(debug, "On demand cluster {} is missing", read_callbacks_->connection(),
     504           0 :                    cluster_name);
     505           0 :     config_->onDemandStats().on_demand_cluster_missing_.inc();
     506           0 :     break;
     507           0 :   case Upstream::ClusterDiscoveryStatus::Timeout:
     508           0 :     ENVOY_CONN_LOG(debug, "On demand cluster {} was not found before timeout.",
     509           0 :                    read_callbacks_->connection(), cluster_name);
     510           0 :     config_->onDemandStats().on_demand_cluster_timeout_.inc();
     511           0 :     break;
     512           0 :   case Upstream::ClusterDiscoveryStatus::Available:
     513             :     // cluster_discovery_handle_ would have been cancelled if the downstream were closed.
     514           0 :     ASSERT(!downstream_closed_);
     515           0 :     ENVOY_CONN_LOG(debug, "On demand cluster {} is found. Establishing connection.",
     516           0 :                    read_callbacks_->connection(), cluster_name);
     517           0 :     config_->onDemandStats().on_demand_cluster_success_.inc();
     518           0 :     establishUpstreamConnection();
     519           0 :     return;
     520           0 :   }
     521             :   // Failure path.
     522           0 :   config_->stats().downstream_cx_no_route_.inc();
     523           0 :   getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
     524           0 :   onInitFailure(UpstreamFailureReason::NoRoute);
     525           0 : }
     526             : 
     527           0 : bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
     528           0 :   GenericConnPoolFactory* factory = nullptr;
     529           0 :   if (cluster.info()->upstreamConfig().has_value()) {
     530           0 :     factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
     531           0 :         cluster.info()->upstreamConfig().ref());
     532           0 :   } else {
     533           0 :     factory = Envoy::Config::Utility::getFactoryByName<GenericConnPoolFactory>(
     534           0 :         "envoy.filters.connection_pools.tcp.generic");
     535           0 :   }
     536           0 :   if (!factory) {
     537           0 :     return false;
     538           0 :   }
     539             : 
     540           0 :   generic_conn_pool_ = factory->createGenericConnPool(cluster, config_->tunnelingConfigHelper(),
     541           0 :                                                       this, *upstream_callbacks_, getStreamInfo());
     542           0 :   if (generic_conn_pool_) {
     543           0 :     connecting_ = true;
     544           0 :     connect_attempts_++;
     545           0 :     getStreamInfo().setAttemptCount(connect_attempts_);
     546           0 :     generic_conn_pool_->newStream(*this);
     547             :     // Because we never return open connections to the pool, this either has a handle waiting on
     548             :     // connection completion, or onPoolFailure has been invoked. Either way, stop iteration.
     549           0 :     return true;
     550           0 :   }
     551           0 :   return false;
     552           0 : }
     553             : 
     554             : void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
     555             :                                   absl::string_view failure_reason,
     556           0 :                                   Upstream::HostDescriptionConstSharedPtr host) {
     557           0 :   generic_conn_pool_.reset();
     558           0 :   read_callbacks_->upstreamHost(host);
     559           0 :   getStreamInfo().upstreamInfo()->setUpstreamHost(host);
     560           0 :   getStreamInfo().upstreamInfo()->setUpstreamTransportFailureReason(failure_reason);
     561             : 
     562           0 :   switch (reason) {
     563           0 :   case ConnectionPool::PoolFailureReason::Overflow:
     564           0 :   case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
     565           0 :     upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose);
     566           0 :     break;
     567           0 :   case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
     568           0 :     upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose);
     569           0 :     break;
     570           0 :   case ConnectionPool::PoolFailureReason::Timeout:
     571           0 :     onConnectTimeout();
     572           0 :     break;
     573           0 :   }
     574           0 : }
     575             : 
     576             : void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
     577             :                                 std::unique_ptr<GenericUpstream>&& upstream,
     578             :                                 Upstream::HostDescriptionConstSharedPtr& host,
     579             :                                 const Network::ConnectionInfoProvider& address_provider,
     580           0 :                                 Ssl::ConnectionInfoConstSharedPtr ssl_info) {
     581           0 :   upstream_ = std::move(upstream);
     582           0 :   generic_conn_pool_.reset();
     583           0 :   read_callbacks_->upstreamHost(host);
     584           0 :   StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
     585           0 :   upstream_info.upstreamTiming().recordConnectionPoolCallbackLatency(
     586           0 :       initial_upstream_connection_start_time_.value(),
     587           0 :       read_callbacks_->connection().dispatcher().timeSource());
     588           0 :   upstream_info.setUpstreamHost(host);
     589           0 :   upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
     590           0 :   upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
     591           0 :   upstream_info.setUpstreamSslConnection(ssl_info);
     592           0 :   onUpstreamConnection();
     593           0 :   read_callbacks_->continueReading();
     594           0 :   if (info) {
     595           0 :     upstream_info.setUpstreamFilterState(info->filterState());
     596           0 :   }
     597           0 : }
     598             : 
     599           0 : const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
     600           0 :   const Router::MetadataMatchCriteria* route_criteria =
     601           0 :       (route_ != nullptr) ? route_->metadataMatchCriteria() : nullptr;
     602             : 
     603           0 :   const auto& request_metadata = getStreamInfo().dynamicMetadata().filter_metadata();
     604           0 :   const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
     605             : 
     606           0 :   if (filter_it != request_metadata.end() && route_criteria != nullptr) {
     607           0 :     metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second);
     608           0 :     return metadata_match_criteria_.get();
     609           0 :   } else if (filter_it != request_metadata.end()) {
     610           0 :     metadata_match_criteria_ =
     611           0 :         std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
     612           0 :     return metadata_match_criteria_.get();
     613           0 :   } else {
     614           0 :     return route_criteria;
     615           0 :   }
     616           0 : }
     617             : 
     618           0 : ProtobufTypes::MessagePtr TunnelResponseHeadersOrTrailers::serializeAsProto() const {
     619           0 :   auto proto_out = std::make_unique<envoy::config::core::v3::HeaderMap>();
     620           0 :   value().iterate([&proto_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
     621           0 :     auto* new_header = proto_out->add_headers();
     622           0 :     new_header->set_key(std::string(e.key().getStringView()));
     623           0 :     new_header->set_value(std::string(e.value().getStringView()));
     624           0 :     return Http::HeaderMap::Iterate::Continue;
     625           0 :   });
     626           0 :   return proto_out;
     627           0 : }
     628             : 
     629           0 : const std::string& TunnelResponseHeaders::key() {
     630           0 :   CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.propagate_response_headers");
     631           0 : }
     632             : 
     633           0 : const std::string& TunnelResponseTrailers::key() {
     634           0 :   CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.propagate_response_trailers");
     635           0 : }
     636             : 
     637             : TunnelingConfigHelperImpl::TunnelingConfigHelperImpl(
     638             :     const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig&
     639             :         config_message,
     640             :     Server::Configuration::FactoryContext& context)
     641             :     : use_post_(config_message.use_post()),
     642             :       header_parser_(Envoy::Router::HeaderParser::configure(config_message.headers_to_add())),
     643             :       propagate_response_headers_(config_message.propagate_response_headers()),
     644             :       propagate_response_trailers_(config_message.propagate_response_trailers()),
     645           0 :       post_path_(config_message.post_path()) {
     646           0 :   if (!post_path_.empty() && !use_post_) {
     647           0 :     throw EnvoyException("Can't set a post path when POST method isn't used");
     648           0 :   }
     649           0 :   post_path_ = post_path_.empty() ? "/" : post_path_;
     650             : 
     651           0 :   envoy::config::core::v3::SubstitutionFormatString substitution_format_config;
     652           0 :   substitution_format_config.mutable_text_format_source()->set_inline_string(
     653           0 :       config_message.hostname());
     654           0 :   hostname_fmt_ = Formatter::SubstitutionFormatStringUtils::fromProtoConfig(
     655           0 :       substitution_format_config, context);
     656           0 : }
     657             : 
     658           0 : std::string TunnelingConfigHelperImpl::host(const StreamInfo::StreamInfo& stream_info) const {
     659           0 :   return hostname_fmt_->formatWithContext({}, stream_info);
     660           0 : }
     661             : 
     662             : void TunnelingConfigHelperImpl::propagateResponseHeaders(
     663             :     Http::ResponseHeaderMapPtr&& headers,
     664           0 :     const StreamInfo::FilterStateSharedPtr& filter_state) const {
     665           0 :   if (!propagate_response_headers_) {
     666           0 :     return;
     667           0 :   }
     668           0 :   filter_state->setData(
     669           0 :       TunnelResponseHeaders::key(), std::make_shared<TunnelResponseHeaders>(std::move(headers)),
     670           0 :       StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
     671           0 : }
     672             : 
     673             : void TunnelingConfigHelperImpl::propagateResponseTrailers(
     674             :     Http::ResponseTrailerMapPtr&& trailers,
     675           0 :     const StreamInfo::FilterStateSharedPtr& filter_state) const {
     676           0 :   if (!propagate_response_trailers_) {
     677           0 :     return;
     678           0 :   }
     679           0 :   filter_state->setData(
     680           0 :       TunnelResponseTrailers::key(), std::make_shared<TunnelResponseTrailers>(std::move(trailers)),
     681           0 :       StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
     682           0 : }
     683             : 
     684           0 : void Filter::onConnectTimeout() {
     685           0 :   ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection());
     686           0 :   read_callbacks_->upstreamHost()->outlierDetector().putResult(
     687           0 :       Upstream::Outlier::Result::LocalOriginTimeout);
     688           0 :   getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionFailure);
     689             : 
     690             :   // Raise LocalClose, which will trigger a reconnect if needed/configured.
     691           0 :   upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose);
     692           0 : }
     693             : 
     694           0 : Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
     695           0 :   ENVOY_CONN_LOG(trace, "downstream connection received {} bytes, end_stream={}",
     696           0 :                  read_callbacks_->connection(), data.length(), end_stream);
     697           0 :   getStreamInfo().getDownstreamBytesMeter()->addWireBytesReceived(data.length());
     698           0 :   if (upstream_) {
     699           0 :     getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length());
     700           0 :     upstream_->encodeData(data, end_stream);
     701           0 :   }
     702             :   // The upstream should consume all of the data.
     703             :   // Before there is an upstream the connection should be readDisabled. If the upstream is
     704             :   // destroyed, there should be no further reads as well.
     705           0 :   ASSERT(0 == data.length());
     706           0 :   resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
     707           0 :   return Network::FilterStatus::StopIteration;
     708           0 : }
     709             : 
     710           0 : Network::FilterStatus Filter::onNewConnection() {
     711           0 :   if (config_->maxDownstreamConnectionDuration()) {
     712           0 :     connection_duration_timer_ = read_callbacks_->connection().dispatcher().createTimer(
     713           0 :         [this]() -> void { onMaxDownstreamConnectionDuration(); });
     714           0 :     connection_duration_timer_->enableTimer(config_->maxDownstreamConnectionDuration().value());
     715           0 :   }
     716             : 
     717           0 :   if (config_->accessLogFlushInterval().has_value()) {
     718           0 :     access_log_flush_timer_ = read_callbacks_->connection().dispatcher().createTimer(
     719           0 :         [this]() -> void { onAccessLogFlushInterval(); });
     720           0 :     resetAccessLogFlushTimer();
     721           0 :   }
     722             : 
     723             :   // Set UUID for the connection. This is used for logging and tracing.
     724           0 :   getStreamInfo().setStreamIdProvider(
     725           0 :       std::make_shared<StreamInfo::StreamIdProviderImpl>(config_->randomGenerator().uuid()));
     726             : 
     727           0 :   ASSERT(upstream_ == nullptr);
     728           0 :   route_ = pickRoute();
     729           0 :   return establishUpstreamConnection();
     730           0 : }
     731             : 
     732           0 : bool Filter::startUpstreamSecureTransport() {
     733           0 :   bool switched_to_tls = upstream_->startUpstreamSecureTransport();
     734           0 :   if (switched_to_tls) {
     735           0 :     StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
     736           0 :     upstream_info.setUpstreamSslConnection(upstream_->getUpstreamConnectionSslInfo());
     737           0 :   }
     738           0 :   return switched_to_tls;
     739           0 : }
     740             : 
     741           0 : void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
     742           0 :   if (event == Network::ConnectionEvent::LocalClose ||
     743           0 :       event == Network::ConnectionEvent::RemoteClose) {
     744           0 :     downstream_closed_ = true;
     745             :     // Cancel the potential odcds callback.
     746           0 :     cluster_discovery_handle_ = nullptr;
     747           0 :   }
     748             : 
     749           0 :   ENVOY_CONN_LOG(trace, "on downstream event {}, has upstream = {}", read_callbacks_->connection(),
     750           0 :                  static_cast<int>(event), upstream_ != nullptr);
     751             : 
     752           0 :   if (upstream_) {
     753           0 :     Tcp::ConnectionPool::ConnectionDataPtr conn_data(upstream_->onDownstreamEvent(event));
     754           0 :     if (conn_data != nullptr &&
     755           0 :         conn_data->connection().state() != Network::Connection::State::Closed) {
     756           0 :       config_->drainManager().add(config_->sharedConfig(), std::move(conn_data),
     757           0 :                                   std::move(upstream_callbacks_), std::move(idle_timer_),
     758           0 :                                   read_callbacks_->upstreamHost());
     759           0 :     }
     760           0 :     if (event == Network::ConnectionEvent::LocalClose ||
     761           0 :         event == Network::ConnectionEvent::RemoteClose) {
     762           0 :       upstream_.reset();
     763           0 :       disableIdleTimer();
     764           0 :     }
     765           0 :   }
     766             : 
     767           0 :   if (generic_conn_pool_) {
     768           0 :     if (event == Network::ConnectionEvent::LocalClose ||
     769           0 :         event == Network::ConnectionEvent::RemoteClose) {
     770             :       // Cancel the conn pool request and close any excess pending requests.
     771           0 :       generic_conn_pool_.reset();
     772           0 :     }
     773           0 :   }
     774           0 : }
     775             : 
     776           0 : void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) {
     777           0 :   ENVOY_CONN_LOG(trace, "upstream connection received {} bytes, end_stream={}",
     778           0 :                  read_callbacks_->connection(), data.length(), end_stream);
     779           0 :   getStreamInfo().getUpstreamBytesMeter()->addWireBytesReceived(data.length());
     780           0 :   getStreamInfo().getDownstreamBytesMeter()->addWireBytesSent(data.length());
     781           0 :   read_callbacks_->connection().write(data, end_stream);
     782           0 :   ASSERT(0 == data.length());
     783           0 :   resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
     784           0 : }
     785             : 
     786           0 : void Filter::onUpstreamEvent(Network::ConnectionEvent event) {
     787           0 :   if (event == Network::ConnectionEvent::ConnectedZeroRtt) {
     788           0 :     return;
     789           0 :   }
     790             :   // Update the connecting flag before processing the event because we may start a new connection
     791             :   // attempt in establishUpstreamConnection.
     792           0 :   bool connecting = connecting_;
     793           0 :   connecting_ = false;
     794             : 
     795           0 :   if (event == Network::ConnectionEvent::RemoteClose ||
     796           0 :       event == Network::ConnectionEvent::LocalClose) {
     797           0 :     upstream_.reset();
     798           0 :     disableIdleTimer();
     799             : 
     800           0 :     if (connecting) {
     801           0 :       if (event == Network::ConnectionEvent::RemoteClose) {
     802           0 :         getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionFailure);
     803           0 :         read_callbacks_->upstreamHost()->outlierDetector().putResult(
     804           0 :             Upstream::Outlier::Result::LocalOriginConnectFailed);
     805           0 :       }
     806           0 :       if (!downstream_closed_) {
     807           0 :         route_ = pickRoute();
     808           0 :         establishUpstreamConnection();
     809           0 :       }
     810           0 :     } else {
     811             :       // TODO(botengyao): propagate RST back to downstream connection if RST is received.
     812           0 :       if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
     813           0 :         read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
     814           0 :       }
     815           0 :     }
     816           0 :   }
     817           0 : }
     818             : 
     819           0 : void Filter::onUpstreamConnection() {
     820           0 :   connecting_ = false;
     821             :   // Re-enable downstream reads now that the upstream connection is established
     822             :   // so we have a place to send downstream data to.
     823           0 :   read_callbacks_->connection().readDisable(false);
     824             : 
     825           0 :   read_callbacks_->upstreamHost()->outlierDetector().putResult(
     826           0 :       Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);
     827             : 
     828           0 :   ENVOY_CONN_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}",
     829           0 :                  read_callbacks_->connection(),
     830           0 :                  getStreamInfo().downstreamAddressProvider().requestedServerName());
     831             : 
     832           0 :   if (config_->idleTimeout()) {
     833             :     // The idle_timer_ can be moved to a Drainer, so related callbacks call into
     834             :     // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch
     835             :     // the call to either TcpProxy or to Drainer, depending on the current state.
     836           0 :     idle_timer_ = read_callbacks_->connection().dispatcher().createTimer(
     837           0 :         [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); });
     838           0 :     resetIdleTimer();
     839           0 :     read_callbacks_->connection().addBytesSentCallback([this](uint64_t) {
     840           0 :       resetIdleTimer();
     841           0 :       return true;
     842           0 :     });
     843           0 :     if (upstream_) {
     844           0 :       upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) -> bool {
     845           0 :         upstream_callbacks->onBytesSent();
     846           0 :         return true;
     847           0 :       });
     848           0 :     }
     849           0 :   }
     850             : 
     851           0 :   if (config_->flushAccessLogOnConnected()) {
     852           0 :     flushAccessLog(AccessLog::AccessLogType::TcpUpstreamConnected);
     853           0 :   }
     854           0 : }
     855             : 
     856           0 : void Filter::onIdleTimeout() {
     857           0 :   ENVOY_CONN_LOG(debug, "Session timed out", read_callbacks_->connection());
     858           0 :   config_->stats().idle_timeout_.inc();
     859             : 
     860             :   // This results in also closing the upstream connection.
     861           0 :   read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush,
     862           0 :                                       StreamInfo::LocalCloseReasons::get().TcpSessionIdleTimeout);
     863           0 : }
     864             : 
     865           0 : void Filter::onMaxDownstreamConnectionDuration() {
     866           0 :   ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
     867           0 :   getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::DurationTimeout);
     868           0 :   config_->stats().max_downstream_connection_duration_.inc();
     869           0 :   read_callbacks_->connection().close(
     870           0 :       Network::ConnectionCloseType::NoFlush,
     871           0 :       StreamInfo::LocalCloseReasons::get().MaxConnectionDurationReached);
     872           0 : }
     873             : 
     874           0 : void Filter::onAccessLogFlushInterval() {
     875           0 :   flushAccessLog(AccessLog::AccessLogType::TcpPeriodic);
     876           0 :   const SystemTime now = read_callbacks_->connection().dispatcher().timeSource().systemTime();
     877           0 :   getStreamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(now);
     878           0 :   if (getStreamInfo().getUpstreamBytesMeter()) {
     879           0 :     getStreamInfo().getUpstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(now);
     880           0 :   }
     881           0 :   resetAccessLogFlushTimer();
     882           0 : }
     883             : 
     884           0 : void Filter::flushAccessLog(AccessLog::AccessLogType access_log_type) {
     885           0 :   const Formatter::HttpFormatterContext log_context{nullptr, nullptr, nullptr, {}, access_log_type};
     886             : 
     887           0 :   for (const auto& access_log : config_->accessLogs()) {
     888           0 :     access_log->log(log_context, getStreamInfo());
     889           0 :   }
     890           0 : }
     891             : 
     892           0 : void Filter::resetAccessLogFlushTimer() {
     893           0 :   if (access_log_flush_timer_ != nullptr) {
     894           0 :     ASSERT(config_->accessLogFlushInterval().has_value());
     895           0 :     access_log_flush_timer_->enableTimer(config_->accessLogFlushInterval().value());
     896           0 :   }
     897           0 : }
     898             : 
     899           0 : void Filter::disableAccessLogFlushTimer() {
     900           0 :   if (access_log_flush_timer_ != nullptr) {
     901           0 :     access_log_flush_timer_->disableTimer();
     902           0 :     access_log_flush_timer_.reset();
     903           0 :   }
     904           0 : }
     905             : 
     906           0 : void Filter::resetIdleTimer() {
     907           0 :   if (idle_timer_ != nullptr) {
     908           0 :     ASSERT(config_->idleTimeout());
     909           0 :     idle_timer_->enableTimer(config_->idleTimeout().value());
     910           0 :   }
     911           0 : }
     912             : 
     913           0 : void Filter::disableIdleTimer() {
     914           0 :   if (idle_timer_ != nullptr) {
     915           0 :     idle_timer_->disableTimer();
     916           0 :     idle_timer_.reset();
     917           0 :   }
     918           0 : }
     919             : 
     920           0 : UpstreamDrainManager::~UpstreamDrainManager() {
     921             :   // If connections aren't closed before they are destructed an ASSERT fires,
     922             :   // so cancel all pending drains, which causes the connections to be closed.
     923           0 :   if (!drainers_.empty()) {
     924           0 :     auto& dispatcher = drainers_.begin()->second->dispatcher();
     925           0 :     while (!drainers_.empty()) {
     926           0 :       auto begin = drainers_.begin();
     927           0 :       Drainer* key = begin->first;
     928           0 :       begin->second->cancelDrain();
     929             : 
     930             :       // cancelDrain() should cause that drainer to be removed from drainers_.
     931             :       // ASSERT so that we don't end up in an infinite loop.
     932           0 :       ASSERT(drainers_.find(key) == drainers_.end());
     933           0 :     }
     934             : 
     935             :     // This destructor is run when shutting down `ThreadLocal`. The destructor of some objects use
     936             :     // earlier `ThreadLocal` slots (for accessing the runtime snapshot) so they must run before that
     937             :     // slot is destructed. Clear the list to enforce that ordering.
     938           0 :     dispatcher.clearDeferredDeleteList();
     939           0 :   }
     940           0 : }
     941             : 
     942             : void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config,
     943             :                                Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
     944             :                                const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
     945             :                                Event::TimerPtr&& idle_timer,
     946           0 :                                const Upstream::HostDescriptionConstSharedPtr& upstream_host) {
     947           0 :   DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data),
     948           0 :                                  std::move(idle_timer), upstream_host));
     949           0 :   callbacks->drain(*drainer);
     950             : 
     951             :   // Use temporary to ensure we get the pointer before we move it out of drainer
     952           0 :   Drainer* ptr = drainer.get();
     953           0 :   drainers_[ptr] = std::move(drainer);
     954           0 : }
     955             : 
     956           0 : void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatcher) {
     957           0 :   auto it = drainers_.find(&drainer);
     958           0 :   ASSERT(it != drainers_.end());
     959           0 :   dispatcher.deferredDelete(std::move(it->second));
     960           0 :   drainers_.erase(it);
     961           0 : }
     962             : 
     963             : Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
     964             :                  const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
     965             :                  Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
     966             :                  const Upstream::HostDescriptionConstSharedPtr& upstream_host)
     967             :     : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)),
     968           0 :       timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) {
     969           0 :   ENVOY_CONN_LOG(trace, "draining the upstream connection", upstream_conn_data_->connection());
     970           0 :   config_->stats().upstream_flush_total_.inc();
     971           0 :   config_->stats().upstream_flush_active_.inc();
     972           0 : }
     973             : 
     974           0 : void Drainer::onEvent(Network::ConnectionEvent event) {
     975           0 :   if (event == Network::ConnectionEvent::RemoteClose ||
     976           0 :       event == Network::ConnectionEvent::LocalClose) {
     977           0 :     if (timer_ != nullptr) {
     978           0 :       timer_->disableTimer();
     979           0 :     }
     980           0 :     config_->stats().upstream_flush_active_.dec();
     981           0 :     parent_.remove(*this, upstream_conn_data_->connection().dispatcher());
     982           0 :   }
     983           0 : }
     984             : 
     985           0 : void Drainer::onData(Buffer::Instance& data, bool) {
     986           0 :   if (data.length() > 0) {
     987             :     // There is no downstream connection to send any data to, but the upstream
     988             :     // sent some data. Try to behave similar to what the kernel would do
     989             :     // when it receives data on a connection where the application has closed
     990             :     // the socket or ::shutdown(fd, SHUT_RD), and close/reset the connection.
     991           0 :     cancelDrain();
     992           0 :   }
     993           0 : }
     994             : 
     995           0 : void Drainer::onIdleTimeout() {
     996           0 :   config_->stats().idle_timeout_.inc();
     997           0 :   cancelDrain();
     998           0 : }
     999             : 
    1000           0 : void Drainer::onBytesSent() {
    1001           0 :   if (timer_ != nullptr) {
    1002           0 :     timer_->enableTimer(config_->idleTimeout().value());
    1003           0 :   }
    1004           0 : }
    1005             : 
    1006           0 : void Drainer::cancelDrain() {
    1007             :   // This sends onEvent(LocalClose).
    1008           0 :   upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
    1009           0 : }
    1010             : 
    1011           0 : Event::Dispatcher& Drainer::dispatcher() { return upstream_conn_data_->connection().dispatcher(); }
    1012             : 
    1013             : } // namespace TcpProxy
    1014             : } // namespace Envoy

Generated by: LCOV version 1.15