Line data Source code
1 : #include "source/common/tcp_proxy/tcp_proxy.h"
2 :
3 : #include <cstdint>
4 : #include <memory>
5 : #include <string>
6 :
7 : #include "envoy/buffer/buffer.h"
8 : #include "envoy/config/accesslog/v3/accesslog.pb.h"
9 : #include "envoy/config/core/v3/base.pb.h"
10 : #include "envoy/event/dispatcher.h"
11 : #include "envoy/event/timer.h"
12 : #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
13 : #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h"
14 : #include "envoy/registry/registry.h"
15 : #include "envoy/stats/scope.h"
16 : #include "envoy/upstream/cluster_manager.h"
17 : #include "envoy/upstream/upstream.h"
18 :
19 : #include "source/common/access_log/access_log_impl.h"
20 : #include "source/common/common/assert.h"
21 : #include "source/common/common/empty_string.h"
22 : #include "source/common/common/enum_to_int.h"
23 : #include "source/common/common/fmt.h"
24 : #include "source/common/common/macros.h"
25 : #include "source/common/common/utility.h"
26 : #include "source/common/config/metadata.h"
27 : #include "source/common/config/utility.h"
28 : #include "source/common/config/well_known_names.h"
29 : #include "source/common/network/application_protocol.h"
30 : #include "source/common/network/proxy_protocol_filter_state.h"
31 : #include "source/common/network/socket_option_factory.h"
32 : #include "source/common/network/transport_socket_options_impl.h"
33 : #include "source/common/network/upstream_server_name.h"
34 : #include "source/common/network/upstream_socket_options_filter_state.h"
35 : #include "source/common/router/metadatamatchcriteria_impl.h"
36 : #include "source/common/stream_info/stream_id_provider_impl.h"
37 :
38 : namespace Envoy {
39 : namespace TcpProxy {
40 :
41 2 : const std::string& PerConnectionCluster::key() {
42 2 : CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.cluster");
43 2 : }
44 :
45 : class PerConnectionClusterFactory : public StreamInfo::FilterState::ObjectFactory {
46 : public:
47 2 : std::string name() const override { return PerConnectionCluster::key(); }
48 : std::unique_ptr<StreamInfo::FilterState::Object>
49 0 : createFromBytes(absl::string_view data) const override {
50 0 : return std::make_unique<PerConnectionCluster>(data);
51 0 : }
52 : };
53 :
54 : REGISTER_FACTORY(PerConnectionClusterFactory, StreamInfo::FilterState::ObjectFactory);
55 :
56 : Config::SimpleRouteImpl::SimpleRouteImpl(const Config& parent, absl::string_view cluster_name)
57 0 : : parent_(parent), cluster_name_(cluster_name) {}
58 :
59 : Config::WeightedClusterEntry::WeightedClusterEntry(
60 : const Config& parent, const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::
61 : WeightedCluster::ClusterWeight& config)
62 0 : : parent_(parent), cluster_name_(config.name()), cluster_weight_(config.weight()) {
63 0 : if (config.has_metadata_match()) {
64 0 : const auto filter_it = config.metadata_match().filter_metadata().find(
65 0 : Envoy::Config::MetadataFilters::get().ENVOY_LB);
66 0 : if (filter_it != config.metadata_match().filter_metadata().end()) {
67 0 : if (parent.cluster_metadata_match_criteria_) {
68 0 : metadata_match_criteria_ =
69 0 : parent.cluster_metadata_match_criteria_->mergeMatchCriteria(filter_it->second);
70 0 : } else {
71 0 : metadata_match_criteria_ =
72 0 : std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
73 0 : }
74 0 : }
75 0 : }
76 0 : }
77 :
78 0 : OnDemandStats OnDemandConfig::generateStats(Stats::Scope& scope) {
79 0 : return {ON_DEMAND_TCP_PROXY_STATS(POOL_COUNTER(scope))};
80 0 : }
81 :
82 : Config::SharedConfig::SharedConfig(
83 : const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
84 : Server::Configuration::FactoryContext& context)
85 : : stats_scope_(context.scope().createScope(fmt::format("tcp.{}", config.stat_prefix()))),
86 0 : stats_(generateStats(*stats_scope_)) {
87 0 : if (config.has_idle_timeout()) {
88 0 : const uint64_t timeout = DurationUtil::durationToMilliseconds(config.idle_timeout());
89 0 : if (timeout > 0) {
90 0 : idle_timeout_ = std::chrono::milliseconds(timeout);
91 0 : }
92 0 : } else {
93 0 : idle_timeout_ = std::chrono::hours(1);
94 0 : }
95 0 : if (config.has_tunneling_config()) {
96 0 : tunneling_config_helper_ =
97 0 : std::make_unique<TunnelingConfigHelperImpl>(config.tunneling_config(), context);
98 0 : }
99 0 : if (config.has_max_downstream_connection_duration()) {
100 0 : const uint64_t connection_duration =
101 0 : DurationUtil::durationToMilliseconds(config.max_downstream_connection_duration());
102 0 : max_downstream_connection_duration_ = std::chrono::milliseconds(connection_duration);
103 0 : }
104 :
105 0 : if (config.has_access_log_options()) {
106 0 : if (config.flush_access_log_on_connected() /* deprecated */) {
107 0 : throw EnvoyException(
108 0 : "Only one of flush_access_log_on_connected or access_log_options can be specified.");
109 0 : }
110 :
111 0 : if (config.has_access_log_flush_interval() /* deprecated */) {
112 0 : throw EnvoyException(
113 0 : "Only one of access_log_flush_interval or access_log_options can be specified.");
114 0 : }
115 :
116 0 : flush_access_log_on_connected_ = config.access_log_options().flush_access_log_on_connected();
117 :
118 0 : if (config.access_log_options().has_access_log_flush_interval()) {
119 0 : const uint64_t flush_interval = DurationUtil::durationToMilliseconds(
120 0 : config.access_log_options().access_log_flush_interval());
121 0 : access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
122 0 : }
123 0 : } else {
124 0 : flush_access_log_on_connected_ = config.flush_access_log_on_connected();
125 :
126 0 : if (config.has_access_log_flush_interval()) {
127 0 : const uint64_t flush_interval =
128 0 : DurationUtil::durationToMilliseconds(config.access_log_flush_interval());
129 0 : access_log_flush_interval_ = std::chrono::milliseconds(flush_interval);
130 0 : }
131 0 : }
132 :
133 0 : if (config.has_on_demand() && config.on_demand().has_odcds_config()) {
134 0 : on_demand_config_ =
135 0 : std::make_unique<OnDemandConfig>(config.on_demand(), context, *stats_scope_);
136 0 : }
137 0 : }
138 :
139 : Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
140 : Server::Configuration::FactoryContext& context)
141 : : max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)),
142 : upstream_drain_manager_slot_(context.serverFactoryContext().threadLocal().allocateSlot()),
143 : shared_config_(std::make_shared<SharedConfig>(config, context)),
144 0 : random_generator_(context.serverFactoryContext().api().randomGenerator()) {
145 0 : upstream_drain_manager_slot_->set([](Event::Dispatcher&) {
146 0 : ThreadLocal::ThreadLocalObjectSharedPtr drain_manager =
147 0 : std::make_shared<UpstreamDrainManager>();
148 0 : return drain_manager;
149 0 : });
150 :
151 0 : if (!config.cluster().empty()) {
152 0 : default_route_ = std::make_shared<const SimpleRouteImpl>(*this, config.cluster());
153 0 : }
154 :
155 0 : if (config.has_metadata_match()) {
156 0 : const auto& filter_metadata = config.metadata_match().filter_metadata();
157 :
158 0 : const auto filter_it = filter_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
159 :
160 0 : if (filter_it != filter_metadata.end()) {
161 0 : cluster_metadata_match_criteria_ =
162 0 : std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
163 0 : }
164 0 : }
165 :
166 : // Weighted clusters will be enabled only if the default cluster is absent.
167 0 : if (default_route_ == nullptr && config.has_weighted_clusters()) {
168 0 : total_cluster_weight_ = 0;
169 0 : for (const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::WeightedCluster::
170 0 : ClusterWeight& cluster_desc : config.weighted_clusters().clusters()) {
171 0 : WeightedClusterEntryConstSharedPtr cluster_entry(
172 0 : std::make_shared<const WeightedClusterEntry>(*this, cluster_desc));
173 0 : weighted_clusters_.emplace_back(std::move(cluster_entry));
174 0 : total_cluster_weight_ += weighted_clusters_.back()->clusterWeight();
175 0 : }
176 0 : }
177 :
178 0 : for (const envoy::config::accesslog::v3::AccessLog& log_config : config.access_log()) {
179 0 : access_logs_.emplace_back(AccessLog::AccessLogFactory::fromProto(log_config, context));
180 0 : }
181 :
182 0 : if (!config.hash_policy().empty()) {
183 0 : hash_policy_ = std::make_unique<Network::HashPolicyImpl>(config.hash_policy());
184 0 : }
185 0 : }
186 :
187 0 : RouteConstSharedPtr Config::getRegularRouteFromEntries(Network::Connection& connection) {
188 : // First check if the per-connection state to see if we need to route to a pre-selected cluster
189 0 : if (const auto* per_connection_cluster =
190 0 : connection.streamInfo().filterState()->getDataReadOnly<PerConnectionCluster>(
191 0 : PerConnectionCluster::key());
192 0 : per_connection_cluster != nullptr) {
193 0 : return std::make_shared<const SimpleRouteImpl>(*this, per_connection_cluster->value());
194 0 : }
195 :
196 0 : if (default_route_ != nullptr) {
197 0 : return default_route_;
198 0 : }
199 :
200 : // no match, no more routes to try
201 0 : return nullptr;
202 0 : }
203 :
204 0 : RouteConstSharedPtr Config::getRouteFromEntries(Network::Connection& connection) {
205 0 : if (weighted_clusters_.empty()) {
206 0 : return getRegularRouteFromEntries(connection);
207 0 : }
208 0 : return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_,
209 0 : random_generator_.random(), false);
210 0 : }
211 :
212 0 : UpstreamDrainManager& Config::drainManager() {
213 0 : return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
214 0 : }
215 :
216 : Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager)
217 : : config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
218 0 : upstream_callbacks_(new UpstreamCallbacks(this)) {
219 0 : ASSERT(config != nullptr);
220 0 : }
221 :
222 0 : Filter::~Filter() {
223 : // Disable access log flush timer if it is enabled.
224 0 : disableAccessLogFlushTimer();
225 :
226 : // Flush the final end stream access log entry.
227 0 : flushAccessLog(AccessLog::AccessLogType::TcpConnectionEnd);
228 :
229 0 : ASSERT(generic_conn_pool_ == nullptr);
230 0 : ASSERT(upstream_ == nullptr);
231 0 : }
232 :
233 0 : TcpProxyStats Config::SharedConfig::generateStats(Stats::Scope& scope) {
234 0 : return {ALL_TCP_PROXY_STATS(POOL_COUNTER(scope), POOL_GAUGE(scope))};
235 0 : }
236 :
237 0 : void Filter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
238 0 : initialize(callbacks, true);
239 0 : }
240 :
241 0 : void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats) {
242 0 : read_callbacks_ = &callbacks;
243 0 : ENVOY_CONN_LOG(debug, "new tcp proxy session", read_callbacks_->connection());
244 :
245 0 : read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
246 0 : read_callbacks_->connection().enableHalfClose(true);
247 :
248 : // Check that we are generating only the byte meters we need.
249 : // The Downstream should be unset and the Upstream should be populated.
250 0 : ASSERT(getStreamInfo().getDownstreamBytesMeter() == nullptr);
251 0 : ASSERT(getStreamInfo().getUpstreamBytesMeter() != nullptr);
252 :
253 : // Need to disable reads so that we don't write to an upstream that might fail
254 : // in onData(). This will get re-enabled when the upstream connection is
255 : // established.
256 0 : read_callbacks_->connection().readDisable(true);
257 0 : getStreamInfo().setDownstreamBytesMeter(std::make_shared<StreamInfo::BytesMeter>());
258 0 : getStreamInfo().setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
259 :
260 0 : config_->stats().downstream_cx_total_.inc();
261 0 : if (set_connection_stats) {
262 0 : read_callbacks_->connection().setConnectionStats(
263 0 : {config_->stats().downstream_cx_rx_bytes_total_,
264 0 : config_->stats().downstream_cx_rx_bytes_buffered_,
265 0 : config_->stats().downstream_cx_tx_bytes_total_,
266 0 : config_->stats().downstream_cx_tx_bytes_buffered_, nullptr, nullptr});
267 0 : }
268 0 : }
269 :
270 0 : void Filter::onInitFailure(UpstreamFailureReason reason) {
271 : // If ODCDS fails, the filter will not attempt to create a connection to
272 : // upstream, as it does not have an assigned upstream. As such the filter will
273 : // not have started attempting to connect to an upstream and there is no
274 : // connection pool callback latency to record.
275 0 : if (initial_upstream_connection_start_time_.has_value()) {
276 0 : getStreamInfo().upstreamInfo()->upstreamTiming().recordConnectionPoolCallbackLatency(
277 0 : initial_upstream_connection_start_time_.value(),
278 0 : read_callbacks_->connection().dispatcher().timeSource());
279 0 : }
280 0 : read_callbacks_->connection().close(
281 0 : Network::ConnectionCloseType::NoFlush,
282 0 : absl::StrCat(StreamInfo::LocalCloseReasons::get().TcpProxyInitializationFailure,
283 0 : enumToInt(reason)));
284 0 : }
285 :
286 0 : void Filter::readDisableUpstream(bool disable) {
287 0 : bool success = false;
288 0 : if (upstream_) {
289 0 : success = upstream_->readDisable(disable);
290 0 : }
291 0 : if (!success) {
292 0 : return;
293 0 : }
294 0 : if (disable) {
295 0 : read_callbacks_->upstreamHost()
296 0 : ->cluster()
297 0 : .trafficStats()
298 0 : ->upstream_flow_control_paused_reading_total_.inc();
299 0 : } else {
300 0 : read_callbacks_->upstreamHost()
301 0 : ->cluster()
302 0 : .trafficStats()
303 0 : ->upstream_flow_control_resumed_reading_total_.inc();
304 0 : }
305 0 : }
306 :
307 0 : void Filter::readDisableDownstream(bool disable) {
308 0 : if (read_callbacks_->connection().state() != Network::Connection::State::Open) {
309 : // During idle timeouts, we close both upstream and downstream with NoFlush.
310 : // Envoy still does a best-effort flush which can case readDisableDownstream to be called
311 : // despite the downstream connection being closed.
312 0 : return;
313 0 : }
314 :
315 0 : const Network::Connection::ReadDisableStatus read_disable_status =
316 0 : read_callbacks_->connection().readDisable(disable);
317 :
318 0 : if (read_disable_status == Network::Connection::ReadDisableStatus::TransitionedToReadDisabled) {
319 0 : config_->stats().downstream_flow_control_paused_reading_total_.inc();
320 0 : } else if (read_disable_status ==
321 0 : Network::Connection::ReadDisableStatus::TransitionedToReadEnabled) {
322 0 : config_->stats().downstream_flow_control_resumed_reading_total_.inc();
323 0 : }
324 0 : }
325 :
326 0 : StreamInfo::StreamInfo& Filter::getStreamInfo() {
327 0 : return read_callbacks_->connection().streamInfo();
328 0 : }
329 :
330 0 : void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
331 0 : ASSERT(!on_high_watermark_called_);
332 0 : on_high_watermark_called_ = true;
333 : // If downstream has too much data buffered, stop reading on the upstream connection.
334 0 : parent_.readDisableUpstream(true);
335 0 : }
336 :
337 0 : void Filter::DownstreamCallbacks::onBelowWriteBufferLowWatermark() {
338 0 : ASSERT(on_high_watermark_called_);
339 0 : on_high_watermark_called_ = false;
340 : // The downstream buffer has been drained. Resume reading from upstream.
341 0 : parent_.readDisableUpstream(false);
342 0 : }
343 :
344 0 : void Filter::UpstreamCallbacks::onEvent(Network::ConnectionEvent event) {
345 0 : if (event == Network::ConnectionEvent::Connected ||
346 0 : event == Network::ConnectionEvent::ConnectedZeroRtt) {
347 0 : return;
348 0 : }
349 0 : if (drainer_ == nullptr) {
350 0 : parent_->onUpstreamEvent(event);
351 0 : } else {
352 0 : drainer_->onEvent(event);
353 0 : }
354 0 : }
355 :
356 0 : void Filter::UpstreamCallbacks::onAboveWriteBufferHighWatermark() {
357 : // TCP Tunneling may call on high/low watermark multiple times.
358 0 : ASSERT(parent_->config_->tunnelingConfigHelper() || !on_high_watermark_called_);
359 0 : on_high_watermark_called_ = true;
360 :
361 0 : if (parent_ != nullptr) {
362 : // There's too much data buffered in the upstream write buffer, so stop reading.
363 0 : parent_->readDisableDownstream(true);
364 0 : }
365 0 : }
366 :
367 0 : void Filter::UpstreamCallbacks::onBelowWriteBufferLowWatermark() {
368 : // TCP Tunneling may call on high/low watermark multiple times.
369 0 : ASSERT(parent_->config_->tunnelingConfigHelper() || on_high_watermark_called_);
370 0 : on_high_watermark_called_ = false;
371 :
372 0 : if (parent_ != nullptr) {
373 : // The upstream write buffer is drained. Resume reading.
374 0 : parent_->readDisableDownstream(false);
375 0 : }
376 0 : }
377 :
378 0 : void Filter::UpstreamCallbacks::onUpstreamData(Buffer::Instance& data, bool end_stream) {
379 0 : if (parent_) {
380 0 : parent_->onUpstreamData(data, end_stream);
381 0 : } else {
382 0 : drainer_->onData(data, end_stream);
383 0 : }
384 0 : }
385 :
386 0 : void Filter::UpstreamCallbacks::onBytesSent() {
387 0 : if (drainer_ == nullptr) {
388 0 : parent_->resetIdleTimer();
389 0 : } else {
390 0 : drainer_->onBytesSent();
391 0 : }
392 0 : }
393 :
394 0 : void Filter::UpstreamCallbacks::onIdleTimeout() {
395 0 : if (drainer_ == nullptr) {
396 0 : parent_->onIdleTimeout();
397 0 : } else {
398 0 : drainer_->onIdleTimeout();
399 0 : }
400 0 : }
401 :
402 0 : void Filter::UpstreamCallbacks::drain(Drainer& drainer) {
403 0 : ASSERT(drainer_ == nullptr); // This should only get set once.
404 0 : drainer_ = &drainer;
405 0 : parent_ = nullptr;
406 0 : }
407 :
408 0 : Network::FilterStatus Filter::establishUpstreamConnection() {
409 0 : const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
410 0 : Upstream::ThreadLocalCluster* thread_local_cluster =
411 0 : cluster_manager_.getThreadLocalCluster(cluster_name);
412 :
413 0 : if (!thread_local_cluster) {
414 0 : auto odcds = config_->onDemandCds();
415 0 : if (!odcds.has_value()) {
416 : // No ODCDS? It means that on-demand discovery is disabled.
417 0 : ENVOY_CONN_LOG(debug, "Cluster not found {} and no on demand cluster set.",
418 0 : read_callbacks_->connection(), cluster_name);
419 0 : config_->stats().downstream_cx_no_route_.inc();
420 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
421 0 : onInitFailure(UpstreamFailureReason::NoRoute);
422 0 : } else {
423 0 : ASSERT(!cluster_discovery_handle_);
424 0 : auto callback = std::make_unique<Upstream::ClusterDiscoveryCallback>(
425 0 : [this](Upstream::ClusterDiscoveryStatus cluster_status) {
426 0 : onClusterDiscoveryCompletion(cluster_status);
427 0 : });
428 0 : config_->onDemandStats().on_demand_cluster_attempt_.inc();
429 0 : cluster_discovery_handle_ = odcds->requestOnDemandClusterDiscovery(
430 0 : cluster_name, std::move(callback), config_->odcdsTimeout());
431 0 : }
432 0 : return Network::FilterStatus::StopIteration;
433 0 : }
434 :
435 0 : ENVOY_CONN_LOG(debug, "Creating connection to cluster {}", read_callbacks_->connection(),
436 0 : cluster_name);
437 0 : if (!initial_upstream_connection_start_time_.has_value()) {
438 0 : initial_upstream_connection_start_time_.emplace(
439 0 : read_callbacks_->connection().dispatcher().timeSource().monotonicTime());
440 0 : }
441 :
442 0 : const Upstream::ClusterInfoConstSharedPtr& cluster = thread_local_cluster->info();
443 0 : getStreamInfo().setUpstreamClusterInfo(cluster);
444 :
445 : // Check this here because the TCP conn pool will queue our request waiting for a connection that
446 : // will never be released.
447 0 : if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) {
448 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
449 0 : cluster->trafficStats()->upstream_cx_overflow_.inc();
450 0 : onInitFailure(UpstreamFailureReason::ResourceLimitExceeded);
451 0 : return Network::FilterStatus::StopIteration;
452 0 : }
453 :
454 0 : const uint32_t max_connect_attempts = config_->maxConnectAttempts();
455 0 : if (connect_attempts_ >= max_connect_attempts) {
456 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded);
457 0 : cluster->trafficStats()->upstream_cx_connect_attempts_exceeded_.inc();
458 0 : onInitFailure(UpstreamFailureReason::ConnectFailed);
459 0 : return Network::FilterStatus::StopIteration;
460 0 : }
461 :
462 0 : auto& downstream_connection = read_callbacks_->connection();
463 0 : auto& filter_state = downstream_connection.streamInfo().filterState();
464 0 : if (!filter_state->hasData<Network::ProxyProtocolFilterState>(
465 0 : Network::ProxyProtocolFilterState::key())) {
466 0 : filter_state->setData(
467 0 : Network::ProxyProtocolFilterState::key(),
468 0 : std::make_shared<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
469 0 : downstream_connection.connectionInfoProvider().remoteAddress(),
470 0 : downstream_connection.connectionInfoProvider().localAddress()}),
471 0 : StreamInfo::FilterState::StateType::ReadOnly,
472 0 : StreamInfo::FilterState::LifeSpan::Connection);
473 0 : }
474 0 : transport_socket_options_ =
475 0 : Network::TransportSocketOptionsUtility::fromFilterState(*filter_state);
476 :
477 0 : if (auto typed_state = filter_state->getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
478 0 : Network::UpstreamSocketOptionsFilterState::key());
479 0 : typed_state != nullptr) {
480 0 : auto downstream_options = typed_state->value();
481 0 : if (!upstream_options_) {
482 0 : upstream_options_ = std::make_shared<Network::Socket::Options>();
483 0 : }
484 0 : Network::Socket::appendOptions(upstream_options_, downstream_options);
485 0 : }
486 :
487 0 : if (!maybeTunnel(*thread_local_cluster)) {
488 : // Either cluster is unknown or there are no healthy hosts. tcpConnPool() increments
489 : // cluster->trafficStats()->upstream_cx_none_healthy in the latter case.
490 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
491 0 : onInitFailure(UpstreamFailureReason::NoHealthyUpstream);
492 0 : }
493 0 : return Network::FilterStatus::StopIteration;
494 0 : }
495 :
496 0 : void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) {
497 : // Clear the cluster_discovery_handle_ before calling establishUpstreamConnection since we may
498 : // request cluster again.
499 0 : cluster_discovery_handle_.reset();
500 0 : const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;
501 0 : switch (cluster_status) {
502 0 : case Upstream::ClusterDiscoveryStatus::Missing:
503 0 : ENVOY_CONN_LOG(debug, "On demand cluster {} is missing", read_callbacks_->connection(),
504 0 : cluster_name);
505 0 : config_->onDemandStats().on_demand_cluster_missing_.inc();
506 0 : break;
507 0 : case Upstream::ClusterDiscoveryStatus::Timeout:
508 0 : ENVOY_CONN_LOG(debug, "On demand cluster {} was not found before timeout.",
509 0 : read_callbacks_->connection(), cluster_name);
510 0 : config_->onDemandStats().on_demand_cluster_timeout_.inc();
511 0 : break;
512 0 : case Upstream::ClusterDiscoveryStatus::Available:
513 : // cluster_discovery_handle_ would have been cancelled if the downstream were closed.
514 0 : ASSERT(!downstream_closed_);
515 0 : ENVOY_CONN_LOG(debug, "On demand cluster {} is found. Establishing connection.",
516 0 : read_callbacks_->connection(), cluster_name);
517 0 : config_->onDemandStats().on_demand_cluster_success_.inc();
518 0 : establishUpstreamConnection();
519 0 : return;
520 0 : }
521 : // Failure path.
522 0 : config_->stats().downstream_cx_no_route_.inc();
523 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
524 0 : onInitFailure(UpstreamFailureReason::NoRoute);
525 0 : }
526 :
527 0 : bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
528 0 : GenericConnPoolFactory* factory = nullptr;
529 0 : if (cluster.info()->upstreamConfig().has_value()) {
530 0 : factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
531 0 : cluster.info()->upstreamConfig().ref());
532 0 : } else {
533 0 : factory = Envoy::Config::Utility::getFactoryByName<GenericConnPoolFactory>(
534 0 : "envoy.filters.connection_pools.tcp.generic");
535 0 : }
536 0 : if (!factory) {
537 0 : return false;
538 0 : }
539 :
540 0 : generic_conn_pool_ = factory->createGenericConnPool(cluster, config_->tunnelingConfigHelper(),
541 0 : this, *upstream_callbacks_, getStreamInfo());
542 0 : if (generic_conn_pool_) {
543 0 : connecting_ = true;
544 0 : connect_attempts_++;
545 0 : getStreamInfo().setAttemptCount(connect_attempts_);
546 0 : generic_conn_pool_->newStream(*this);
547 : // Because we never return open connections to the pool, this either has a handle waiting on
548 : // connection completion, or onPoolFailure has been invoked. Either way, stop iteration.
549 0 : return true;
550 0 : }
551 0 : return false;
552 0 : }
553 :
554 : void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
555 : absl::string_view failure_reason,
556 0 : Upstream::HostDescriptionConstSharedPtr host) {
557 0 : generic_conn_pool_.reset();
558 0 : read_callbacks_->upstreamHost(host);
559 0 : getStreamInfo().upstreamInfo()->setUpstreamHost(host);
560 0 : getStreamInfo().upstreamInfo()->setUpstreamTransportFailureReason(failure_reason);
561 :
562 0 : switch (reason) {
563 0 : case ConnectionPool::PoolFailureReason::Overflow:
564 0 : case ConnectionPool::PoolFailureReason::LocalConnectionFailure:
565 0 : upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose);
566 0 : break;
567 0 : case ConnectionPool::PoolFailureReason::RemoteConnectionFailure:
568 0 : upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose);
569 0 : break;
570 0 : case ConnectionPool::PoolFailureReason::Timeout:
571 0 : onConnectTimeout();
572 0 : break;
573 0 : }
574 0 : }
575 :
576 : void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
577 : std::unique_ptr<GenericUpstream>&& upstream,
578 : Upstream::HostDescriptionConstSharedPtr& host,
579 : const Network::ConnectionInfoProvider& address_provider,
580 0 : Ssl::ConnectionInfoConstSharedPtr ssl_info) {
581 0 : upstream_ = std::move(upstream);
582 0 : generic_conn_pool_.reset();
583 0 : read_callbacks_->upstreamHost(host);
584 0 : StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
585 0 : upstream_info.upstreamTiming().recordConnectionPoolCallbackLatency(
586 0 : initial_upstream_connection_start_time_.value(),
587 0 : read_callbacks_->connection().dispatcher().timeSource());
588 0 : upstream_info.setUpstreamHost(host);
589 0 : upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
590 0 : upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
591 0 : upstream_info.setUpstreamSslConnection(ssl_info);
592 0 : onUpstreamConnection();
593 0 : read_callbacks_->continueReading();
594 0 : if (info) {
595 0 : upstream_info.setUpstreamFilterState(info->filterState());
596 0 : }
597 0 : }
598 :
599 0 : const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
600 0 : const Router::MetadataMatchCriteria* route_criteria =
601 0 : (route_ != nullptr) ? route_->metadataMatchCriteria() : nullptr;
602 :
603 0 : const auto& request_metadata = getStreamInfo().dynamicMetadata().filter_metadata();
604 0 : const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
605 :
606 0 : if (filter_it != request_metadata.end() && route_criteria != nullptr) {
607 0 : metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second);
608 0 : return metadata_match_criteria_.get();
609 0 : } else if (filter_it != request_metadata.end()) {
610 0 : metadata_match_criteria_ =
611 0 : std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
612 0 : return metadata_match_criteria_.get();
613 0 : } else {
614 0 : return route_criteria;
615 0 : }
616 0 : }
617 :
618 0 : ProtobufTypes::MessagePtr TunnelResponseHeadersOrTrailers::serializeAsProto() const {
619 0 : auto proto_out = std::make_unique<envoy::config::core::v3::HeaderMap>();
620 0 : value().iterate([&proto_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
621 0 : auto* new_header = proto_out->add_headers();
622 0 : new_header->set_key(std::string(e.key().getStringView()));
623 0 : new_header->set_value(std::string(e.value().getStringView()));
624 0 : return Http::HeaderMap::Iterate::Continue;
625 0 : });
626 0 : return proto_out;
627 0 : }
628 :
629 0 : const std::string& TunnelResponseHeaders::key() {
630 0 : CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.propagate_response_headers");
631 0 : }
632 :
633 0 : const std::string& TunnelResponseTrailers::key() {
634 0 : CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.propagate_response_trailers");
635 0 : }
636 :
637 : TunnelingConfigHelperImpl::TunnelingConfigHelperImpl(
638 : const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig&
639 : config_message,
640 : Server::Configuration::FactoryContext& context)
641 : : use_post_(config_message.use_post()),
642 : header_parser_(Envoy::Router::HeaderParser::configure(config_message.headers_to_add())),
643 : propagate_response_headers_(config_message.propagate_response_headers()),
644 : propagate_response_trailers_(config_message.propagate_response_trailers()),
645 0 : post_path_(config_message.post_path()) {
646 0 : if (!post_path_.empty() && !use_post_) {
647 0 : throw EnvoyException("Can't set a post path when POST method isn't used");
648 0 : }
649 0 : post_path_ = post_path_.empty() ? "/" : post_path_;
650 :
651 0 : envoy::config::core::v3::SubstitutionFormatString substitution_format_config;
652 0 : substitution_format_config.mutable_text_format_source()->set_inline_string(
653 0 : config_message.hostname());
654 0 : hostname_fmt_ = Formatter::SubstitutionFormatStringUtils::fromProtoConfig(
655 0 : substitution_format_config, context);
656 0 : }
657 :
658 0 : std::string TunnelingConfigHelperImpl::host(const StreamInfo::StreamInfo& stream_info) const {
659 0 : return hostname_fmt_->formatWithContext({}, stream_info);
660 0 : }
661 :
662 : void TunnelingConfigHelperImpl::propagateResponseHeaders(
663 : Http::ResponseHeaderMapPtr&& headers,
664 0 : const StreamInfo::FilterStateSharedPtr& filter_state) const {
665 0 : if (!propagate_response_headers_) {
666 0 : return;
667 0 : }
668 0 : filter_state->setData(
669 0 : TunnelResponseHeaders::key(), std::make_shared<TunnelResponseHeaders>(std::move(headers)),
670 0 : StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
671 0 : }
672 :
673 : void TunnelingConfigHelperImpl::propagateResponseTrailers(
674 : Http::ResponseTrailerMapPtr&& trailers,
675 0 : const StreamInfo::FilterStateSharedPtr& filter_state) const {
676 0 : if (!propagate_response_trailers_) {
677 0 : return;
678 0 : }
679 0 : filter_state->setData(
680 0 : TunnelResponseTrailers::key(), std::make_shared<TunnelResponseTrailers>(std::move(trailers)),
681 0 : StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
682 0 : }
683 :
684 0 : void Filter::onConnectTimeout() {
685 0 : ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection());
686 0 : read_callbacks_->upstreamHost()->outlierDetector().putResult(
687 0 : Upstream::Outlier::Result::LocalOriginTimeout);
688 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionFailure);
689 :
690 : // Raise LocalClose, which will trigger a reconnect if needed/configured.
691 0 : upstream_callbacks_->onEvent(Network::ConnectionEvent::LocalClose);
692 0 : }
693 :
694 0 : Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
695 0 : ENVOY_CONN_LOG(trace, "downstream connection received {} bytes, end_stream={}",
696 0 : read_callbacks_->connection(), data.length(), end_stream);
697 0 : getStreamInfo().getDownstreamBytesMeter()->addWireBytesReceived(data.length());
698 0 : if (upstream_) {
699 0 : getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length());
700 0 : upstream_->encodeData(data, end_stream);
701 0 : }
702 : // The upstream should consume all of the data.
703 : // Before there is an upstream the connection should be readDisabled. If the upstream is
704 : // destroyed, there should be no further reads as well.
705 0 : ASSERT(0 == data.length());
706 0 : resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
707 0 : return Network::FilterStatus::StopIteration;
708 0 : }
709 :
710 0 : Network::FilterStatus Filter::onNewConnection() {
711 0 : if (config_->maxDownstreamConnectionDuration()) {
712 0 : connection_duration_timer_ = read_callbacks_->connection().dispatcher().createTimer(
713 0 : [this]() -> void { onMaxDownstreamConnectionDuration(); });
714 0 : connection_duration_timer_->enableTimer(config_->maxDownstreamConnectionDuration().value());
715 0 : }
716 :
717 0 : if (config_->accessLogFlushInterval().has_value()) {
718 0 : access_log_flush_timer_ = read_callbacks_->connection().dispatcher().createTimer(
719 0 : [this]() -> void { onAccessLogFlushInterval(); });
720 0 : resetAccessLogFlushTimer();
721 0 : }
722 :
723 : // Set UUID for the connection. This is used for logging and tracing.
724 0 : getStreamInfo().setStreamIdProvider(
725 0 : std::make_shared<StreamInfo::StreamIdProviderImpl>(config_->randomGenerator().uuid()));
726 :
727 0 : ASSERT(upstream_ == nullptr);
728 0 : route_ = pickRoute();
729 0 : return establishUpstreamConnection();
730 0 : }
731 :
732 0 : bool Filter::startUpstreamSecureTransport() {
733 0 : bool switched_to_tls = upstream_->startUpstreamSecureTransport();
734 0 : if (switched_to_tls) {
735 0 : StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
736 0 : upstream_info.setUpstreamSslConnection(upstream_->getUpstreamConnectionSslInfo());
737 0 : }
738 0 : return switched_to_tls;
739 0 : }
740 :
741 0 : void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
742 0 : if (event == Network::ConnectionEvent::LocalClose ||
743 0 : event == Network::ConnectionEvent::RemoteClose) {
744 0 : downstream_closed_ = true;
745 : // Cancel the potential odcds callback.
746 0 : cluster_discovery_handle_ = nullptr;
747 0 : }
748 :
749 0 : ENVOY_CONN_LOG(trace, "on downstream event {}, has upstream = {}", read_callbacks_->connection(),
750 0 : static_cast<int>(event), upstream_ != nullptr);
751 :
752 0 : if (upstream_) {
753 0 : Tcp::ConnectionPool::ConnectionDataPtr conn_data(upstream_->onDownstreamEvent(event));
754 0 : if (conn_data != nullptr &&
755 0 : conn_data->connection().state() != Network::Connection::State::Closed) {
756 0 : config_->drainManager().add(config_->sharedConfig(), std::move(conn_data),
757 0 : std::move(upstream_callbacks_), std::move(idle_timer_),
758 0 : read_callbacks_->upstreamHost());
759 0 : }
760 0 : if (event == Network::ConnectionEvent::LocalClose ||
761 0 : event == Network::ConnectionEvent::RemoteClose) {
762 0 : upstream_.reset();
763 0 : disableIdleTimer();
764 0 : }
765 0 : }
766 :
767 0 : if (generic_conn_pool_) {
768 0 : if (event == Network::ConnectionEvent::LocalClose ||
769 0 : event == Network::ConnectionEvent::RemoteClose) {
770 : // Cancel the conn pool request and close any excess pending requests.
771 0 : generic_conn_pool_.reset();
772 0 : }
773 0 : }
774 0 : }
775 :
776 0 : void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) {
777 0 : ENVOY_CONN_LOG(trace, "upstream connection received {} bytes, end_stream={}",
778 0 : read_callbacks_->connection(), data.length(), end_stream);
779 0 : getStreamInfo().getUpstreamBytesMeter()->addWireBytesReceived(data.length());
780 0 : getStreamInfo().getDownstreamBytesMeter()->addWireBytesSent(data.length());
781 0 : read_callbacks_->connection().write(data, end_stream);
782 0 : ASSERT(0 == data.length());
783 0 : resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
784 0 : }
785 :
786 0 : void Filter::onUpstreamEvent(Network::ConnectionEvent event) {
787 0 : if (event == Network::ConnectionEvent::ConnectedZeroRtt) {
788 0 : return;
789 0 : }
790 : // Update the connecting flag before processing the event because we may start a new connection
791 : // attempt in establishUpstreamConnection.
792 0 : bool connecting = connecting_;
793 0 : connecting_ = false;
794 :
795 0 : if (event == Network::ConnectionEvent::RemoteClose ||
796 0 : event == Network::ConnectionEvent::LocalClose) {
797 0 : upstream_.reset();
798 0 : disableIdleTimer();
799 :
800 0 : if (connecting) {
801 0 : if (event == Network::ConnectionEvent::RemoteClose) {
802 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionFailure);
803 0 : read_callbacks_->upstreamHost()->outlierDetector().putResult(
804 0 : Upstream::Outlier::Result::LocalOriginConnectFailed);
805 0 : }
806 0 : if (!downstream_closed_) {
807 0 : route_ = pickRoute();
808 0 : establishUpstreamConnection();
809 0 : }
810 0 : } else {
811 : // TODO(botengyao): propagate RST back to downstream connection if RST is received.
812 0 : if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
813 0 : read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
814 0 : }
815 0 : }
816 0 : }
817 0 : }
818 :
819 0 : void Filter::onUpstreamConnection() {
820 0 : connecting_ = false;
821 : // Re-enable downstream reads now that the upstream connection is established
822 : // so we have a place to send downstream data to.
823 0 : read_callbacks_->connection().readDisable(false);
824 :
825 0 : read_callbacks_->upstreamHost()->outlierDetector().putResult(
826 0 : Upstream::Outlier::Result::LocalOriginConnectSuccessFinal);
827 :
828 0 : ENVOY_CONN_LOG(debug, "TCP:onUpstreamEvent(), requestedServerName: {}",
829 0 : read_callbacks_->connection(),
830 0 : getStreamInfo().downstreamAddressProvider().requestedServerName());
831 :
832 0 : if (config_->idleTimeout()) {
833 : // The idle_timer_ can be moved to a Drainer, so related callbacks call into
834 : // the UpstreamCallbacks, which has the same lifetime as the timer, and can dispatch
835 : // the call to either TcpProxy or to Drainer, depending on the current state.
836 0 : idle_timer_ = read_callbacks_->connection().dispatcher().createTimer(
837 0 : [upstream_callbacks = upstream_callbacks_]() { upstream_callbacks->onIdleTimeout(); });
838 0 : resetIdleTimer();
839 0 : read_callbacks_->connection().addBytesSentCallback([this](uint64_t) {
840 0 : resetIdleTimer();
841 0 : return true;
842 0 : });
843 0 : if (upstream_) {
844 0 : upstream_->addBytesSentCallback([upstream_callbacks = upstream_callbacks_](uint64_t) -> bool {
845 0 : upstream_callbacks->onBytesSent();
846 0 : return true;
847 0 : });
848 0 : }
849 0 : }
850 :
851 0 : if (config_->flushAccessLogOnConnected()) {
852 0 : flushAccessLog(AccessLog::AccessLogType::TcpUpstreamConnected);
853 0 : }
854 0 : }
855 :
856 0 : void Filter::onIdleTimeout() {
857 0 : ENVOY_CONN_LOG(debug, "Session timed out", read_callbacks_->connection());
858 0 : config_->stats().idle_timeout_.inc();
859 :
860 : // This results in also closing the upstream connection.
861 0 : read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush,
862 0 : StreamInfo::LocalCloseReasons::get().TcpSessionIdleTimeout);
863 0 : }
864 :
865 0 : void Filter::onMaxDownstreamConnectionDuration() {
866 0 : ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
867 0 : getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::DurationTimeout);
868 0 : config_->stats().max_downstream_connection_duration_.inc();
869 0 : read_callbacks_->connection().close(
870 0 : Network::ConnectionCloseType::NoFlush,
871 0 : StreamInfo::LocalCloseReasons::get().MaxConnectionDurationReached);
872 0 : }
873 :
874 0 : void Filter::onAccessLogFlushInterval() {
875 0 : flushAccessLog(AccessLog::AccessLogType::TcpPeriodic);
876 0 : const SystemTime now = read_callbacks_->connection().dispatcher().timeSource().systemTime();
877 0 : getStreamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(now);
878 0 : if (getStreamInfo().getUpstreamBytesMeter()) {
879 0 : getStreamInfo().getUpstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(now);
880 0 : }
881 0 : resetAccessLogFlushTimer();
882 0 : }
883 :
884 0 : void Filter::flushAccessLog(AccessLog::AccessLogType access_log_type) {
885 0 : const Formatter::HttpFormatterContext log_context{nullptr, nullptr, nullptr, {}, access_log_type};
886 :
887 0 : for (const auto& access_log : config_->accessLogs()) {
888 0 : access_log->log(log_context, getStreamInfo());
889 0 : }
890 0 : }
891 :
892 0 : void Filter::resetAccessLogFlushTimer() {
893 0 : if (access_log_flush_timer_ != nullptr) {
894 0 : ASSERT(config_->accessLogFlushInterval().has_value());
895 0 : access_log_flush_timer_->enableTimer(config_->accessLogFlushInterval().value());
896 0 : }
897 0 : }
898 :
899 0 : void Filter::disableAccessLogFlushTimer() {
900 0 : if (access_log_flush_timer_ != nullptr) {
901 0 : access_log_flush_timer_->disableTimer();
902 0 : access_log_flush_timer_.reset();
903 0 : }
904 0 : }
905 :
906 0 : void Filter::resetIdleTimer() {
907 0 : if (idle_timer_ != nullptr) {
908 0 : ASSERT(config_->idleTimeout());
909 0 : idle_timer_->enableTimer(config_->idleTimeout().value());
910 0 : }
911 0 : }
912 :
913 0 : void Filter::disableIdleTimer() {
914 0 : if (idle_timer_ != nullptr) {
915 0 : idle_timer_->disableTimer();
916 0 : idle_timer_.reset();
917 0 : }
918 0 : }
919 :
920 0 : UpstreamDrainManager::~UpstreamDrainManager() {
921 : // If connections aren't closed before they are destructed an ASSERT fires,
922 : // so cancel all pending drains, which causes the connections to be closed.
923 0 : if (!drainers_.empty()) {
924 0 : auto& dispatcher = drainers_.begin()->second->dispatcher();
925 0 : while (!drainers_.empty()) {
926 0 : auto begin = drainers_.begin();
927 0 : Drainer* key = begin->first;
928 0 : begin->second->cancelDrain();
929 :
930 : // cancelDrain() should cause that drainer to be removed from drainers_.
931 : // ASSERT so that we don't end up in an infinite loop.
932 0 : ASSERT(drainers_.find(key) == drainers_.end());
933 0 : }
934 :
935 : // This destructor is run when shutting down `ThreadLocal`. The destructor of some objects use
936 : // earlier `ThreadLocal` slots (for accessing the runtime snapshot) so they must run before that
937 : // slot is destructed. Clear the list to enforce that ordering.
938 0 : dispatcher.clearDeferredDeleteList();
939 0 : }
940 0 : }
941 :
942 : void UpstreamDrainManager::add(const Config::SharedConfigSharedPtr& config,
943 : Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
944 : const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
945 : Event::TimerPtr&& idle_timer,
946 0 : const Upstream::HostDescriptionConstSharedPtr& upstream_host) {
947 0 : DrainerPtr drainer(new Drainer(*this, config, callbacks, std::move(upstream_conn_data),
948 0 : std::move(idle_timer), upstream_host));
949 0 : callbacks->drain(*drainer);
950 :
951 : // Use temporary to ensure we get the pointer before we move it out of drainer
952 0 : Drainer* ptr = drainer.get();
953 0 : drainers_[ptr] = std::move(drainer);
954 0 : }
955 :
956 0 : void UpstreamDrainManager::remove(Drainer& drainer, Event::Dispatcher& dispatcher) {
957 0 : auto it = drainers_.find(&drainer);
958 0 : ASSERT(it != drainers_.end());
959 0 : dispatcher.deferredDelete(std::move(it->second));
960 0 : drainers_.erase(it);
961 0 : }
962 :
963 : Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
964 : const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
965 : Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
966 : const Upstream::HostDescriptionConstSharedPtr& upstream_host)
967 : : parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)),
968 0 : timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) {
969 0 : ENVOY_CONN_LOG(trace, "draining the upstream connection", upstream_conn_data_->connection());
970 0 : config_->stats().upstream_flush_total_.inc();
971 0 : config_->stats().upstream_flush_active_.inc();
972 0 : }
973 :
974 0 : void Drainer::onEvent(Network::ConnectionEvent event) {
975 0 : if (event == Network::ConnectionEvent::RemoteClose ||
976 0 : event == Network::ConnectionEvent::LocalClose) {
977 0 : if (timer_ != nullptr) {
978 0 : timer_->disableTimer();
979 0 : }
980 0 : config_->stats().upstream_flush_active_.dec();
981 0 : parent_.remove(*this, upstream_conn_data_->connection().dispatcher());
982 0 : }
983 0 : }
984 :
985 0 : void Drainer::onData(Buffer::Instance& data, bool) {
986 0 : if (data.length() > 0) {
987 : // There is no downstream connection to send any data to, but the upstream
988 : // sent some data. Try to behave similar to what the kernel would do
989 : // when it receives data on a connection where the application has closed
990 : // the socket or ::shutdown(fd, SHUT_RD), and close/reset the connection.
991 0 : cancelDrain();
992 0 : }
993 0 : }
994 :
995 0 : void Drainer::onIdleTimeout() {
996 0 : config_->stats().idle_timeout_.inc();
997 0 : cancelDrain();
998 0 : }
999 :
1000 0 : void Drainer::onBytesSent() {
1001 0 : if (timer_ != nullptr) {
1002 0 : timer_->enableTimer(config_->idleTimeout().value());
1003 0 : }
1004 0 : }
1005 :
1006 0 : void Drainer::cancelDrain() {
1007 : // This sends onEvent(LocalClose).
1008 0 : upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush);
1009 0 : }
1010 :
1011 0 : Event::Dispatcher& Drainer::dispatcher() { return upstream_conn_data_->connection().dispatcher(); }
1012 :
1013 : } // namespace TcpProxy
1014 : } // namespace Envoy
|