Line data Source code
1 : #pragma once
2 :
3 : #include "envoy/event/dispatcher.h"
4 : #include "envoy/http/conn_pool.h"
5 : #include "envoy/network/connection.h"
6 : #include "envoy/stats/timespan.h"
7 :
8 : #include "source/common/common/linked_object.h"
9 : #include "source/common/conn_pool/conn_pool_base.h"
10 : #include "source/common/http/codec_client.h"
11 : #include "source/common/http/http_server_properties_cache_impl.h"
12 : #include "source/common/http/utility.h"
13 :
14 : #include "absl/strings/string_view.h"
15 :
16 : namespace Envoy {
17 : namespace Http {
18 :
19 : struct HttpAttachContext : public Envoy::ConnectionPool::AttachContext {
20 : HttpAttachContext(Http::ResponseDecoder* d, Http::ConnectionPool::Callbacks* c)
21 424 : : decoder_(d), callbacks_(c) {}
22 : Http::ResponseDecoder* decoder_;
23 : Http::ConnectionPool::Callbacks* callbacks_;
24 : };
25 :
26 : // An implementation of Envoy::ConnectionPool::PendingStream for HTTP/1.1 and HTTP/2
27 : class HttpPendingStream : public Envoy::ConnectionPool::PendingStream {
28 : public:
29 : // OnPoolSuccess for HTTP requires both the decoder and callbacks. OnPoolFailure
30 : // requires only the callbacks, but passes both for consistency.
31 : HttpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, Http::ResponseDecoder& decoder,
32 : Http::ConnectionPool::Callbacks& callbacks, bool can_send_early_data)
33 : : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data),
34 173 : context_(&decoder, &callbacks) {}
35 :
36 130 : Envoy::ConnectionPool::AttachContext& context() override { return context_; }
37 : HttpAttachContext context_;
38 : };
39 :
40 : class ActiveClient;
41 :
42 : /* An implementation of Envoy::ConnectionPool::ConnPoolImplBase for shared code
43 : * between HTTP/1.1 and HTTP/2
44 : *
45 : * NOTE: The connection pool does NOT do DNS resolution. It assumes it is being given a numeric IP
46 : * address. Higher layer code should handle resolving DNS on error and creating a new pool
47 : * bound to a different IP address.
48 : */
49 : class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
50 : public Http::ConnectionPool::Instance {
51 : public:
52 : HttpConnPoolImplBase(
53 : Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
54 : Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
55 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
56 : Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
57 : std::vector<Http::Protocol> protocols);
58 : ~HttpConnPoolImplBase() override;
59 :
60 : // Event::DeferredDeletable
61 173 : void deleteIsPending() override { deleteIsPendingImpl(); }
62 :
63 : // ConnectionPool::Instance
64 173 : void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
65 0 : bool isIdle() const override { return isIdleImpl(); }
66 0 : void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
67 0 : drainConnectionsImpl(drain_behavior);
68 0 : }
69 1058 : Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }
70 : ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder,
71 : Http::ConnectionPool::Callbacks& callbacks,
72 : const Instance::StreamOptions& options) override;
73 0 : bool maybePreconnect(float ratio) override { return maybePreconnectImpl(ratio); }
74 : bool hasActiveConnections() const override;
75 :
76 : // Creates a new PendingStream and enqueues it into the queue.
77 : ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context,
78 : bool can_send_early_data) override;
79 : void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
80 : absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason,
81 6 : Envoy::ConnectionPool::AttachContext& context) override {
82 6 : auto* callbacks = typedContext<HttpAttachContext>(context).callbacks_;
83 6 : callbacks->onPoolFailure(reason, failure_reason, host_description);
84 6 : }
85 : void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
86 : Envoy::ConnectionPool::AttachContext& context) override;
87 :
88 : virtual CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;
89 173 : Random::RandomGenerator& randomGenerator() { return random_generator_; }
90 :
91 143 : virtual absl::optional<HttpServerPropertiesCache::Origin>& origin() { return origin_; }
92 0 : virtual Http::HttpServerPropertiesCacheSharedPtr cache() { return nullptr; }
93 :
94 : protected:
95 : friend class ActiveClient;
96 :
97 173 : void setOrigin(absl::optional<HttpServerPropertiesCache::Origin> origin) { origin_ = origin; }
98 :
99 : Random::RandomGenerator& random_generator_;
100 :
101 : private:
102 : absl::optional<HttpServerPropertiesCache::Origin> origin_;
103 : };
104 :
105 : // An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2
106 : class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
107 : public:
108 : ActiveClient(HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
109 : uint64_t effective_concurrent_stream_limit,
110 : uint64_t configured_concurrent_stream_limit,
111 : OptRef<Upstream::Host::CreateConnectionData> opt_data)
112 : : Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
113 : effective_concurrent_stream_limit,
114 173 : configured_concurrent_stream_limit) {
115 173 : if (opt_data.has_value()) {
116 0 : initialize(opt_data.value(), parent);
117 0 : return;
118 0 : }
119 : // The static cast makes sure we call the base class host() and not
120 : // HttpConnPoolImplBase::host which is of a different type.
121 173 : Upstream::Host::CreateConnectionData data =
122 173 : static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
123 173 : parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
124 173 : initialize(data, parent);
125 173 : }
126 :
127 173 : void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
128 173 : real_host_description_ = data.host_description_;
129 173 : codec_client_ = parent.createCodecClient(data);
130 173 : codec_client_->addConnectionCallbacks(*this);
131 173 : Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
132 173 : codec_client_->setConnectionStats(
133 173 : {traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
134 173 : traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
135 173 : &traffic_stats.bind_errors_, nullptr});
136 173 : }
137 :
138 165 : void initializeReadFilters() override { codec_client_->initializeReadFilters(); }
139 0 : absl::optional<Http::Protocol> protocol() const override { return codec_client_->protocol(); }
140 0 : void close() override { codec_client_->close(); }
141 : virtual Http::RequestEncoder& newStreamEncoder(Http::ResponseDecoder& response_decoder) PURE;
142 338 : void onEvent(Network::ConnectionEvent event) override {
143 338 : parent_.onConnectionEvent(*this, codec_client_->connectionFailureReason(), event);
144 338 : }
145 962 : uint32_t numActiveStreams() const override { return codec_client_->numActiveRequests(); }
146 0 : uint64_t id() const override { return codec_client_->id(); }
147 365 : HttpConnPoolImplBase& parent() { return *static_cast<HttpConnPoolImplBase*>(&parent_); }
148 :
149 : Http::CodecClientPtr codec_client_;
150 : };
151 :
152 : /* An implementation of Envoy::ConnectionPool::ConnPoolImplBase for HTTP/1 and HTTP/2
153 : */
154 : class FixedHttpConnPoolImpl : public HttpConnPoolImplBase {
155 : public:
156 : using CreateClientFn =
157 : std::function<Envoy::ConnectionPool::ActiveClientPtr(HttpConnPoolImplBase* pool)>;
158 : using CreateCodecFn = std::function<CodecClientPtr(Upstream::Host::CreateConnectionData& data,
159 : HttpConnPoolImplBase* pool)>;
160 :
161 : FixedHttpConnPoolImpl(
162 : Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
163 : Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
164 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
165 : Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
166 : CreateClientFn client_fn, CreateCodecFn codec_fn, std::vector<Http::Protocol> protocols,
167 : absl::optional<Http::HttpServerPropertiesCache::Origin> origin = absl::nullopt,
168 : Http::HttpServerPropertiesCacheSharedPtr cache = nullptr)
169 : : HttpConnPoolImplBase(host, priority, dispatcher, options, transport_socket_options,
170 : random_generator, state, protocols),
171 173 : codec_fn_(codec_fn), client_fn_(client_fn), protocol_(protocols[0]), cache_(cache) {
172 173 : setOrigin(origin);
173 173 : ASSERT(protocols.size() == 1);
174 173 : }
175 :
176 173 : CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override {
177 173 : return codec_fn_(data, this);
178 173 : }
179 :
180 173 : Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
181 173 : return client_fn_(this);
182 173 : }
183 :
184 0 : absl::string_view protocolDescription() const override {
185 0 : return Utility::getProtocolString(protocol_);
186 0 : }
187 :
188 143 : Http::HttpServerPropertiesCacheSharedPtr cache() override { return cache_; }
189 :
190 : protected:
191 : const CreateCodecFn codec_fn_;
192 : const CreateClientFn client_fn_;
193 : const Http::Protocol protocol_;
194 :
195 : Http::HttpServerPropertiesCacheSharedPtr cache_;
196 : };
197 :
198 : /**
199 : * Active client base for HTTP/2 and HTTP/3
200 : */
201 : class MultiplexedActiveClientBase : public CodecClientCallbacks,
202 : public Http::ConnectionCallbacks,
203 : public Envoy::Http::ActiveClient {
204 : public:
205 : MultiplexedActiveClientBase(HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
206 : uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
207 : OptRef<Upstream::Host::CreateConnectionData> data);
208 105 : ~MultiplexedActiveClientBase() override = default;
209 : // Caps max streams per connection below 2^31 to prevent overflow.
210 : static uint64_t maxStreamsPerConnection(uint64_t max_streams_config);
211 :
212 : // ConnPoolImpl::ActiveClient
213 : bool closingWithIncompleteStream() const override;
214 : RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override;
215 :
216 : // CodecClientCallbacks
217 : void onStreamDestroy() override;
218 : void onStreamReset(Http::StreamResetReason reason) override;
219 :
220 : // Http::ConnectionCallbacks
221 : void onGoAway(Http::GoAwayErrorCode error_code) override;
222 : void onSettings(ReceivedSettings& settings) override;
223 :
224 : private:
225 : bool closed_with_active_rq_{};
226 : };
227 :
228 : } // namespace Http
229 :
230 : } // namespace Envoy
|