1
#include "source/common/network/connection_impl.h"
2

            
3
#include <atomic>
4
#include <cstdint>
5
#include <memory>
6

            
7
#include "envoy/common/exception.h"
8
#include "envoy/common/platform.h"
9
#include "envoy/config/core/v3/base.pb.h"
10
#include "envoy/event/scaled_range_timer_manager.h"
11
#include "envoy/event/timer.h"
12
#include "envoy/network/filter.h"
13
#include "envoy/network/socket.h"
14

            
15
#include "source/common/common/assert.h"
16
#include "source/common/common/dump_state_utils.h"
17
#include "source/common/common/empty_string.h"
18
#include "source/common/common/enum_to_int.h"
19
#include "source/common/common/scope_tracker.h"
20
#include "source/common/network/address_impl.h"
21
#include "source/common/network/connection_socket_impl.h"
22
#include "source/common/network/raw_buffer_socket.h"
23
#include "source/common/network/socket_option_factory.h"
24
#include "source/common/network/socket_option_impl.h"
25
#include "source/common/network/utility.h"
26
#include "source/common/runtime/runtime_features.h"
27

            
28
namespace Envoy {
29
namespace Network {
30
namespace {
31

            
32
constexpr absl::string_view kTransportSocketConnectTimeoutTerminationDetails =
33
    "transport socket timeout was reached";
34

            
35
1
std::ostream& operator<<(std::ostream& os, Connection::State connection_state) {
36
1
  switch (connection_state) {
37
1
  case Connection::State::Open:
38
1
    return os << "Open";
39
  case Connection::State::Closing:
40
    return os << "Closing";
41
  case Connection::State::Closed:
42
    return os << "Closed";
43
1
  }
44
  return os;
45
1
}
46

            
47
} // namespace
48

            
49
void ConnectionImplUtility::updateBufferStats(uint64_t delta, uint64_t new_total,
50
                                              uint64_t& previous_total, Stats::Counter& stat_total,
51
906764
                                              Stats::Gauge& stat_current) {
52
906764
  if (delta) {
53
487792
    stat_total.add(delta);
54
487792
  }
55

            
56
906764
  if (new_total != previous_total) {
57
129323
    if (new_total > previous_total) {
58
61229
      stat_current.add(new_total - previous_total);
59
69264
    } else {
60
68094
      stat_current.sub(previous_total - new_total);
61
68094
    }
62

            
63
129323
    previous_total = new_total;
64
129323
  }
65
906764
}
66

            
67
std::atomic<uint64_t> ConnectionImpl::next_global_id_;
68

            
69
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
70
                               TransportSocketPtr&& transport_socket,
71
                               StreamInfo::StreamInfo& stream_info, bool connected)
72
108164
    : ConnectionImplBase(dispatcher, next_global_id_++),
73
108164
      transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
74
108164
      stream_info_(stream_info), filter_manager_(*this, *socket_),
75
108164
      write_buffer_(dispatcher.getWatermarkFactory().createBuffer(
76
218406
          [this]() -> void { this->onWriteBufferLowWatermark(); },
77
218406
          [this]() -> void { this->onWriteBufferHighWatermark(); },
78
108164
          []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
79
108164
      read_buffer_(dispatcher.getWatermarkFactory().createBuffer(
80
150163
          [this]() -> void { this->onReadBufferLowWatermark(); },
81
150163
          [this]() -> void { this->onReadBufferHighWatermark(); },
82
108164
          []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
83
108164
      write_buffer_above_high_watermark_(false), detect_early_close_(true),
84
108164
      enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
85
108164
      write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
86
108164
      transport_wants_read_(false),
87
108164
      enable_close_through_filter_manager_(Runtime::runtimeFeatureEnabled(
88
108164
          "envoy.reloadable_features.connection_close_through_filter_manager")) {
89

            
90
108164
  if (!socket_->isOpen()) {
91
10
    IS_ENVOY_BUG("Client socket failure");
92
10
    return;
93
10
  }
94
108154
  if (!connected) {
95
53654
    connecting_ = true;
96
53654
  }
97

            
98
108154
  Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;
99

            
100
  // We never ask for both early close and read at the same time. If we are reading, we want to
101
  // consume all available data.
102
108154
  socket_->ioHandle().initializeFileEvent(
103
108154
      dispatcher_,
104
1247260
      [this](uint32_t events) {
105
1247251
        onFileEvent(events);
106
1247251
        return absl::OkStatus();
107
1247251
      },
108
108154
      trigger, Event::FileReadyType::Read | Event::FileReadyType::Write);
109

            
110
108154
  transport_socket_->setTransportSocketCallbacks(*this);
111

            
112
  // TODO(soulxu): generate the connection id inside the addressProvider directly,
113
  // then we don't need a setter or any of the optional stuff.
114
108154
  socket_->connectionInfoProvider().setConnectionID(id());
115
108154
  socket_->connectionInfoProvider().setSslConnection(transport_socket_->ssl());
116
108154
}
117

            
118
108159
ConnectionImpl::~ConnectionImpl() {
119
108159
  ASSERT(!socket_->isOpen() && delayed_close_timer_ == nullptr,
120
108159
         "ConnectionImpl destroyed with open socket and/or active timer");
121

            
122
  // In general we assume that owning code has called close() previously to the destructor being
123
  // run. This generally must be done so that callbacks run in the correct context (vs. deferred
124
  // deletion). Hence the assert above. However, call close() here just to be completely sure that
125
  // the fd is closed and make it more likely that we crash from a bad close callback.
126
108159
  close(ConnectionCloseType::NoFlush);
127
  // Ensure that the access log is written.
128
108159
  ensureAccessLogWritten();
129
108159
}
130

            
131
93
void ConnectionImpl::addWriteFilter(WriteFilterSharedPtr filter) {
132
93
  filter_manager_.addWriteFilter(filter);
133
93
}
134

            
135
44
void ConnectionImpl::addFilter(FilterSharedPtr filter) { filter_manager_.addFilter(filter); }
136

            
137
106743
void ConnectionImpl::addReadFilter(ReadFilterSharedPtr filter) {
138
106743
  filter_manager_.addReadFilter(filter);
139
106743
}
140

            
141
492
void ConnectionImpl::removeReadFilter(ReadFilterSharedPtr filter) {
142
492
  filter_manager_.removeReadFilter(filter);
143
492
}
144

            
145
52244
bool ConnectionImpl::initializeReadFilters() { return filter_manager_.initializeReadFilters(); }
146

            
147
3
void ConnectionImpl::addAccessLogHandler(AccessLog::InstanceSharedPtr handler) {
148
3
  filter_manager_.addAccessLogHandler(handler);
149
3
}
150

            
151
161786
void ConnectionImpl::ensureAccessLogWritten() {
152
161786
  if (!access_log_written_) {
153
108131
    access_log_written_ = true;
154
108131
    filter_manager_.log(AccessLog::AccessLogType::TcpConnectionEnd);
155
108131
  }
156
161786
}
157

            
158
228817
void ConnectionImpl::close(ConnectionCloseType type) {
159
228817
  if (!socket_->isOpen()) {
160
168340
    ENVOY_CONN_LOG_EVENT(debug, "connection_closing", "Not closing conn, socket is not open",
161
168340
                         *this);
162
168340
    return;
163
168340
  }
164

            
165
  // The connection is closed by Envoy by sending RST, and the connection is closed immediately.
166
60477
  if (type == ConnectionCloseType::AbortReset) {
167
20
    ENVOY_CONN_LOG(
168
20
        trace, "connection closing type=AbortReset, setting LocalReset to the detected close type.",
169
20
        *this);
170
20
    setDetectedCloseType(StreamInfo::DetectedCloseType::LocalReset);
171
20
    closeSocket(ConnectionEvent::LocalClose);
172
20
    return;
173
20
  }
174

            
175
60465
  if (type == ConnectionCloseType::Abort || type == ConnectionCloseType::NoFlush) {
176
41461
    closeInternal(type);
177
41461
    return;
178
41461
  }
179

            
180
  // Only FlushWrite and FlushWriteAndDelay are managed by the filter manager, since the above
181
  // status will abort data naturally.
182
18996
  ASSERT(type == ConnectionCloseType::FlushWrite ||
183
18996
         type == ConnectionCloseType::FlushWriteAndDelay);
184
18996
  closeThroughFilterManager(ConnectionCloseAction{ConnectionEvent::LocalClose, false, type});
185
18996
}
186

            
187
60451
void ConnectionImpl::closeInternal(ConnectionCloseType type) {
188
60451
  if (!socket_->isOpen()) {
189
    return;
190
  }
191

            
192
60451
  uint64_t data_to_write = write_buffer_->length();
193
60451
  ENVOY_CONN_LOG_EVENT(debug, "connection_closing", "closing data_to_write={} type={}", *this,
194
60451
                       data_to_write, enumToInt(type));
195

            
196
60451
  const bool delayed_close_timeout_set = delayed_close_timeout_.count() > 0;
197
60451
  if (data_to_write == 0 || type == ConnectionCloseType::NoFlush ||
198
60451
      type == ConnectionCloseType::Abort || !transport_socket_->canFlushClose()) {
199
55445
    if (data_to_write > 0 && type != ConnectionCloseType::Abort) {
200
      // We aren't going to wait to flush, but try to write as much as we can if there is pending
201
      // data.
202
921
      transport_socket_->doWrite(*write_buffer_, true);
203
921
    }
204

            
205
55445
    if (type != ConnectionCloseType::FlushWriteAndDelay || !delayed_close_timeout_set) {
206
54988
      closeConnectionImmediately();
207
54988
      return;
208
54988
    }
209
    // The socket is being closed and either there is no more data to write or the data can not be
210
    // flushed (!transport_socket_->canFlushClose()). Since a delayed close has been requested,
211
    // start the delayed close timer if it hasn't been done already by a previous close().
212
    // NOTE: Even though the delayed_close_state_ is being set to CloseAfterFlushAndWait, since
213
    // a write event is not being registered for the socket, this logic is simply setting the
214
    // timer and waiting for it to trigger to close the socket.
215
457
    if (!inDelayedClose()) {
216
442
      initializeDelayedCloseTimer();
217
442
      delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
218
      // Monitor for the peer closing the connection.
219
442
      ioHandle().enableFileEvents(enable_half_close_ ? 0 : Event::FileReadyType::Closed);
220
442
    }
221
457
    return;
222
55445
  }
223

            
224
5006
  ASSERT(type == ConnectionCloseType::FlushWrite ||
225
5006
         type == ConnectionCloseType::FlushWriteAndDelay);
226

            
227
  // If there is a pending delayed close, simply update the delayed close state.
228
  //
229
  // An example of this condition manifests when a downstream connection is closed early by Envoy,
230
  // such as when a route can't be matched:
231
  //   In ConnectionManagerImpl::onData()
232
  //     1) Via codec_->dispatch(), a local reply with a 404 is sent to the client
233
  //       a) ConnectionManagerImpl::doEndStream() issues the first connection close() via
234
  //          ConnectionManagerImpl::checkForDeferredClose()
235
  //     2) A second close is issued by a subsequent call to
236
  //        ConnectionManagerImpl::checkForDeferredClose() prior to returning from onData()
237
5006
  if (inDelayedClose()) {
238
    // Validate that a delayed close timer is already enabled unless it was disabled via
239
    // configuration.
240
1685
    ASSERT(!delayed_close_timeout_set || delayed_close_timer_ != nullptr);
241
1685
    if (type == ConnectionCloseType::FlushWrite || !delayed_close_timeout_set) {
242
17
      delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
243
1674
    } else {
244
1668
      delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
245
1668
    }
246
1685
    return;
247
1685
  }
248

            
249
  // NOTE: At this point, it's already been validated that the connection is not already in
250
  // delayed close processing and therefore the timer has not yet been created.
251
3321
  if (delayed_close_timeout_set) {
252
1921
    initializeDelayedCloseTimer();
253
1921
    delayed_close_state_ = (type == ConnectionCloseType::FlushWrite)
254
1921
                               ? DelayedCloseState::CloseAfterFlush
255
1921
                               : DelayedCloseState::CloseAfterFlushAndWait;
256
3296
  } else {
257
1400
    delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
258
1400
  }
259

            
260
3321
  ioHandle().enableFileEvents(Event::FileReadyType::Write |
261
3321
                              (enable_half_close_ ? 0 : Event::FileReadyType::Closed));
262
3321
}
263

            
264
3
void ConnectionImpl::onBufferHighWatermarkTimeout() {
265
3
  ENVOY_CONN_LOG(debug, "buffer high watermark timeout reached", *this);
266
3
  if (!socket_->isOpen()) {
267
    return;
268
  }
269
3
  closeConnectionImmediatelyWithDetails(
270
3
      StreamInfo::LocalCloseReasons::get().BufferHighWatermarkTimeout);
271
3
}
272

            
273
166277
void ConnectionImpl::scheduleBufferHighWatermarkTimeout() {
274
166277
  if (buffer_high_watermark_timeout_.count() == 0) {
275
166260
    return;
276
166260
  }
277

            
278
17
  if (buffer_high_watermark_timer_ == nullptr) {
279
8
    buffer_high_watermark_timer_ =
280
8
        dispatcher_.createTimer([this]() -> void { onBufferHighWatermarkTimeout(); });
281
8
  }
282

            
283
17
  if (!buffer_high_watermark_timer_->enabled()) {
284
17
    ENVOY_CONN_LOG(debug, "scheduling buffer high watermark timeout", *this);
285
17
    buffer_high_watermark_timer_->enableTimer(buffer_high_watermark_timeout_);
286
17
  }
287
17
}
288

            
289
166208
void ConnectionImpl::maybeCancelBufferHighWatermarkTimeout() {
290
166208
  if (buffer_high_watermark_timer_ == nullptr || !buffer_high_watermark_timer_->enabled()) {
291
166198
    return;
292
166198
  }
293

            
294
10
  if (!write_buffer_->highWatermarkTriggered() && !read_buffer_->highWatermarkTriggered()) {
295
10
    ENVOY_CONN_LOG(debug, "cancelling buffer high watermark timeout", *this);
296
10
    buffer_high_watermark_timer_->disableTimer();
297
10
  }
298
10
}
299

            
300
5325754
Connection::State ConnectionImpl::state() const {
301
5325754
  if (!socket_->isOpen()) {
302
42029
    return State::Closed;
303
5283725
  } else if (inDelayedClose()) {
304
665
    return State::Closing;
305
5283060
  } else {
306
5283060
    return State::Open;
307
5283060
  }
308
5325754
}
309

            
310
58360
void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }
311

            
312
50494
void ConnectionImpl::setTransportSocketIsReadable() {
313
50494
  ASSERT(dispatcher_.isThreadSafe());
314
  // Remember that the transport requested read resumption, in case the resumption event is not
315
  // scheduled immediately or is "lost" because read was disabled.
316
50494
  transport_wants_read_ = true;
317
  // Only schedule a read activation if the connection is not read disabled to avoid spurious
318
  // wakeups. When read disabled, the connection will not read from the transport, and limit
319
  // dispatch to the current contents of the read buffer if its high-watermark is triggered and
320
  // dispatch_buffered_data_ is set.
321
50494
  if (read_disable_count_ == 0) {
322
2216
    ioHandle().activateFileEvents(Event::FileReadyType::Read);
323
2216
  }
324
50494
}
325

            
326
636046
bool ConnectionImpl::filterChainWantsData() {
327
636046
  return read_disable_count_ == 0 ||
328
636046
         (read_disable_count_ == 1 && read_buffer_->highWatermarkTriggered());
329
636046
}
330

            
331
436
void ConnectionImpl::setDetectedCloseType(StreamInfo::DetectedCloseType close_type) {
332
436
  detected_close_type_ = close_type;
333
436
}
334

            
335
67483
void ConnectionImpl::closeThroughFilterManager(ConnectionCloseAction close_action) {
336
67483
  if (!socket_->isOpen()) {
337
85
    return;
338
85
  }
339

            
340
67398
  if (!enable_close_through_filter_manager_) {
341
67298
    ENVOY_CONN_LOG(trace, "connection is closing not through the filter manager", *this);
342
67298
    closeConnection(close_action);
343
67298
    return;
344
67298
  }
345

            
346
100
  ENVOY_CONN_LOG(trace, "connection is closing through the filter manager", *this);
347
100
  filter_manager_.onConnectionClose(close_action);
348
100
}
349

            
350
108287
void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
351
108287
  if (!socket_->isOpen()) {
352
178
    ENVOY_CONN_LOG(trace, "closeSocket: socket is not open, returning", *this);
353
178
    return;
354
178
  }
355

            
356
  // No need for a delayed close (if pending) now that the socket is being closed.
357
108109
  if (delayed_close_timer_) {
358
444
    delayed_close_timer_->disableTimer();
359
444
    delayed_close_timer_ = nullptr;
360
444
  }
361

            
362
108109
  ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
363
108109
  transport_socket_->closeSocket(close_type);
364

            
365
  // Drain input and output buffers.
366
108109
  updateReadBufferStats(0, 0);
367
108109
  updateWriteBufferStats(0, 0);
368

            
369
  // As the socket closes, drain any remaining data.
370
  // The data won't be written out at this point, and where there are reference
371
  // counted buffer fragments, it helps avoid lifetime issues with the
372
  // connection outlasting the subscriber.
373
108109
  write_buffer_->drain(write_buffer_->length());
374

            
375
108109
  connection_stats_.reset();
376

            
377
108109
  if (detected_close_type_ == StreamInfo::DetectedCloseType::RemoteReset ||
378
108116
      detected_close_type_ == StreamInfo::DetectedCloseType::LocalReset) {
379
436
#if ENVOY_PLATFORM_ENABLE_SEND_RST
380
436
    const bool ok = Network::Socket::applyOptions(
381
436
        Network::SocketOptionFactory::buildZeroSoLingerOptions(), *socket_,
382
436
        envoy::config::core::v3::SocketOption::STATE_LISTENING);
383
436
    if (!ok) {
384
3
      ENVOY_LOG_EVERY_POW_2(error, "rst setting so_linger=0 failed on connection {}", id());
385
3
    }
386
436
#endif
387
436
  }
388

            
389
  // It is safe to call close() since there is an IO handle check.
390
108109
  socket_->close();
391

            
392
  // Propagate transport failure reason to StreamInfo before raising close events,
393
  // ensuring it's available to all filters and access loggers.
394
  // Only set if we have a valid failure reason to avoid accessing potentially invalid state.
395
108109
  absl::string_view failure_reason = transportFailureReason();
396
108109
  if (!failure_reason.empty()) {
397
518
    stream_info_.setDownstreamTransportFailureReason(failure_reason);
398
518
  }
399

            
400
  // Call the base class directly as close() is called in the destructor.
401
108109
  ConnectionImpl::raiseEvent(close_type);
402
108109
}
403

            
404
75796
void ConnectionImpl::onConnected() {
405
75796
  ASSERT(!connecting_);
406
75796
  transport_socket_->onConnected();
407
75796
}
408

            
409
104225
void ConnectionImpl::noDelay(bool enable) {
410
  // There are cases where a connection to localhost can immediately fail (e.g., if the other end
411
  // does not have enough fds, reaches a backlog limit, etc.). Because we run with deferred error
412
  // events, the calling code may not yet know that the connection has failed. This is one call
413
  // where we go outside of libevent and hit the fd directly and this case can fail if the fd is
414
  // invalid. For this call instead of plumbing through logic that will immediately indicate that a
415
  // connect failed, we will just ignore the noDelay() call if the socket is invalid since error is
416
  // going to be raised shortly anyway and it makes the calling code simpler.
417
104225
  if (!socket_->isOpen()) {
418
25
    return;
419
25
  }
420

            
421
  // Don't set NODELAY for unix domain sockets or internal socket.
422
104200
  if (socket_->addressType() != Address::Type::Ip) {
423
94
    return;
424
94
  }
425

            
426
  // Set NODELAY
427
104106
  int new_value = enable;
428
104106
  Api::SysCallIntResult result =
429
104106
      socket_->setSocketOption(IPPROTO_TCP, TCP_NODELAY, &new_value, sizeof(new_value));
430
#if defined(__APPLE__)
431
  if (SOCKET_FAILURE(result.return_value_) && result.errno_ == SOCKET_ERROR_INVAL) {
432
    // Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
433
    // enabled despite this result.
434
    return;
435
  }
436
#elif defined(WIN32)
437
  if (SOCKET_FAILURE(result.return_value_) &&
438
      (result.errno_ == SOCKET_ERROR_AGAIN || result.errno_ == SOCKET_ERROR_INVAL)) {
439
    // Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
440
    // enabled despite this result.
441
    return;
442
  }
443
#endif
444

            
445
104106
  RELEASE_ASSERT(result.return_value_ == 0,
446
104106
                 fmt::format("Failed to set TCP_NODELAY with error {}, {}", result.errno_,
447
104106
                             errorDetails(result.errno_)));
448
104106
}
449

            
450
525253
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
451
525253
  ASSERT(dispatcher_.isThreadSafe());
452
  // Do not read the data from the socket if the connection is in delay closed,
453
  // high watermark is called, or is closing through filter manager.
454
525255
  if (inDelayedClose() || !filterChainWantsData() ||
455
525257
      (enable_close_through_filter_manager_ && filter_manager_.pendingClose())) {
456
4
    return;
457
4
  }
458
525249
  ASSERT(socket_->isOpen());
459

            
460
525249
  if (read_buffer_size == 0 && !read_end_stream_) {
461
1
    return;
462
1
  }
463

            
464
525248
  if (read_end_stream_) {
465
    // read() on a raw socket will repeatedly return 0 (EOF) once EOF has
466
    // occurred, so filter out the repeats so that filters don't have
467
    // to handle repeats.
468
    //
469
    // I don't know of any cases where this actually happens (we should stop
470
    // reading the socket after EOF), but this check guards against any bugs
471
    // in ConnectionImpl or strangeness in the OS events (epoll, kqueue, etc)
472
    // and maintains the guarantee for filters.
473
5631
    if (read_end_stream_raised_) {
474
      // No further data can be delivered after end_stream
475
217
      ASSERT(read_buffer_size == 0);
476
217
      return;
477
217
    }
478
5414
    read_end_stream_raised_ = true;
479
5414
  }
480

            
481
525031
  filter_manager_.onRead();
482
525031
}
483

            
484
24308
void ConnectionImpl::enableHalfClose(bool enabled) {
485
  // This code doesn't correctly ensure that EV_CLOSE isn't set if reading is disabled
486
  // when enabling half-close. This could be fixed, but isn't needed right now, so just
487
  // ASSERT that it doesn't happen.
488
24308
  ASSERT(!enabled || read_disable_count_ == 0);
489

            
490
24308
  enable_half_close_ = enabled;
491
24308
}
492

            
493
222713
Connection::ReadDisableStatus ConnectionImpl::readDisable(bool disable) {
494
  // Calls to readEnabled on a closed socket are considered to be an error.
495
222713
  ASSERT(state() == State::Open);
496

            
497
222713
  ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this,
498
222713
                 disable, read_disable_count_, static_cast<int>(state()), read_buffer_->length());
499

            
500
  // When we disable reads, we still allow for early close notifications (the equivalent of
501
  // `EPOLLRDHUP` for an epoll backend). For backends that support it, this allows us to apply
502
  // back pressure at the kernel layer, but still get timely notification of a FIN. Note that
503
  // we are not guaranteed to get notified, so even if the remote has closed, we may not know
504
  // until we try to write. Further note that currently we optionally don't correctly handle half
505
  // closed TCP connections in the sense that we assume that a remote FIN means the remote intends a
506
  // full close.
507
222713
  if (disable) {
508
111919
    ++read_disable_count_;
509

            
510
111919
    if (state() != State::Open) {
511
      // If readDisable is called on a closed connection, do not crash.
512
1
      return ReadDisableStatus::NoTransition;
513
1
    }
514

            
515
111918
    if (read_disable_count_ > 1) {
516
      // The socket has already been read disabled.
517
207
      return ReadDisableStatus::StillReadDisabled;
518
207
    }
519

            
520
    // If half-close semantics are enabled, we never want early close notifications; we
521
    // always want to read all available data, even if the other side has closed.
522
111711
    if (detect_early_close_ && !enable_half_close_) {
523
17346
      ioHandle().enableFileEvents(Event::FileReadyType::Write | Event::FileReadyType::Closed);
524
98289
    } else {
525
94365
      ioHandle().enableFileEvents(Event::FileReadyType::Write);
526
94365
    }
527

            
528
111711
    return ReadDisableStatus::TransitionedToReadDisabled;
529
111918
  } else {
530
110794
    ASSERT(read_disable_count_ != 0);
531
110794
    --read_disable_count_;
532
110794
    if (state() != State::Open) {
533
      // If readDisable is called on a closed connection, do not crash.
534
3
      return ReadDisableStatus::NoTransition;
535
3
    }
536

            
537
110791
    auto read_disable_status = ReadDisableStatus::StillReadDisabled;
538
110791
    if (read_disable_count_ == 0) {
539
      // We never ask for both early close and read at the same time. If we are reading, we want to
540
      // consume all available data.
541
110585
      ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
542
110585
      read_disable_status = ReadDisableStatus::TransitionedToReadEnabled;
543
110585
    }
544

            
545
110791
    if (filterChainWantsData() && (read_buffer_->length() > 0 || transport_wants_read_)) {
546
      // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
547
      // buffer's high watermark has triggered.
548
78127
      ASSERT(read_buffer_->length() > 0 || read_disable_count_ == 0);
549

            
550
      // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
551
      // able to process additional bytes even if there is no data in the kernel to kick off the
552
      // filter chain. Alternately the connection may need read resumption while read disabled and
553
      // not registered for read events because the read buffer's high-watermark has triggered. To
554
      // handle these cases, directly schedule a fake read event to make sure the buffered data in
555
      // the read buffer or in transport socket internal buffers gets processed regardless and
556
      // ensure that we dispatch it via onRead.
557
78127
      dispatch_buffered_data_ = true;
558
78127
      ioHandle().activateFileEvents(Event::FileReadyType::Read);
559
78127
    }
560

            
561
110791
    return read_disable_status;
562
110794
  }
563
222713
}
564

            
565
184643
void ConnectionImpl::raiseEvent(ConnectionEvent event) {
566
184643
  ENVOY_CONN_LOG(trace, "raising connection event {}", *this, static_cast<int>(event));
567
184643
  ConnectionImplBase::raiseConnectionEvent(event);
568
  // We may have pending data in the write buffer on transport handshake
569
  // completion, which may also have completed in the context of onReadReady(),
570
  // where no check of the write buffer is made. Provide an opportunity to flush
571
  // here. If connection write is not ready, this is harmless. We should only do
572
  // this if we're still open (the above callbacks may have closed).
573
184643
  if (event == ConnectionEvent::Connected) {
574
76496
    flushWriteBuffer();
575
76496
  }
576
184643
}
577

            
578
11
bool ConnectionImpl::readEnabled() const {
579
  // Calls to readEnabled on a closed socket are considered to be an error.
580
11
  ASSERT(state() == State::Open);
581
11
  ASSERT(dispatcher_.isThreadSafe());
582
11
  return read_disable_count_ == 0;
583
11
}
584

            
585
3753
void ConnectionImpl::addBytesSentCallback(BytesSentCb cb) {
586
3753
  bytes_sent_callbacks_.emplace_back(cb);
587
3753
}
588

            
589
353
void ConnectionImpl::rawWrite(Buffer::Instance& data, bool end_stream) {
590
353
  write(data, end_stream, false);
591
353
}
592

            
593
1519972
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {
594
1519972
  write(data, end_stream, true);
595
1519972
}
596

            
597
1520325
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
598
1520325
  ASSERT(!end_stream || enable_half_close_);
599
1520325
  ASSERT(dispatcher_.isThreadSafe());
600

            
601
1520325
  if (write_end_stream_) {
602
    // It is an API violation to write more data after writing end_stream, but a duplicate
603
    // end_stream with no data is harmless. This catches misuse of the API that could result in data
604
    // being lost.
605
    ASSERT(data.length() == 0 && end_stream);
606

            
607
    return;
608
  }
609

            
610
1520325
  if (through_filter_chain) {
611
    // NOTE: This is kind of a hack, but currently we don't support restart/continue on the write
612
    //       path, so we just pass around the buffer passed to us in this function. If we ever
613
    //       support buffer/restart/continue on the write path this needs to get more complicated.
614
1519970
    current_write_buffer_ = &data;
615
1519970
    current_write_end_stream_ = end_stream;
616
1519970
    FilterStatus status = filter_manager_.onWrite();
617
1519970
    current_write_buffer_ = nullptr;
618

            
619
1519970
    if (FilterStatus::StopIteration == status) {
620
43
      return;
621
43
    }
622
1519970
  }
623

            
624
1520282
  write_end_stream_ = end_stream;
625
1520283
  if (data.length() > 0 || end_stream) {
626
1519310
    ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
627
    // TODO(mattklein123): All data currently gets moved from the source buffer to the write buffer.
628
    // This can lead to inefficient behavior if writing a bunch of small chunks. In this case, it
629
    // would likely be more efficient to copy data below a certain size. VERY IMPORTANT: If this is
630
    // ever changed, read the comment in SslSocket::doWrite() VERY carefully. That code assumes that
631
    // we never change existing write_buffer_ chain elements between calls to SSL_write(). That code
632
    // might need to change if we ever copy here.
633
1519310
    write_buffer_->move(data);
634

            
635
    // Activating a write event before the socket is connected has the side-effect of tricking
636
    // doWriteReady into thinking the socket is connected. On macOS, the underlying write may fail
637
    // with a connection error if a call to write(2) occurs before the connection is completed.
638
1519314
    if (!connecting_) {
639
1512381
      ioHandle().activateFileEvents(Event::FileReadyType::Write);
640
1512381
    }
641
1519310
  }
642
1520282
}
643

            
644
84341
void ConnectionImpl::setBufferLimits(uint32_t limit) {
645
84341
  read_buffer_limit_ = limit;
646

            
647
  // Due to the fact that writes to the connection and flushing data from the connection are done
648
  // asynchronously, we have the option of either setting the watermarks aggressively, and regularly
649
  // enabling/disabling reads from the socket, or allowing more data, but then not triggering
650
  // based on watermarks until 2x the data is buffered in the common case. Given these are all soft
651
  // limits we err on the side of buffering more triggering watermark callbacks less often.
652
  //
653
  // Given the current implementation for straight up TCP proxying, the common case is reading
654
  // |limit| bytes through the socket, passing |limit| bytes to the connection and the immediately
655
  // draining |limit| bytes to the socket. Triggering the high watermarks and then immediately
656
  // triggering the low watermarks would be expensive, but we narrowly avoid triggering high
657
  // watermark when moving |limit| bytes through the connection because the high watermark
658
  // computation checks if the size of the buffer exceeds the high watermark value.
659
84341
  if (limit > 0) {
660
52848
    write_buffer_->setWatermarks(limit);
661
52848
    read_buffer_->setWatermarks(limit);
662
52848
  }
663
84341
}
664

            
665
11
void ConnectionImpl::setBufferHighWatermarkTimeout(std::chrono::milliseconds timeout) {
666
11
  if (timeout == buffer_high_watermark_timeout_) {
667
1
    return;
668
1
  }
669

            
670
10
  buffer_high_watermark_timeout_ = timeout;
671

            
672
10
  if (buffer_high_watermark_timer_ != nullptr && buffer_high_watermark_timer_->enabled()) {
673
2
    buffer_high_watermark_timer_->disableTimer();
674
2
  }
675

            
676
10
  if (state() == State::Open &&
677
10
      (write_buffer_->highWatermarkTriggered() || read_buffer_->highWatermarkTriggered())) {
678
2
    scheduleBufferHighWatermarkTimeout();
679
2
  }
680
10
}
681

            
682
48278
void ConnectionImpl::onReadBufferLowWatermark() {
683
48278
  ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this);
684
48278
  if (state() == State::Open) {
685
48234
    readDisable(false);
686
48234
    maybeCancelBufferHighWatermarkTimeout();
687
48234
  }
688
48278
}
689

            
690
48283
void ConnectionImpl::onReadBufferHighWatermark() {
691
48283
  ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this);
692
48283
  if (state() == State::Open) {
693
48283
    readDisable(true);
694
48283
    scheduleBufferHighWatermarkTimeout();
695
48283
  }
696
48283
}
697

            
698
117991
void ConnectionImpl::onWriteBufferLowWatermark() {
699
117991
  ENVOY_CONN_LOG(debug, "onBelowWriteBufferLowWatermark", *this);
700
117991
  ASSERT(write_buffer_above_high_watermark_);
701
117991
  write_buffer_above_high_watermark_ = false;
702
117991
  if (state() == State::Open) {
703
117974
    maybeCancelBufferHighWatermarkTimeout();
704
117974
  }
705
235893
  for (ConnectionCallbacks* callback : callbacks_) {
706
235893
    if (callback) {
707
235824
      callback->onBelowWriteBufferLowWatermark();
708
235824
    }
709
235893
  }
710
117991
}
711

            
712
117992
void ConnectionImpl::onWriteBufferHighWatermark() {
713
117992
  ENVOY_CONN_LOG(debug, "onAboveWriteBufferHighWatermark", *this);
714
117992
  ASSERT(!write_buffer_above_high_watermark_);
715
117992
  write_buffer_above_high_watermark_ = true;
716
117992
  if (state() == State::Open) {
717
117992
    scheduleBufferHighWatermarkTimeout();
718
117992
  }
719
235895
  for (ConnectionCallbacks* callback : callbacks_) {
720
235895
    if (callback) {
721
235826
      callback->onAboveWriteBufferHighWatermark();
722
235826
    }
723
235895
  }
724
117992
}
725

            
726
250
void ConnectionImpl::setFailureReason(absl::string_view failure_reason) {
727
250
  if (!transport_socket_->failureReason().empty()) {
728
3
    failure_reason_ = absl::StrCat(failure_reason, ". ", transport_socket_->failureReason());
729
247
  } else {
730
247
    failure_reason_ = std::string(failure_reason);
731
247
  }
732
250
}
733

            
734
1247245
void ConnectionImpl::onFileEvent(uint32_t events) {
735
1247245
  ScopeTrackerScopeState scope(this, this->dispatcher_);
736
1247245
  ENVOY_CONN_LOG(trace, "socket event: {}", *this, events);
737

            
738
1247245
  if (immediate_error_event_ == ConnectionEvent::LocalClose ||
739
1247249
      immediate_error_event_ == ConnectionEvent::RemoteClose) {
740
4
    if (bind_error_) {
741
2
      ENVOY_CONN_LOG(debug, "raising bind error", *this);
742
      // Update stats here, rather than on bind failure, to give the caller a chance to
743
      // setConnectionStats.
744
2
      if (connection_stats_ && connection_stats_->bind_errors_) {
745
2
        connection_stats_->bind_errors_->inc();
746
2
      }
747
3
    } else {
748
2
      ENVOY_CONN_LOG(debug, "raising immediate error", *this);
749
2
    }
750
4
    closeSocket(immediate_error_event_);
751
4
    return;
752
4
  }
753

            
754
1247241
  if (events & Event::FileReadyType::Closed) {
755
    // We never ask for both early close and read at the same time. If we are reading, we want to
756
    // consume all available data.
757
428
    ASSERT(!(events & Event::FileReadyType::Read));
758
428
    ENVOY_CONN_LOG(debug, "remote early close", *this);
759
    // If half-close is enabled, this is never activated.
760
    // If half-close is disabled, there are two scenarios where this applies:
761
    //    1. During the closeInternal(_) call.
762
    //    2. When an early close is detected while the connection is read-disabled.
763
    // Both situations allow the connection to bypass the filter manager's status since there will
764
    // be data loss even in normal cases.
765
428
    closeSocket(ConnectionEvent::RemoteClose);
766
428
    return;
767
428
  }
768

            
769
1246816
  if (events & Event::FileReadyType::Write) {
770
1246303
    onWriteReady();
771
1246303
  }
772

            
773
  // It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
774
  // In this case ignore read event processing.
775
1246814
  if (socket_->isOpen() && (events & Event::FileReadyType::Read)) {
776
574132
    onReadReady();
777
574132
  }
778
1246813
}
779

            
780
574130
void ConnectionImpl::onReadReady() {
781
574130
  ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this,
782
574130
                 static_cast<int>(dispatch_buffered_data_));
783
574130
  const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
784
574130
  dispatch_buffered_data_ = false;
785

            
786
574130
  ASSERT(!connecting_);
787

            
788
  // If it is closing through the filter manager, we either need to close the socket or go
789
  // through the close(), so we prevent further reading from the socket when we are waiting
790
  // for the connection close.
791
574130
  if (enable_close_through_filter_manager_ && filter_manager_.pendingClose()) {
792
10
    return;
793
10
  }
794

            
795
  // We get here while read disabled in two ways.
796
  // 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
797
  //    due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
798
  //    enabled.
799
  // 2) The consumer of connection data called readDisable(true), and instead of reading from the
800
  //    socket we simply need to dispatch already read data.
801
574120
  if (read_disable_count_ != 0) {
802
    // Do not clear transport_wants_read_ when returning early; the early return skips the transport
803
    // socket doRead call.
804
32
    if (latched_dispatch_buffered_data && filterChainWantsData()) {
805
4
      onRead(read_buffer_->length());
806
4
    }
807
32
    return;
808
32
  }
809

            
810
  // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
811
  // the transport socket read resumption happens as requested; onReadReady() returns early without
812
  // reading from the transport if the read buffer is above high watermark at the start of the
813
  // method.
814
574088
  transport_wants_read_ = false;
815
574088
  IoResult result = transport_socket_->doRead(*read_buffer_);
816
574088
  uint64_t new_buffer_size = read_buffer_->length();
817
574088
  updateReadBufferStats(result.bytes_processed_, new_buffer_size);
818

            
819
  // The socket is closed immediately when receiving RST.
820
574088
  if (result.err_code_.has_value() &&
821
574088
      result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
822
406
    ENVOY_CONN_LOG(trace, "read: rst close from peer", *this);
823
406
    setDetectedCloseType(StreamInfo::DetectedCloseType::RemoteReset);
824
406
    if (result.bytes_processed_ != 0) {
825
4
      onRead(new_buffer_size);
826
      // In some cases, the transport socket could read data along with an RST (Reset) flag.
827
      // We need to ensure this data is properly propagated to the terminal filter for proper
828
      // handling. For more details, see #29616 and #28817.
829
4
      closeThroughFilterManager(ConnectionCloseAction{ConnectionEvent::RemoteClose, true});
830
402
    } else {
831
      // Otherwise no data was read, and close the socket directly.
832
402
      closeSocket(Network::ConnectionEvent::RemoteClose);
833
402
    }
834
406
    return;
835
406
  }
836

            
837
  // If this connection doesn't have half-close semantics, translate end_stream into
838
  // a connection close.
839
573695
  if ((!enable_half_close_ && result.end_stream_read_)) {
840
46710
    result.end_stream_read_ = false;
841
46710
    result.action_ = PostIoAction::Close;
842
46710
  }
843

            
844
573682
  read_end_stream_ |= result.end_stream_read_;
845
573682
  if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
846
573682
      (latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
847
    // Skip onRead if no bytes were processed unless we explicitly want to force onRead for
848
    // buffered data. For instance, skip onRead if the connection was closed without producing
849
    // more data.
850
525243
    onRead(new_buffer_size);
851
525243
  }
852

            
853
  // The read callback may have already closed the connection.
854
573682
  if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
855
48394
    ENVOY_CONN_LOG(debug, "remote close", *this);
856
    // This is the typical case where a socket read triggers a connection close.
857
    // When half-close is disabled, the action_ will be set to close.
858
    // When half-close is enabled, once both directions of the connection are closed,
859
    // we need to ensure that the read data is properly propagated to the terminal filter.
860
48394
    closeThroughFilterManager(ConnectionCloseAction{ConnectionEvent::RemoteClose, true});
861
48394
  }
862
573682
}
863

            
864
absl::optional<Connection::UnixDomainSocketPeerCredentials>
865
3
ConnectionImpl::unixSocketPeerCredentials() const {
866
  // TODO(snowp): Support non-linux platforms.
867
#ifndef SO_PEERCRED
868
  return absl::nullopt;
869
#else
870
3
  struct ucred ucred;
871
3
  socklen_t ucred_size = sizeof(ucred);
872
3
  int rc = socket_->getSocketOption(SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_size).return_value_;
873
3
  if (SOCKET_FAILURE(rc)) {
874
    return absl::nullopt;
875
  }
876

            
877
3
  return {{ucred.pid, ucred.uid, ucred.gid}};
878
3
#endif
879
3
}
880

            
881
1279550
void ConnectionImpl::onWriteReady() {
882
1279550
  ENVOY_CONN_LOG(trace, "write ready", *this);
883

            
884
1279550
  if (connecting_) {
885
53492
    int error;
886
53492
    socklen_t error_size = sizeof(error);
887
53492
    RELEASE_ASSERT(
888
53492
        socket_->getSocketOption(SOL_SOCKET, SO_ERROR, &error, &error_size).return_value_ == 0, "");
889

            
890
53492
    if (error == 0) {
891
53267
      ENVOY_CONN_LOG_EVENT(debug, "connection_connected", "connected", *this);
892
53267
      connecting_ = false;
893
53267
      onConnected();
894
      // It's possible that we closed during the connect callback.
895
53267
      if (state() != State::Open) {
896
16
        ENVOY_CONN_LOG_EVENT(debug, "connection_closed_callback", "close during connected callback",
897
16
                             *this);
898
16
        return;
899
16
      }
900
53302
    } else {
901
225
      setFailureReason(absl::StrCat("delayed connect error: ", errorDetails(error)));
902
225
      ENVOY_CONN_LOG_EVENT(debug, "connection_error", "{}", *this, transportFailureReason());
903
225
      closeSocket(ConnectionEvent::RemoteClose);
904
225
      return;
905
225
    }
906
53492
  }
907

            
908
1279309
  IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
909
1279309
  ASSERT(!result.end_stream_read_); // The interface guarantees that only read operations set this.
910
1279309
  uint64_t new_buffer_size = write_buffer_->length();
911
1279309
  updateWriteBufferStats(result.bytes_processed_, new_buffer_size);
912

            
913
  // The socket is closed immediately when receiving RST.
914
1279309
  if (result.err_code_.has_value() &&
915
1279309
      result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
916
    // Discard anything in the buffer.
917
10
    ENVOY_CONN_LOG(debug, "write: rst close from peer.", *this);
918
10
    setDetectedCloseType(StreamInfo::DetectedCloseType::RemoteReset);
919
10
    closeSocket(ConnectionEvent::RemoteClose);
920
10
    return;
921
10
  }
922

            
923
  // NOTE: If the delayed_close_timer_ is set, it must only trigger after a delayed_close_timeout_
924
  // period of inactivity from the last write event. Therefore, the timer must be reset to its
925
  // original timeout value unless the socket is going to be closed as a result of the doWrite().
926

            
927
1279299
  if (result.action_ == PostIoAction::Close) {
928
    // It is possible (though unlikely) for the connection to have already been closed during the
929
    // write callback. This can happen if we manage to complete the SSL handshake in the write
930
    // callback, raise a connected event, and close the connection.
931
480
    closeSocket(ConnectionEvent::RemoteClose);
932
1278834
  } else if ((inDelayedClose() && new_buffer_size == 0) || bothSidesHalfClosed()) {
933
3394
    ENVOY_CONN_LOG(debug, "write flush complete", *this);
934
3394
    if (delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) {
935
1858
      ASSERT(delayed_close_timer_ != nullptr && delayed_close_timer_->enabled());
936
1858
      if (result.bytes_processed_ > 0) {
937
1857
        delayed_close_timer_->enableTimer(delayed_close_timeout_);
938
1857
      }
939
3329
    } else {
940
1536
      ASSERT(bothSidesHalfClosed() || delayed_close_state_ == DelayedCloseState::CloseAfterFlush);
941
1536
      ENVOY_CONN_LOG(trace, "both sides are half closed or it is final close after flush state",
942
1536
                     *this);
943
1536
      if (delayed_close_state_ == DelayedCloseState::CloseAfterFlush) {
944
        // close() is already managed by the filter manager and delayed.
945
        // This is the final close.
946
1442
        closeConnectionImmediately();
947
1491
      } else if (bothSidesHalfClosed()) {
948
        // If half_close is enabled, the close should still go through the filter manager, since
949
        // the end_stream from read side is possible pending in the filter chain.
950
94
        closeThroughFilterManager(ConnectionCloseAction{ConnectionEvent::LocalClose, true});
951
94
      }
952
1536
    }
953
1275425
  } else {
954
1275425
    ASSERT(result.action_ == PostIoAction::KeepOpen);
955
1275425
    ASSERT(!delayed_close_timer_ || delayed_close_timer_->enabled());
956
1275425
    if (delayed_close_timer_ != nullptr && result.bytes_processed_ > 0) {
957
2
      delayed_close_timer_->enableTimer(delayed_close_timeout_);
958
2
    }
959
1275425
    if (result.bytes_processed_ > 0) {
960
536317
      auto it = bytes_sent_callbacks_.begin();
961
701779
      while (it != bytes_sent_callbacks_.end()) {
962
165463
        if ((*it)(result.bytes_processed_)) {
963
          // move to the next callback.
964
165459
          it++;
965
165459
        } else {
966
          // remove the current callback.
967
4
          it = bytes_sent_callbacks_.erase(it);
968
4
        }
969

            
970
        // If a callback closes the socket, stop iterating.
971
165463
        if (!socket_->isOpen()) {
972
1
          return;
973
1
        }
974
165463
      }
975
536317
    }
976
1275425
  }
977
1279299
}
978

            
979
682201
void ConnectionImpl::updateReadBufferStats(uint64_t num_read, uint64_t new_size) {
980
682201
  if (!connection_stats_) {
981
440718
    return;
982
440718
  }
983

            
984
241483
  ConnectionImplUtility::updateBufferStats(num_read, new_size, last_read_buffer_size_,
985
241483
                                           connection_stats_->read_total_,
986
241483
                                           connection_stats_->read_current_);
987
241483
}
988

            
989
1386465
void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_size) {
990
1386465
  if (!connection_stats_) {
991
722368
    return;
992
722368
  }
993

            
994
664097
  ConnectionImplUtility::updateBufferStats(num_written, new_size, last_write_buffer_size_,
995
664097
                                           connection_stats_->write_total_,
996
664097
                                           connection_stats_->write_current_);
997
664097
}
998

            
999
1802120
bool ConnectionImpl::bothSidesHalfClosed() {
  // If the write_buffer_ is not empty, then the end_stream has not been sent to the transport
  // yet.
1802120
  return read_end_stream_ && write_end_stream_ && write_buffer_->length() == 0;
1802120
}
2
bool ConnectionImpl::setSocketOption(Network::SocketOptionName name, absl::Span<uint8_t> value) {
2
  Api::SysCallIntResult result =
2
      SocketOptionImpl::setSocketOption(*socket_, name, value.data(), value.size());
2
  if (result.return_value_ != 0) {
1
    return false;
1
  }
  // Only add a sockopt if it's added successfully.
1
  auto sockopt = std::make_shared<SocketOptionImpl>(
1
      name, absl::string_view(reinterpret_cast<const char*>(value.data()), value.size()));
1
  socket_->addOption(sockopt);
1
  return true;
2
}
168165
absl::string_view ConnectionImpl::transportFailureReason() const {
168165
  if (!failure_reason_.empty()) {
444
    return failure_reason_;
444
  }
167721
  return transport_socket_->failureReason();
168165
}
1
absl::optional<std::chrono::milliseconds> ConnectionImpl::lastRoundTripTime() const {
1
  return socket_->lastRoundTripTime();
1
}
void ConnectionImpl::configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec,
                                                      std::chrono::microseconds rtt) {
  return transport_socket_->configureInitialCongestionWindow(bandwidth_bits_per_sec, rtt);
}
2
absl::optional<uint64_t> ConnectionImpl::congestionWindowInBytes() const {
2
  return socket_->congestionWindowInBytes();
2
}
76510
void ConnectionImpl::flushWriteBuffer() {
76510
  if (state() == State::Open && write_buffer_->length() > 0) {
33260
    onWriteReady();
33260
  }
76510
}
1
void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const {
1
  const char* spaces = spacesForLevel(indent_level);
1
  os << spaces << "ConnectionImpl " << this << DUMP_MEMBER(connecting_) << DUMP_MEMBER(bind_error_)
1
     << DUMP_MEMBER(state()) << DUMP_MEMBER(read_buffer_limit_) << "\n";
1
  DUMP_DETAILS(socket_);
1
}
ServerConnectionImpl::ServerConnectionImpl(Event::Dispatcher& dispatcher,
                                           ConnectionSocketPtr&& socket,
                                           TransportSocketPtr&& transport_socket,
                                           StreamInfo::StreamInfo& stream_info)
54457
    : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info,
54457
                     true) {}
void ServerConnectionImpl::setTransportSocketConnectTimeout(std::chrono::milliseconds timeout,
10
                                                            Stats::Counter& timeout_stat) {
10
  if (!transport_connect_pending_) {
1
    return;
1
  }
9
  transport_socket_timeout_stat_ = &timeout_stat;
9
  if (transport_socket_connect_timer_ == nullptr) {
9
    transport_socket_connect_timer_ =
9
        dispatcher_.createScaledTimer(Event::ScaledTimerType::TransportSocketConnectTimeout,
9
                                      [this] { onTransportSocketConnectTimeout(); });
9
  }
9
  transport_socket_connect_timer_->enableTimer(timeout);
9
}
23384
void ServerConnectionImpl::raiseEvent(ConnectionEvent event) {
23384
  switch (event) {
  case ConnectionEvent::ConnectedZeroRtt:
    // The transport socket is still connecting, so skip changing connect state.
    break;
23384
  case ConnectionEvent::Connected:
23384
  case ConnectionEvent::RemoteClose:
23384
  case ConnectionEvent::LocalClose:
23384
    transport_connect_pending_ = false;
23384
    transport_socket_connect_timer_.reset();
23384
  }
23384
  ConnectionImpl::raiseEvent(event);
23384
}
22529
bool ServerConnectionImpl::initializeReadFilters() {
22529
  bool initialized = ConnectionImpl::initializeReadFilters();
22529
  if (initialized) {
    // Server connection starts as connected, and we must explicitly signal to
    // the downstream transport socket that the underlying socket is connected.
    // We delay this step until after the filters are initialized and can
    // receive the connection events.
22529
    onConnected();
22529
  }
22529
  return initialized;
22529
}
8
void ServerConnectionImpl::onTransportSocketConnectTimeout() {
8
  stream_info_.setConnectionTerminationDetails(kTransportSocketConnectTimeoutTerminationDetails);
8
  closeConnectionImmediatelyWithDetails(
8
      StreamInfo::LocalCloseReasons::get().TransportSocketTimeout);
8
  transport_socket_timeout_stat_->inc();
8
  setFailureReason("connect timeout");
8
}
ClientConnectionImpl::ClientConnectionImpl(
    Event::Dispatcher& dispatcher, const Address::InstanceConstSharedPtr& remote_address,
    const Network::Address::InstanceConstSharedPtr& source_address,
    Network::TransportSocketPtr&& transport_socket,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    const Network::TransportSocketOptionsConstSharedPtr& transport_options)
53638
    : ClientConnectionImpl(dispatcher, std::make_unique<ClientSocketImpl>(remote_address, options),
53638
                           source_address, std::move(transport_socket), options,
53638
                           transport_options) {}
ClientConnectionImpl::ClientConnectionImpl(
    Event::Dispatcher& dispatcher, std::unique_ptr<ConnectionSocket> socket,
    const Address::InstanceConstSharedPtr& source_address,
    Network::TransportSocketPtr&& transport_socket,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    const Network::TransportSocketOptionsConstSharedPtr& transport_options)
53663
    : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info_,
53663
                     false),
53663
      stream_info_(dispatcher_.timeSource(), socket_->connectionInfoProviderSharedPtr(),
53663
                   StreamInfo::FilterState::LifeSpan::Connection) {
53663
  if (!socket_->isOpen()) {
9
    setFailureReason("socket creation failure");
    // Set up the dispatcher to "close" the connection on the next loop after
    // the owner has a chance to add callbacks.
9
    dispatcher_.post([this]() { raiseEvent(ConnectionEvent::LocalClose); });
9
    return;
9
  }
53654
  stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
53654
  if (transport_options) {
29737
    for (const auto& object : transport_options->downstreamSharedFilterStateObjects()) {
      // This does not throw as all objects are distinctly named and the stream info is empty.
37
      stream_info_.filterState()->setData(object.name_, object.data_, object.state_type_,
37
                                          StreamInfo::FilterState::LifeSpan::Connection,
37
                                          object.stream_sharing_);
37
    }
29730
  }
  // There are no meaningful socket options or source address semantics for
  // non-IP sockets, so skip.
53654
  if (socket_->connectionInfoProviderSharedPtr()->remoteAddress()->ip() == nullptr) {
56
    return;
56
  }
53598
  if (!Network::Socket::applyOptions(options, *socket_,
53598
                                     envoy::config::core::v3::SocketOption::STATE_PREBIND)) {
    // Set a special error state to ensure asynchronous close to give the owner of the
    // ConnectionImpl a chance to add callbacks and detect the "disconnect".
1
    immediate_error_event_ = ConnectionEvent::LocalClose;
    // Trigger a write event to close this connection out-of-band.
1
    ioHandle().activateFileEvents(Event::FileReadyType::Write);
1
    return;
1
  }
53597
  const Network::Address::InstanceConstSharedPtr* source = &source_address;
53597
  if (socket_->connectionInfoProvider().localAddress()) {
1
    source = &socket_->connectionInfoProvider().localAddress();
1
  }
53597
  if (*source != nullptr) {
12
    Api::SysCallIntResult result = socket_->bind(*source);
12
    if (result.return_value_ < 0) {
2
      setFailureReason(absl::StrCat("failed to bind to ", source->get()->asString(), ": ",
2
                                    errorDetails(result.errno_)));
2
      ENVOY_LOG_MISC(debug, failureReason());
2
      bind_error_ = true;
      // Set a special error state to ensure asynchronous close to give the owner of the
      // ConnectionImpl a chance to add callbacks and detect the "disconnect".
2
      immediate_error_event_ = ConnectionEvent::LocalClose;
      // Trigger a write event to close this connection out-of-band.
2
      ioHandle().activateFileEvents(Event::FileReadyType::Write);
2
    }
12
  }
53597
}
53663
ClientConnectionImpl::~ClientConnectionImpl() {
  // Ensure that connection is closed and the access log is written before the StreamInfo is
  // destroyed. We need to write the access log here because the StreamInfo is owned by this class,
  // and will be destroyed before the base class destructor runs.
53663
  close(ConnectionCloseType::NoFlush);
53663
  ensureAccessLogWritten();
53663
}
53634
void ClientConnectionImpl::connect() {
53634
  ENVOY_CONN_LOG_EVENT(debug, "client_connection", "connecting to {}", *this,
53634
                       socket_->connectionInfoProvider().remoteAddress()->asString());
53634
  const Api::SysCallIntResult result = transport_socket_->connect(*socket_);
53634
  stream_info_.upstreamInfo()->upstreamTiming().onUpstreamConnectStart(dispatcher_.timeSource());
53634
  if (result.return_value_ == 0) {
    // write will become ready.
55
    ASSERT(connecting_);
55
    return;
55
  }
53579
  ASSERT(SOCKET_FAILURE(result.return_value_));
#ifdef WIN32
  // winsock2 connect returns EWOULDBLOCK if the socket is non-blocking and the connection
  // cannot be completed immediately. We do not check for `EINPROGRESS` as that error is for
  // blocking operations.
  if (result.errno_ == SOCKET_ERROR_AGAIN) {
#else
53579
  if (result.errno_ == SOCKET_ERROR_IN_PROGRESS) {
53573
#endif
53573
    ASSERT(connecting_);
53573
    ENVOY_CONN_LOG_EVENT(debug, "connection_in_progress", "connection in progress", *this);
53577
  } else {
6
    immediate_error_event_ = ConnectionEvent::RemoteClose;
6
    connecting_ = false;
6
    setFailureReason(absl::StrCat(
6
        "immediate connect error: ", errorDetails(result.errno_),
6
        "|remote address:", socket_->connectionInfoProvider().remoteAddress()->asString()));
6
    ENVOY_CONN_LOG_EVENT(debug, "connection_immediate_error", "{}", *this, failureReason());
    // Trigger a write event. This is needed on macOS and seems harmless on Linux.
6
    ioHandle().activateFileEvents(Event::FileReadyType::Write);
6
  }
53579
}
53267
void ClientConnectionImpl::onConnected() {
53267
  stream_info_.upstreamInfo()->upstreamTiming().onUpstreamConnectComplete(dispatcher_.timeSource());
  // There are no meaningful socket source address semantics for non-IP sockets, so skip.
53267
  if (socket_->connectionInfoProviderSharedPtr()->remoteAddress()->ip()) {
53219
    socket_->connectionInfoProvider().maybeSetInterfaceName(ioHandle());
53219
    const auto maybe_interface_name = socket_->connectionInfoProvider().interfaceName();
53219
    if (maybe_interface_name.has_value()) {
9
      ENVOY_CONN_LOG_EVENT(debug, "conn_interface", "connected on local interface '{}'", *this,
9
                           maybe_interface_name.value());
9
    }
53219
  }
53267
  ConnectionImpl::onConnected();
53267
}
} // namespace Network
} // namespace Envoy