1
#include "source/common/http/conn_pool_base.h"
2

            
3
#include "source/common/common/assert.h"
4
#include "source/common/http/utility.h"
5
#include "source/common/network/transport_socket_options_impl.h"
6
#include "source/common/runtime/runtime_features.h"
7
#include "source/common/stats/timespan_impl.h"
8
#include "source/common/upstream/upstream_impl.h"
9

            
10
namespace Envoy {
11
namespace Http {
12

            
13
Network::TransportSocketOptionsConstSharedPtr
14
wrapTransportSocketOptions(Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
15
11868
                           std::vector<Protocol> protocols) {
16
11868
  std::vector<std::string> fallbacks;
17
11935
  for (auto protocol : protocols) {
18
    // If configured to do so, we override the ALPN to use for the upstream connection to match the
19
    // selected protocol.
20
11935
    switch (protocol) {
21
    case Http::Protocol::Http10:
22
      PANIC("not imlemented");
23
4937
    case Http::Protocol::Http11:
24
4937
      fallbacks.push_back(Http::Utility::AlpnNames::get().Http11);
25
4937
      break;
26
6049
    case Http::Protocol::Http2:
27
6049
      fallbacks.push_back(Http::Utility::AlpnNames::get().Http2);
28
6049
      break;
29
949
    case Http::Protocol::Http3:
30
      // HTTP3 ALPN is set in the QUIC stack based on supported versions.
31
949
      break;
32
11935
    }
33
11935
  }
34

            
35
11868
  if (transport_socket_options) {
36
9315
    return std::make_shared<Network::AlpnDecoratingTransportSocketOptions>(
37
9315
        std::move(fallbacks), transport_socket_options);
38
10439
  } else {
39
2553
    return std::make_shared<Network::TransportSocketOptionsImpl>(
40
2553
        "", std::vector<std::string>{}, std::vector<std::string>{}, std::move(fallbacks));
41
2553
  }
42
11868
}
43

            
44
HttpConnPoolImplBase::HttpConnPoolImplBase(
45
    Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
46
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
47
    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
48
    Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
49
    std::vector<Http::Protocol> protocols, Server::OverloadManager& overload_manager)
50
11868
    : Envoy::ConnectionPool::ConnPoolImplBase(
51
11868
          host, priority, dispatcher, options,
52
11868
          wrapTransportSocketOptions(transport_socket_options, protocols), state, overload_manager),
53
11868
      random_generator_(random_generator) {
54
11868
  ASSERT(!protocols.empty());
55
11868
}
56

            
57
11868
HttpConnPoolImplBase::~HttpConnPoolImplBase() { destructAllConnections(); }
58

            
59
ConnectionPool::Cancellable*
60
HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
61
                                Http::ConnectionPool::Callbacks& callbacks,
62
47380
                                const Instance::StreamOptions& options) {
63
47380
  HttpAttachContext context({&response_decoder, &callbacks});
64
47380
  return newStreamImpl(context, options.can_send_early_data_);
65
47380
}
66

            
67
11
bool HttpConnPoolImplBase::hasActiveConnections() const {
68
11
  return (hasPendingStreams() || (hasActiveStreams()));
69
11
}
70

            
71
ConnectionPool::Cancellable*
72
HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
73
29598
                                       bool can_send_early_data) {
74
29598
  Http::ResponseDecoder& decoder = *typedContext<HttpAttachContext>(context).decoder_;
75
29598
  Http::ConnectionPool::Callbacks& callbacks = *typedContext<HttpAttachContext>(context).callbacks_;
76
29598
  ENVOY_LOG(debug,
77
29598
            "queueing stream due to no available connections (ready={} busy={} connecting={})",
78
29598
            ready_clients_.size(), busy_clients_.size(), connecting_clients_.size());
79
29598
  Envoy::ConnectionPool::PendingStreamPtr pending_stream(
80
29598
      new HttpPendingStream(*this, decoder, callbacks, can_send_early_data));
81
29598
  return addPendingStream(std::move(pending_stream));
82
29598
}
83

            
84
void HttpConnPoolImplBase::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
85
46730
                                       Envoy::ConnectionPool::AttachContext& context) {
86
46730
  ActiveClient* http_client = static_cast<ActiveClient*>(&client);
87
46730
  auto& http_context = typedContext<HttpAttachContext>(context);
88
  // This decoder might have already died if ConnectivityGrid is in use and TCP
89
  // win over QUIC.
90
46730
  Http::ResponseDecoder& response_decoder = *http_context.decoder_;
91
46730
  Http::ConnectionPool::Callbacks& callbacks = *http_context.callbacks_;
92

            
93
  // Track this request on the connection
94
46730
  http_client->request_count_++;
95

            
96
46730
  Http::RequestEncoder* new_encoder = nullptr;
97
46730
  if (http_context.decoder_handle_ == nullptr) {
98
4
    new_encoder = &http_client->newStreamEncoder(response_decoder);
99
46726
  } else {
100
46726
    new_encoder = &http_client->newStreamEncoder(std::move(http_context.decoder_handle_));
101
46726
  }
102
46730
  callbacks.onPoolReady(*new_encoder, client.real_host_description_,
103
46730
                        http_client->codec_client_->streamInfo(),
104
46730
                        http_client->codec_client_->protocol());
105
46730
}
106

            
107
// All streams are 2^31. Client streams are half that, minus stream 0. Just to be on the safe
108
// side we do 2^29.
109
constexpr uint32_t DEFAULT_MAX_STREAMS = 1U << 29;
110

            
111
889
void MultiplexedActiveClientBase::onGoAway(Http::GoAwayErrorCode) {
112
889
  ENVOY_CONN_LOG(debug, "remote goaway", *codec_client_);
113
889
  parent_.host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
114
889
  if (state() != ActiveClient::State::Draining) {
115
888
    if (codec_client_->numActiveRequests() == 0) {
116
759
      codec_client_->close();
117
796
    } else {
118
129
      parent_.transitionActiveClientState(*this, ActiveClient::State::Draining);
119
129
    }
120
888
  }
121
889
}
122

            
123
// Adjust the concurrent stream limit if the negotiated concurrent stream limit
124
// is lower than the local max configured streams.
125
//
126
// Note: if multiple streams are assigned to a connection before the settings
127
// are received, they may still be reset by the peer. This could be avoided by
128
// not considering http/2 connections connected until the SETTINGS frame is
129
// received, but that would result in a latency penalty instead.
130
5727
void MultiplexedActiveClientBase::onSettings(ReceivedSettings& settings) {
131
5727
  if (settings.maxConcurrentStreams().has_value()) {
132
5503
    int64_t old_unused_capacity = currentUnusedCapacity();
133
    // Given config limits old_unused_capacity should never exceed int32_t.
134
5503
    ASSERT(std::numeric_limits<int32_t>::max() >= old_unused_capacity);
135
5503
    if (parent().cache() && parent().origin().has_value()) {
136
107
      parent().cache()->setConcurrentStreams(*parent().origin(),
137
107
                                             settings.maxConcurrentStreams().value());
138
107
    }
139
5503
    concurrent_stream_limit_ =
140
5503
        std::min(settings.maxConcurrentStreams().value(), configured_stream_limit_);
141

            
142
5503
    int64_t delta = old_unused_capacity - currentUnusedCapacity();
143
5503
    if (state() == ActiveClient::State::Ready && currentUnusedCapacity() <= 0) {
144
15
      parent_.transitionActiveClientState(*this, ActiveClient::State::Busy);
145
5488
    } else if (state() == ActiveClient::State::Busy && currentUnusedCapacity() > 0) {
146
5
      parent_.transitionActiveClientState(*this, ActiveClient::State::Ready);
147
5
    }
148

            
149
5503
    if (delta > 0) {
150
29
      parent_.decrClusterStreamCapacity(delta);
151
29
      ENVOY_CONN_LOG(trace, "Decreasing stream capacity by {}", *codec_client_, delta);
152
5480
    } else if (delta < 0) {
153
6
      parent_.incrClusterStreamCapacity(-delta);
154
6
      ENVOY_CONN_LOG(trace, "Increasing stream capacity by {}", *codec_client_, -delta);
155
6
    }
156
5503
  }
157
5727
}
158

            
159
21208
void MultiplexedActiveClientBase::onStreamDestroy() {
160
21208
  parent().onStreamClosed(*this, false);
161

            
162
  // If we are destroying this stream because of a disconnect, do not check for drain here. We will
163
  // wait until the connection has been fully drained of streams and then check in the connection
164
  // event callback.
165
21208
  if (!closed_with_active_rq_) {
166
19595
    parent().checkForIdleAndCloseIdleConnsIfDraining();
167
19595
  }
168
21208
}
169

            
170
8285
void MultiplexedActiveClientBase::onStreamReset(Http::StreamResetReason reason) {
171
8285
  switch (reason) {
172
1603
  case StreamResetReason::ConnectionTermination:
173
1611
  case StreamResetReason::LocalConnectionFailure:
174
1613
  case StreamResetReason::RemoteConnectionFailure:
175
1613
  case StreamResetReason::ConnectionTimeout:
176
1613
    parent_.host()->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
177
1613
    closed_with_active_rq_ = true;
178
1613
    break;
179
5934
  case StreamResetReason::LocalReset:
180
6069
  case StreamResetReason::ProtocolError:
181
6069
  case StreamResetReason::OverloadManager:
182
6069
    parent_.host()->cluster().trafficStats()->upstream_rq_tx_reset_.inc();
183
6069
    break;
184
598
  case StreamResetReason::RemoteReset:
185
598
    parent_.host()->cluster().trafficStats()->upstream_rq_rx_reset_.inc();
186
598
    break;
187
  case StreamResetReason::LocalRefusedStreamReset:
188
5
  case StreamResetReason::RemoteRefusedStreamReset:
189
5
  case StreamResetReason::Overflow:
190
5
  case StreamResetReason::ConnectError:
191
5
  case StreamResetReason::Http1PrematureUpstreamHalfClose:
192
5
    break;
193
8285
  }
194
8285
}
195

            
196
13145
uint32_t MultiplexedActiveClientBase::maxStreamsPerConnection(uint32_t max_streams_config) {
197
13145
  return (max_streams_config != 0) ? max_streams_config : DEFAULT_MAX_STREAMS;
198
13145
}
199

            
200
MultiplexedActiveClientBase::MultiplexedActiveClientBase(
201
    HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
202
    uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
203
    OptRef<Upstream::Host::CreateConnectionData> data)
204
7009
    : Envoy::Http::ActiveClient(
205
7009
          parent, maxStreamsPerConnection(parent.host()->cluster().maxRequestsPerConnection()),
206
7009
          effective_concurrent_streams, max_configured_concurrent_streams, data) {
207
7009
  codec_client_->setCodecClientCallbacks(*this);
208
7009
  codec_client_->setCodecConnectionCallbacks(*this);
209
7009
  cx_total.inc();
210
7009
}
211

            
212
7009
bool MultiplexedActiveClientBase::closingWithIncompleteStream() const {
213
7009
  return closed_with_active_rq_;
214
7009
}
215

            
216
3
RequestEncoder& MultiplexedActiveClientBase::newStreamEncoder(ResponseDecoder& response_decoder) {
217
3
  return codec_client_->newStream(response_decoder);
218
3
}
219

            
220
RequestEncoder&
221
21205
MultiplexedActiveClientBase::newStreamEncoder(ResponseDecoderHandlePtr response_decoder_handle) {
222
21205
  return codec_client_->newStream(std::move(response_decoder_handle));
223
21205
}
224

            
225
} // namespace Http
226
} // namespace Envoy