LCOV - code coverage report
Current view: top level - source/common/network - connection_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 408 636 64.2 %
Date: 2024-01-05 06:35:25 Functions: 37 67 55.2 %

          Line data    Source code
       1             : #include "source/common/network/connection_impl.h"
       2             : 
       3             : #include <atomic>
       4             : #include <cstdint>
       5             : #include <memory>
       6             : 
       7             : #include "envoy/common/exception.h"
       8             : #include "envoy/common/platform.h"
       9             : #include "envoy/config/core/v3/base.pb.h"
      10             : #include "envoy/event/scaled_range_timer_manager.h"
      11             : #include "envoy/event/timer.h"
      12             : #include "envoy/network/filter.h"
      13             : #include "envoy/network/socket.h"
      14             : 
      15             : #include "source/common/common/assert.h"
      16             : #include "source/common/common/dump_state_utils.h"
      17             : #include "source/common/common/empty_string.h"
      18             : #include "source/common/common/enum_to_int.h"
      19             : #include "source/common/common/scope_tracker.h"
      20             : #include "source/common/network/address_impl.h"
      21             : #include "source/common/network/connection_socket_impl.h"
      22             : #include "source/common/network/raw_buffer_socket.h"
      23             : #include "source/common/network/socket_option_factory.h"
      24             : #include "source/common/network/socket_option_impl.h"
      25             : #include "source/common/network/utility.h"
      26             : #include "source/common/runtime/runtime_features.h"
      27             : 
      28             : namespace Envoy {
      29             : namespace Network {
      30             : namespace {
      31             : 
      32             : constexpr absl::string_view kTransportSocketConnectTimeoutTerminationDetails =
      33             :     "transport socket timeout was reached";
      34             : 
      35           0 : std::ostream& operator<<(std::ostream& os, Connection::State connection_state) {
      36           0 :   switch (connection_state) {
      37           0 :   case Connection::State::Open:
      38           0 :     return os << "Open";
      39           0 :   case Connection::State::Closing:
      40           0 :     return os << "Closing";
      41           0 :   case Connection::State::Closed:
      42           0 :     return os << "Closed";
      43           0 :   }
      44           0 :   return os;
      45           0 : }
      46             : 
      47             : } // namespace
      48             : 
      49             : void ConnectionImplUtility::updateBufferStats(uint64_t delta, uint64_t new_total,
      50             :                                               uint64_t& previous_total, Stats::Counter& stat_total,
      51        6789 :                                               Stats::Gauge& stat_current) {
      52        6789 :   if (delta) {
      53        2312 :     stat_total.add(delta);
      54        2312 :   }
      55             : 
      56        6789 :   if (new_total != previous_total) {
      57        2242 :     if (new_total > previous_total) {
      58        1093 :       stat_current.add(new_total - previous_total);
      59        1167 :     } else {
      60        1149 :       stat_current.sub(previous_total - new_total);
      61        1149 :     }
      62             : 
      63        2242 :     previous_total = new_total;
      64        2242 :   }
      65        6789 : }
      66             : 
      67             : std::atomic<uint64_t> ConnectionImpl::next_global_id_;
      68             : 
      69             : ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
      70             :                                TransportSocketPtr&& transport_socket,
      71             :                                StreamInfo::StreamInfo& stream_info, bool connected)
      72             :     : ConnectionImplBase(dispatcher, next_global_id_++),
      73             :       transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
      74             :       stream_info_(stream_info), filter_manager_(*this, *socket_),
      75             :       write_buffer_(dispatcher.getWatermarkFactory().createBuffer(
      76           0 :           [this]() -> void { this->onWriteBufferLowWatermark(); },
      77           0 :           [this]() -> void { this->onWriteBufferHighWatermark(); },
      78           0 :           []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
      79             :       read_buffer_(dispatcher.getWatermarkFactory().createBuffer(
      80           0 :           [this]() -> void { this->onReadBufferLowWatermark(); },
      81           0 :           [this]() -> void { this->onReadBufferHighWatermark(); },
      82           0 :           []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
      83             :       write_buffer_above_high_watermark_(false), detect_early_close_(true),
      84             :       enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
      85             :       write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
      86        2370 :       transport_wants_read_(false) {
      87             : 
      88             :   // Keep it as a bool flag to reduce the times calling runtime method..
      89        2370 :   enable_rst_detect_send_ = Runtime::runtimeFeatureEnabled(
      90        2370 :       "envoy.reloadable_features.detect_and_raise_rst_tcp_connection");
      91             : 
      92        2370 :   if (!connected) {
      93        1328 :     connecting_ = true;
      94        1328 :   }
      95             : 
      96        2370 :   Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;
      97             : 
      98             :   // We never ask for both early close and read at the same time. If we are reading, we want to
      99             :   // consume all available data.
     100        2370 :   socket_->ioHandle().initializeFileEvent(
     101        6950 :       dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
     102        2370 :       Event::FileReadyType::Read | Event::FileReadyType::Write);
     103             : 
     104        2370 :   transport_socket_->setTransportSocketCallbacks(*this);
     105             : 
     106             :   // TODO(soulxu): generate the connection id inside the addressProvider directly,
     107             :   // then we don't need a setter or any of the optional stuff.
     108        2370 :   socket_->connectionInfoProvider().setConnectionID(id());
     109        2370 :   socket_->connectionInfoProvider().setSslConnection(transport_socket_->ssl());
     110        2370 : }
     111             : 
     112        2370 : ConnectionImpl::~ConnectionImpl() {
     113        2370 :   ASSERT(!ioHandle().isOpen() && delayed_close_timer_ == nullptr,
     114        2370 :          "ConnectionImpl was unexpectedly torn down without being closed.");
     115             : 
     116             :   // In general we assume that owning code has called close() previously to the destructor being
     117             :   // run. This generally must be done so that callbacks run in the correct context (vs. deferred
     118             :   // deletion). Hence the assert above. However, call close() here just to be completely sure that
     119             :   // the fd is closed and make it more likely that we crash from a bad close callback.
     120        2370 :   close(ConnectionCloseType::NoFlush);
     121        2370 : }
     122             : 
     123           0 : void ConnectionImpl::addWriteFilter(WriteFilterSharedPtr filter) {
     124           0 :   filter_manager_.addWriteFilter(filter);
     125           0 : }
     126             : 
     127           0 : void ConnectionImpl::addFilter(FilterSharedPtr filter) { filter_manager_.addFilter(filter); }
     128             : 
     129        2097 : void ConnectionImpl::addReadFilter(ReadFilterSharedPtr filter) {
     130        2097 :   filter_manager_.addReadFilter(filter);
     131        2097 : }
     132             : 
     133           0 : void ConnectionImpl::removeReadFilter(ReadFilterSharedPtr filter) {
     134           0 :   filter_manager_.removeReadFilter(filter);
     135           0 : }
     136             : 
     137         949 : bool ConnectionImpl::initializeReadFilters() { return filter_manager_.initializeReadFilters(); }
     138             : 
     139        4366 : void ConnectionImpl::close(ConnectionCloseType type) {
     140        4366 :   if (!ioHandle().isOpen()) {
     141        2450 :     return;
     142        2450 :   }
     143             : 
     144        1916 :   uint64_t data_to_write = write_buffer_->length();
     145        1916 :   ENVOY_CONN_LOG_EVENT(debug, "connection_closing", "closing data_to_write={} type={}", *this,
     146        1916 :                        data_to_write, enumToInt(type));
     147             : 
     148             :   // RST will be sent only if enable_rst_detect_send_ is true, otherwise it is converted to normal
     149             :   // ConnectionCloseType::Abort.
     150        1916 :   if (!enable_rst_detect_send_ && type == ConnectionCloseType::AbortReset) {
     151           0 :     type = ConnectionCloseType::Abort;
     152           0 :   }
     153             : 
     154             :   // The connection is closed by Envoy by sending RST, and the connection is closed immediately.
     155        1916 :   if (type == ConnectionCloseType::AbortReset) {
     156           0 :     ENVOY_CONN_LOG(
     157           0 :         trace, "connection closing type=AbortReset, setting LocalReset to the detected close type.",
     158           0 :         *this);
     159           0 :     setDetectedCloseType(DetectedCloseType::LocalReset);
     160           0 :     closeSocket(ConnectionEvent::LocalClose);
     161           0 :     return;
     162           0 :   }
     163             : 
     164        1916 :   const bool delayed_close_timeout_set = delayed_close_timeout_.count() > 0;
     165        1916 :   if (data_to_write == 0 || type == ConnectionCloseType::NoFlush ||
     166        1916 :       type == ConnectionCloseType::Abort || !transport_socket_->canFlushClose()) {
     167        1419 :     if (data_to_write > 0 && type != ConnectionCloseType::Abort) {
     168             :       // We aren't going to wait to flush, but try to write as much as we can if there is pending
     169             :       // data.
     170         273 :       transport_socket_->doWrite(*write_buffer_, true);
     171         273 :     }
     172             : 
     173        1419 :     if (type != ConnectionCloseType::FlushWriteAndDelay || !delayed_close_timeout_set) {
     174        1370 :       closeConnectionImmediately();
     175        1370 :       return;
     176        1370 :     }
     177             :     // The socket is being closed and either there is no more data to write or the data can not be
     178             :     // flushed (!transport_socket_->canFlushClose()). Since a delayed close has been requested,
     179             :     // start the delayed close timer if it hasn't been done already by a previous close().
     180             :     // NOTE: Even though the delayed_close_state_ is being set to CloseAfterFlushAndWait, since
     181             :     // a write event is not being registered for the socket, this logic is simply setting the
     182             :     // timer and waiting for it to trigger to close the socket.
     183          49 :     if (!inDelayedClose()) {
     184          49 :       initializeDelayedCloseTimer();
     185          49 :       delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
     186             :       // Monitor for the peer closing the connection.
     187          49 :       ioHandle().enableFileEvents(enable_half_close_ ? 0 : Event::FileReadyType::Closed);
     188          49 :     }
     189          49 :     return;
     190        1419 :   }
     191             : 
     192         497 :   ASSERT(type == ConnectionCloseType::FlushWrite ||
     193         497 :          type == ConnectionCloseType::FlushWriteAndDelay);
     194             : 
     195             :   // If there is a pending delayed close, simply update the delayed close state.
     196             :   //
     197             :   // An example of this condition manifests when a downstream connection is closed early by Envoy,
     198             :   // such as when a route can't be matched:
     199             :   //   In ConnectionManagerImpl::onData()
     200             :   //     1) Via codec_->dispatch(), a local reply with a 404 is sent to the client
     201             :   //       a) ConnectionManagerImpl::doEndStream() issues the first connection close() via
     202             :   //          ConnectionManagerImpl::checkForDeferredClose()
     203             :   //     2) A second close is issued by a subsequent call to
     204             :   //        ConnectionManagerImpl::checkForDeferredClose() prior to returning from onData()
     205         497 :   if (inDelayedClose()) {
     206             :     // Validate that a delayed close timer is already enabled unless it was disabled via
     207             :     // configuration.
     208         201 :     ASSERT(!delayed_close_timeout_set || delayed_close_timer_ != nullptr);
     209         201 :     if (type == ConnectionCloseType::FlushWrite || !delayed_close_timeout_set) {
     210           0 :       delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
     211         201 :     } else {
     212         201 :       delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
     213         201 :     }
     214         201 :     return;
     215         201 :   }
     216             : 
     217             :   // NOTE: At this point, it's already been validated that the connection is not already in
     218             :   // delayed close processing and therefore the timer has not yet been created.
     219         297 :   if (delayed_close_timeout_set) {
     220         297 :     initializeDelayedCloseTimer();
     221         297 :     delayed_close_state_ = (type == ConnectionCloseType::FlushWrite)
     222         297 :                                ? DelayedCloseState::CloseAfterFlush
     223         297 :                                : DelayedCloseState::CloseAfterFlushAndWait;
     224         274 :   } else {
     225           0 :     delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
     226           0 :   }
     227             : 
     228         296 :   ioHandle().enableFileEvents(Event::FileReadyType::Write |
     229         296 :                               (enable_half_close_ ? 0 : Event::FileReadyType::Closed));
     230         296 : }
     231             : 
     232       26238 : Connection::State ConnectionImpl::state() const {
     233       26238 :   if (!ioHandle().isOpen()) {
     234          72 :     return State::Closed;
     235       26166 :   } else if (inDelayedClose()) {
     236         240 :     return State::Closing;
     237       25926 :   } else {
     238       25926 :     return State::Open;
     239       25926 :   }
     240       26238 : }
     241             : 
     242        1398 : void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }
     243             : 
     244           0 : void ConnectionImpl::setTransportSocketIsReadable() {
     245           0 :   ASSERT(dispatcher_.isThreadSafe());
     246             :   // Remember that the transport requested read resumption, in case the resumption event is not
     247             :   // scheduled immediately or is "lost" because read was disabled.
     248           0 :   transport_wants_read_ = true;
     249             :   // Only schedule a read activation if the connection is not read disabled to avoid spurious
     250             :   // wakeups. When read disabled, the connection will not read from the transport, and limit
     251             :   // dispatch to the current contents of the read buffer if its high-watermark is triggered and
     252             :   // dispatch_buffered_data_ is set.
     253           0 :   if (read_disable_count_ == 0) {
     254           0 :     ioHandle().activateFileEvents(Event::FileReadyType::Read);
     255           0 :   }
     256           0 : }
     257             : 
     258        2115 : bool ConnectionImpl::filterChainWantsData() {
     259        2115 :   return read_disable_count_ == 0 ||
     260        2115 :          (read_disable_count_ == 1 && read_buffer_->highWatermarkTriggered());
     261        2115 : }
     262             : 
     263          55 : void ConnectionImpl::setDetectedCloseType(DetectedCloseType close_type) {
     264          55 :   detected_close_type_ = close_type;
     265          55 : }
     266             : 
     267        2413 : void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
     268        2413 :   if (!ConnectionImpl::ioHandle().isOpen()) {
     269          43 :     return;
     270          43 :   }
     271             : 
     272             :   // No need for a delayed close (if pending) now that the socket is being closed.
     273        2370 :   if (delayed_close_timer_) {
     274         318 :     delayed_close_timer_->disableTimer();
     275         318 :     delayed_close_timer_ = nullptr;
     276         318 :   }
     277             : 
     278        2370 :   ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
     279        2370 :   transport_socket_->closeSocket(close_type);
     280             : 
     281             :   // Drain input and output buffers.
     282        2370 :   updateReadBufferStats(0, 0);
     283        2370 :   updateWriteBufferStats(0, 0);
     284             : 
     285             :   // As the socket closes, drain any remaining data.
     286             :   // The data won't be written out at this point, and where there are reference
     287             :   // counted buffer fragments, it helps avoid lifetime issues with the
     288             :   // connection outlasting the subscriber.
     289        2370 :   write_buffer_->drain(write_buffer_->length());
     290             : 
     291        2370 :   connection_stats_.reset();
     292             : 
     293        2370 :   if (enable_rst_detect_send_ && (detected_close_type_ == DetectedCloseType::RemoteReset ||
     294        2370 :                                   detected_close_type_ == DetectedCloseType::LocalReset)) {
     295          54 :     const bool ok = Network::Socket::applyOptions(
     296          54 :         Network::SocketOptionFactory::buildZeroSoLingerOptions(), *socket_,
     297          54 :         envoy::config::core::v3::SocketOption::STATE_LISTENING);
     298          54 :     if (!ok) {
     299           0 :       ENVOY_LOG_EVERY_POW_2(error, "rst setting so_linger=0 failed on connection {}", id());
     300           0 :     }
     301          54 :   }
     302             : 
     303             :   // It is safe to call close() since there is an IO handle check.
     304        2370 :   socket_->close();
     305             : 
     306             :   // Call the base class directly as close() is called in the destructor.
     307        2370 :   ConnectionImpl::raiseEvent(close_type);
     308        2370 : }
     309             : 
     310        1954 : void ConnectionImpl::onConnected() {
     311        1954 :   ASSERT(!connecting_);
     312        1954 :   transport_socket_->onConnected();
     313        1954 : }
     314             : 
     315        1353 : void ConnectionImpl::noDelay(bool enable) {
     316             :   // There are cases where a connection to localhost can immediately fail (e.g., if the other end
     317             :   // does not have enough fds, reaches a backlog limit, etc.). Because we run with deferred error
     318             :   // events, the calling code may not yet know that the connection has failed. This is one call
     319             :   // where we go outside of libevent and hit the fd directly and this case can fail if the fd is
     320             :   // invalid. For this call instead of plumbing through logic that will immediately indicate that a
     321             :   // connect failed, we will just ignore the noDelay() call if the socket is invalid since error is
     322             :   // going to be raised shortly anyway and it makes the calling code simpler.
     323        1353 :   if (!ioHandle().isOpen()) {
     324           0 :     return;
     325           0 :   }
     326             : 
     327             :   // Don't set NODELAY for unix domain sockets or internal socket.
     328        1353 :   if (socket_->addressType() != Address::Type::Ip) {
     329           0 :     return;
     330           0 :   }
     331             : 
     332             :   // Set NODELAY
     333        1353 :   int new_value = enable;
     334        1353 :   Api::SysCallIntResult result =
     335        1353 :       socket_->setSocketOption(IPPROTO_TCP, TCP_NODELAY, &new_value, sizeof(new_value));
     336             : #if defined(__APPLE__)
     337             :   if (SOCKET_FAILURE(result.return_value_) && result.errno_ == SOCKET_ERROR_INVAL) {
     338             :     // Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
     339             :     // enabled despite this result.
     340             :     return;
     341             :   }
     342             : #elif defined(WIN32)
     343             :   if (SOCKET_FAILURE(result.return_value_) &&
     344             :       (result.errno_ == SOCKET_ERROR_AGAIN || result.errno_ == SOCKET_ERROR_INVAL)) {
     345             :     // Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
     346             :     // enabled despite this result.
     347             :     return;
     348             :   }
     349             : #endif
     350             : 
     351        1353 :   RELEASE_ASSERT(result.return_value_ == 0,
     352        1353 :                  fmt::format("Failed to set TCP_NODELAY with error {}, {}", result.errno_,
     353        1353 :                              errorDetails(result.errno_)));
     354        1353 : }
     355             : 
     356        1955 : void ConnectionImpl::onRead(uint64_t read_buffer_size) {
     357        1955 :   ASSERT(dispatcher_.isThreadSafe());
     358        1955 :   if (inDelayedClose() || !filterChainWantsData()) {
     359           0 :     return;
     360           0 :   }
     361        1955 :   ASSERT(ioHandle().isOpen());
     362             : 
     363        1955 :   if (read_buffer_size == 0 && !read_end_stream_) {
     364           0 :     return;
     365           0 :   }
     366             : 
     367        1955 :   if (read_end_stream_) {
     368             :     // read() on a raw socket will repeatedly return 0 (EOF) once EOF has
     369             :     // occurred, so filter out the repeats so that filters don't have
     370             :     // to handle repeats.
     371             :     //
     372             :     // I don't know of any cases where this actually happens (we should stop
     373             :     // reading the socket after EOF), but this check guards against any bugs
     374             :     // in ConnectionImpl or strangeness in the OS events (epoll, kqueue, etc)
     375             :     // and maintains the guarantee for filters.
     376           0 :     if (read_end_stream_raised_) {
     377             :       // No further data can be delivered after end_stream
     378           0 :       ASSERT(read_buffer_size == 0);
     379           0 :       return;
     380           0 :     }
     381           0 :     read_end_stream_raised_ = true;
     382           0 :   }
     383             : 
     384        1955 :   filter_manager_.onRead();
     385        1955 : }
     386             : 
     387         911 : void ConnectionImpl::enableHalfClose(bool enabled) {
     388             :   // This code doesn't correctly ensure that EV_CLOSE isn't set if reading is disabled
     389             :   // when enabling half-close. This could be fixed, but isn't needed right now, so just
     390             :   // ASSERT that it doesn't happen.
     391         911 :   ASSERT(!enabled || read_disable_count_ == 0);
     392             : 
     393         911 :   enable_half_close_ = enabled;
     394         911 : }
     395             : 
     396         324 : Connection::ReadDisableStatus ConnectionImpl::readDisable(bool disable) {
     397             :   // Calls to readEnabled on a closed socket are considered to be an error.
     398         324 :   ASSERT(state() == State::Open);
     399             : 
     400         324 :   ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this,
     401         324 :                  disable, read_disable_count_, static_cast<int>(state()), read_buffer_->length());
     402             : 
     403             :   // When we disable reads, we still allow for early close notifications (the equivalent of
     404             :   // `EPOLLRDHUP` for an epoll backend). For backends that support it, this allows us to apply
     405             :   // back pressure at the kernel layer, but still get timely notification of a FIN. Note that
     406             :   // we are not guaranteed to get notified, so even if the remote has closed, we may not know
     407             :   // until we try to write. Further note that currently we optionally don't correctly handle half
     408             :   // closed TCP connections in the sense that we assume that a remote FIN means the remote intends a
     409             :   // full close.
     410         324 :   if (disable) {
     411         164 :     ++read_disable_count_;
     412             : 
     413         164 :     if (state() != State::Open) {
     414             :       // If readDisable is called on a closed connection, do not crash.
     415           0 :       return ReadDisableStatus::NoTransition;
     416           0 :     }
     417             : 
     418         164 :     if (read_disable_count_ > 1) {
     419             :       // The socket has already been read disabled.
     420           0 :       return ReadDisableStatus::StillReadDisabled;
     421           0 :     }
     422             : 
     423             :     // If half-close semantics are enabled, we never want early close notifications; we
     424             :     // always want to read all available data, even if the other side has closed.
     425         164 :     if (detect_early_close_ && !enable_half_close_) {
     426           9 :       ioHandle().enableFileEvents(Event::FileReadyType::Write | Event::FileReadyType::Closed);
     427         155 :     } else {
     428         155 :       ioHandle().enableFileEvents(Event::FileReadyType::Write);
     429         155 :     }
     430             : 
     431         164 :     return ReadDisableStatus::TransitionedToReadDisabled;
     432         164 :   } else {
     433         160 :     ASSERT(read_disable_count_ != 0);
     434         160 :     --read_disable_count_;
     435         160 :     if (state() != State::Open) {
     436             :       // If readDisable is called on a closed connection, do not crash.
     437           0 :       return ReadDisableStatus::NoTransition;
     438           0 :     }
     439             : 
     440         160 :     auto read_disable_status = ReadDisableStatus::StillReadDisabled;
     441         160 :     if (read_disable_count_ == 0) {
     442             :       // We never ask for both early close and read at the same time. If we are reading, we want to
     443             :       // consume all available data.
     444         160 :       ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
     445         160 :       read_disable_status = ReadDisableStatus::TransitionedToReadEnabled;
     446         160 :     }
     447             : 
     448         160 :     if (filterChainWantsData() && (read_buffer_->length() > 0 || transport_wants_read_)) {
     449             :       // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
     450             :       // buffer's high watermark has triggered.
     451           5 :       ASSERT(read_buffer_->length() > 0 || read_disable_count_ == 0);
     452             : 
     453             :       // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
     454             :       // able to process additional bytes even if there is no data in the kernel to kick off the
     455             :       // filter chain. Alternately the connection may need read resumption while read disabled and
     456             :       // not registered for read events because the read buffer's high-watermark has triggered. To
     457             :       // handle these cases, directly schedule a fake read event to make sure the buffered data in
     458             :       // the read buffer or in transport socket internal buffers gets processed regardless and
     459             :       // ensure that we dispatch it via onRead.
     460           5 :       dispatch_buffered_data_ = true;
     461           5 :       ioHandle().activateFileEvents(Event::FileReadyType::Read);
     462           5 :     }
     463             : 
     464         160 :     return read_disable_status;
     465         160 :   }
     466         324 : }
     467             : 
     468        4324 : void ConnectionImpl::raiseEvent(ConnectionEvent event) {
     469        4324 :   ENVOY_CONN_LOG(trace, "raising connection event {}", *this, static_cast<int>(event));
     470        4324 :   ConnectionImplBase::raiseConnectionEvent(event);
     471             :   // We may have pending data in the write buffer on transport handshake
     472             :   // completion, which may also have completed in the context of onReadReady(),
     473             :   // where no check of the write buffer is made. Provide an opportunity to flush
     474             :   // here. If connection write is not ready, this is harmless. We should only do
     475             :   // this if we're still open (the above callbacks may have closed).
     476        4324 :   if (event == ConnectionEvent::Connected) {
     477        1954 :     flushWriteBuffer();
     478        1954 :   }
     479        4324 : }
     480             : 
     481           0 : bool ConnectionImpl::readEnabled() const {
     482             :   // Calls to readEnabled on a closed socket are considered to be an error.
     483           0 :   ASSERT(state() == State::Open);
     484           0 :   ASSERT(dispatcher_.isThreadSafe());
     485           0 :   return read_disable_count_ == 0;
     486           0 : }
     487             : 
     488           0 : void ConnectionImpl::addBytesSentCallback(BytesSentCb cb) {
     489           0 :   bytes_sent_callbacks_.emplace_back(cb);
     490           0 : }
     491             : 
     492           0 : void ConnectionImpl::rawWrite(Buffer::Instance& data, bool end_stream) {
     493           0 :   write(data, end_stream, false);
     494           0 : }
     495             : 
     496        7910 : void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {
     497        7910 :   write(data, end_stream, true);
     498        7910 : }
     499             : 
     500        7910 : void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
     501        7910 :   ASSERT(!end_stream || enable_half_close_);
     502        7910 :   ASSERT(dispatcher_.isThreadSafe());
     503             : 
     504        7910 :   if (write_end_stream_) {
     505             :     // It is an API violation to write more data after writing end_stream, but a duplicate
     506             :     // end_stream with no data is harmless. This catches misuse of the API that could result in data
     507             :     // being lost.
     508           0 :     ASSERT(data.length() == 0 && end_stream);
     509             : 
     510           0 :     return;
     511           0 :   }
     512             : 
     513        7910 :   if (through_filter_chain) {
     514             :     // NOTE: This is kind of a hack, but currently we don't support restart/continue on the write
     515             :     //       path, so we just pass around the buffer passed to us in this function. If we ever
     516             :     //       support buffer/restart/continue on the write path this needs to get more complicated.
     517        7910 :     current_write_buffer_ = &data;
     518        7910 :     current_write_end_stream_ = end_stream;
     519        7910 :     FilterStatus status = filter_manager_.onWrite();
     520        7910 :     current_write_buffer_ = nullptr;
     521             : 
     522        7910 :     if (FilterStatus::StopIteration == status) {
     523           0 :       return;
     524           0 :     }
     525        7910 :   }
     526             : 
     527        7910 :   write_end_stream_ = end_stream;
     528        7910 :   if (data.length() > 0 || end_stream) {
     529        7421 :     ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
     530             :     // TODO(mattklein123): All data currently gets moved from the source buffer to the write buffer.
     531             :     // This can lead to inefficient behavior if writing a bunch of small chunks. In this case, it
     532             :     // would likely be more efficient to copy data below a certain size. VERY IMPORTANT: If this is
     533             :     // ever changed, read the comment in SslSocket::doWrite() VERY carefully. That code assumes that
     534             :     // we never change existing write_buffer_ chain elements between calls to SSL_write(). That code
     535             :     // might need to change if we ever copy here.
     536        7421 :     write_buffer_->move(data);
     537             : 
     538             :     // Activating a write event before the socket is connected has the side-effect of tricking
     539             :     // doWriteReady into thinking the socket is connected. On macOS, the underlying write may fail
     540             :     // with a connection error if a call to write(2) occurs before the connection is completed.
     541        7421 :     if (!connecting_) {
     542        6739 :       ioHandle().activateFileEvents(Event::FileReadyType::Write);
     543        6739 :     }
     544        7421 :   }
     545        7910 : }
     546             : 
     547        1215 : void ConnectionImpl::setBufferLimits(uint32_t limit) {
     548        1215 :   read_buffer_limit_ = limit;
     549             : 
     550             :   // Due to the fact that writes to the connection and flushing data from the connection are done
     551             :   // asynchronously, we have the option of either setting the watermarks aggressively, and regularly
     552             :   // enabling/disabling reads from the socket, or allowing more data, but then not triggering
     553             :   // based on watermarks until 2x the data is buffered in the common case. Given these are all soft
     554             :   // limits we err on the side of buffering more triggering watermark callbacks less often.
     555             :   //
     556             :   // Given the current implementation for straight up TCP proxying, the common case is reading
     557             :   // |limit| bytes through the socket, passing |limit| bytes to the connection and the immediately
     558             :   // draining |limit| bytes to the socket. Triggering the high watermarks and then immediately
     559             :   // triggering the low watermarks would be expensive, but we narrowly avoid triggering high
     560             :   // watermark when moving |limit| bytes through the connection because the high watermark
     561             :   // computation checks if the size of the buffer exceeds the high watermark value.
     562        1215 :   if (limit > 0) {
     563         957 :     write_buffer_->setWatermarks(limit);
     564         957 :     read_buffer_->setWatermarks(limit);
     565         957 :   }
     566        1215 : }
     567             : 
     568           0 : void ConnectionImpl::onReadBufferLowWatermark() {
     569           0 :   ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this);
     570           0 :   if (state() == State::Open) {
     571           0 :     readDisable(false);
     572           0 :   }
     573           0 : }
     574             : 
     575           0 : void ConnectionImpl::onReadBufferHighWatermark() {
     576           0 :   ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this);
     577           0 :   if (state() == State::Open) {
     578           0 :     readDisable(true);
     579           0 :   }
     580           0 : }
     581             : 
     582           0 : void ConnectionImpl::onWriteBufferLowWatermark() {
     583           0 :   ENVOY_CONN_LOG(debug, "onBelowWriteBufferLowWatermark", *this);
     584           0 :   ASSERT(write_buffer_above_high_watermark_);
     585           0 :   write_buffer_above_high_watermark_ = false;
     586           0 :   for (ConnectionCallbacks* callback : callbacks_) {
     587           0 :     if (callback) {
     588           0 :       callback->onBelowWriteBufferLowWatermark();
     589           0 :     }
     590           0 :   }
     591           0 : }
     592             : 
     593           0 : void ConnectionImpl::onWriteBufferHighWatermark() {
     594           0 :   ENVOY_CONN_LOG(debug, "onAboveWriteBufferHighWatermark", *this);
     595           0 :   ASSERT(!write_buffer_above_high_watermark_);
     596           0 :   write_buffer_above_high_watermark_ = true;
     597           0 :   for (ConnectionCallbacks* callback : callbacks_) {
     598           0 :     if (callback) {
     599           0 :       callback->onAboveWriteBufferHighWatermark();
     600           0 :     }
     601           0 :   }
     602           0 : }
     603             : 
     604           8 : void ConnectionImpl::setFailureReason(absl::string_view failure_reason) {
     605           8 :   if (!transport_socket_->failureReason().empty()) {
     606           0 :     failure_reason_ = absl::StrCat(failure_reason, ". ", transport_socket_->failureReason());
     607           8 :   } else {
     608           8 :     failure_reason_ = std::string(failure_reason);
     609           8 :   }
     610           8 : }
     611             : 
     612        6952 : void ConnectionImpl::onFileEvent(uint32_t events) {
     613        6952 :   ScopeTrackerScopeState scope(this, this->dispatcher_);
     614        6952 :   ENVOY_CONN_LOG(trace, "socket event: {}", *this, events);
     615             : 
     616        6952 :   if (immediate_error_event_ == ConnectionEvent::LocalClose ||
     617        6953 :       immediate_error_event_ == ConnectionEvent::RemoteClose) {
     618           0 :     if (bind_error_) {
     619           0 :       ENVOY_CONN_LOG(debug, "raising bind error", *this);
     620             :       // Update stats here, rather than on bind failure, to give the caller a chance to
     621             :       // setConnectionStats.
     622           0 :       if (connection_stats_ && connection_stats_->bind_errors_) {
     623           0 :         connection_stats_->bind_errors_->inc();
     624           0 :       }
     625           0 :     } else {
     626           0 :       ENVOY_CONN_LOG(debug, "raising immediate error", *this);
     627           0 :     }
     628           0 :     closeSocket(immediate_error_event_);
     629           0 :     return;
     630           0 :   }
     631             : 
     632        6952 :   if (events & Event::FileReadyType::Closed) {
     633             :     // We never ask for both early close and read at the same time. If we are reading, we want to
     634             :     // consume all available data.
     635          68 :     ASSERT(!(events & Event::FileReadyType::Read));
     636          68 :     ENVOY_CONN_LOG(debug, "remote early close", *this);
     637          68 :     closeSocket(ConnectionEvent::RemoteClose);
     638          68 :     return;
     639          68 :   }
     640             : 
     641        6884 :   if (events & Event::FileReadyType::Write) {
     642        6877 :     onWriteReady();
     643        6877 :   }
     644             : 
     645             :   // It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
     646             :   // In this case ignore read event processing.
     647        6884 :   if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
     648        2377 :     onReadReady();
     649        2377 :   }
     650        6884 : }
     651             : 
     652        2376 : void ConnectionImpl::onReadReady() {
     653        2376 :   ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this,
     654        2376 :                  static_cast<int>(dispatch_buffered_data_));
     655        2376 :   const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
     656        2376 :   dispatch_buffered_data_ = false;
     657             : 
     658        2376 :   ASSERT(!connecting_);
     659             : 
     660             :   // We get here while read disabled in two ways.
     661             :   // 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
     662             :   //    due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
     663             :   //    enabled.
     664             :   // 2) The consumer of connection data called readDisable(true), and instead of reading from the
     665             :   //    socket we simply need to dispatch already read data.
     666        2376 :   if (read_disable_count_ != 0) {
     667             :     // Do not clear transport_wants_read_ when returning early; the early return skips the transport
     668             :     // socket doRead call.
     669           0 :     if (latched_dispatch_buffered_data && filterChainWantsData()) {
     670           0 :       onRead(read_buffer_->length());
     671           0 :     }
     672           0 :     return;
     673           0 :   }
     674             : 
     675             :   // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
     676             :   // the transport socket read resumption happens as requested; onReadReady() returns early without
     677             :   // reading from the transport if the read buffer is above high watermark at the start of the
     678             :   // method.
     679        2376 :   transport_wants_read_ = false;
     680        2376 :   IoResult result = transport_socket_->doRead(*read_buffer_);
     681        2376 :   uint64_t new_buffer_size = read_buffer_->length();
     682        2376 :   updateReadBufferStats(result.bytes_processed_, new_buffer_size);
     683             : 
     684             :   // The socket is closed immediately when receiving RST.
     685        2377 :   if (enable_rst_detect_send_ && result.err_code_.has_value() &&
     686        2376 :       result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
     687          24 :     ENVOY_CONN_LOG(trace, "read: rst close from peer", *this);
     688          24 :     if (result.bytes_processed_ != 0) {
     689          14 :       onRead(new_buffer_size);
     690          14 :     }
     691          24 :     setDetectedCloseType(DetectedCloseType::RemoteReset);
     692          24 :     closeSocket(ConnectionEvent::RemoteClose);
     693          24 :     return;
     694          24 :   }
     695             : 
     696             :   // If this connection doesn't have half-close semantics, translate end_stream into
     697             :   // a connection close.
     698        2353 :   if ((!enable_half_close_ && result.end_stream_read_)) {
     699         872 :     result.end_stream_read_ = false;
     700         872 :     result.action_ = PostIoAction::Close;
     701         872 :   }
     702             : 
     703        2352 :   read_end_stream_ |= result.end_stream_read_;
     704        2352 :   if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
     705        2352 :       (latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
     706             :     // Skip onRead if no bytes were processed unless we explicitly want to force onRead for
     707             :     // buffered data. For instance, skip onRead if the connection was closed without producing
     708             :     // more data.
     709        1941 :     onRead(new_buffer_size);
     710        1941 :   }
     711             : 
     712             :   // The read callback may have already closed the connection.
     713        2352 :   if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
     714         872 :     ENVOY_CONN_LOG(debug, "remote close", *this);
     715         872 :     closeSocket(ConnectionEvent::RemoteClose);
     716         872 :   }
     717        2352 : }
     718             : 
     719             : absl::optional<Connection::UnixDomainSocketPeerCredentials>
     720           0 : ConnectionImpl::unixSocketPeerCredentials() const {
     721             :   // TODO(snowp): Support non-linux platforms.
     722             : #ifndef SO_PEERCRED
     723             :   return absl::nullopt;
     724             : #else
     725           0 :   struct ucred ucred;
     726           0 :   socklen_t ucred_size = sizeof(ucred);
     727           0 :   int rc = socket_->getSocketOption(SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_size).return_value_;
     728           0 :   if (SOCKET_FAILURE(rc)) {
     729           0 :     return absl::nullopt;
     730           0 :   }
     731             : 
     732           0 :   return {{ucred.pid, ucred.uid, ucred.gid}};
     733           0 : #endif
     734           0 : }
     735             : 
     736        7691 : void ConnectionImpl::onWriteReady() {
     737        7691 :   ENVOY_CONN_LOG(trace, "write ready", *this);
     738             : 
     739        7691 :   if (connecting_) {
     740        1178 :     int error;
     741        1178 :     socklen_t error_size = sizeof(error);
     742        1178 :     RELEASE_ASSERT(
     743        1178 :         socket_->getSocketOption(SOL_SOCKET, SO_ERROR, &error, &error_size).return_value_ == 0, "");
     744             : 
     745        1178 :     if (error == 0) {
     746        1170 :       ENVOY_CONN_LOG_EVENT(debug, "connection_connected", "connected", *this);
     747        1170 :       connecting_ = false;
     748        1170 :       onConnected();
     749             :       // It's possible that we closed during the connect callback.
     750        1170 :       if (state() != State::Open) {
     751           0 :         ENVOY_CONN_LOG_EVENT(debug, "connection_closed_callback", "close during connected callback",
     752           0 :                              *this);
     753           0 :         return;
     754           0 :       }
     755        1170 :     } else {
     756           8 :       setFailureReason(absl::StrCat("delayed connect error: ", error));
     757           8 :       ENVOY_CONN_LOG_EVENT(debug, "connection_error", "{}", *this, transportFailureReason());
     758           8 :       closeSocket(ConnectionEvent::RemoteClose);
     759           8 :       return;
     760           8 :     }
     761        1178 :   }
     762             : 
     763        7683 :   IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
     764        7683 :   ASSERT(!result.end_stream_read_); // The interface guarantees that only read operations set this.
     765        7683 :   uint64_t new_buffer_size = write_buffer_->length();
     766        7683 :   updateWriteBufferStats(result.bytes_processed_, new_buffer_size);
     767             : 
     768             :   // The socket is closed immediately when receiving RST.
     769        7683 :   if (enable_rst_detect_send_ && result.err_code_.has_value() &&
     770        7683 :       result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
     771             :     // Discard anything in the buffer.
     772          31 :     ENVOY_CONN_LOG(debug, "write: rst close from peer.", *this);
     773          31 :     setDetectedCloseType(DetectedCloseType::RemoteReset);
     774          31 :     closeSocket(ConnectionEvent::RemoteClose);
     775          31 :     return;
     776          31 :   }
     777             : 
     778             :   // NOTE: If the delayed_close_timer_ is set, it must only trigger after a delayed_close_timeout_
     779             :   // period of inactivity from the last write event. Therefore, the timer must be reset to its
     780             :   // original timeout value unless the socket is going to be closed as a result of the doWrite().
     781             : 
     782        7652 :   if (result.action_ == PostIoAction::Close) {
     783             :     // It is possible (though unlikely) for the connection to have already been closed during the
     784             :     // write callback. This can happen if we manage to complete the SSL handshake in the write
     785             :     // callback, raise a connected event, and close the connection.
     786          12 :     closeSocket(ConnectionEvent::RemoteClose);
     787        7640 :   } else if ((inDelayedClose() && new_buffer_size == 0) || bothSidesHalfClosed()) {
     788          61 :     ENVOY_CONN_LOG(debug, "write flush complete", *this);
     789          61 :     if (delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) {
     790          61 :       ASSERT(delayed_close_timer_ != nullptr && delayed_close_timer_->enabled());
     791          61 :       if (result.bytes_processed_ > 0) {
     792          39 :         delayed_close_timer_->enableTimer(delayed_close_timeout_);
     793          39 :       }
     794          61 :     } else {
     795           0 :       ASSERT(bothSidesHalfClosed() || delayed_close_state_ == DelayedCloseState::CloseAfterFlush);
     796           0 :       closeConnectionImmediately();
     797           0 :     }
     798        7579 :   } else {
     799        7579 :     ASSERT(result.action_ == PostIoAction::KeepOpen);
     800        7579 :     ASSERT(!delayed_close_timer_ || delayed_close_timer_->enabled());
     801        7579 :     if (delayed_close_timer_ != nullptr && result.bytes_processed_ > 0) {
     802           0 :       delayed_close_timer_->enableTimer(delayed_close_timeout_);
     803           0 :     }
     804        7579 :     if (result.bytes_processed_ > 0) {
     805        3904 :       auto it = bytes_sent_callbacks_.begin();
     806        3904 :       while (it != bytes_sent_callbacks_.end()) {
     807           0 :         if ((*it)(result.bytes_processed_)) {
     808             :           // move to the next callback.
     809           0 :           it++;
     810           0 :         } else {
     811             :           // remove the current callback.
     812           0 :           it = bytes_sent_callbacks_.erase(it);
     813           0 :         }
     814             : 
     815             :         // If a callback closes the socket, stop iterating.
     816           0 :         if (!ioHandle().isOpen()) {
     817           0 :           return;
     818           0 :         }
     819           0 :       }
     820        3904 :     }
     821        7579 :   }
     822        7652 : }
     823             : 
     824        4742 : void ConnectionImpl::updateReadBufferStats(uint64_t num_read, uint64_t new_size) {
     825        4742 :   if (!connection_stats_) {
     826        1900 :     return;
     827        1900 :   }
     828             : 
     829        2842 :   ConnectionImplUtility::updateBufferStats(num_read, new_size, last_read_buffer_size_,
     830        2842 :                                            connection_stats_->read_total_,
     831        2842 :                                            connection_stats_->read_current_);
     832        2842 : }
     833             : 
     834       10048 : void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_size) {
     835       10048 :   if (!connection_stats_) {
     836        6111 :     return;
     837        6111 :   }
     838             : 
     839        3937 :   ConnectionImplUtility::updateBufferStats(num_written, new_size, last_write_buffer_size_,
     840        3937 :                                            connection_stats_->write_total_,
     841        3937 :                                            connection_stats_->write_current_);
     842        3937 : }
     843             : 
     844        9058 : bool ConnectionImpl::bothSidesHalfClosed() {
     845             :   // If the write_buffer_ is not empty, then the end_stream has not been sent to the transport yet.
     846        9058 :   return read_end_stream_ && write_end_stream_ && write_buffer_->length() == 0;
     847        9058 : }
     848             : 
     849        1380 : absl::string_view ConnectionImpl::transportFailureReason() const {
     850        1380 :   if (!failure_reason_.empty()) {
     851           8 :     return failure_reason_;
     852           8 :   }
     853        1372 :   return transport_socket_->failureReason();
     854        1380 : }
     855             : 
     856           0 : absl::optional<std::chrono::milliseconds> ConnectionImpl::lastRoundTripTime() const {
     857           0 :   return socket_->lastRoundTripTime();
     858           0 : }
     859             : 
     860             : void ConnectionImpl::configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec,
     861           0 :                                                       std::chrono::microseconds rtt) {
     862           0 :   return transport_socket_->configureInitialCongestionWindow(bandwidth_bits_per_sec, rtt);
     863           0 : }
     864             : 
     865           0 : absl::optional<uint64_t> ConnectionImpl::congestionWindowInBytes() const {
     866           0 :   return socket_->congestionWindowInBytes();
     867           0 : }
     868             : 
     869        1954 : void ConnectionImpl::flushWriteBuffer() {
     870        1954 :   if (state() == State::Open && write_buffer_->length() > 0) {
     871         809 :     onWriteReady();
     872         809 :   }
     873        1954 : }
     874             : 
     875           0 : void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const {
     876           0 :   const char* spaces = spacesForLevel(indent_level);
     877           0 :   os << spaces << "ConnectionImpl " << this << DUMP_MEMBER(connecting_) << DUMP_MEMBER(bind_error_)
     878           0 :      << DUMP_MEMBER(state()) << DUMP_MEMBER(read_buffer_limit_) << "\n";
     879             : 
     880           0 :   DUMP_DETAILS(socket_);
     881           0 : }
     882             : 
     883             : ServerConnectionImpl::ServerConnectionImpl(Event::Dispatcher& dispatcher,
     884             :                                            ConnectionSocketPtr&& socket,
     885             :                                            TransportSocketPtr&& transport_socket,
     886             :                                            StreamInfo::StreamInfo& stream_info)
     887             :     : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info,
     888        1042 :                      true) {}
     889             : 
     890             : void ServerConnectionImpl::setTransportSocketConnectTimeout(std::chrono::milliseconds timeout,
     891           0 :                                                             Stats::Counter& timeout_stat) {
     892           0 :   if (!transport_connect_pending_) {
     893           0 :     return;
     894           0 :   }
     895             : 
     896           0 :   transport_socket_timeout_stat_ = &timeout_stat;
     897           0 :   if (transport_socket_connect_timer_ == nullptr) {
     898           0 :     transport_socket_connect_timer_ =
     899           0 :         dispatcher_.createScaledTimer(Event::ScaledTimerType::TransportSocketConnectTimeout,
     900           0 :                                       [this] { onTransportSocketConnectTimeout(); });
     901           0 :   }
     902           0 :   transport_socket_connect_timer_->enableTimer(timeout);
     903           0 : }
     904             : 
     905         784 : void ServerConnectionImpl::raiseEvent(ConnectionEvent event) {
     906         784 :   switch (event) {
     907           0 :   case ConnectionEvent::ConnectedZeroRtt:
     908             :     // The transport socket is still connecting, so skip changing connect state.
     909           0 :     break;
     910         784 :   case ConnectionEvent::Connected:
     911         784 :   case ConnectionEvent::RemoteClose:
     912         784 :   case ConnectionEvent::LocalClose:
     913         784 :     transport_connect_pending_ = false;
     914         784 :     transport_socket_connect_timer_.reset();
     915         784 :   }
     916         784 :   ConnectionImpl::raiseEvent(event);
     917         784 : }
     918         784 : bool ServerConnectionImpl::initializeReadFilters() {
     919         784 :   bool initialized = ConnectionImpl::initializeReadFilters();
     920         784 :   if (initialized) {
     921             :     // Server connection starts as connected, and we must explicitly signal to
     922             :     // the downstream transport socket that the underlying socket is connected.
     923             :     // We delay this step until after the filters are initialized and can
     924             :     // receive the connection events.
     925         784 :     onConnected();
     926         784 :   }
     927         784 :   return initialized;
     928         784 : }
     929             : 
     930           0 : void ServerConnectionImpl::onTransportSocketConnectTimeout() {
     931           0 :   stream_info_.setConnectionTerminationDetails(kTransportSocketConnectTimeoutTerminationDetails);
     932           0 :   closeConnectionImmediatelyWithDetails(
     933           0 :       StreamInfo::LocalCloseReasons::get().TransportSocketTimeout);
     934           0 :   transport_socket_timeout_stat_->inc();
     935           0 :   setFailureReason("connect timeout");
     936           0 : }
     937             : 
     938             : ClientConnectionImpl::ClientConnectionImpl(
     939             :     Event::Dispatcher& dispatcher, const Address::InstanceConstSharedPtr& remote_address,
     940             :     const Network::Address::InstanceConstSharedPtr& source_address,
     941             :     Network::TransportSocketPtr&& transport_socket,
     942             :     const Network::ConnectionSocket::OptionsSharedPtr& options,
     943             :     const Network::TransportSocketOptionsConstSharedPtr& transport_options)
     944             :     : ClientConnectionImpl(dispatcher, std::make_unique<ClientSocketImpl>(remote_address, options),
     945             :                            source_address, std::move(transport_socket), options,
     946        1328 :                            transport_options) {}
     947             : 
     948             : ClientConnectionImpl::ClientConnectionImpl(
     949             :     Event::Dispatcher& dispatcher, std::unique_ptr<ConnectionSocket> socket,
     950             :     const Address::InstanceConstSharedPtr& source_address,
     951             :     Network::TransportSocketPtr&& transport_socket,
     952             :     const Network::ConnectionSocket::OptionsSharedPtr& options,
     953             :     const Network::TransportSocketOptionsConstSharedPtr& transport_options)
     954             :     : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info_,
     955             :                      false),
     956        1328 :       stream_info_(dispatcher_.timeSource(), socket_->connectionInfoProviderSharedPtr()) {
     957             : 
     958        1328 :   stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
     959             : 
     960        1328 :   if (transport_options) {
     961         173 :     for (const auto& object : transport_options->downstreamSharedFilterStateObjects()) {
     962             :       // This does not throw as all objects are distinctly named and the stream info is empty.
     963           0 :       stream_info_.filterState()->setData(object.name_, object.data_, object.state_type_,
     964           0 :                                           StreamInfo::FilterState::LifeSpan::Connection,
     965           0 :                                           object.stream_sharing_);
     966           0 :     }
     967         173 :   }
     968             : 
     969             :   // There are no meaningful socket options or source address semantics for
     970             :   // non-IP sockets, so skip.
     971        1328 :   if (socket_->connectionInfoProviderSharedPtr()->remoteAddress()->ip() == nullptr) {
     972           0 :     return;
     973           0 :   }
     974        1328 :   if (!Network::Socket::applyOptions(options, *socket_,
     975        1328 :                                      envoy::config::core::v3::SocketOption::STATE_PREBIND)) {
     976             :     // Set a special error state to ensure asynchronous close to give the owner of the
     977             :     // ConnectionImpl a chance to add callbacks and detect the "disconnect".
     978           0 :     immediate_error_event_ = ConnectionEvent::LocalClose;
     979             :     // Trigger a write event to close this connection out-of-band.
     980           0 :     ioHandle().activateFileEvents(Event::FileReadyType::Write);
     981           0 :     return;
     982           0 :   }
     983             : 
     984        1328 :   const Network::Address::InstanceConstSharedPtr* source = &source_address;
     985             : 
     986        1328 :   if (socket_->connectionInfoProvider().localAddress()) {
     987           0 :     source = &socket_->connectionInfoProvider().localAddress();
     988           0 :   }
     989             : 
     990        1328 :   if (*source != nullptr) {
     991           0 :     Api::SysCallIntResult result = socket_->bind(*source);
     992           0 :     if (result.return_value_ < 0) {
     993           0 :       setFailureReason(absl::StrCat("failed to bind to ", source->get()->asString(), ": ",
     994           0 :                                     errorDetails(result.errno_)));
     995           0 :       ENVOY_LOG_MISC(debug, failureReason());
     996           0 :       bind_error_ = true;
     997             :       // Set a special error state to ensure asynchronous close to give the owner of the
     998             :       // ConnectionImpl a chance to add callbacks and detect the "disconnect".
     999           0 :       immediate_error_event_ = ConnectionEvent::LocalClose;
    1000             : 
    1001             :       // Trigger a write event to close this connection out-of-band.
    1002           0 :       ioHandle().activateFileEvents(Event::FileReadyType::Write);
    1003           0 :     }
    1004           0 :   }
    1005        1328 : }
    1006             : 
    1007        1328 : void ClientConnectionImpl::connect() {
    1008        1328 :   ENVOY_CONN_LOG_EVENT(debug, "client_connection", "connecting to {}", *this,
    1009        1328 :                        socket_->connectionInfoProvider().remoteAddress()->asString());
    1010        1328 :   const Api::SysCallIntResult result = transport_socket_->connect(*socket_);
    1011        1328 :   stream_info_.upstreamInfo()->upstreamTiming().onUpstreamConnectStart(dispatcher_.timeSource());
    1012        1328 :   if (result.return_value_ == 0) {
    1013             :     // write will become ready.
    1014           0 :     ASSERT(connecting_);
    1015           0 :     return;
    1016           0 :   }
    1017             : 
    1018        1328 :   ASSERT(SOCKET_FAILURE(result.return_value_));
    1019             : #ifdef WIN32
    1020             :   // winsock2 connect returns EWOULDBLOCK if the socket is non-blocking and the connection
    1021             :   // cannot be completed immediately. We do not check for `EINPROGRESS` as that error is for
    1022             :   // blocking operations.
    1023             :   if (result.errno_ == SOCKET_ERROR_AGAIN) {
    1024             : #else
    1025        1328 :   if (result.errno_ == SOCKET_ERROR_IN_PROGRESS) {
    1026        1328 : #endif
    1027        1328 :     ASSERT(connecting_);
    1028        1328 :     ENVOY_CONN_LOG_EVENT(debug, "connection_in_progress", "connection in progress", *this);
    1029        1328 :   } else {
    1030           0 :     immediate_error_event_ = ConnectionEvent::RemoteClose;
    1031           0 :     connecting_ = false;
    1032           0 :     setFailureReason(absl::StrCat("immediate connect error: ", errorDetails(result.errno_)));
    1033           0 :     ENVOY_CONN_LOG_EVENT(debug, "connection_immediate_error", "{}", *this, failureReason());
    1034             : 
    1035             :     // Trigger a write event. This is needed on macOS and seems harmless on Linux.
    1036           0 :     ioHandle().activateFileEvents(Event::FileReadyType::Write);
    1037           0 :   }
    1038        1328 : }
    1039             : 
    1040        1170 : void ClientConnectionImpl::onConnected() {
    1041        1170 :   stream_info_.upstreamInfo()->upstreamTiming().onUpstreamConnectComplete(dispatcher_.timeSource());
    1042             :   // There are no meaningful socket source address semantics for non-IP sockets, so skip.
    1043        1170 :   if (socket_->connectionInfoProviderSharedPtr()->remoteAddress()->ip()) {
    1044        1170 :     socket_->connectionInfoProvider().maybeSetInterfaceName(ioHandle());
    1045        1170 :     const auto maybe_interface_name = socket_->connectionInfoProvider().interfaceName();
    1046        1170 :     if (maybe_interface_name.has_value()) {
    1047           0 :       ENVOY_CONN_LOG_EVENT(debug, "conn_interface", "connected on local interface '{}'", *this,
    1048           0 :                            maybe_interface_name.value());
    1049           0 :     }
    1050        1170 :   }
    1051        1170 :   ConnectionImpl::onConnected();
    1052        1170 : }
    1053             : 
    1054             : } // namespace Network
    1055             : } // namespace Envoy

Generated by: LCOV version 1.15