1
#pragma once
2

            
3
#include "envoy/common/conn_pool.h"
4
#include "envoy/event/dispatcher.h"
5
#include "envoy/network/connection.h"
6
#include "envoy/server/overload/overload_manager.h"
7
#include "envoy/stats/timespan.h"
8
#include "envoy/upstream/cluster_manager.h"
9

            
10
#include "source/common/common/debug_recursion_checker.h"
11
#include "source/common/common/dump_state_utils.h"
12
#include "source/common/common/linked_object.h"
13

            
14
#include "absl/strings/string_view.h"
15
#include "fmt/ostream.h"
16

            
17
namespace Envoy {
18
namespace ConnectionPool {
19

            
20
class ConnPoolImplBase;
21

            
22
// A placeholder struct for whatever data a given connection pool needs to
23
// successfully attach an upstream connection to a downstream connection.
24
struct AttachContext {
25
  // Add a virtual destructor to allow for the dynamic_cast ASSERT in typedContext.
26
79219
  virtual ~AttachContext() = default;
27
};
28

            
29
// ActiveClient provides a base class for connection pool clients that handles connection timings
30
// as well as managing the connection timeout.
31
class ActiveClient : public LinkedObject<ActiveClient>,
32
                     public Network::ConnectionCallbacks,
33
                     public Event::DeferredDeletable,
34
                     protected Logger::Loggable<Logger::Id::pool> {
35
public:
36
  ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
37
               uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit);
38
  ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
39
               uint32_t concurrent_stream_limit);
40
  ~ActiveClient() override;
41

            
42
31051
  virtual void releaseResources() { releaseResourcesBase(); }
43
  void releaseResourcesBase();
44

            
45
  // Network::ConnectionCallbacks
46
104529
  void onAboveWriteBufferHighWatermark() override {}
47
104529
  void onBelowWriteBufferLowWatermark() override {}
48

            
49
  // Called if the connection does not complete within the cluster's connectTimeout()
50
  void onConnectTimeout();
51

            
52
  // Called if the maximum connection duration is reached.
53
  void onConnectionDurationTimeout();
54

            
55
  // Returns the concurrent stream limit, accounting for if the total stream limit
56
  // is less than the concurrent stream limit.
57
16042
  virtual uint32_t effectiveConcurrentStreamLimit() const {
58
16042
    return std::min(remaining_streams_, concurrent_stream_limit_);
59
16042
  }
60

            
61
  // Returns the application protocol, or absl::nullopt for TCP.
62
  virtual absl::optional<Http::Protocol> protocol() const PURE;
63

            
64
268592
  virtual int64_t currentUnusedCapacity() const {
65
268592
    int64_t remaining_concurrent_streams =
66
268592
        static_cast<int64_t>(concurrent_stream_limit_) - numActiveStreams();
67

            
68
268592
    return std::min<int64_t>(remaining_streams_, remaining_concurrent_streams);
69
268592
  }
70

            
71
  // Initialize upstream read filters. Called when connected.
72
  virtual void initializeReadFilters() PURE;
73

            
74
  // Closes the underlying connection.
75
  virtual void
76
  close(Envoy::Network::ConnectionCloseType type = Envoy::Network::ConnectionCloseType::NoFlush,
77
        absl::string_view details = "") PURE;
78
  // Returns the ID of the underlying connection.
79
  virtual uint64_t id() const PURE;
80
  // Returns true if this closed with an incomplete stream, for stats tracking/ purposes.
81
  virtual bool closingWithIncompleteStream() const PURE;
82
  // Returns the number of active streams on this connection.
83
  virtual uint32_t numActiveStreams() const PURE;
84

            
85
  // Return true if it is ready to dispatch the next stream.
86
29944
  virtual bool readyForStream() const {
87
29944
    ASSERT(!supportsEarlyData());
88
29944
    return state_ == State::Ready;
89
29944
  }
90

            
91
  enum class State {
92
    Connecting,        // Connection is not yet established.
93
    ReadyForEarlyData, // Any additional early data stream can be immediately dispatched to this
94
                       // connection.
95
    Ready,             // Additional streams may be immediately dispatched to this connection.
96
    Busy,              // Connection is at its concurrent stream limit.
97
    Draining,          // No more streams can be dispatched to this connection, and it will be
98
                       // closed when all streams complete.
99
    Closed             // Connection is closed and object is queued for destruction.
100
  };
101

            
102
342181
  State state() const { return state_; }
103

            
104
114441
  void setState(State state) {
105
114441
    if (state == State::ReadyForEarlyData && !supportsEarlyData()) {
106
1
      IS_ENVOY_BUG("Unable to set state to ReadyForEarlyData in a client which does not support "
107
1
                   "early data.");
108
1
      return;
109
1
    }
110
    // If the client is transitioning to draining, update the remaining
111
    // streams and pool and cluster capacity.
112
114440
    if (state == State::Draining) {
113
207
      drain();
114
207
    }
115
114440
    state_ = state;
116
114440
  }
117

            
118
  // Sets the remaining streams to 0, and updates pool and cluster capacity.
119
  virtual void drain();
120

            
121
234611
  virtual bool hasHandshakeCompleted() const {
122
234611
    ASSERT(!supportsEarlyData());
123
234611
    return state_ != State::Connecting;
124
234611
  }
125

            
126
  ConnPoolImplBase& parent_;
127
  // The count of remaining streams allowed for this connection.
128
  // This will start out as the total number of streams per connection if capped
129
  // by configuration, or it will be set to std::numeric_limits<uint32_t>::max() to be
130
  // (functionally) unlimited.
131
  // TODO: this could be moved to an optional to make it actually unlimited.
132
  uint32_t remaining_streams_;
133
  // The will start out as the upper limit of max concurrent streams for this connection
134
  // if capped by configuration, or it will be set to std::numeric_limits<uint32_t>::max()
135
  // to be (functionally) unlimited.
136
  uint32_t configured_stream_limit_;
137
  // The max concurrent stream for this connection, it's initialized by `configured_stream_limit_`
138
  // and can be adjusted by SETTINGS frame, but the max value of it can't exceed
139
  // `configured_stream_limit_`.
140
  uint32_t concurrent_stream_limit_;
141
  Upstream::HostDescriptionConstSharedPtr real_host_description_;
142
  Stats::TimespanPtr conn_connect_ms_;
143
  Stats::TimespanPtr conn_length_;
144
  Event::TimerPtr connect_timer_;
145
  Event::TimerPtr connection_duration_timer_;
146
  bool resources_released_{false};
147
  bool timed_out_{false};
148
  // TODO(danzh) remove this once http codec exposes the handshake state for h3.
149
  bool has_handshake_completed_{false};
150

            
151
protected:
152
  // HTTP/3 subclass should override this.
153
  virtual bool supportsEarlyData() const { return false; }
154

            
155
private:
156
  State state_{State::Connecting};
157
};
158

            
159
// PendingStream is the base class tracking streams for which a connection has been created but not
160
// yet established.
161
class PendingStream : public LinkedObject<PendingStream>, public ConnectionPool::Cancellable {
162
public:
163
  PendingStream(ConnPoolImplBase& parent, bool can_send_early_data);
164
  ~PendingStream() override;
165

            
166
  // ConnectionPool::Cancellable
167
  void cancel(Envoy::ConnectionPool::CancelPolicy policy) override;
168

            
169
  // The context here returns a pointer to whatever context is provided with newStream(),
170
  // which will be passed back to the parent in onPoolReady or onPoolFailure.
171
  virtual AttachContext& context() PURE;
172

            
173
  ConnPoolImplBase& parent_;
174
  // The request can be sent as early data.
175
  bool can_send_early_data_;
176
};
177

            
178
using PendingStreamPtr = std::unique_ptr<PendingStream>;
179

            
180
using ActiveClientPtr = std::unique_ptr<ActiveClient>;
181

            
182
// Base class that handles stream queueing logic shared between connection pool implementations.
183
class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
184
public:
185
  ConnPoolImplBase(Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
186
                   Event::Dispatcher& dispatcher,
187
                   const Network::ConnectionSocket::OptionsSharedPtr& options,
188
                   const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
189
                   Upstream::ClusterConnectivityState& state,
190
                   Server::OverloadManager& overload_manager);
191
  virtual ~ConnPoolImplBase();
192

            
193
  void deleteIsPendingImpl();
194
  // By default, the connection pool will track connected and connecting stream
195
  // capacity as streams are created and destroyed. QUIC does custom stream
196
  // accounting so will override this to false.
197
94674
  virtual bool trackStreamCapacity() { return true; }
198

            
199
  // A helper function to get the specific context type from the base class context.
200
110508
  template <class T> T& typedContext(AttachContext& context) {
201
110508
    ASSERT(dynamic_cast<T*>(&context) != nullptr);
202
110508
    return *static_cast<T*>(&context);
203
110508
  }
204

            
205
  // Determines if prefetching is warranted based on the number of streams in
206
  // use, pending streams, anticipated and/or currently unused capacity, and
207
  // preconnect configuration.
208
  //
209
  // If anticipate_incoming_stream is true this assumes a call to newStream is
210
  // pending, which is true for global preconnect.
211
  static bool shouldConnect(size_t pending_streams, size_t active_streams,
212
                            int64_t connecting_and_connected_capacity, float preconnect_ratio,
213
                            bool anticipate_incoming_stream = false);
214

            
215
  // Envoy::ConnectionPool::Instance implementation helpers
216
  void addIdleCallbackImpl(Instance::IdleCb cb);
217
  // Returns true if the pool is idle.
218
  bool isIdleImpl() const;
219
  void drainConnectionsImpl(DrainBehavior drain_behavior);
220
599729
  const Upstream::HostConstSharedPtr& host() const { return host_; }
221
  // Called if this pool is likely to be picked soon, to determine if it's worth preconnecting.
222
  bool maybePreconnectImpl(float global_preconnect_ratio);
223

            
224
  // Closes and destroys all connections. This must be called in the destructor of
225
  // derived classes because the derived ActiveClient will downcast parent_ to a more
226
  // specific type of ConnPoolImplBase, but if the more specific part is already destructed
227
  // (due to bottom-up destructor ordering in c++) that access will be invalid.
228
  void destructAllConnections();
229

            
230
  // Returns a new instance of ActiveClient.
231
  virtual ActiveClientPtr instantiateActiveClient() PURE;
232

            
233
  // Gets a pointer to the list that currently owns this client.
234
  std::list<ActiveClientPtr>& owningList(ActiveClient::State state);
235

            
236
  // Removes the PendingStream from the list of streams. Called when the PendingStream is
237
  // cancelled, e.g. when the stream is reset before a connection has been established.
238
  void onPendingStreamCancel(PendingStream& stream, Envoy::ConnectionPool::CancelPolicy policy);
239

            
240
  // Fails all pending streams, calling onPoolFailure on the associated callbacks.
241
  void purgePendingStreams(const Upstream::HostDescriptionConstSharedPtr& host_description,
242
                           absl::string_view failure_reason,
243
                           ConnectionPool::PoolFailureReason pool_failure_reason);
244

            
245
  // Closes any idle connections as this pool is drained.
246
  void closeIdleConnectionsForDrainingPool();
247

            
248
  // Changes the state_ of an ActiveClient and moves to the appropriate list.
249
  void transitionActiveClientState(ActiveClient& client, ActiveClient::State new_state);
250

            
251
  void onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
252
                         Network::ConnectionEvent event);
253

            
254
  // Check if the pool has gone idle and invoke idle notification callbacks.
255
  void checkForIdleAndNotify();
256

            
257
  // See if the pool has gone idle. If we're draining, this will also close idle connections.
258
  void checkForIdleAndCloseIdleConnsIfDraining();
259

            
260
  void scheduleOnUpstreamReady();
261
  ConnectionPool::Cancellable* newStreamImpl(AttachContext& context, bool can_send_early_data);
262

            
263
  virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context,
264
                                                        bool can_send_early_data) PURE;
265

            
266
  virtual void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
267
                                    AttachContext& context);
268

            
269
  virtual void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
270
                             absl::string_view failure_reason,
271
                             ConnectionPool::PoolFailureReason pool_failure_reason,
272
                             AttachContext& context) PURE;
273
  virtual void onPoolReady(ActiveClient& client, AttachContext& context) PURE;
274
  // Called by derived classes any time a stream is completed or destroyed for any reason.
275
  void onStreamClosed(Envoy::ConnectionPool::ActiveClient& client, bool delay_attaching_stream);
276

            
277
175958
  Event::Dispatcher& dispatcher() { return dispatcher_; }
278
125896
  Upstream::ResourcePriority priority() const { return priority_; }
279
31023
  const Network::ConnectionSocket::OptionsSharedPtr& socketOptions() { return socket_options_; }
280
59818
  const Network::TransportSocketOptionsConstSharedPtr& transportSocketOptions() {
281
59818
    return transport_socket_options_;
282
59818
  }
283
310
  bool hasPendingStreams() const { return !pending_streams_.empty(); }
284

            
285
80049
  void decrClusterStreamCapacity(uint32_t delta) {
286
80049
    cluster_connectivity_state_.decrConnectingAndConnectedStreamCapacity(delta);
287
80049
    connecting_and_connected_stream_capacity_ -= delta;
288
80049
  }
289
75389
  void incrClusterStreamCapacity(uint32_t delta) {
290
75389
    cluster_connectivity_state_.incrConnectingAndConnectedStreamCapacity(delta);
291
75389
    connecting_and_connected_stream_capacity_ += delta;
292
75389
  }
293
95
  void dumpState(std::ostream& os, int indent_level = 0) const {
294
95
    const char* spaces = spacesForLevel(indent_level);
295
95
    os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
296
95
       << DUMP_MEMBER(busy_clients_.size()) << DUMP_MEMBER(connecting_clients_.size())
297
95
       << DUMP_MEMBER(connecting_stream_capacity_)
298
95
       << DUMP_MEMBER(connecting_and_connected_stream_capacity_) << DUMP_MEMBER(num_active_streams_)
299
95
       << DUMP_MEMBER(pending_streams_.size())
300
95
       << " per upstream preconnect ratio: " << perUpstreamPreconnectRatio();
301
95
  }
302

            
303
94
  friend std::ostream& operator<<(std::ostream& os, const ConnPoolImplBase& s) {
304
94
    s.dumpState(os);
305
94
    return os;
306
94
  }
307

            
308
  // Helper for use as the 2nd argument to ASSERT.
309
  std::string dumpState() const;
310

            
311
  void decrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);
312
  void incrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);
313

            
314
  // Called when an upstream is ready to serve pending streams.
315
  void onUpstreamReady();
316

            
317
  // Called when an upstream is ready to serve early data streams.
318
  void onUpstreamReadyForEarlyData(ActiveClient& client);
319

            
320
protected:
321
29730
  virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {}
322
298
  virtual void onConnectFailed(Envoy::ConnectionPool::ActiveClient&) {}
323

            
324
  enum class ConnectionResult {
325
    FailedToCreateConnection,
326
    CreatedNewConnection,
327
    ShouldNotConnect,
328
    NoConnectionRateLimited,
329
    CreatedButRateLimited,
330
    LoadShed,
331
  };
332
  // Creates up to 3 connections, based on the preconnect ratio.
333
  // Returns the ConnectionResult of the last attempt.
334
  ConnectionResult tryCreateNewConnections();
335

            
336
  // Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or
337
  // to avoid starving this pool.
338
  // Demand is determined either by perUpstreamPreconnectRatio() or global_preconnect_ratio
339
  // if this is called by maybePreconnect()
340
  ConnectionResult tryCreateNewConnection(float global_preconnect_ratio = 0);
341

            
342
  // A helper function which determines if a canceled pending connection should
343
  // be closed as excess or not.
344
  bool connectingConnectionIsExcess(const ActiveClient& client) const;
345

            
346
  // A helper function which determines if a new incoming stream should trigger
347
  // connection preconnect.
348
  bool shouldCreateNewConnection(float global_preconnect_ratio) const;
349

            
350
  float perUpstreamPreconnectRatio() const;
351

            
352
  ConnectionPool::Cancellable*
353
31837
  addPendingStream(Envoy::ConnectionPool::PendingStreamPtr&& pending_stream) {
354
31837
    LinkedList::moveIntoList(std::move(pending_stream), pending_streams_);
355
31837
    cluster_connectivity_state_.incrPendingStreams(1);
356
31837
    return pending_streams_.front().get();
357
31837
  }
358

            
359
9
  bool hasActiveStreams() const { return num_active_streams_ > 0; }
360

            
361
  const Upstream::HostConstSharedPtr host_;
362
  const Upstream::ResourcePriority priority_;
363

            
364
  Event::Dispatcher& dispatcher_;
365
  const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
366
  const Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
367

            
368
  // True if the max requests circuit breakers apply.
369
  // This will be false for the TCP pool, true otherwise.
370
46770
  virtual bool enforceMaxRequests() const { return true; }
371

            
372
  std::list<Instance::IdleCb> idle_callbacks_;
373

            
374
  // When calling purgePendingStreams, this list will be used to hold the streams we are about
375
  // to purge. We need this if one cancelled streams cancels a different pending stream
376
  std::list<PendingStreamPtr> pending_streams_to_purge_;
377

            
378
  // Clients that are ready to handle additional streams.
379
  // All entries are in state Ready.
380
  std::list<ActiveClientPtr> ready_clients_;
381

            
382
  // Clients that are not ready to handle additional streams due to being Busy or Draining.
383
  std::list<ActiveClientPtr> busy_clients_;
384

            
385
  // Clients that are not ready to handle additional streams because they are Connecting.
386
  std::list<ActiveClientPtr> connecting_clients_;
387

            
388
  // Clients that are ready to handle additional early data streams because they have 0-RTT
389
  // credentials.
390
  std::list<ActiveClientPtr> early_data_clients_;
391

            
392
  // The number of streams that can be immediately dispatched
393
  // if all Connecting connections become connected.
394
  uint32_t connecting_stream_capacity_{0};
395

            
396
private:
397
  // Drain all the clients in the given list.
398
  // Prerequisite: the given clients shouldn't be idle.
399
  void drainClients(std::list<ActiveClientPtr>& clients);
400

            
401
  void assertCapacityCountsAreCorrect();
402

            
403
  Upstream::ClusterConnectivityState& cluster_connectivity_state_;
404

            
405
  std::list<PendingStreamPtr> pending_streams_;
406

            
407
  // The number of streams that can be immediately dispatched from the current
408
  // `ready_clients_` plus `connecting_stream_capacity_`.
409
  int64_t connecting_and_connected_stream_capacity_{0};
410

            
411
  // The number of streams currently attached to clients.
412
  uint32_t num_active_streams_{0};
413

            
414
  // Whether the connection pool is currently in the process of closing
415
  // all connections so that it can be gracefully deleted.
416
  bool is_draining_for_deletion_{false};
417

            
418
  // True iff this object is in the deferred delete list.
419
  bool deferred_deleting_{false};
420

            
421
  Event::SchedulableCallbackPtr upstream_ready_cb_;
422
  Common::DebugRecursionChecker recursion_checker_;
423
  Server::LoadShedPoint* create_new_connection_load_shed_{nullptr};
424
};
425

            
426
} // namespace ConnectionPool
427
} // namespace Envoy
428

            
429
// NOLINT(namespace-envoy)
430
namespace fmt {
431
template <> struct formatter<Envoy::ConnectionPool::ConnPoolImplBase> : ostream_formatter {};
432
} // namespace fmt