Line data Source code
1 : #pragma once
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <memory>
6 : #include <string>
7 : #include <vector>
8 :
9 : #include "envoy/access_log/access_log.h"
10 : #include "envoy/common/random_generator.h"
11 : #include "envoy/event/timer.h"
12 : #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
13 : #include "envoy/http/header_evaluator.h"
14 : #include "envoy/network/connection.h"
15 : #include "envoy/network/filter.h"
16 : #include "envoy/runtime/runtime.h"
17 : #include "envoy/server/filter_config.h"
18 : #include "envoy/stats/scope.h"
19 : #include "envoy/stats/stats_macros.h"
20 : #include "envoy/stats/timespan.h"
21 : #include "envoy/stream_info/filter_state.h"
22 : #include "envoy/upstream/cluster_manager.h"
23 : #include "envoy/upstream/upstream.h"
24 :
25 : #include "source/common/common/logger.h"
26 : #include "source/common/formatter/substitution_format_string.h"
27 : #include "source/common/http/header_map_impl.h"
28 : #include "source/common/network/cidr_range.h"
29 : #include "source/common/network/filter_impl.h"
30 : #include "source/common/network/hash_policy.h"
31 : #include "source/common/network/utility.h"
32 : #include "source/common/stream_info/stream_info_impl.h"
33 : #include "source/common/tcp_proxy/upstream.h"
34 : #include "source/common/upstream/load_balancer_impl.h"
35 :
36 : #include "absl/container/node_hash_map.h"
37 :
38 : namespace Envoy {
39 : namespace TcpProxy {
40 :
41 : /**
42 : * All tcp proxy stats. @see stats_macros.h
43 : */
44 : #define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \
45 0 : COUNTER(downstream_cx_no_route) \
46 0 : COUNTER(downstream_cx_rx_bytes_total) \
47 0 : COUNTER(downstream_cx_total) \
48 0 : COUNTER(downstream_cx_tx_bytes_total) \
49 0 : COUNTER(downstream_flow_control_paused_reading_total) \
50 0 : COUNTER(downstream_flow_control_resumed_reading_total) \
51 0 : COUNTER(idle_timeout) \
52 0 : COUNTER(max_downstream_connection_duration) \
53 0 : COUNTER(upstream_flush_total) \
54 0 : GAUGE(downstream_cx_rx_bytes_buffered, Accumulate) \
55 0 : GAUGE(downstream_cx_tx_bytes_buffered, Accumulate) \
56 0 : GAUGE(upstream_flush_active, Accumulate)
57 :
58 : /**
59 : * Tcp proxy stats for on-demand. These stats are generated only if the tcp proxy enables on demand.
60 : */
61 : #define ON_DEMAND_TCP_PROXY_STATS(COUNTER) \
62 0 : COUNTER(on_demand_cluster_attempt) \
63 0 : COUNTER(on_demand_cluster_missing) \
64 0 : COUNTER(on_demand_cluster_timeout) \
65 0 : COUNTER(on_demand_cluster_success)
66 :
67 : /**
68 : * Struct definition for all tcp proxy stats. @see stats_macros.h
69 : */
70 : struct TcpProxyStats {
71 : ALL_TCP_PROXY_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
72 : };
73 :
74 : /**
75 : * Struct definition for on-demand related tcp proxy stats. @see stats_macros.h
76 : * These stats are available if and only if the tcp proxy enables on-demand.
77 : * Note that these stats has the same prefix as `TcpProxyStats`.
78 : */
79 : struct OnDemandStats {
80 : ON_DEMAND_TCP_PROXY_STATS(GENERATE_COUNTER_STRUCT)
81 : };
82 :
83 : class Drainer;
84 : class UpstreamDrainManager;
85 :
86 : /**
87 : * Route is an individual resolved route for a connection.
88 : */
89 : class Route {
90 : public:
91 0 : virtual ~Route() = default;
92 :
93 : /**
94 : * Check whether this route matches a given connection.
95 : * @param connection supplies the connection to test against.
96 : * @return bool true if this route matches a given connection.
97 : */
98 : virtual bool matches(Network::Connection& connection) const PURE;
99 :
100 : /**
101 : * @return const std::string& the upstream cluster that owns the route.
102 : */
103 : virtual const std::string& clusterName() const PURE;
104 :
105 : /**
106 : * @return MetadataMatchCriteria* the metadata that a subset load balancer should match when
107 : * selecting an upstream host
108 : */
109 : virtual const Router::MetadataMatchCriteria* metadataMatchCriteria() const PURE;
110 : };
111 :
112 : using RouteConstSharedPtr = std::shared_ptr<const Route>;
113 : using TunnelingConfig =
114 : envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
115 :
116 : /**
117 : * Base class for both tunnel response headers and trailers.
118 : */
119 : class TunnelResponseHeadersOrTrailers : public StreamInfo::FilterState::Object {
120 : public:
121 : ProtobufTypes::MessagePtr serializeAsProto() const override;
122 : virtual const Http::HeaderMap& value() const PURE;
123 : };
124 :
125 : /**
126 : * Response headers for the tunneling connections.
127 : */
128 : class TunnelResponseHeaders : public TunnelResponseHeadersOrTrailers {
129 : public:
130 : TunnelResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers)
131 0 : : response_headers_(std::move(response_headers)) {}
132 0 : const Http::HeaderMap& value() const override { return *response_headers_; }
133 : static const std::string& key();
134 :
135 : private:
136 : const Http::ResponseHeaderMapPtr response_headers_;
137 : };
138 :
139 : /**
140 : * Response trailers for the tunneling connections.
141 : */
142 : class TunnelResponseTrailers : public TunnelResponseHeadersOrTrailers {
143 : public:
144 : TunnelResponseTrailers(Http::ResponseTrailerMapPtr&& response_trailers)
145 0 : : response_trailers_(std::move(response_trailers)) {}
146 0 : const Http::HeaderMap& value() const override { return *response_trailers_; }
147 : static const std::string& key();
148 :
149 : private:
150 : const Http::ResponseTrailerMapPtr response_trailers_;
151 : };
152 :
153 : class TunnelingConfigHelperImpl : public TunnelingConfigHelper,
154 : protected Logger::Loggable<Logger::Id::filter> {
155 : public:
156 : TunnelingConfigHelperImpl(
157 : const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig&
158 : config_message,
159 : Server::Configuration::FactoryContext& context);
160 : std::string host(const StreamInfo::StreamInfo& stream_info) const override;
161 0 : bool usePost() const override { return use_post_; }
162 0 : const std::string& postPath() const override { return post_path_; }
163 0 : Envoy::Http::HeaderEvaluator& headerEvaluator() const override { return *header_parser_; }
164 : void
165 : propagateResponseHeaders(Http::ResponseHeaderMapPtr&& headers,
166 : const StreamInfo::FilterStateSharedPtr& filter_state) const override;
167 : void
168 : propagateResponseTrailers(Http::ResponseTrailerMapPtr&& trailers,
169 : const StreamInfo::FilterStateSharedPtr& filter_state) const override;
170 :
171 : private:
172 : const bool use_post_;
173 : std::unique_ptr<Envoy::Router::HeaderParser> header_parser_;
174 : Formatter::FormatterPtr hostname_fmt_;
175 : const bool propagate_response_headers_;
176 : const bool propagate_response_trailers_;
177 : std::string post_path_;
178 : };
179 :
180 : /**
181 : * On demand configurations.
182 : */
183 : class OnDemandConfig {
184 : public:
185 : OnDemandConfig(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_OnDemand&
186 : on_demand_message,
187 : Server::Configuration::FactoryContext& context, Stats::Scope& scope)
188 : : odcds_(context.serverFactoryContext().clusterManager().allocateOdCdsApi(
189 : on_demand_message.odcds_config(), OptRef<xds::core::v3::ResourceLocator>(),
190 : context.messageValidationVisitor())),
191 : lookup_timeout_(std::chrono::milliseconds(
192 : PROTOBUF_GET_MS_OR_DEFAULT(on_demand_message, timeout, 60000))),
193 0 : stats_(generateStats(scope)) {}
194 0 : Upstream::OdCdsApiHandle& onDemandCds() const { return *odcds_; }
195 0 : std::chrono::milliseconds timeout() const { return lookup_timeout_; }
196 0 : const OnDemandStats& stats() const { return stats_; }
197 :
198 : private:
199 : static OnDemandStats generateStats(Stats::Scope& scope);
200 : Upstream::OdCdsApiHandlePtr odcds_;
201 : // The timeout of looking up the on-demand cluster.
202 : std::chrono::milliseconds lookup_timeout_;
203 : // On demand stats.
204 : OnDemandStats stats_;
205 : };
206 : using OnDemandConfigOptConstRef = OptRef<const OnDemandConfig>;
207 :
208 : /**
209 : * Filter configuration.
210 : *
211 : * This configuration holds a TLS slot, and therefore it must be destructed
212 : * on the main thread.
213 : */
214 : class Config {
215 : public:
216 : /**
217 : * Configuration that can be shared and have an arbitrary lifetime safely.
218 : */
219 : class SharedConfig {
220 : public:
221 : SharedConfig(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
222 : Server::Configuration::FactoryContext& context);
223 0 : const TcpProxyStats& stats() { return stats_; }
224 0 : const absl::optional<std::chrono::milliseconds>& idleTimeout() { return idle_timeout_; }
225 0 : bool flushAccessLogOnConnected() const { return flush_access_log_on_connected_; }
226 0 : const absl::optional<std::chrono::milliseconds>& maxDownstreamConnectionDuration() const {
227 0 : return max_downstream_connection_duration_;
228 0 : }
229 0 : const absl::optional<std::chrono::milliseconds>& accessLogFlushInterval() const {
230 0 : return access_log_flush_interval_;
231 0 : }
232 0 : TunnelingConfigHelperOptConstRef tunnelingConfigHelper() {
233 0 : if (tunneling_config_helper_) {
234 0 : return {*tunneling_config_helper_};
235 0 : } else {
236 0 : return {};
237 0 : }
238 0 : }
239 0 : OnDemandConfigOptConstRef onDemandConfig() {
240 0 : if (on_demand_config_) {
241 0 : return {*on_demand_config_};
242 0 : } else {
243 0 : return {};
244 0 : }
245 0 : }
246 :
247 : private:
248 : static TcpProxyStats generateStats(Stats::Scope& scope);
249 :
250 : // Hold a Scope for the lifetime of the configuration because connections in
251 : // the UpstreamDrainManager can live longer than the listener.
252 : const Stats::ScopeSharedPtr stats_scope_;
253 :
254 : const TcpProxyStats stats_;
255 : bool flush_access_log_on_connected_;
256 : absl::optional<std::chrono::milliseconds> idle_timeout_;
257 : absl::optional<std::chrono::milliseconds> max_downstream_connection_duration_;
258 : absl::optional<std::chrono::milliseconds> access_log_flush_interval_;
259 : std::unique_ptr<TunnelingConfigHelper> tunneling_config_helper_;
260 : std::unique_ptr<OnDemandConfig> on_demand_config_;
261 : };
262 :
263 : using SharedConfigSharedPtr = std::shared_ptr<SharedConfig>;
264 :
265 : Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config,
266 : Server::Configuration::FactoryContext& context);
267 :
268 : /**
269 : * Find out which cluster an upstream connection should be opened to based on the
270 : * parameters of a downstream connection.
271 : * @param connection supplies the parameters of the downstream connection for
272 : * which the proxy needs to open the corresponding upstream.
273 : * @return the route to be used for the upstream connection.
274 : * If no route applies, returns nullptr.
275 : */
276 : RouteConstSharedPtr getRouteFromEntries(Network::Connection& connection);
277 : RouteConstSharedPtr getRegularRouteFromEntries(Network::Connection& connection);
278 :
279 0 : const TcpProxyStats& stats() { return shared_config_->stats(); }
280 0 : const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() { return access_logs_; }
281 0 : uint32_t maxConnectAttempts() const { return max_connect_attempts_; }
282 0 : const absl::optional<std::chrono::milliseconds>& idleTimeout() {
283 0 : return shared_config_->idleTimeout();
284 0 : }
285 0 : const absl::optional<std::chrono::milliseconds>& maxDownstreamConnectionDuration() const {
286 0 : return shared_config_->maxDownstreamConnectionDuration();
287 0 : }
288 0 : const absl::optional<std::chrono::milliseconds>& accessLogFlushInterval() const {
289 0 : return shared_config_->accessLogFlushInterval();
290 0 : }
291 : // Return nullptr if there is no tunneling config.
292 0 : TunnelingConfigHelperOptConstRef tunnelingConfigHelper() {
293 0 : return shared_config_->tunnelingConfigHelper();
294 0 : }
295 : UpstreamDrainManager& drainManager();
296 0 : SharedConfigSharedPtr sharedConfig() { return shared_config_; }
297 0 : const Router::MetadataMatchCriteria* metadataMatchCriteria() const {
298 0 : return cluster_metadata_match_criteria_.get();
299 0 : }
300 0 : const Network::HashPolicy* hashPolicy() { return hash_policy_.get(); }
301 0 : OptRef<Upstream::OdCdsApiHandle> onDemandCds() const {
302 0 : auto on_demand_config = shared_config_->onDemandConfig();
303 0 : return on_demand_config.has_value() ? makeOptRef(on_demand_config->onDemandCds())
304 0 : : OptRef<Upstream::OdCdsApiHandle>();
305 0 : }
306 : // This function must not be called if on demand is disabled.
307 0 : std::chrono::milliseconds odcdsTimeout() const {
308 0 : return shared_config_->onDemandConfig()->timeout();
309 0 : }
310 : // This function must not be called if on demand is disabled.
311 0 : const OnDemandStats& onDemandStats() const { return shared_config_->onDemandConfig()->stats(); }
312 0 : Random::RandomGenerator& randomGenerator() { return random_generator_; }
313 0 : bool flushAccessLogOnConnected() const { return shared_config_->flushAccessLogOnConnected(); }
314 :
315 : private:
316 : struct SimpleRouteImpl : public Route {
317 : SimpleRouteImpl(const Config& parent, absl::string_view cluster_name);
318 :
319 : // Route
320 0 : bool matches(Network::Connection&) const override { return true; }
321 0 : const std::string& clusterName() const override { return cluster_name_; }
322 0 : const Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
323 0 : return parent_.metadataMatchCriteria();
324 0 : }
325 :
326 : const Config& parent_;
327 : std::string cluster_name_;
328 : };
329 :
330 : class WeightedClusterEntry : public Route {
331 : public:
332 : WeightedClusterEntry(const Config& parent,
333 : const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::
334 : WeightedCluster::ClusterWeight& config);
335 :
336 0 : uint64_t clusterWeight() const { return cluster_weight_; }
337 :
338 : // Route
339 0 : bool matches(Network::Connection&) const override { return false; }
340 0 : const std::string& clusterName() const override { return cluster_name_; }
341 0 : const Router::MetadataMatchCriteria* metadataMatchCriteria() const override {
342 0 : if (metadata_match_criteria_) {
343 0 : return metadata_match_criteria_.get();
344 0 : }
345 0 : return parent_.metadataMatchCriteria();
346 0 : }
347 :
348 : private:
349 : const Config& parent_;
350 : const std::string cluster_name_;
351 : const uint64_t cluster_weight_;
352 : Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
353 : };
354 : using WeightedClusterEntryConstSharedPtr = std::shared_ptr<const WeightedClusterEntry>;
355 :
356 : RouteConstSharedPtr default_route_;
357 : std::vector<WeightedClusterEntryConstSharedPtr> weighted_clusters_;
358 : uint64_t total_cluster_weight_;
359 : std::vector<AccessLog::InstanceSharedPtr> access_logs_;
360 : const uint32_t max_connect_attempts_;
361 : ThreadLocal::SlotPtr upstream_drain_manager_slot_;
362 : SharedConfigSharedPtr shared_config_;
363 : std::unique_ptr<const Router::MetadataMatchCriteria> cluster_metadata_match_criteria_;
364 : Random::RandomGenerator& random_generator_;
365 : std::unique_ptr<const Network::HashPolicyImpl> hash_policy_;
366 : };
367 :
368 : using ConfigSharedPtr = std::shared_ptr<Config>;
369 :
370 : /**
371 : * Per-connection TCP Proxy Cluster configuration.
372 : */
373 : class PerConnectionCluster : public StreamInfo::FilterState::Object {
374 : public:
375 0 : PerConnectionCluster(absl::string_view cluster) : cluster_(cluster) {}
376 0 : const std::string& value() const { return cluster_; }
377 0 : absl::optional<std::string> serializeAsString() const override { return cluster_; }
378 : static const std::string& key();
379 :
380 : private:
381 : const std::string cluster_;
382 : };
383 :
384 : /**
385 : * An implementation of a TCP (L3/L4) proxy. This filter will instantiate a new outgoing TCP
386 : * connection using the defined load balancing proxy for the configured cluster. All data will
387 : * be proxied back and forth between the two connections.
388 : */
389 : class Filter : public Network::ReadFilter,
390 : public Upstream::LoadBalancerContextBase,
391 : protected Logger::Loggable<Logger::Id::filter>,
392 : public GenericConnectionPoolCallbacks {
393 : public:
394 : Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
395 : ~Filter() override;
396 :
397 : // Network::ReadFilter
398 : Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
399 : Network::FilterStatus onNewConnection() override;
400 : void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
401 : bool startUpstreamSecureTransport() override;
402 :
403 : // GenericConnectionPoolCallbacks
404 : void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
405 : Upstream::HostDescriptionConstSharedPtr& host,
406 : const Network::ConnectionInfoProvider& address_provider,
407 : Ssl::ConnectionInfoConstSharedPtr ssl_info) override;
408 : void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
409 : absl::string_view failure_reason,
410 : Upstream::HostDescriptionConstSharedPtr host) override;
411 :
412 : // Upstream::LoadBalancerContext
413 : const Router::MetadataMatchCriteria* metadataMatchCriteria() override;
414 0 : absl::optional<uint64_t> computeHashKey() override {
415 0 : auto hash_policy = config_->hashPolicy();
416 0 : if (hash_policy) {
417 0 : return hash_policy->generateHash(*downstreamConnection());
418 0 : }
419 :
420 0 : return {};
421 0 : }
422 0 : const Network::Connection* downstreamConnection() const override {
423 0 : return &read_callbacks_->connection();
424 0 : }
425 :
426 0 : const StreamInfo::StreamInfo* requestStreamInfo() const override {
427 0 : return &read_callbacks_->connection().streamInfo();
428 0 : }
429 :
430 0 : Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const override {
431 0 : return transport_socket_options_;
432 0 : }
433 :
434 0 : Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override {
435 0 : return upstream_options_;
436 0 : }
437 :
438 : // These two functions allow enabling/disabling reads on the upstream and downstream connections.
439 : // They are called by the Downstream/Upstream Watermark callbacks to limit buffering.
440 : void readDisableUpstream(bool disable);
441 : void readDisableDownstream(bool disable);
442 :
443 : struct UpstreamCallbacks : public Tcp::ConnectionPool::UpstreamCallbacks {
444 0 : UpstreamCallbacks(Filter* parent) : parent_(parent) {}
445 :
446 : // Tcp::ConnectionPool::UpstreamCallbacks
447 : void onUpstreamData(Buffer::Instance& data, bool end_stream) override;
448 : void onEvent(Network::ConnectionEvent event) override;
449 : void onAboveWriteBufferHighWatermark() override;
450 : void onBelowWriteBufferLowWatermark() override;
451 :
452 : void onBytesSent();
453 : void onIdleTimeout();
454 : void drain(Drainer& drainer);
455 :
456 : // Either parent_ or drainer_ will be non-NULL, but never both. This could be
457 : // logically be represented as a union, but saving one pointer of memory is
458 : // outweighed by more type safety/better error handling.
459 : //
460 : // Parent starts out as non-NULL. If the downstream connection is closed while
461 : // the upstream connection still has buffered data to flush, drainer_ becomes
462 : // non-NULL and parent_ is set to NULL.
463 : Filter* parent_{};
464 : Drainer* drainer_{};
465 :
466 : bool on_high_watermark_called_{false};
467 : };
468 :
469 : StreamInfo::StreamInfo& getStreamInfo();
470 :
471 : protected:
472 : struct DownstreamCallbacks : public Network::ConnectionCallbacks {
473 0 : DownstreamCallbacks(Filter& parent) : parent_(parent) {}
474 :
475 : // Network::ConnectionCallbacks
476 0 : void onEvent(Network::ConnectionEvent event) override { parent_.onDownstreamEvent(event); }
477 : void onAboveWriteBufferHighWatermark() override;
478 : void onBelowWriteBufferLowWatermark() override;
479 :
480 : Filter& parent_;
481 : bool on_high_watermark_called_{false};
482 : };
483 :
484 : enum class UpstreamFailureReason {
485 : ConnectFailed,
486 : NoHealthyUpstream,
487 : ResourceLimitExceeded,
488 : NoRoute,
489 : };
490 :
491 : // Callbacks for different error and success states during connection establishment
492 0 : virtual RouteConstSharedPtr pickRoute() {
493 0 : return config_->getRouteFromEntries(read_callbacks_->connection());
494 0 : }
495 :
496 : virtual void onInitFailure(UpstreamFailureReason reason);
497 : void initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats);
498 :
499 : // Create connection to the upstream cluster. This function can be repeatedly called on upstream
500 : // connection failure.
501 : Network::FilterStatus establishUpstreamConnection();
502 :
503 : // The callback upon on demand cluster discovery response.
504 : void onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status);
505 :
506 : bool maybeTunnel(Upstream::ThreadLocalCluster& cluster);
507 : void onConnectTimeout();
508 : void onDownstreamEvent(Network::ConnectionEvent event);
509 : void onUpstreamData(Buffer::Instance& data, bool end_stream);
510 : void onUpstreamEvent(Network::ConnectionEvent event);
511 : void onUpstreamConnection();
512 : void onIdleTimeout();
513 : void resetIdleTimer();
514 : void disableIdleTimer();
515 : void onMaxDownstreamConnectionDuration();
516 : void onAccessLogFlushInterval();
517 : void resetAccessLogFlushTimer();
518 : void flushAccessLog(AccessLog::AccessLogType access_log_type);
519 : void disableAccessLogFlushTimer();
520 :
521 : const ConfigSharedPtr config_;
522 : Upstream::ClusterManager& cluster_manager_;
523 : Network::ReadFilterCallbacks* read_callbacks_{};
524 :
525 : DownstreamCallbacks downstream_callbacks_;
526 : Event::TimerPtr idle_timer_;
527 : Event::TimerPtr connection_duration_timer_;
528 : Event::TimerPtr access_log_flush_timer_;
529 :
530 : // A pointer to the on demand cluster lookup when lookup is in flight.
531 : Upstream::ClusterDiscoveryCallbackHandlePtr cluster_discovery_handle_;
532 :
533 : std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
534 : // read filter.
535 : // The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist
536 : // until either the upstream or downstream connection is terminated.
537 : std::unique_ptr<GenericUpstream> upstream_;
538 : // The connection pool used to set up |upstream_|.
539 : // This will be non-null from when an upstream connection is attempted until
540 : // it either succeeds or fails.
541 : std::unique_ptr<GenericConnPool> generic_conn_pool_;
542 : // Time the filter first attempted to connect to the upstream after the
543 : // cluster is discovered. Capture the first time as the filter may try multiple times to connect
544 : // to the upstream.
545 : absl::optional<MonotonicTime> initial_upstream_connection_start_time_;
546 : RouteConstSharedPtr route_;
547 : Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
548 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
549 : Network::Socket::OptionsSharedPtr upstream_options_;
550 : uint32_t connect_attempts_{};
551 : bool connecting_{};
552 : bool downstream_closed_{};
553 : };
554 :
555 : // This class deals with an upstream connection that needs to finish flushing, when the downstream
556 : // connection has been closed. The TcpProxy is destroyed when the downstream connection is closed,
557 : // so handling the upstream connection here allows it to finish draining or timeout.
558 : class Drainer : public Event::DeferredDeletable, protected Logger::Loggable<Logger::Id::filter> {
559 : public:
560 : Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
561 : const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
562 : Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Event::TimerPtr&& idle_timer,
563 : const Upstream::HostDescriptionConstSharedPtr& upstream_host);
564 :
565 : void onEvent(Network::ConnectionEvent event);
566 : void onData(Buffer::Instance& data, bool end_stream);
567 : void onIdleTimeout();
568 : void onBytesSent();
569 : void cancelDrain();
570 : Event::Dispatcher& dispatcher();
571 :
572 : private:
573 : UpstreamDrainManager& parent_;
574 : std::shared_ptr<Filter::UpstreamCallbacks> callbacks_;
575 : Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_;
576 : Event::TimerPtr timer_;
577 : Upstream::HostDescriptionConstSharedPtr upstream_host_;
578 : Config::SharedConfigSharedPtr config_;
579 : };
580 :
581 : using DrainerPtr = std::unique_ptr<Drainer>;
582 :
583 : class UpstreamDrainManager : public ThreadLocal::ThreadLocalObject {
584 : public:
585 : ~UpstreamDrainManager() override;
586 : void add(const Config::SharedConfigSharedPtr& config,
587 : Tcp::ConnectionPool::ConnectionDataPtr&& upstream_conn_data,
588 : const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
589 : Event::TimerPtr&& idle_timer,
590 : const Upstream::HostDescriptionConstSharedPtr& upstream_host);
591 : void remove(Drainer& drainer, Event::Dispatcher& dispatcher);
592 :
593 : private:
594 : // This must be a map instead of set because there is no way to move elements
595 : // out of a set, and these elements get passed to deferredDelete() instead of
596 : // being deleted in-place. The key and value will always be equal.
597 : absl::node_hash_map<Drainer*, DrainerPtr> drainers_;
598 : };
599 :
600 : } // namespace TcpProxy
601 : } // namespace Envoy
|