1
#pragma once
2

            
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/http/conn_pool.h"
5
#include "envoy/network/connection.h"
6
#include "envoy/stats/timespan.h"
7

            
8
#include "source/common/common/linked_object.h"
9
#include "source/common/conn_pool/conn_pool_base.h"
10
#include "source/common/http/codec_client.h"
11
#include "source/common/http/http_server_properties_cache_impl.h"
12
#include "source/common/http/response_decoder_impl_base.h"
13
#include "source/common/http/utility.h"
14

            
15
#include "absl/strings/string_view.h"
16

            
17
namespace Envoy {
18
namespace Http {
19

            
20
struct HttpAttachContext : public Envoy::ConnectionPool::AttachContext {
21
  HttpAttachContext(Http::ResponseDecoder* d, Http::ConnectionPool::Callbacks* c)
22
76978
      : decoder_(d), callbacks_(c) {
23
76978
    if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_response_decoder_handle")) {
24
76970
      decoder_handle_ = d->createResponseDecoderHandle();
25
76970
    }
26
76978
  }
27

            
28
  Http::ResponseDecoder* decoder_;
29
  Http::ConnectionPool::Callbacks* callbacks_;
30
  ResponseDecoderHandlePtr decoder_handle_;
31
};
32

            
33
// An implementation of Envoy::ConnectionPool::PendingStream for HTTP/1.1 and HTTP/2
34
class HttpPendingStream : public Envoy::ConnectionPool::PendingStream {
35
public:
36
  // OnPoolSuccess for HTTP requires both the decoder and callbacks. OnPoolFailure
37
  // requires only the callbacks, but passes both for consistency.
38
  HttpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, Http::ResponseDecoder& decoder,
39
                    Http::ConnectionPool::Callbacks& callbacks, bool can_send_early_data)
40
29598
      : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data),
41
29598
        context_(&decoder, &callbacks) {}
42

            
43
29247
  Envoy::ConnectionPool::AttachContext& context() override { return context_; }
44
  HttpAttachContext context_;
45
};
46

            
47
class ActiveClient;
48

            
49
/* An implementation of Envoy::ConnectionPool::ConnPoolImplBase for shared code
50
 * between HTTP/1.1 and HTTP/2
51
 *
52
 * NOTE: The connection pool does NOT do DNS resolution. It assumes it is being given a numeric IP
53
 *       address. Higher layer code should handle resolving DNS on error and creating a new pool
54
 *       bound to a different IP address.
55
 */
56
class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase,
57
                             public Http::ConnectionPool::Instance {
58
public:
59
  HttpConnPoolImplBase(
60
      Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
61
      Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
62
      const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
63
      Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
64
      std::vector<Http::Protocol> protocols, Server::OverloadManager& overload_manager);
65
  ~HttpConnPoolImplBase() override;
66

            
67
  // Event::DeferredDeletable
68
11484
  void deleteIsPending() override { deleteIsPendingImpl(); }
69

            
70
  // ConnectionPool::Instance
71
11723
  void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
72
75
  bool isIdle() const override { return isIdleImpl(); }
73
200
  void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
74
200
    drainConnectionsImpl(drain_behavior);
75
200
  }
76
168446
  Upstream::HostDescriptionConstSharedPtr host() const override { return host_; }
77
  ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder,
78
                                         Http::ConnectionPool::Callbacks& callbacks,
79
                                         const Instance::StreamOptions& options) override;
80
60
  bool maybePreconnect(float ratio) override { return maybePreconnectImpl(ratio); }
81
  bool hasActiveConnections() const override;
82

            
83
  // Creates a new PendingStream and enqueues it into the queue.
84
  ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context,
85
                                                bool can_send_early_data) override;
86
  void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
87
                     absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason,
88
306
                     Envoy::ConnectionPool::AttachContext& context) override {
89
306
    auto* callbacks = typedContext<HttpAttachContext>(context).callbacks_;
90
306
    callbacks->onPoolFailure(reason, failure_reason, host_description);
91
306
  }
92
  void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
93
                   Envoy::ConnectionPool::AttachContext& context) override;
94

            
95
  virtual CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;
96
28735
  Random::RandomGenerator& randomGenerator() { return random_generator_; }
97

            
98
11597
  virtual absl::optional<HttpServerPropertiesCache::Origin>& origin() { return origin_; }
99
65
  virtual Http::HttpServerPropertiesCacheSharedPtr cache() { return nullptr; }
100

            
101
protected:
102
  friend class ActiveClient;
103

            
104
11801
  void setOrigin(absl::optional<HttpServerPropertiesCache::Origin> origin) { origin_ = origin; }
105

            
106
  Random::RandomGenerator& random_generator_;
107

            
108
private:
109
  absl::optional<HttpServerPropertiesCache::Origin> origin_;
110
};
111

            
112
// An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2
113
class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
114
public:
115
  ActiveClient(HttpConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
116
               uint32_t effective_concurrent_stream_limit,
117
               uint32_t configured_concurrent_stream_limit,
118
               OptRef<Upstream::Host::CreateConnectionData> opt_data)
119
28915
      : Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
120
28915
                                            effective_concurrent_stream_limit,
121
28915
                                            configured_concurrent_stream_limit) {
122
28915
    if (opt_data.has_value()) {
123
1002
      initialize(opt_data.value(), parent);
124
1002
      return;
125
1002
    }
126
    // The static cast makes sure we call the base class host() and not
127
    // HttpConnPoolImplBase::host which is of a different type.
128
27913
    Upstream::Host::CreateConnectionData data =
129
27913
        static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
130
27913
            parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
131
27913
    initialize(data, parent);
132
27913
  }
133

            
134
28915
  void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
135
28915
    real_host_description_ = data.host_description_;
136
28915
    codec_client_ = parent.createCodecClient(data);
137
28915
    codec_client_->addConnectionCallbacks(*this);
138
28915
    Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
139
28915
    codec_client_->setConnectionStats(
140
28915
        {traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
141
28915
         traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
142
28915
         &traffic_stats.bind_errors_, nullptr});
143
28915
  }
144

            
145
28606
  void initializeReadFilters() override { codec_client_->initializeReadFilters(); }
146
59
  absl::optional<Http::Protocol> protocol() const override { return codec_client_->protocol(); }
147
16643
  void close(Network::ConnectionCloseType type, absl::string_view details) override {
148
16643
    codec_client_->close(type, details);
149
16643
  }
150
  virtual Http::RequestEncoder& newStreamEncoder(Http::ResponseDecoder& response_decoder) PURE;
151
  virtual Http::RequestEncoder&
152
  newStreamEncoder(Http::ResponseDecoderHandlePtr response_decoder_handle) PURE;
153
57565
  void onEvent(Network::ConnectionEvent event) override {
154
    // Record request metrics only for successfully connected connections that handled requests
155
57565
    if ((event == Network::ConnectionEvent::LocalClose ||
156
57565
         event == Network::ConnectionEvent::RemoteClose) &&
157
57565
        hasHandshakeCompleted()) {
158
28621
      parent_.host()->cluster().trafficStats()->upstream_rq_per_cx_.recordValue(request_count_);
159
28621
    }
160
57565
    parent_.onConnectionEvent(*this, codec_client_->connectionFailureReason(), event);
161
57565
  }
162
112126
  uint32_t numActiveStreams() const override { return codec_client_->numActiveRequests(); }
163
341
  uint64_t id() const override { return codec_client_->id(); }
164
76258
  HttpConnPoolImplBase& parent() { return *static_cast<HttpConnPoolImplBase*>(&parent_); }
165

            
166
  Http::CodecClientPtr codec_client_;
167
  // Request tracking for HTTP protocols
168
  uint32_t request_count_{0};
169
};
170

            
171
/* An implementation of Envoy::ConnectionPool::ConnPoolImplBase for HTTP/1 and HTTP/2
172
 */
173
class FixedHttpConnPoolImpl : public HttpConnPoolImplBase {
174
public:
175
  using CreateClientFn =
176
      std::function<Envoy::ConnectionPool::ActiveClientPtr(HttpConnPoolImplBase* pool)>;
177
  using CreateCodecFn = std::function<CodecClientPtr(Upstream::Host::CreateConnectionData& data,
178
                                                     HttpConnPoolImplBase* pool)>;
179

            
180
  FixedHttpConnPoolImpl(
181
      Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
182
      Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
183
      const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
184
      Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
185
      CreateClientFn client_fn, CreateCodecFn codec_fn, std::vector<Http::Protocol> protocols,
186
      Server::OverloadManager& overload_manager,
187
      absl::optional<Http::HttpServerPropertiesCache::Origin> origin = absl::nullopt,
188
      Http::HttpServerPropertiesCacheSharedPtr cache = nullptr)
189
11801
      : HttpConnPoolImplBase(host, priority, dispatcher, options, transport_socket_options,
190
11801
                             random_generator, state, protocols, overload_manager),
191
11801
        codec_fn_(codec_fn), client_fn_(client_fn), protocol_(protocols[0]), cache_(cache) {
192
11801
    setOrigin(origin);
193
11801
    ASSERT(protocols.size() == 1);
194
11801
  }
195

            
196
28735
  CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override {
197
28735
    return codec_fn_(data, this);
198
28735
  }
199

            
200
28860
  Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
201
28860
    return client_fn_(this);
202
28860
  }
203

            
204
2
  absl::string_view protocolDescription() const override {
205
2
    return Utility::getProtocolString(protocol_);
206
2
  }
207

            
208
11611
  Http::HttpServerPropertiesCacheSharedPtr cache() override { return cache_; }
209

            
210
protected:
211
  const CreateCodecFn codec_fn_;
212
  const CreateClientFn client_fn_;
213
  const Http::Protocol protocol_;
214

            
215
  Http::HttpServerPropertiesCacheSharedPtr cache_;
216
};
217

            
218
/**
219
 * Active client base for HTTP/2 and HTTP/3
220
 */
221
class MultiplexedActiveClientBase : public CodecClientCallbacks,
222
                                    public Http::ConnectionCallbacks,
223
                                    public Envoy::Http::ActiveClient {
224
public:
225
  MultiplexedActiveClientBase(HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
226
                              uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
227
                              OptRef<Upstream::Host::CreateConnectionData> data);
228
7009
  ~MultiplexedActiveClientBase() override = default;
229
  // Caps max streams per connection below 2^31 to prevent overflow.
230
  static uint32_t maxStreamsPerConnection(uint32_t max_streams_config);
231

            
232
  // ConnPoolImpl::ActiveClient
233
  bool closingWithIncompleteStream() const override;
234
  RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override;
235
  RequestEncoder& newStreamEncoder(ResponseDecoderHandlePtr response_decoder_handle) override;
236

            
237
  // CodecClientCallbacks
238
  void onStreamDestroy() override;
239
  void onStreamReset(Http::StreamResetReason reason) override;
240

            
241
  // Http::ConnectionCallbacks
242
  void onGoAway(Http::GoAwayErrorCode error_code) override;
243
  void onSettings(ReceivedSettings& settings) override;
244

            
245
private:
246
  bool closed_with_active_rq_{};
247
};
248

            
249
} // namespace Http
250

            
251
} // namespace Envoy