Line data Source code
1 : #include "source/common/http/conn_pool_grid.h"
2 :
3 : #include <cstdint>
4 :
5 : #include "source/common/http/http3_status_tracker_impl.h"
6 : #include "source/common/http/mixed_conn_pool.h"
7 :
8 : #include "quiche/quic/core/http/spdy_utils.h"
9 : #include "quiche/quic/core/quic_versions.h"
10 :
11 : namespace Envoy {
12 : namespace Http {
13 :
14 : namespace {
15 0 : absl::string_view describePool(const ConnectionPool::Instance& pool) {
16 0 : return pool.protocolDescription();
17 0 : }
18 :
19 : static constexpr uint32_t kDefaultTimeoutMs = 300;
20 :
21 : std::string getSni(const Network::TransportSocketOptionsConstSharedPtr& options,
22 0 : Network::UpstreamTransportSocketFactory& transport_socket_factory) {
23 0 : if (options && options->serverNameOverride().has_value()) {
24 0 : return options->serverNameOverride().value();
25 0 : }
26 0 : return std::string(transport_socket_factory.defaultServerNameIndication());
27 0 : }
28 :
29 : } // namespace
30 :
31 : ConnectivityGrid::WrapperCallbacks::WrapperCallbacks(ConnectivityGrid& grid,
32 : Http::ResponseDecoder& decoder,
33 : PoolIterator pool_it,
34 : ConnectionPool::Callbacks& callbacks,
35 : const Instance::StreamOptions& options)
36 : : grid_(grid), decoder_(decoder), inner_callbacks_(&callbacks),
37 : next_attempt_timer_(
38 0 : grid_.dispatcher_.createTimer([this]() -> void { tryAnotherConnection(); })),
39 0 : current_(pool_it), stream_options_(options) {
40 0 : if (!stream_options_.can_use_http3_) {
41 : // If alternate protocols are explicitly disabled, there must have been a failed request over
42 : // HTTP/3 and the failure must be post-handshake. So disable HTTP/3 for this request.
43 0 : http3_attempt_failed_ = true;
44 0 : }
45 0 : }
46 :
47 : ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::ConnectionAttemptCallbacks(
48 : WrapperCallbacks& parent, PoolIterator it)
49 0 : : parent_(parent), pool_it_(it) {}
50 :
51 0 : ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::~ConnectionAttemptCallbacks() {
52 0 : if (cancellable_ != nullptr) {
53 0 : cancel(Envoy::ConnectionPool::CancelPolicy::Default);
54 0 : }
55 0 : }
56 :
57 : ConnectivityGrid::StreamCreationResult
58 0 : ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::newStream() {
59 0 : ASSERT(!parent_.grid_.isPoolHttp3(pool()) || parent_.stream_options_.can_use_http3_);
60 0 : auto* cancellable = pool().newStream(parent_.decoder_, *this, parent_.stream_options_);
61 0 : if (cancellable == nullptr) {
62 0 : return StreamCreationResult::ImmediateResult;
63 0 : }
64 0 : cancellable_ = cancellable;
65 0 : return StreamCreationResult::StreamCreationPending;
66 0 : }
67 :
68 : void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolFailure(
69 : ConnectionPool::PoolFailureReason reason, absl::string_view transport_failure_reason,
70 0 : Upstream::HostDescriptionConstSharedPtr host) {
71 0 : cancellable_ = nullptr; // Attempt failed and can no longer be cancelled.
72 0 : parent_.onConnectionAttemptFailed(this, reason, transport_failure_reason, host);
73 0 : }
74 :
75 : void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptFailed(
76 : ConnectionAttemptCallbacks* attempt, ConnectionPool::PoolFailureReason reason,
77 0 : absl::string_view transport_failure_reason, Upstream::HostDescriptionConstSharedPtr host) {
78 0 : ENVOY_LOG(trace, "{} pool failed to create connection to host '{}'.",
79 0 : describePool(attempt->pool()), host->hostname());
80 0 : if (grid_.isPoolHttp3(attempt->pool())) {
81 0 : http3_attempt_failed_ = true;
82 0 : }
83 0 : maybeMarkHttp3Broken();
84 :
85 0 : auto delete_this_on_return = attempt->removeFromList(connection_attempts_);
86 :
87 : // If there is another connection attempt in flight then let that proceed.
88 0 : if (!connection_attempts_.empty()) {
89 0 : return;
90 0 : }
91 :
92 : // If the next connection attempt does not immediately fail, let it proceed.
93 0 : if (tryAnotherConnection()) {
94 0 : return;
95 0 : }
96 :
97 : // If this point is reached, all pools have been tried. Pass the pool failure up to the
98 : // original caller, if the caller hasn't already been notified.
99 0 : ConnectionPool::Callbacks* callbacks = inner_callbacks_;
100 0 : inner_callbacks_ = nullptr;
101 0 : deleteThis();
102 0 : if (callbacks != nullptr) {
103 0 : ENVOY_LOG(trace, "Passing pool failure up to caller.", describePool(attempt->pool()),
104 0 : host->hostname());
105 0 : callbacks->onPoolFailure(reason, transport_failure_reason, host);
106 0 : }
107 0 : }
108 :
109 0 : void ConnectivityGrid::WrapperCallbacks::deleteThis() {
110 : // By removing the entry from the list, it will be deleted.
111 0 : removeFromList(grid_.wrapped_callbacks_);
112 0 : }
113 :
114 0 : ConnectivityGrid::StreamCreationResult ConnectivityGrid::WrapperCallbacks::newStream() {
115 0 : ENVOY_LOG(trace, "{} pool attempting to create a new stream to host '{}'.",
116 0 : describePool(**current_), grid_.origin_.hostname_);
117 0 : auto attempt = std::make_unique<ConnectionAttemptCallbacks>(*this, current_);
118 0 : LinkedList::moveIntoList(std::move(attempt), connection_attempts_);
119 0 : if (!next_attempt_timer_->enabled()) {
120 0 : next_attempt_timer_->enableTimer(grid_.next_attempt_duration_);
121 0 : }
122 : // Note that in the case of immediate attempt/failure, newStream will delete this.
123 0 : return connection_attempts_.front()->newStream();
124 0 : }
125 :
126 : void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptReady(
127 : ConnectionAttemptCallbacks* attempt, RequestEncoder& encoder,
128 : Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
129 0 : absl::optional<Http::Protocol> protocol) {
130 0 : ENVOY_LOG(trace, "{} pool successfully connected to host '{}'.", describePool(attempt->pool()),
131 0 : host->hostname());
132 0 : if (!grid_.isPoolHttp3(attempt->pool())) {
133 0 : tcp_attempt_succeeded_ = true;
134 0 : maybeMarkHttp3Broken();
135 0 : }
136 :
137 0 : auto delete_this_on_return = attempt->removeFromList(connection_attempts_);
138 0 : ConnectionPool::Callbacks* callbacks = inner_callbacks_;
139 0 : inner_callbacks_ = nullptr;
140 : // If an HTTP/3 connection attempts is in progress, let it complete so that if it succeeds
141 : // it can be used for future requests. But if there is a TCP connection attempt in progress,
142 : // cancel it.
143 0 : if (grid_.isPoolHttp3(attempt->pool())) {
144 0 : cancelAllPendingAttempts(Envoy::ConnectionPool::CancelPolicy::Default);
145 0 : }
146 0 : if (connection_attempts_.empty()) {
147 0 : deleteThis();
148 0 : }
149 0 : if (callbacks != nullptr) {
150 0 : callbacks->onPoolReady(encoder, host, info, protocol);
151 0 : } else if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.avoid_zombie_streams")) {
152 0 : encoder.getStream().resetStream(StreamResetReason::LocalReset);
153 0 : }
154 0 : }
155 :
156 0 : void ConnectivityGrid::WrapperCallbacks::maybeMarkHttp3Broken() {
157 0 : if (http3_attempt_failed_ && tcp_attempt_succeeded_) {
158 0 : ENVOY_LOG(trace, "Marking HTTP/3 broken for host '{}'.", grid_.origin_.hostname_);
159 0 : grid_.markHttp3Broken();
160 0 : }
161 0 : }
162 :
163 : void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolReady(
164 : RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host,
165 0 : StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) {
166 0 : cancellable_ = nullptr; // Attempt succeeded and can no longer be cancelled.
167 0 : parent_.onConnectionAttemptReady(this, encoder, host, info, protocol);
168 0 : }
169 :
170 : void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::cancel(
171 0 : Envoy::ConnectionPool::CancelPolicy cancel_policy) {
172 0 : auto cancellable = cancellable_;
173 0 : cancellable_ = nullptr; // Prevent repeated cancellations.
174 0 : cancellable->cancel(cancel_policy);
175 0 : }
176 :
177 0 : void ConnectivityGrid::WrapperCallbacks::cancel(Envoy::ConnectionPool::CancelPolicy cancel_policy) {
178 : // If the newStream caller cancels the stream request, pass the cancellation on
179 : // to each connection attempt.
180 0 : cancelAllPendingAttempts(cancel_policy);
181 0 : deleteThis();
182 0 : }
183 :
184 : void ConnectivityGrid::WrapperCallbacks::cancelAllPendingAttempts(
185 0 : Envoy::ConnectionPool::CancelPolicy cancel_policy) {
186 0 : for (auto& attempt : connection_attempts_) {
187 0 : attempt->cancel(cancel_policy);
188 0 : }
189 0 : connection_attempts_.clear();
190 0 : }
191 :
192 0 : bool ConnectivityGrid::WrapperCallbacks::tryAnotherConnection() {
193 0 : absl::optional<PoolIterator> next_pool = grid_.nextPool(current_);
194 0 : if (!next_pool.has_value()) {
195 : // If there are no other pools to try, return false.
196 0 : return false;
197 0 : }
198 : // Create a new connection attempt for the next pool. If we reach this point
199 : // return true regardless of if newStream resulted in an immediate result or
200 : // an async call, as either way the attempt will result in success/failure
201 : // callbacks.
202 0 : current_ = next_pool.value();
203 0 : newStream();
204 0 : return true;
205 0 : }
206 :
207 : ConnectivityGrid::ConnectivityGrid(
208 : Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
209 : Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
210 : const Network::ConnectionSocket::OptionsSharedPtr& options,
211 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
212 : Upstream::ClusterConnectivityState& state, TimeSource& time_source,
213 : HttpServerPropertiesCacheSharedPtr alternate_protocols,
214 : ConnectivityOptions connectivity_options, Quic::QuicStatNames& quic_stat_names,
215 : Stats::Scope& scope, Http::PersistentQuicInfo& quic_info)
216 : : dispatcher_(dispatcher), random_generator_(random_generator), host_(host),
217 : priority_(priority), options_(options), transport_socket_options_(transport_socket_options),
218 : state_(state), next_attempt_duration_(std::chrono::milliseconds(kDefaultTimeoutMs)),
219 : time_source_(time_source), alternate_protocols_(alternate_protocols),
220 : quic_stat_names_(quic_stat_names), scope_(scope),
221 : // TODO(RyanTheOptimist): Figure out how scheme gets plumbed in here.
222 : origin_("https", getSni(transport_socket_options, host_->transportSocketFactory()),
223 : host_->address()->ip()->port()),
224 0 : quic_info_(quic_info) {
225 : // ProdClusterManagerFactory::allocateConnPool verifies the protocols are HTTP/1, HTTP/2 and
226 : // HTTP/3.
227 0 : ASSERT(connectivity_options.protocols_.size() == 3);
228 0 : ASSERT(alternate_protocols);
229 0 : std::chrono::milliseconds rtt =
230 0 : std::chrono::duration_cast<std::chrono::milliseconds>(alternate_protocols_->getSrtt(origin_));
231 0 : if (rtt.count() != 0) {
232 0 : next_attempt_duration_ = std::chrono::milliseconds(rtt.count() * 2);
233 0 : }
234 0 : }
235 :
236 0 : ConnectivityGrid::~ConnectivityGrid() {
237 : // Ignore idle callbacks while the pools are destroyed below.
238 0 : destroying_ = true;
239 : // Callbacks might have pending streams registered with the pools, so cancel and delete
240 : // the callback before deleting the pools.
241 0 : wrapped_callbacks_.clear();
242 0 : pools_.clear();
243 0 : }
244 :
245 0 : void ConnectivityGrid::deleteIsPending() {
246 0 : deferred_deleting_ = true;
247 0 : for (const auto& pool : pools_) {
248 0 : pool->deleteIsPending();
249 0 : }
250 0 : }
251 :
252 0 : absl::optional<ConnectivityGrid::PoolIterator> ConnectivityGrid::createNextPool() {
253 0 : ASSERT(!deferred_deleting_);
254 : // Pools are created by newStream, which should not be called during draining.
255 0 : ASSERT(!draining_);
256 : // Right now, only H3 and TCP are supported, so if there are 2 pools we're done.
257 0 : if (pools_.size() == 2 || draining_) {
258 0 : return absl::nullopt;
259 0 : }
260 :
261 : // HTTP/3 is hard-coded as higher priority, H2 as secondary.
262 0 : ConnectionPool::InstancePtr pool;
263 0 : if (pools_.empty()) {
264 0 : pool = Http3::allocateConnPool(
265 0 : dispatcher_, random_generator_, host_, priority_, options_, transport_socket_options_,
266 0 : state_, quic_stat_names_, *alternate_protocols_, scope_,
267 0 : makeOptRefFromPtr<Http3::PoolConnectResultCallback>(this), quic_info_);
268 0 : } else {
269 0 : pool = std::make_unique<HttpConnPoolImplMixed>(dispatcher_, random_generator_, host_, priority_,
270 0 : options_, transport_socket_options_, state_,
271 0 : origin_, alternate_protocols_);
272 0 : }
273 :
274 0 : setupPool(*pool);
275 0 : pools_.push_back(std::move(pool));
276 :
277 0 : return --pools_.end();
278 0 : }
279 :
280 0 : void ConnectivityGrid::setupPool(ConnectionPool::Instance& pool) {
281 0 : pool.addIdleCallback([this]() { onIdleReceived(); });
282 0 : }
283 :
284 0 : bool ConnectivityGrid::hasActiveConnections() const {
285 : // This is O(n) but the function is constant and there are no plans for n > 8.
286 0 : for (const auto& pool : pools_) {
287 0 : if (pool->hasActiveConnections()) {
288 0 : return true;
289 0 : }
290 0 : }
291 0 : return false;
292 0 : }
293 :
294 : ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder& decoder,
295 : ConnectionPool::Callbacks& callbacks,
296 0 : const Instance::StreamOptions& options) {
297 0 : ASSERT(!deferred_deleting_);
298 :
299 : // New streams should not be created during draining.
300 0 : ASSERT(!draining_);
301 :
302 0 : if (pools_.empty()) {
303 0 : createNextPool();
304 0 : }
305 0 : PoolIterator pool = pools_.begin();
306 0 : Instance::StreamOptions overriding_options = options;
307 0 : bool delay_tcp_attempt = true;
308 0 : if (shouldAttemptHttp3() && options.can_use_http3_) {
309 0 : if (getHttp3StatusTracker().hasHttp3FailedRecently()) {
310 0 : overriding_options.can_send_early_data_ = false;
311 0 : delay_tcp_attempt = false;
312 0 : }
313 0 : } else {
314 : // Before skipping to the next pool, make sure it has been created.
315 0 : createNextPool();
316 0 : ++pool;
317 0 : }
318 0 : auto wrapped_callback =
319 0 : std::make_unique<WrapperCallbacks>(*this, decoder, pool, callbacks, overriding_options);
320 0 : ConnectionPool::Cancellable* ret = wrapped_callback.get();
321 0 : LinkedList::moveIntoList(std::move(wrapped_callback), wrapped_callbacks_);
322 0 : if (wrapped_callbacks_.front()->newStream() == StreamCreationResult::ImmediateResult) {
323 : // If newStream succeeds, return nullptr as the caller has received their
324 : // callback and does not need a cancellable handle. At this point the
325 : // WrappedCallbacks object has also been deleted.
326 0 : return nullptr;
327 0 : }
328 0 : if (!delay_tcp_attempt) {
329 : // Immediately start TCP attempt if HTTP/3 failed recently.
330 0 : wrapped_callbacks_.front()->tryAnotherConnection();
331 0 : }
332 0 : return ret;
333 0 : }
334 :
335 0 : void ConnectivityGrid::addIdleCallback(IdleCb cb) {
336 : // Add the callback to the list of callbacks to be called when all drains are
337 : // complete.
338 0 : idle_callbacks_.emplace_back(cb);
339 0 : }
340 :
341 0 : void ConnectivityGrid::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
342 0 : if (draining_) {
343 : // A drain callback has already been set, and only needs to happen once.
344 0 : return;
345 0 : }
346 :
347 0 : if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
348 : // Note that no new pools can be created from this point on
349 : // as createNextPool fast-fails if `draining_` is true.
350 0 : draining_ = true;
351 0 : }
352 :
353 0 : for (auto& pool : pools_) {
354 0 : pool->drainConnections(drain_behavior);
355 0 : }
356 0 : }
357 :
358 0 : Upstream::HostDescriptionConstSharedPtr ConnectivityGrid::host() const { return host_; }
359 :
360 0 : bool ConnectivityGrid::maybePreconnect(float) {
361 0 : return false; // Preconnect not yet supported for the grid.
362 0 : }
363 :
364 0 : absl::optional<ConnectivityGrid::PoolIterator> ConnectivityGrid::nextPool(PoolIterator pool_it) {
365 0 : pool_it++;
366 0 : if (pool_it != pools_.end()) {
367 0 : return pool_it;
368 0 : }
369 0 : return createNextPool();
370 0 : }
371 :
372 0 : bool ConnectivityGrid::isPoolHttp3(const ConnectionPool::Instance& pool) {
373 0 : return &pool == pools_.begin()->get();
374 0 : }
375 :
376 0 : HttpServerPropertiesCache::Http3StatusTracker& ConnectivityGrid::getHttp3StatusTracker() const {
377 0 : ENVOY_BUG(host_->address()->type() == Network::Address::Type::Ip, "Address is not an IP address");
378 0 : return alternate_protocols_->getOrCreateHttp3StatusTracker(origin_);
379 0 : }
380 :
381 0 : bool ConnectivityGrid::isHttp3Broken() const { return getHttp3StatusTracker().isHttp3Broken(); }
382 :
383 0 : void ConnectivityGrid::markHttp3Broken() {
384 0 : host_->cluster().trafficStats()->upstream_http3_broken_.inc();
385 0 : getHttp3StatusTracker().markHttp3Broken();
386 0 : }
387 :
388 0 : void ConnectivityGrid::markHttp3Confirmed() { getHttp3StatusTracker().markHttp3Confirmed(); }
389 :
390 0 : bool ConnectivityGrid::isIdle() const {
391 : // This is O(n) but the function is constant and there are no plans for n > 8.
392 0 : bool idle = true;
393 0 : for (const auto& pool : pools_) {
394 0 : idle &= pool->isIdle();
395 0 : }
396 0 : return idle;
397 0 : }
398 :
399 0 : void ConnectivityGrid::onIdleReceived() {
400 : // Don't do any work under the stack of ~ConnectivityGrid()
401 0 : if (destroying_) {
402 0 : return;
403 0 : }
404 :
405 0 : if (isIdle()) {
406 0 : for (auto& callback : idle_callbacks_) {
407 0 : callback();
408 0 : }
409 0 : }
410 0 : }
411 :
412 0 : bool ConnectivityGrid::shouldAttemptHttp3() {
413 0 : if (host_->address()->type() != Network::Address::Type::Ip) {
414 0 : IS_ENVOY_BUG("Address is not an IP address");
415 0 : return false;
416 0 : }
417 0 : uint32_t port = host_->address()->ip()->port();
418 0 : OptRef<const std::vector<HttpServerPropertiesCache::AlternateProtocol>> protocols =
419 0 : alternate_protocols_->findAlternatives(origin_);
420 0 : if (!protocols.has_value()) {
421 0 : ENVOY_LOG(trace, "No alternate protocols available for host '{}', skipping HTTP/3.",
422 0 : origin_.hostname_);
423 0 : return false;
424 0 : }
425 0 : if (isHttp3Broken()) {
426 0 : ENVOY_LOG(trace, "HTTP/3 is broken to host '{}', skipping.", host_->hostname());
427 0 : return false;
428 0 : }
429 0 : for (const HttpServerPropertiesCache::AlternateProtocol& protocol : protocols.ref()) {
430 : // TODO(RyanTheOptimist): Handle alternate protocols which change hostname or port.
431 0 : if (!protocol.hostname_.empty() || protocol.port_ != port) {
432 0 : ENVOY_LOG(trace,
433 0 : "Alternate protocol for host '{}' attempts to change host or port, skipping.",
434 0 : origin_.hostname_);
435 0 : continue;
436 0 : }
437 :
438 : // TODO(RyanTheOptimist): Cache this mapping, but handle the supported versions list
439 : // changing dynamically.
440 0 : spdy::SpdyAltSvcWireFormat::AlternativeService alt_svc(protocol.alpn_, protocol.hostname_,
441 0 : protocol.port_, 0, {});
442 0 : quic::ParsedQuicVersion version = quic::SpdyUtils::ExtractQuicVersionFromAltSvcEntry(
443 0 : alt_svc, quic::CurrentSupportedVersions());
444 0 : if (version != quic::ParsedQuicVersion::Unsupported()) {
445 : // TODO(RyanTheOptimist): Pass this version down to the HTTP/3 pool.
446 0 : ENVOY_LOG(trace, "HTTP/3 advertised for host '{}'", origin_.hostname_);
447 0 : return true;
448 0 : }
449 :
450 0 : ENVOY_LOG(trace, "Alternate protocol for host '{}' has unsupported ALPN '{}', skipping.",
451 0 : origin_.hostname_, protocol.alpn_);
452 0 : }
453 :
454 0 : ENVOY_LOG(trace, "HTTP/3 is not available to host '{}', skipping.", origin_.hostname_);
455 0 : return false;
456 0 : }
457 :
458 0 : void ConnectivityGrid::onHandshakeComplete() {
459 0 : ENVOY_LOG(trace, "Marking HTTP/3 confirmed for host '{}'.", origin_.hostname_);
460 0 : markHttp3Confirmed();
461 0 : }
462 :
463 0 : void ConnectivityGrid::onZeroRttHandshakeFailed() {
464 0 : ENVOY_LOG(trace, "Marking HTTP/3 failed for host '{}'.", host_->hostname());
465 0 : getHttp3StatusTracker().markHttp3FailedRecently();
466 0 : }
467 :
468 : } // namespace Http
469 : } // namespace Envoy
|