1
#pragma once
2

            
3
#include <atomic>
4
#include <cstdint>
5
#include <list>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/common/scope_tracker.h"
10
#include "envoy/network/transport_socket.h"
11

            
12
#include "source/common/buffer/watermark_buffer.h"
13
#include "source/common/event/libevent.h"
14
#include "source/common/network/connection_impl_base.h"
15
#include "source/common/stream_info/stream_info_impl.h"
16

            
17
#include "absl/types/optional.h"
18

            
19
namespace Envoy {
20
class RandomPauseFilter;
21
class TestPauseFilter;
22

            
23
namespace Network {
24

            
25
class MultiConnectionBaseImpl;
26

            
27
/**
28
 * Utility functions for the connection implementation.
29
 */
30
class ConnectionImplUtility {
31
public:
32
  /**
33
   * Update the buffer stats for a connection.
34
   * @param delta supplies the data read/written.
35
   * @param new_total supplies the final total buffer size.
36
   * @param previous_total supplies the previous final total buffer size. previous_total will be
37
   *        updated to new_total when the call is complete.
38
   * @param stat_total supplies the counter to increment with the delta.
39
   * @param stat_current supplies the gauge that should be updated with the delta of previous_total
40
   *        and new_total.
41
   */
42
  static void updateBufferStats(uint64_t delta, uint64_t new_total, uint64_t& previous_total,
43
                                Stats::Counter& stat_total, Stats::Gauge& stat_current);
44
};
45

            
46
/**
47
 * Implementation of Network::Connection, Network::FilterManagerConnection and
48
 * Envoy::ScopeTrackedObject.
49
 */
50
class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallbacks {
51
public:
52
  ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
53
                 TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info,
54
                 bool connected);
55

            
56
  ~ConnectionImpl() override;
57

            
58
  // Network::FilterManager
59
  void addWriteFilter(WriteFilterSharedPtr filter) override;
60
  void addFilter(FilterSharedPtr filter) override;
61
  void addReadFilter(ReadFilterSharedPtr filter) override;
62
  void removeReadFilter(ReadFilterSharedPtr filter) override;
63
  bool initializeReadFilters() override;
64
  void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) override;
65

            
66
77
  const ConnectionSocketPtr& getSocket() const override { return socket_; }
67

            
68
  // Network::Connection
69
  void addBytesSentCallback(BytesSentCb cb) override;
70
  void enableHalfClose(bool enabled) override;
71
98
  bool isHalfCloseEnabled() const override { return enable_half_close_; }
72
  void close(ConnectionCloseType type) final;
73
50073
  void close(ConnectionCloseType type, absl::string_view details) override {
74
50073
    if (!details.empty()) {
75
8295
      setLocalCloseReason(details);
76
8295
    }
77
50073
    close(type);
78
50073
  }
79

            
80
532
  std::string nextProtocol() const override { return transport_socket_->protocol(); }
81
  void noDelay(bool enable) override;
82
  ReadDisableStatus readDisable(bool disable) override;
83
107130
  void detectEarlyCloseWhenReadDisabled(bool value) override { detect_early_close_ = value; }
84
  bool readEnabled() const override;
85
30161
  ConnectionInfoSetter& connectionInfoSetter() override {
86
30161
    return socket_->connectionInfoProvider();
87
30161
  }
88
411172
  const ConnectionInfoProvider& connectionInfoProvider() const override {
89
411172
    return socket_->connectionInfoProvider();
90
411172
  }
91
133755
  ConnectionInfoProviderSharedPtr connectionInfoProviderSharedPtr() const override {
92
133755
    return socket_->connectionInfoProviderSharedPtr();
93
133755
  }
94
  absl::optional<UnixDomainSocketPeerCredentials> unixSocketPeerCredentials() const override;
95
144729
  Ssl::ConnectionInfoConstSharedPtr ssl() const override {
96
    // SSL info may be overwritten by a filter in the provider.
97
144729
    return socket_->connectionInfoProvider().sslConnection();
98
144729
  }
99
  State state() const override;
100
135858
  bool connecting() const override {
101
135858
    ENVOY_CONN_LOG_EVENT(debug, "connection_connecting_state", "current connecting state: {}",
102
135858
                         *this, connecting_);
103
135858
    return connecting_;
104
135858
  }
105
  void write(Buffer::Instance& data, bool end_stream) override;
106
  void setBufferLimits(uint32_t limit) override;
107
  void setBufferHighWatermarkTimeout(std::chrono::milliseconds timeout) override;
108
230584
  uint32_t bufferLimit() const override { return read_buffer_limit_; }
109
229842
  bool aboveHighWatermark() const override { return write_buffer_above_high_watermark_; }
110
44553
  const ConnectionSocket::OptionsSharedPtr& socketOptions() const override {
111
44553
    return socket_->options();
112
44553
  }
113
  bool setSocketOption(Network::SocketOptionName name, absl::Span<uint8_t> value) override;
114
91
  absl::string_view requestedServerName() const override { return socket_->requestedServerName(); }
115
3360429
  StreamInfo::StreamInfo& streamInfo() override { return stream_info_; }
116
47000
  const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
117
  absl::string_view transportFailureReason() const override;
118
4
  bool startSecureTransport() override { return transport_socket_->startSecureTransport(); }
119
  absl::optional<std::chrono::milliseconds> lastRoundTripTime() const override;
120
  void configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec,
121
                                        std::chrono::microseconds rtt) override;
122
  absl::optional<uint64_t> congestionWindowInBytes() const override;
123

            
124
  // Network::FilterManagerConnection
125
  void rawWrite(Buffer::Instance& data, bool end_stream) override;
126
67392
  void closeConnection(ConnectionCloseAction close_action) override {
127
67392
    ASSERT(close_action.isLocalClose() || close_action.isRemoteClose());
128
67392
    if (close_action.closeSocket()) {
129
      // The socket will be directly closed.
130
48403
      closeSocket(close_action.event_);
131
50268
    } else {
132
      // It will go through the normal close() process.
133
18989
      ASSERT(close_action.isLocalClose());
134
18989
      closeInternal(close_action.type_);
135
18989
    }
136
67392
  }
137

            
138
  // Network::ReadBufferSource
139
1050275
  StreamBuffer getReadBuffer() override { return {*read_buffer_, read_end_stream_}; }
140
  // Network::WriteBufferSource
141
1520103
  StreamBuffer getWriteBuffer() override {
142
1520103
    return {*current_write_buffer_, current_write_end_stream_};
143
1520103
  }
144

            
145
  // Network::TransportSocketCallbacks
146
3552679
  IoHandle& ioHandle() final { return socket_->ioHandle(); }
147
  const IoHandle& ioHandle() const override { return socket_->ioHandle(); }
148
16806
  Connection& connection() override { return *this; }
149
  void raiseEvent(ConnectionEvent event) override;
150
  // Should the read buffer be drained?
151
603021
  bool shouldDrainReadBuffer() override {
152
603021
    return read_buffer_limit_ > 0 && read_buffer_->length() >= read_buffer_limit_;
153
603021
  }
154
  // Mark read buffer ready to read in the event loop. This is used when yielding following
155
  // shouldDrainReadBuffer().
156
  // TODO(htuch): While this is the basis for also yielding to other connections to provide some
157
  // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
158
  // Reconsider how to make fairness happen.
159
  void setTransportSocketIsReadable() override;
160
  void flushWriteBuffer() override;
161
6
  TransportSocketPtr& transportSocket() { return transport_socket_; }
162

            
163
  // Obtain global next connection ID. This should only be used in tests.
164
8
  static uint64_t nextGlobalIdForTest() { return next_global_id_; }
165

            
166
  // ScopeTrackedObject
167
  void dumpState(std::ostream& os, int indent_level) const override;
168

            
169
62009
  StreamInfo::DetectedCloseType detectedCloseType() const override { return detected_close_type_; }
170

            
171
protected:
172
  // Indicates if the access log has been written. This is used to ensure that the access log is
173
  // written exactly once, even if close() is called multiple times.
174
  bool access_log_written_{false};
175

            
176
  // Write access log if it hasn't been written yet.
177
  void ensureAccessLogWritten();
178

            
179
  // A convenience function which returns true if
180
  // 1) The read disable count is zero or
181
  // 2) The read disable count is one due to the read buffer being overrun.
182
  // In either case the filter chain would like to process data from the read buffer or transport
183
  // socket. If the read count is greater than one, or equal to one when the buffer is not overrun,
184
  // then the filter chain has called readDisable, and does not want additional data.
185
  bool filterChainWantsData();
186

            
187
  // Network::ConnectionImplBase
188
  void closeConnectionImmediately() final;
189
  void closeThroughFilterManager(ConnectionCloseAction close_action);
190

            
191
  void closeSocket(ConnectionEvent close_type);
192

            
193
  void onReadBufferLowWatermark();
194
  void onReadBufferHighWatermark();
195
  void onWriteBufferLowWatermark();
196
  void onWriteBufferHighWatermark();
197

            
198
  // This is called when the underlying socket is connected, not when the
199
  // connected event is raised.
200
  virtual void onConnected();
201

            
202
  void setFailureReason(absl::string_view failure_reason);
203
2
  const std::string& failureReason() const { return failure_reason_; }
204

            
205
  // Set the detected close type for this connection.
206
  virtual void setDetectedCloseType(StreamInfo::DetectedCloseType close_type);
207

            
208
  TransportSocketPtr transport_socket_;
209
  ConnectionSocketPtr socket_;
210
  StreamInfo::StreamInfo& stream_info_;
211
  FilterManagerImpl filter_manager_;
212

            
213
  // This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
214
  // a generic pointer.
215
  // It MUST be defined after the filter_manager_ as some filters may have callbacks that
216
  // write_buffer_ invokes during its clean up.
217
  // This buffer is always allocated, never nullptr.
218
  Buffer::InstancePtr write_buffer_;
219
  // Ensure that if the consumer of the data from this connection isn't
220
  // consuming, that the connection eventually stops reading from the wire.
221
  // This buffer is always allocated, never nullptr.
222
  Buffer::InstancePtr read_buffer_;
223
  uint32_t read_buffer_limit_ = 0;
224
  bool connecting_{false};
225
  ConnectionEvent immediate_error_event_{ConnectionEvent::Connected};
226
  bool bind_error_{false};
227

            
228
private:
229
  friend class MultiConnectionBaseImpl;
230
  friend class Envoy::RandomPauseFilter;
231
  friend class Envoy::TestPauseFilter;
232

            
233
  void onFileEvent(uint32_t events);
234
  void onRead(uint64_t read_buffer_size);
235
  void onReadReady();
236
  void onWriteReady();
237
  void updateReadBufferStats(uint64_t num_read, uint64_t new_size);
238
  void updateWriteBufferStats(uint64_t num_written, uint64_t new_size);
239

            
240
  // Write data to the connection bypassing filter chain (optionally).
241
  void write(Buffer::Instance& data, bool end_stream, bool through_filter_chain);
242

            
243
  // Returns true iff end of stream has been both written and read.
244
  bool bothSidesHalfClosed();
245

            
246
  void closeInternal(ConnectionCloseType type);
247

            
248
  void onBufferHighWatermarkTimeout();
249
  void scheduleBufferHighWatermarkTimeout();
250
  void maybeCancelBufferHighWatermarkTimeout();
251

            
252
  static std::atomic<uint64_t> next_global_id_;
253

            
254
  std::list<BytesSentCb> bytes_sent_callbacks_;
255
  // Should be set with setFailureReason.
256
  std::string failure_reason_;
257
  // Tracks the number of times reads have been disabled. If N different components call
258
  // readDisabled(true) this allows the connection to only resume reads when readDisabled(false)
259
  // has been called N times.
260
  uint64_t last_read_buffer_size_{};
261
  uint64_t last_write_buffer_size_{};
262
  Buffer::Instance* current_write_buffer_{};
263
  uint32_t read_disable_count_{0};
264
  StreamInfo::DetectedCloseType detected_close_type_{StreamInfo::DetectedCloseType::Normal};
265
  std::chrono::milliseconds buffer_high_watermark_timeout_{};
266
  Event::TimerPtr buffer_high_watermark_timer_{nullptr};
267
  bool write_buffer_above_high_watermark_ : 1;
268
  bool detect_early_close_ : 1;
269
  bool enable_half_close_ : 1;
270
  bool read_end_stream_raised_ : 1;
271
  bool read_end_stream_ : 1;
272
  bool write_end_stream_ : 1;
273
  bool current_write_end_stream_ : 1;
274
  bool dispatch_buffered_data_ : 1;
275
  // True if the most recent call to the transport socket's doRead method invoked
276
  // setTransportSocketIsReadable to schedule read resumption after yielding due to
277
  // shouldDrainReadBuffer(). When true, readDisable must schedule read resumption when
278
  // read_disable_count_ == 0 to ensure that read resumption happens when remaining bytes are held
279
  // in transport socket internal buffers.
280
  bool transport_wants_read_ : 1;
281
  bool enable_close_through_filter_manager_ : 1;
282
};
283

            
284
class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
285
public:
286
  ServerConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
287
                       TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info);
288

            
289
  // ServerConnection impl
290
  void setTransportSocketConnectTimeout(std::chrono::milliseconds timeout,
291
                                        Stats::Counter& timeout_stat) override;
292
  void raiseEvent(ConnectionEvent event) override;
293
  bool initializeReadFilters() override;
294

            
295
8621
  void setLocalCloseReason(absl::string_view reason) override {
296
8621
    ConnectionImpl::setLocalCloseReason(reason);
297
8621
    stream_info_.setDownstreamLocalCloseReason(reason);
298
8621
  }
299

            
300
91
  void setDetectedCloseType(StreamInfo::DetectedCloseType close_type) override {
301
91
    ConnectionImpl::setDetectedCloseType(close_type);
302
91
    stream_info_.setDownstreamDetectedCloseType(close_type);
303
91
  }
304

            
305
private:
306
  void onTransportSocketConnectTimeout();
307

            
308
  bool transport_connect_pending_{true};
309
  // Implements a timeout for the transport socket signaling connection. The timer is enabled by a
310
  // call to setTransportSocketConnectTimeout and is reset when the connection is established.
311
  Event::TimerPtr transport_socket_connect_timer_;
312
  Stats::Counter* transport_socket_timeout_stat_;
313
};
314

            
315
/**
316
 * libevent implementation of Network::ClientConnection.
317
 */
318
class ClientConnectionImpl : public ConnectionImpl, virtual public ClientConnection {
319
public:
320
  ClientConnectionImpl(Event::Dispatcher& dispatcher,
321
                       const Address::InstanceConstSharedPtr& remote_address,
322
                       const Address::InstanceConstSharedPtr& source_address,
323
                       Network::TransportSocketPtr&& transport_socket,
324
                       const Network::ConnectionSocket::OptionsSharedPtr& options,
325
                       const Network::TransportSocketOptionsConstSharedPtr& transport_options);
326

            
327
  ClientConnectionImpl(Event::Dispatcher& dispatcher, std::unique_ptr<ConnectionSocket> socket,
328
                       const Address::InstanceConstSharedPtr& source_address,
329
                       Network::TransportSocketPtr&& transport_socket,
330
                       const Network::ConnectionSocket::OptionsSharedPtr& options,
331
                       const Network::TransportSocketOptionsConstSharedPtr& transport_options);
332

            
333
  ~ClientConnectionImpl() override;
334

            
335
  // Network::ClientConnection
336
  void connect() override;
337

            
338
protected:
339
342
  void setDetectedCloseType(StreamInfo::DetectedCloseType close_type) override {
340
342
    ConnectionImpl::setDetectedCloseType(close_type);
341
342
    if (stream_info_.upstreamInfo() != nullptr) {
342
342
      stream_info_.upstreamInfo()->setUpstreamDetectedCloseType(close_type);
343
342
    }
344
342
  }
345

            
346
1601
  void setLocalCloseReason(absl::string_view reason) override {
347
1601
    ConnectionImpl::setLocalCloseReason(reason);
348
1601
    if (stream_info_.upstreamInfo() != nullptr) {
349
1601
      stream_info_.upstreamInfo()->setUpstreamLocalCloseReason(reason);
350
1601
    }
351
1601
  }
352

            
353
private:
354
  void onConnected() override;
355

            
356
  StreamInfo::StreamInfoImpl stream_info_;
357
};
358

            
359
} // namespace Network
360
} // namespace Envoy