Line data Source code
1 : #include "source/common/http/conn_pool_base.h"
2 :
3 : #include "source/common/common/assert.h"
4 : #include "source/common/http/utility.h"
5 : #include "source/common/network/transport_socket_options_impl.h"
6 : #include "source/common/runtime/runtime_features.h"
7 : #include "source/common/stats/timespan_impl.h"
8 : #include "source/common/upstream/upstream_impl.h"
9 :
10 : namespace Envoy {
11 : namespace Http {
12 :
13 : Network::TransportSocketOptionsConstSharedPtr
14 : wrapTransportSocketOptions(Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
15 173 : std::vector<Protocol> protocols) {
16 173 : std::vector<std::string> fallbacks;
17 173 : for (auto protocol : protocols) {
18 : // If configured to do so, we override the ALPN to use for the upstream connection to match the
19 : // selected protocol.
20 173 : switch (protocol) {
21 0 : case Http::Protocol::Http10:
22 0 : PANIC("not imlemented");
23 68 : case Http::Protocol::Http11:
24 68 : fallbacks.push_back(Http::Utility::AlpnNames::get().Http11);
25 68 : break;
26 105 : case Http::Protocol::Http2:
27 105 : fallbacks.push_back(Http::Utility::AlpnNames::get().Http2);
28 105 : break;
29 0 : case Http::Protocol::Http3:
30 : // HTTP3 ALPN is set in the QUIC stack based on supported versions.
31 0 : break;
32 173 : }
33 173 : }
34 :
35 173 : if (transport_socket_options) {
36 140 : return std::make_shared<Network::AlpnDecoratingTransportSocketOptions>(
37 140 : std::move(fallbacks), transport_socket_options);
38 168 : } else {
39 33 : return std::make_shared<Network::TransportSocketOptionsImpl>(
40 33 : "", std::vector<std::string>{}, std::vector<std::string>{}, std::move(fallbacks));
41 33 : }
42 173 : }
43 :
44 : HttpConnPoolImplBase::HttpConnPoolImplBase(
45 : Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
46 : Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
47 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
48 : Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
49 : std::vector<Http::Protocol> protocols)
50 : : Envoy::ConnectionPool::ConnPoolImplBase(
51 : host, priority, dispatcher, options,
52 : wrapTransportSocketOptions(transport_socket_options, protocols), state),
53 173 : random_generator_(random_generator) {
54 173 : ASSERT(!protocols.empty());
55 173 : }
56 :
57 173 : HttpConnPoolImplBase::~HttpConnPoolImplBase() { destructAllConnections(); }
58 :
59 : ConnectionPool::Cancellable*
60 : HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
61 : Http::ConnectionPool::Callbacks& callbacks,
62 251 : const Instance::StreamOptions& options) {
63 251 : HttpAttachContext context({&response_decoder, &callbacks});
64 251 : return newStreamImpl(context, options.can_send_early_data_);
65 251 : }
66 :
67 0 : bool HttpConnPoolImplBase::hasActiveConnections() const {
68 0 : return (hasPendingStreams() || (hasActiveStreams()));
69 0 : }
70 :
71 : ConnectionPool::Cancellable*
72 : HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
73 173 : bool can_send_early_data) {
74 173 : Http::ResponseDecoder& decoder = *typedContext<HttpAttachContext>(context).decoder_;
75 173 : Http::ConnectionPool::Callbacks& callbacks = *typedContext<HttpAttachContext>(context).callbacks_;
76 173 : ENVOY_LOG(debug,
77 173 : "queueing stream due to no available connections (ready={} busy={} connecting={})",
78 173 : ready_clients_.size(), busy_clients_.size(), connecting_clients_.size());
79 173 : Envoy::ConnectionPool::PendingStreamPtr pending_stream(
80 173 : new HttpPendingStream(*this, decoder, callbacks, can_send_early_data));
81 173 : return addPendingStream(std::move(pending_stream));
82 173 : }
83 :
84 : void HttpConnPoolImplBase::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
85 202 : Envoy::ConnectionPool::AttachContext& context) {
86 202 : ActiveClient* http_client = static_cast<ActiveClient*>(&client);
87 202 : auto& http_context = typedContext<HttpAttachContext>(context);
88 202 : Http::ResponseDecoder& response_decoder = *http_context.decoder_;
89 202 : Http::ConnectionPool::Callbacks& callbacks = *http_context.callbacks_;
90 202 : Http::RequestEncoder& new_encoder = http_client->newStreamEncoder(response_decoder);
91 202 : callbacks.onPoolReady(new_encoder, client.real_host_description_,
92 202 : http_client->codec_client_->streamInfo(),
93 202 : http_client->codec_client_->protocol());
94 202 : }
95 :
96 : // All streams are 2^31. Client streams are half that, minus stream 0. Just to be on the safe
97 : // side we do 2^29.
98 : static const uint64_t DEFAULT_MAX_STREAMS = (1 << 29);
99 :
100 5 : void MultiplexedActiveClientBase::onGoAway(Http::GoAwayErrorCode) {
101 5 : ENVOY_CONN_LOG(debug, "remote goaway", *codec_client_);
102 5 : parent_.host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
103 5 : if (state() != ActiveClient::State::Draining) {
104 5 : if (codec_client_->numActiveRequests() == 0) {
105 5 : codec_client_->close();
106 5 : } else {
107 0 : parent_.transitionActiveClientState(*this, ActiveClient::State::Draining);
108 0 : }
109 5 : }
110 5 : }
111 :
112 : // Adjust the concurrent stream limit if the negotiated concurrent stream limit
113 : // is lower than the local max configured streams.
114 : //
115 : // Note: if multiple streams are assigned to a connection before the settings
116 : // are received, they may still be reset by the peer. This could be avoided by
117 : // not considering http/2 connections connected until the SETTINGS frame is
118 : // received, but that would result in a latency penalty instead.
119 68 : void MultiplexedActiveClientBase::onSettings(ReceivedSettings& settings) {
120 68 : if (settings.maxConcurrentStreams().has_value()) {
121 38 : int64_t old_unused_capacity = currentUnusedCapacity();
122 : // Given config limits old_unused_capacity should never exceed int32_t.
123 38 : ASSERT(std::numeric_limits<int32_t>::max() >= old_unused_capacity);
124 38 : if (parent().cache() && parent().origin().has_value()) {
125 0 : parent().cache()->setConcurrentStreams(*parent().origin(),
126 0 : settings.maxConcurrentStreams().value());
127 0 : }
128 38 : concurrent_stream_limit_ =
129 38 : std::min(settings.maxConcurrentStreams().value(), configured_stream_limit_);
130 :
131 38 : int64_t delta = old_unused_capacity - currentUnusedCapacity();
132 38 : if (state() == ActiveClient::State::Ready && currentUnusedCapacity() <= 0) {
133 0 : parent_.transitionActiveClientState(*this, ActiveClient::State::Busy);
134 38 : } else if (state() == ActiveClient::State::Busy && currentUnusedCapacity() > 0) {
135 0 : parent_.transitionActiveClientState(*this, ActiveClient::State::Ready);
136 0 : }
137 :
138 38 : if (delta > 0) {
139 0 : parent_.decrClusterStreamCapacity(delta);
140 0 : ENVOY_CONN_LOG(trace, "Decreasing stream capacity by {}", *codec_client_, delta);
141 38 : } else if (delta < 0) {
142 0 : parent_.incrClusterStreamCapacity(-delta);
143 0 : ENVOY_CONN_LOG(trace, "Increasing stream capacity by {}", *codec_client_, -delta);
144 0 : }
145 38 : }
146 68 : }
147 :
148 148 : void MultiplexedActiveClientBase::onStreamDestroy() {
149 148 : parent().onStreamClosed(*this, false);
150 :
151 : // If we are destroying this stream because of a disconnect, do not check for drain here. We will
152 : // wait until the connection has been fully drained of streams and then check in the connection
153 : // event callback.
154 148 : if (!closed_with_active_rq_) {
155 108 : parent().checkForIdleAndCloseIdleConnsIfDraining();
156 108 : }
157 148 : }
158 :
159 108 : void MultiplexedActiveClientBase::onStreamReset(Http::StreamResetReason reason) {
160 108 : switch (reason) {
161 40 : case StreamResetReason::ConnectionTermination:
162 40 : case StreamResetReason::LocalConnectionFailure:
163 40 : case StreamResetReason::RemoteConnectionFailure:
164 40 : case StreamResetReason::ConnectionTimeout:
165 40 : parent_.host()->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
166 40 : closed_with_active_rq_ = true;
167 40 : break;
168 45 : case StreamResetReason::LocalReset:
169 68 : case StreamResetReason::ProtocolError:
170 68 : case StreamResetReason::OverloadManager:
171 68 : parent_.host()->cluster().trafficStats()->upstream_rq_tx_reset_.inc();
172 68 : break;
173 0 : case StreamResetReason::RemoteReset:
174 0 : parent_.host()->cluster().trafficStats()->upstream_rq_rx_reset_.inc();
175 0 : break;
176 0 : case StreamResetReason::LocalRefusedStreamReset:
177 0 : case StreamResetReason::RemoteRefusedStreamReset:
178 0 : case StreamResetReason::Overflow:
179 0 : case StreamResetReason::ConnectError:
180 0 : break;
181 108 : }
182 108 : }
183 :
184 210 : uint64_t MultiplexedActiveClientBase::maxStreamsPerConnection(uint64_t max_streams_config) {
185 210 : return (max_streams_config != 0) ? max_streams_config : DEFAULT_MAX_STREAMS;
186 210 : }
187 :
188 : MultiplexedActiveClientBase::MultiplexedActiveClientBase(
189 : HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
190 : uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
191 : OptRef<Upstream::Host::CreateConnectionData> data)
192 : : Envoy::Http::ActiveClient(
193 : parent, maxStreamsPerConnection(parent.host()->cluster().maxRequestsPerConnection()),
194 105 : effective_concurrent_streams, max_configured_concurrent_streams, data) {
195 105 : codec_client_->setCodecClientCallbacks(*this);
196 105 : codec_client_->setCodecConnectionCallbacks(*this);
197 105 : cx_total.inc();
198 105 : }
199 :
200 105 : bool MultiplexedActiveClientBase::closingWithIncompleteStream() const {
201 105 : return closed_with_active_rq_;
202 105 : }
203 :
204 148 : RequestEncoder& MultiplexedActiveClientBase::newStreamEncoder(ResponseDecoder& response_decoder) {
205 148 : return codec_client_->newStream(response_decoder);
206 148 : }
207 :
208 : } // namespace Http
209 : } // namespace Envoy
|