Line data Source code
1 : #include "source/common/network/connection_impl.h"
2 :
3 : #include <atomic>
4 : #include <cstdint>
5 : #include <memory>
6 :
7 : #include "envoy/common/exception.h"
8 : #include "envoy/common/platform.h"
9 : #include "envoy/config/core/v3/base.pb.h"
10 : #include "envoy/event/scaled_range_timer_manager.h"
11 : #include "envoy/event/timer.h"
12 : #include "envoy/network/filter.h"
13 : #include "envoy/network/socket.h"
14 :
15 : #include "source/common/common/assert.h"
16 : #include "source/common/common/dump_state_utils.h"
17 : #include "source/common/common/empty_string.h"
18 : #include "source/common/common/enum_to_int.h"
19 : #include "source/common/common/scope_tracker.h"
20 : #include "source/common/network/address_impl.h"
21 : #include "source/common/network/connection_socket_impl.h"
22 : #include "source/common/network/raw_buffer_socket.h"
23 : #include "source/common/network/socket_option_factory.h"
24 : #include "source/common/network/socket_option_impl.h"
25 : #include "source/common/network/utility.h"
26 : #include "source/common/runtime/runtime_features.h"
27 :
28 : namespace Envoy {
29 : namespace Network {
30 : namespace {
31 :
32 : constexpr absl::string_view kTransportSocketConnectTimeoutTerminationDetails =
33 : "transport socket timeout was reached";
34 :
35 0 : std::ostream& operator<<(std::ostream& os, Connection::State connection_state) {
36 0 : switch (connection_state) {
37 0 : case Connection::State::Open:
38 0 : return os << "Open";
39 0 : case Connection::State::Closing:
40 0 : return os << "Closing";
41 0 : case Connection::State::Closed:
42 0 : return os << "Closed";
43 0 : }
44 0 : return os;
45 0 : }
46 :
47 : } // namespace
48 :
49 : void ConnectionImplUtility::updateBufferStats(uint64_t delta, uint64_t new_total,
50 : uint64_t& previous_total, Stats::Counter& stat_total,
51 6789 : Stats::Gauge& stat_current) {
52 6789 : if (delta) {
53 2312 : stat_total.add(delta);
54 2312 : }
55 :
56 6789 : if (new_total != previous_total) {
57 2242 : if (new_total > previous_total) {
58 1093 : stat_current.add(new_total - previous_total);
59 1167 : } else {
60 1149 : stat_current.sub(previous_total - new_total);
61 1149 : }
62 :
63 2242 : previous_total = new_total;
64 2242 : }
65 6789 : }
66 :
67 : std::atomic<uint64_t> ConnectionImpl::next_global_id_;
68 :
69 : ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
70 : TransportSocketPtr&& transport_socket,
71 : StreamInfo::StreamInfo& stream_info, bool connected)
72 : : ConnectionImplBase(dispatcher, next_global_id_++),
73 : transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
74 : stream_info_(stream_info), filter_manager_(*this, *socket_),
75 : write_buffer_(dispatcher.getWatermarkFactory().createBuffer(
76 0 : [this]() -> void { this->onWriteBufferLowWatermark(); },
77 0 : [this]() -> void { this->onWriteBufferHighWatermark(); },
78 0 : []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
79 : read_buffer_(dispatcher.getWatermarkFactory().createBuffer(
80 0 : [this]() -> void { this->onReadBufferLowWatermark(); },
81 0 : [this]() -> void { this->onReadBufferHighWatermark(); },
82 0 : []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
83 : write_buffer_above_high_watermark_(false), detect_early_close_(true),
84 : enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
85 : write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
86 2370 : transport_wants_read_(false) {
87 :
88 : // Keep it as a bool flag to reduce the times calling runtime method..
89 2370 : enable_rst_detect_send_ = Runtime::runtimeFeatureEnabled(
90 2370 : "envoy.reloadable_features.detect_and_raise_rst_tcp_connection");
91 :
92 2370 : if (!connected) {
93 1328 : connecting_ = true;
94 1328 : }
95 :
96 2370 : Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;
97 :
98 : // We never ask for both early close and read at the same time. If we are reading, we want to
99 : // consume all available data.
100 2370 : socket_->ioHandle().initializeFileEvent(
101 6950 : dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
102 2370 : Event::FileReadyType::Read | Event::FileReadyType::Write);
103 :
104 2370 : transport_socket_->setTransportSocketCallbacks(*this);
105 :
106 : // TODO(soulxu): generate the connection id inside the addressProvider directly,
107 : // then we don't need a setter or any of the optional stuff.
108 2370 : socket_->connectionInfoProvider().setConnectionID(id());
109 2370 : socket_->connectionInfoProvider().setSslConnection(transport_socket_->ssl());
110 2370 : }
111 :
112 2370 : ConnectionImpl::~ConnectionImpl() {
113 2370 : ASSERT(!ioHandle().isOpen() && delayed_close_timer_ == nullptr,
114 2370 : "ConnectionImpl was unexpectedly torn down without being closed.");
115 :
116 : // In general we assume that owning code has called close() previously to the destructor being
117 : // run. This generally must be done so that callbacks run in the correct context (vs. deferred
118 : // deletion). Hence the assert above. However, call close() here just to be completely sure that
119 : // the fd is closed and make it more likely that we crash from a bad close callback.
120 2370 : close(ConnectionCloseType::NoFlush);
121 2370 : }
122 :
123 0 : void ConnectionImpl::addWriteFilter(WriteFilterSharedPtr filter) {
124 0 : filter_manager_.addWriteFilter(filter);
125 0 : }
126 :
127 0 : void ConnectionImpl::addFilter(FilterSharedPtr filter) { filter_manager_.addFilter(filter); }
128 :
129 2097 : void ConnectionImpl::addReadFilter(ReadFilterSharedPtr filter) {
130 2097 : filter_manager_.addReadFilter(filter);
131 2097 : }
132 :
133 0 : void ConnectionImpl::removeReadFilter(ReadFilterSharedPtr filter) {
134 0 : filter_manager_.removeReadFilter(filter);
135 0 : }
136 :
137 949 : bool ConnectionImpl::initializeReadFilters() { return filter_manager_.initializeReadFilters(); }
138 :
139 4366 : void ConnectionImpl::close(ConnectionCloseType type) {
140 4366 : if (!ioHandle().isOpen()) {
141 2450 : return;
142 2450 : }
143 :
144 1916 : uint64_t data_to_write = write_buffer_->length();
145 1916 : ENVOY_CONN_LOG_EVENT(debug, "connection_closing", "closing data_to_write={} type={}", *this,
146 1916 : data_to_write, enumToInt(type));
147 :
148 : // RST will be sent only if enable_rst_detect_send_ is true, otherwise it is converted to normal
149 : // ConnectionCloseType::Abort.
150 1916 : if (!enable_rst_detect_send_ && type == ConnectionCloseType::AbortReset) {
151 0 : type = ConnectionCloseType::Abort;
152 0 : }
153 :
154 : // The connection is closed by Envoy by sending RST, and the connection is closed immediately.
155 1916 : if (type == ConnectionCloseType::AbortReset) {
156 0 : ENVOY_CONN_LOG(
157 0 : trace, "connection closing type=AbortReset, setting LocalReset to the detected close type.",
158 0 : *this);
159 0 : setDetectedCloseType(DetectedCloseType::LocalReset);
160 0 : closeSocket(ConnectionEvent::LocalClose);
161 0 : return;
162 0 : }
163 :
164 1916 : const bool delayed_close_timeout_set = delayed_close_timeout_.count() > 0;
165 1916 : if (data_to_write == 0 || type == ConnectionCloseType::NoFlush ||
166 1916 : type == ConnectionCloseType::Abort || !transport_socket_->canFlushClose()) {
167 1419 : if (data_to_write > 0 && type != ConnectionCloseType::Abort) {
168 : // We aren't going to wait to flush, but try to write as much as we can if there is pending
169 : // data.
170 273 : transport_socket_->doWrite(*write_buffer_, true);
171 273 : }
172 :
173 1419 : if (type != ConnectionCloseType::FlushWriteAndDelay || !delayed_close_timeout_set) {
174 1370 : closeConnectionImmediately();
175 1370 : return;
176 1370 : }
177 : // The socket is being closed and either there is no more data to write or the data can not be
178 : // flushed (!transport_socket_->canFlushClose()). Since a delayed close has been requested,
179 : // start the delayed close timer if it hasn't been done already by a previous close().
180 : // NOTE: Even though the delayed_close_state_ is being set to CloseAfterFlushAndWait, since
181 : // a write event is not being registered for the socket, this logic is simply setting the
182 : // timer and waiting for it to trigger to close the socket.
183 49 : if (!inDelayedClose()) {
184 49 : initializeDelayedCloseTimer();
185 49 : delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
186 : // Monitor for the peer closing the connection.
187 49 : ioHandle().enableFileEvents(enable_half_close_ ? 0 : Event::FileReadyType::Closed);
188 49 : }
189 49 : return;
190 1419 : }
191 :
192 497 : ASSERT(type == ConnectionCloseType::FlushWrite ||
193 497 : type == ConnectionCloseType::FlushWriteAndDelay);
194 :
195 : // If there is a pending delayed close, simply update the delayed close state.
196 : //
197 : // An example of this condition manifests when a downstream connection is closed early by Envoy,
198 : // such as when a route can't be matched:
199 : // In ConnectionManagerImpl::onData()
200 : // 1) Via codec_->dispatch(), a local reply with a 404 is sent to the client
201 : // a) ConnectionManagerImpl::doEndStream() issues the first connection close() via
202 : // ConnectionManagerImpl::checkForDeferredClose()
203 : // 2) A second close is issued by a subsequent call to
204 : // ConnectionManagerImpl::checkForDeferredClose() prior to returning from onData()
205 497 : if (inDelayedClose()) {
206 : // Validate that a delayed close timer is already enabled unless it was disabled via
207 : // configuration.
208 201 : ASSERT(!delayed_close_timeout_set || delayed_close_timer_ != nullptr);
209 201 : if (type == ConnectionCloseType::FlushWrite || !delayed_close_timeout_set) {
210 0 : delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
211 201 : } else {
212 201 : delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
213 201 : }
214 201 : return;
215 201 : }
216 :
217 : // NOTE: At this point, it's already been validated that the connection is not already in
218 : // delayed close processing and therefore the timer has not yet been created.
219 297 : if (delayed_close_timeout_set) {
220 297 : initializeDelayedCloseTimer();
221 297 : delayed_close_state_ = (type == ConnectionCloseType::FlushWrite)
222 297 : ? DelayedCloseState::CloseAfterFlush
223 297 : : DelayedCloseState::CloseAfterFlushAndWait;
224 274 : } else {
225 0 : delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
226 0 : }
227 :
228 296 : ioHandle().enableFileEvents(Event::FileReadyType::Write |
229 296 : (enable_half_close_ ? 0 : Event::FileReadyType::Closed));
230 296 : }
231 :
232 26238 : Connection::State ConnectionImpl::state() const {
233 26238 : if (!ioHandle().isOpen()) {
234 72 : return State::Closed;
235 26166 : } else if (inDelayedClose()) {
236 240 : return State::Closing;
237 25926 : } else {
238 25926 : return State::Open;
239 25926 : }
240 26238 : }
241 :
242 1398 : void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }
243 :
244 0 : void ConnectionImpl::setTransportSocketIsReadable() {
245 0 : ASSERT(dispatcher_.isThreadSafe());
246 : // Remember that the transport requested read resumption, in case the resumption event is not
247 : // scheduled immediately or is "lost" because read was disabled.
248 0 : transport_wants_read_ = true;
249 : // Only schedule a read activation if the connection is not read disabled to avoid spurious
250 : // wakeups. When read disabled, the connection will not read from the transport, and limit
251 : // dispatch to the current contents of the read buffer if its high-watermark is triggered and
252 : // dispatch_buffered_data_ is set.
253 0 : if (read_disable_count_ == 0) {
254 0 : ioHandle().activateFileEvents(Event::FileReadyType::Read);
255 0 : }
256 0 : }
257 :
258 2115 : bool ConnectionImpl::filterChainWantsData() {
259 2115 : return read_disable_count_ == 0 ||
260 2115 : (read_disable_count_ == 1 && read_buffer_->highWatermarkTriggered());
261 2115 : }
262 :
263 55 : void ConnectionImpl::setDetectedCloseType(DetectedCloseType close_type) {
264 55 : detected_close_type_ = close_type;
265 55 : }
266 :
267 2413 : void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
268 2413 : if (!ConnectionImpl::ioHandle().isOpen()) {
269 43 : return;
270 43 : }
271 :
272 : // No need for a delayed close (if pending) now that the socket is being closed.
273 2370 : if (delayed_close_timer_) {
274 318 : delayed_close_timer_->disableTimer();
275 318 : delayed_close_timer_ = nullptr;
276 318 : }
277 :
278 2370 : ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
279 2370 : transport_socket_->closeSocket(close_type);
280 :
281 : // Drain input and output buffers.
282 2370 : updateReadBufferStats(0, 0);
283 2370 : updateWriteBufferStats(0, 0);
284 :
285 : // As the socket closes, drain any remaining data.
286 : // The data won't be written out at this point, and where there are reference
287 : // counted buffer fragments, it helps avoid lifetime issues with the
288 : // connection outlasting the subscriber.
289 2370 : write_buffer_->drain(write_buffer_->length());
290 :
291 2370 : connection_stats_.reset();
292 :
293 2370 : if (enable_rst_detect_send_ && (detected_close_type_ == DetectedCloseType::RemoteReset ||
294 2370 : detected_close_type_ == DetectedCloseType::LocalReset)) {
295 54 : const bool ok = Network::Socket::applyOptions(
296 54 : Network::SocketOptionFactory::buildZeroSoLingerOptions(), *socket_,
297 54 : envoy::config::core::v3::SocketOption::STATE_LISTENING);
298 54 : if (!ok) {
299 0 : ENVOY_LOG_EVERY_POW_2(error, "rst setting so_linger=0 failed on connection {}", id());
300 0 : }
301 54 : }
302 :
303 : // It is safe to call close() since there is an IO handle check.
304 2370 : socket_->close();
305 :
306 : // Call the base class directly as close() is called in the destructor.
307 2370 : ConnectionImpl::raiseEvent(close_type);
308 2370 : }
309 :
310 1954 : void ConnectionImpl::onConnected() {
311 1954 : ASSERT(!connecting_);
312 1954 : transport_socket_->onConnected();
313 1954 : }
314 :
315 1353 : void ConnectionImpl::noDelay(bool enable) {
316 : // There are cases where a connection to localhost can immediately fail (e.g., if the other end
317 : // does not have enough fds, reaches a backlog limit, etc.). Because we run with deferred error
318 : // events, the calling code may not yet know that the connection has failed. This is one call
319 : // where we go outside of libevent and hit the fd directly and this case can fail if the fd is
320 : // invalid. For this call instead of plumbing through logic that will immediately indicate that a
321 : // connect failed, we will just ignore the noDelay() call if the socket is invalid since error is
322 : // going to be raised shortly anyway and it makes the calling code simpler.
323 1353 : if (!ioHandle().isOpen()) {
324 0 : return;
325 0 : }
326 :
327 : // Don't set NODELAY for unix domain sockets or internal socket.
328 1353 : if (socket_->addressType() != Address::Type::Ip) {
329 0 : return;
330 0 : }
331 :
332 : // Set NODELAY
333 1353 : int new_value = enable;
334 1353 : Api::SysCallIntResult result =
335 1353 : socket_->setSocketOption(IPPROTO_TCP, TCP_NODELAY, &new_value, sizeof(new_value));
336 : #if defined(__APPLE__)
337 : if (SOCKET_FAILURE(result.return_value_) && result.errno_ == SOCKET_ERROR_INVAL) {
338 : // Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
339 : // enabled despite this result.
340 : return;
341 : }
342 : #elif defined(WIN32)
343 : if (SOCKET_FAILURE(result.return_value_) &&
344 : (result.errno_ == SOCKET_ERROR_AGAIN || result.errno_ == SOCKET_ERROR_INVAL)) {
345 : // Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
346 : // enabled despite this result.
347 : return;
348 : }
349 : #endif
350 :
351 1353 : RELEASE_ASSERT(result.return_value_ == 0,
352 1353 : fmt::format("Failed to set TCP_NODELAY with error {}, {}", result.errno_,
353 1353 : errorDetails(result.errno_)));
354 1353 : }
355 :
356 1955 : void ConnectionImpl::onRead(uint64_t read_buffer_size) {
357 1955 : ASSERT(dispatcher_.isThreadSafe());
358 1955 : if (inDelayedClose() || !filterChainWantsData()) {
359 0 : return;
360 0 : }
361 1955 : ASSERT(ioHandle().isOpen());
362 :
363 1955 : if (read_buffer_size == 0 && !read_end_stream_) {
364 0 : return;
365 0 : }
366 :
367 1955 : if (read_end_stream_) {
368 : // read() on a raw socket will repeatedly return 0 (EOF) once EOF has
369 : // occurred, so filter out the repeats so that filters don't have
370 : // to handle repeats.
371 : //
372 : // I don't know of any cases where this actually happens (we should stop
373 : // reading the socket after EOF), but this check guards against any bugs
374 : // in ConnectionImpl or strangeness in the OS events (epoll, kqueue, etc)
375 : // and maintains the guarantee for filters.
376 0 : if (read_end_stream_raised_) {
377 : // No further data can be delivered after end_stream
378 0 : ASSERT(read_buffer_size == 0);
379 0 : return;
380 0 : }
381 0 : read_end_stream_raised_ = true;
382 0 : }
383 :
384 1955 : filter_manager_.onRead();
385 1955 : }
386 :
387 911 : void ConnectionImpl::enableHalfClose(bool enabled) {
388 : // This code doesn't correctly ensure that EV_CLOSE isn't set if reading is disabled
389 : // when enabling half-close. This could be fixed, but isn't needed right now, so just
390 : // ASSERT that it doesn't happen.
391 911 : ASSERT(!enabled || read_disable_count_ == 0);
392 :
393 911 : enable_half_close_ = enabled;
394 911 : }
395 :
396 324 : Connection::ReadDisableStatus ConnectionImpl::readDisable(bool disable) {
397 : // Calls to readEnabled on a closed socket are considered to be an error.
398 324 : ASSERT(state() == State::Open);
399 :
400 324 : ENVOY_CONN_LOG(trace, "readDisable: disable={} disable_count={} state={} buffer_length={}", *this,
401 324 : disable, read_disable_count_, static_cast<int>(state()), read_buffer_->length());
402 :
403 : // When we disable reads, we still allow for early close notifications (the equivalent of
404 : // `EPOLLRDHUP` for an epoll backend). For backends that support it, this allows us to apply
405 : // back pressure at the kernel layer, but still get timely notification of a FIN. Note that
406 : // we are not guaranteed to get notified, so even if the remote has closed, we may not know
407 : // until we try to write. Further note that currently we optionally don't correctly handle half
408 : // closed TCP connections in the sense that we assume that a remote FIN means the remote intends a
409 : // full close.
410 324 : if (disable) {
411 164 : ++read_disable_count_;
412 :
413 164 : if (state() != State::Open) {
414 : // If readDisable is called on a closed connection, do not crash.
415 0 : return ReadDisableStatus::NoTransition;
416 0 : }
417 :
418 164 : if (read_disable_count_ > 1) {
419 : // The socket has already been read disabled.
420 0 : return ReadDisableStatus::StillReadDisabled;
421 0 : }
422 :
423 : // If half-close semantics are enabled, we never want early close notifications; we
424 : // always want to read all available data, even if the other side has closed.
425 164 : if (detect_early_close_ && !enable_half_close_) {
426 9 : ioHandle().enableFileEvents(Event::FileReadyType::Write | Event::FileReadyType::Closed);
427 155 : } else {
428 155 : ioHandle().enableFileEvents(Event::FileReadyType::Write);
429 155 : }
430 :
431 164 : return ReadDisableStatus::TransitionedToReadDisabled;
432 164 : } else {
433 160 : ASSERT(read_disable_count_ != 0);
434 160 : --read_disable_count_;
435 160 : if (state() != State::Open) {
436 : // If readDisable is called on a closed connection, do not crash.
437 0 : return ReadDisableStatus::NoTransition;
438 0 : }
439 :
440 160 : auto read_disable_status = ReadDisableStatus::StillReadDisabled;
441 160 : if (read_disable_count_ == 0) {
442 : // We never ask for both early close and read at the same time. If we are reading, we want to
443 : // consume all available data.
444 160 : ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
445 160 : read_disable_status = ReadDisableStatus::TransitionedToReadEnabled;
446 160 : }
447 :
448 160 : if (filterChainWantsData() && (read_buffer_->length() > 0 || transport_wants_read_)) {
449 : // Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
450 : // buffer's high watermark has triggered.
451 5 : ASSERT(read_buffer_->length() > 0 || read_disable_count_ == 0);
452 :
453 : // If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
454 : // able to process additional bytes even if there is no data in the kernel to kick off the
455 : // filter chain. Alternately the connection may need read resumption while read disabled and
456 : // not registered for read events because the read buffer's high-watermark has triggered. To
457 : // handle these cases, directly schedule a fake read event to make sure the buffered data in
458 : // the read buffer or in transport socket internal buffers gets processed regardless and
459 : // ensure that we dispatch it via onRead.
460 5 : dispatch_buffered_data_ = true;
461 5 : ioHandle().activateFileEvents(Event::FileReadyType::Read);
462 5 : }
463 :
464 160 : return read_disable_status;
465 160 : }
466 324 : }
467 :
468 4324 : void ConnectionImpl::raiseEvent(ConnectionEvent event) {
469 4324 : ENVOY_CONN_LOG(trace, "raising connection event {}", *this, static_cast<int>(event));
470 4324 : ConnectionImplBase::raiseConnectionEvent(event);
471 : // We may have pending data in the write buffer on transport handshake
472 : // completion, which may also have completed in the context of onReadReady(),
473 : // where no check of the write buffer is made. Provide an opportunity to flush
474 : // here. If connection write is not ready, this is harmless. We should only do
475 : // this if we're still open (the above callbacks may have closed).
476 4324 : if (event == ConnectionEvent::Connected) {
477 1954 : flushWriteBuffer();
478 1954 : }
479 4324 : }
480 :
481 0 : bool ConnectionImpl::readEnabled() const {
482 : // Calls to readEnabled on a closed socket are considered to be an error.
483 0 : ASSERT(state() == State::Open);
484 0 : ASSERT(dispatcher_.isThreadSafe());
485 0 : return read_disable_count_ == 0;
486 0 : }
487 :
488 0 : void ConnectionImpl::addBytesSentCallback(BytesSentCb cb) {
489 0 : bytes_sent_callbacks_.emplace_back(cb);
490 0 : }
491 :
492 0 : void ConnectionImpl::rawWrite(Buffer::Instance& data, bool end_stream) {
493 0 : write(data, end_stream, false);
494 0 : }
495 :
496 7910 : void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {
497 7910 : write(data, end_stream, true);
498 7910 : }
499 :
500 7910 : void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
501 7910 : ASSERT(!end_stream || enable_half_close_);
502 7910 : ASSERT(dispatcher_.isThreadSafe());
503 :
504 7910 : if (write_end_stream_) {
505 : // It is an API violation to write more data after writing end_stream, but a duplicate
506 : // end_stream with no data is harmless. This catches misuse of the API that could result in data
507 : // being lost.
508 0 : ASSERT(data.length() == 0 && end_stream);
509 :
510 0 : return;
511 0 : }
512 :
513 7910 : if (through_filter_chain) {
514 : // NOTE: This is kind of a hack, but currently we don't support restart/continue on the write
515 : // path, so we just pass around the buffer passed to us in this function. If we ever
516 : // support buffer/restart/continue on the write path this needs to get more complicated.
517 7910 : current_write_buffer_ = &data;
518 7910 : current_write_end_stream_ = end_stream;
519 7910 : FilterStatus status = filter_manager_.onWrite();
520 7910 : current_write_buffer_ = nullptr;
521 :
522 7910 : if (FilterStatus::StopIteration == status) {
523 0 : return;
524 0 : }
525 7910 : }
526 :
527 7910 : write_end_stream_ = end_stream;
528 7910 : if (data.length() > 0 || end_stream) {
529 7421 : ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
530 : // TODO(mattklein123): All data currently gets moved from the source buffer to the write buffer.
531 : // This can lead to inefficient behavior if writing a bunch of small chunks. In this case, it
532 : // would likely be more efficient to copy data below a certain size. VERY IMPORTANT: If this is
533 : // ever changed, read the comment in SslSocket::doWrite() VERY carefully. That code assumes that
534 : // we never change existing write_buffer_ chain elements between calls to SSL_write(). That code
535 : // might need to change if we ever copy here.
536 7421 : write_buffer_->move(data);
537 :
538 : // Activating a write event before the socket is connected has the side-effect of tricking
539 : // doWriteReady into thinking the socket is connected. On macOS, the underlying write may fail
540 : // with a connection error if a call to write(2) occurs before the connection is completed.
541 7421 : if (!connecting_) {
542 6739 : ioHandle().activateFileEvents(Event::FileReadyType::Write);
543 6739 : }
544 7421 : }
545 7910 : }
546 :
547 1215 : void ConnectionImpl::setBufferLimits(uint32_t limit) {
548 1215 : read_buffer_limit_ = limit;
549 :
550 : // Due to the fact that writes to the connection and flushing data from the connection are done
551 : // asynchronously, we have the option of either setting the watermarks aggressively, and regularly
552 : // enabling/disabling reads from the socket, or allowing more data, but then not triggering
553 : // based on watermarks until 2x the data is buffered in the common case. Given these are all soft
554 : // limits we err on the side of buffering more triggering watermark callbacks less often.
555 : //
556 : // Given the current implementation for straight up TCP proxying, the common case is reading
557 : // |limit| bytes through the socket, passing |limit| bytes to the connection and the immediately
558 : // draining |limit| bytes to the socket. Triggering the high watermarks and then immediately
559 : // triggering the low watermarks would be expensive, but we narrowly avoid triggering high
560 : // watermark when moving |limit| bytes through the connection because the high watermark
561 : // computation checks if the size of the buffer exceeds the high watermark value.
562 1215 : if (limit > 0) {
563 957 : write_buffer_->setWatermarks(limit);
564 957 : read_buffer_->setWatermarks(limit);
565 957 : }
566 1215 : }
567 :
568 0 : void ConnectionImpl::onReadBufferLowWatermark() {
569 0 : ENVOY_CONN_LOG(debug, "onBelowReadBufferLowWatermark", *this);
570 0 : if (state() == State::Open) {
571 0 : readDisable(false);
572 0 : }
573 0 : }
574 :
575 0 : void ConnectionImpl::onReadBufferHighWatermark() {
576 0 : ENVOY_CONN_LOG(debug, "onAboveReadBufferHighWatermark", *this);
577 0 : if (state() == State::Open) {
578 0 : readDisable(true);
579 0 : }
580 0 : }
581 :
582 0 : void ConnectionImpl::onWriteBufferLowWatermark() {
583 0 : ENVOY_CONN_LOG(debug, "onBelowWriteBufferLowWatermark", *this);
584 0 : ASSERT(write_buffer_above_high_watermark_);
585 0 : write_buffer_above_high_watermark_ = false;
586 0 : for (ConnectionCallbacks* callback : callbacks_) {
587 0 : if (callback) {
588 0 : callback->onBelowWriteBufferLowWatermark();
589 0 : }
590 0 : }
591 0 : }
592 :
593 0 : void ConnectionImpl::onWriteBufferHighWatermark() {
594 0 : ENVOY_CONN_LOG(debug, "onAboveWriteBufferHighWatermark", *this);
595 0 : ASSERT(!write_buffer_above_high_watermark_);
596 0 : write_buffer_above_high_watermark_ = true;
597 0 : for (ConnectionCallbacks* callback : callbacks_) {
598 0 : if (callback) {
599 0 : callback->onAboveWriteBufferHighWatermark();
600 0 : }
601 0 : }
602 0 : }
603 :
604 8 : void ConnectionImpl::setFailureReason(absl::string_view failure_reason) {
605 8 : if (!transport_socket_->failureReason().empty()) {
606 0 : failure_reason_ = absl::StrCat(failure_reason, ". ", transport_socket_->failureReason());
607 8 : } else {
608 8 : failure_reason_ = std::string(failure_reason);
609 8 : }
610 8 : }
611 :
612 6952 : void ConnectionImpl::onFileEvent(uint32_t events) {
613 6952 : ScopeTrackerScopeState scope(this, this->dispatcher_);
614 6952 : ENVOY_CONN_LOG(trace, "socket event: {}", *this, events);
615 :
616 6952 : if (immediate_error_event_ == ConnectionEvent::LocalClose ||
617 6953 : immediate_error_event_ == ConnectionEvent::RemoteClose) {
618 0 : if (bind_error_) {
619 0 : ENVOY_CONN_LOG(debug, "raising bind error", *this);
620 : // Update stats here, rather than on bind failure, to give the caller a chance to
621 : // setConnectionStats.
622 0 : if (connection_stats_ && connection_stats_->bind_errors_) {
623 0 : connection_stats_->bind_errors_->inc();
624 0 : }
625 0 : } else {
626 0 : ENVOY_CONN_LOG(debug, "raising immediate error", *this);
627 0 : }
628 0 : closeSocket(immediate_error_event_);
629 0 : return;
630 0 : }
631 :
632 6952 : if (events & Event::FileReadyType::Closed) {
633 : // We never ask for both early close and read at the same time. If we are reading, we want to
634 : // consume all available data.
635 68 : ASSERT(!(events & Event::FileReadyType::Read));
636 68 : ENVOY_CONN_LOG(debug, "remote early close", *this);
637 68 : closeSocket(ConnectionEvent::RemoteClose);
638 68 : return;
639 68 : }
640 :
641 6884 : if (events & Event::FileReadyType::Write) {
642 6877 : onWriteReady();
643 6877 : }
644 :
645 : // It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
646 : // In this case ignore read event processing.
647 6884 : if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
648 2377 : onReadReady();
649 2377 : }
650 6884 : }
651 :
652 2376 : void ConnectionImpl::onReadReady() {
653 2376 : ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this,
654 2376 : static_cast<int>(dispatch_buffered_data_));
655 2376 : const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
656 2376 : dispatch_buffered_data_ = false;
657 :
658 2376 : ASSERT(!connecting_);
659 :
660 : // We get here while read disabled in two ways.
661 : // 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
662 : // due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
663 : // enabled.
664 : // 2) The consumer of connection data called readDisable(true), and instead of reading from the
665 : // socket we simply need to dispatch already read data.
666 2376 : if (read_disable_count_ != 0) {
667 : // Do not clear transport_wants_read_ when returning early; the early return skips the transport
668 : // socket doRead call.
669 0 : if (latched_dispatch_buffered_data && filterChainWantsData()) {
670 0 : onRead(read_buffer_->length());
671 0 : }
672 0 : return;
673 0 : }
674 :
675 : // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
676 : // the transport socket read resumption happens as requested; onReadReady() returns early without
677 : // reading from the transport if the read buffer is above high watermark at the start of the
678 : // method.
679 2376 : transport_wants_read_ = false;
680 2376 : IoResult result = transport_socket_->doRead(*read_buffer_);
681 2376 : uint64_t new_buffer_size = read_buffer_->length();
682 2376 : updateReadBufferStats(result.bytes_processed_, new_buffer_size);
683 :
684 : // The socket is closed immediately when receiving RST.
685 2377 : if (enable_rst_detect_send_ && result.err_code_.has_value() &&
686 2376 : result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
687 24 : ENVOY_CONN_LOG(trace, "read: rst close from peer", *this);
688 24 : if (result.bytes_processed_ != 0) {
689 14 : onRead(new_buffer_size);
690 14 : }
691 24 : setDetectedCloseType(DetectedCloseType::RemoteReset);
692 24 : closeSocket(ConnectionEvent::RemoteClose);
693 24 : return;
694 24 : }
695 :
696 : // If this connection doesn't have half-close semantics, translate end_stream into
697 : // a connection close.
698 2353 : if ((!enable_half_close_ && result.end_stream_read_)) {
699 872 : result.end_stream_read_ = false;
700 872 : result.action_ = PostIoAction::Close;
701 872 : }
702 :
703 2352 : read_end_stream_ |= result.end_stream_read_;
704 2352 : if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
705 2352 : (latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
706 : // Skip onRead if no bytes were processed unless we explicitly want to force onRead for
707 : // buffered data. For instance, skip onRead if the connection was closed without producing
708 : // more data.
709 1941 : onRead(new_buffer_size);
710 1941 : }
711 :
712 : // The read callback may have already closed the connection.
713 2352 : if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
714 872 : ENVOY_CONN_LOG(debug, "remote close", *this);
715 872 : closeSocket(ConnectionEvent::RemoteClose);
716 872 : }
717 2352 : }
718 :
719 : absl::optional<Connection::UnixDomainSocketPeerCredentials>
720 0 : ConnectionImpl::unixSocketPeerCredentials() const {
721 : // TODO(snowp): Support non-linux platforms.
722 : #ifndef SO_PEERCRED
723 : return absl::nullopt;
724 : #else
725 0 : struct ucred ucred;
726 0 : socklen_t ucred_size = sizeof(ucred);
727 0 : int rc = socket_->getSocketOption(SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_size).return_value_;
728 0 : if (SOCKET_FAILURE(rc)) {
729 0 : return absl::nullopt;
730 0 : }
731 :
732 0 : return {{ucred.pid, ucred.uid, ucred.gid}};
733 0 : #endif
734 0 : }
735 :
736 7691 : void ConnectionImpl::onWriteReady() {
737 7691 : ENVOY_CONN_LOG(trace, "write ready", *this);
738 :
739 7691 : if (connecting_) {
740 1178 : int error;
741 1178 : socklen_t error_size = sizeof(error);
742 1178 : RELEASE_ASSERT(
743 1178 : socket_->getSocketOption(SOL_SOCKET, SO_ERROR, &error, &error_size).return_value_ == 0, "");
744 :
745 1178 : if (error == 0) {
746 1170 : ENVOY_CONN_LOG_EVENT(debug, "connection_connected", "connected", *this);
747 1170 : connecting_ = false;
748 1170 : onConnected();
749 : // It's possible that we closed during the connect callback.
750 1170 : if (state() != State::Open) {
751 0 : ENVOY_CONN_LOG_EVENT(debug, "connection_closed_callback", "close during connected callback",
752 0 : *this);
753 0 : return;
754 0 : }
755 1170 : } else {
756 8 : setFailureReason(absl::StrCat("delayed connect error: ", error));
757 8 : ENVOY_CONN_LOG_EVENT(debug, "connection_error", "{}", *this, transportFailureReason());
758 8 : closeSocket(ConnectionEvent::RemoteClose);
759 8 : return;
760 8 : }
761 1178 : }
762 :
763 7683 : IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
764 7683 : ASSERT(!result.end_stream_read_); // The interface guarantees that only read operations set this.
765 7683 : uint64_t new_buffer_size = write_buffer_->length();
766 7683 : updateWriteBufferStats(result.bytes_processed_, new_buffer_size);
767 :
768 : // The socket is closed immediately when receiving RST.
769 7683 : if (enable_rst_detect_send_ && result.err_code_.has_value() &&
770 7683 : result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
771 : // Discard anything in the buffer.
772 31 : ENVOY_CONN_LOG(debug, "write: rst close from peer.", *this);
773 31 : setDetectedCloseType(DetectedCloseType::RemoteReset);
774 31 : closeSocket(ConnectionEvent::RemoteClose);
775 31 : return;
776 31 : }
777 :
778 : // NOTE: If the delayed_close_timer_ is set, it must only trigger after a delayed_close_timeout_
779 : // period of inactivity from the last write event. Therefore, the timer must be reset to its
780 : // original timeout value unless the socket is going to be closed as a result of the doWrite().
781 :
782 7652 : if (result.action_ == PostIoAction::Close) {
783 : // It is possible (though unlikely) for the connection to have already been closed during the
784 : // write callback. This can happen if we manage to complete the SSL handshake in the write
785 : // callback, raise a connected event, and close the connection.
786 12 : closeSocket(ConnectionEvent::RemoteClose);
787 7640 : } else if ((inDelayedClose() && new_buffer_size == 0) || bothSidesHalfClosed()) {
788 61 : ENVOY_CONN_LOG(debug, "write flush complete", *this);
789 61 : if (delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) {
790 61 : ASSERT(delayed_close_timer_ != nullptr && delayed_close_timer_->enabled());
791 61 : if (result.bytes_processed_ > 0) {
792 39 : delayed_close_timer_->enableTimer(delayed_close_timeout_);
793 39 : }
794 61 : } else {
795 0 : ASSERT(bothSidesHalfClosed() || delayed_close_state_ == DelayedCloseState::CloseAfterFlush);
796 0 : closeConnectionImmediately();
797 0 : }
798 7579 : } else {
799 7579 : ASSERT(result.action_ == PostIoAction::KeepOpen);
800 7579 : ASSERT(!delayed_close_timer_ || delayed_close_timer_->enabled());
801 7579 : if (delayed_close_timer_ != nullptr && result.bytes_processed_ > 0) {
802 0 : delayed_close_timer_->enableTimer(delayed_close_timeout_);
803 0 : }
804 7579 : if (result.bytes_processed_ > 0) {
805 3904 : auto it = bytes_sent_callbacks_.begin();
806 3904 : while (it != bytes_sent_callbacks_.end()) {
807 0 : if ((*it)(result.bytes_processed_)) {
808 : // move to the next callback.
809 0 : it++;
810 0 : } else {
811 : // remove the current callback.
812 0 : it = bytes_sent_callbacks_.erase(it);
813 0 : }
814 :
815 : // If a callback closes the socket, stop iterating.
816 0 : if (!ioHandle().isOpen()) {
817 0 : return;
818 0 : }
819 0 : }
820 3904 : }
821 7579 : }
822 7652 : }
823 :
824 4742 : void ConnectionImpl::updateReadBufferStats(uint64_t num_read, uint64_t new_size) {
825 4742 : if (!connection_stats_) {
826 1900 : return;
827 1900 : }
828 :
829 2842 : ConnectionImplUtility::updateBufferStats(num_read, new_size, last_read_buffer_size_,
830 2842 : connection_stats_->read_total_,
831 2842 : connection_stats_->read_current_);
832 2842 : }
833 :
834 10048 : void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_size) {
835 10048 : if (!connection_stats_) {
836 6111 : return;
837 6111 : }
838 :
839 3937 : ConnectionImplUtility::updateBufferStats(num_written, new_size, last_write_buffer_size_,
840 3937 : connection_stats_->write_total_,
841 3937 : connection_stats_->write_current_);
842 3937 : }
843 :
844 9058 : bool ConnectionImpl::bothSidesHalfClosed() {
845 : // If the write_buffer_ is not empty, then the end_stream has not been sent to the transport yet.
846 9058 : return read_end_stream_ && write_end_stream_ && write_buffer_->length() == 0;
847 9058 : }
848 :
849 1380 : absl::string_view ConnectionImpl::transportFailureReason() const {
850 1380 : if (!failure_reason_.empty()) {
851 8 : return failure_reason_;
852 8 : }
853 1372 : return transport_socket_->failureReason();
854 1380 : }
855 :
856 0 : absl::optional<std::chrono::milliseconds> ConnectionImpl::lastRoundTripTime() const {
857 0 : return socket_->lastRoundTripTime();
858 0 : }
859 :
860 : void ConnectionImpl::configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec,
861 0 : std::chrono::microseconds rtt) {
862 0 : return transport_socket_->configureInitialCongestionWindow(bandwidth_bits_per_sec, rtt);
863 0 : }
864 :
865 0 : absl::optional<uint64_t> ConnectionImpl::congestionWindowInBytes() const {
866 0 : return socket_->congestionWindowInBytes();
867 0 : }
868 :
869 1954 : void ConnectionImpl::flushWriteBuffer() {
870 1954 : if (state() == State::Open && write_buffer_->length() > 0) {
871 809 : onWriteReady();
872 809 : }
873 1954 : }
874 :
875 0 : void ConnectionImpl::dumpState(std::ostream& os, int indent_level) const {
876 0 : const char* spaces = spacesForLevel(indent_level);
877 0 : os << spaces << "ConnectionImpl " << this << DUMP_MEMBER(connecting_) << DUMP_MEMBER(bind_error_)
878 0 : << DUMP_MEMBER(state()) << DUMP_MEMBER(read_buffer_limit_) << "\n";
879 :
880 0 : DUMP_DETAILS(socket_);
881 0 : }
882 :
883 : ServerConnectionImpl::ServerConnectionImpl(Event::Dispatcher& dispatcher,
884 : ConnectionSocketPtr&& socket,
885 : TransportSocketPtr&& transport_socket,
886 : StreamInfo::StreamInfo& stream_info)
887 : : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info,
888 1042 : true) {}
889 :
890 : void ServerConnectionImpl::setTransportSocketConnectTimeout(std::chrono::milliseconds timeout,
891 0 : Stats::Counter& timeout_stat) {
892 0 : if (!transport_connect_pending_) {
893 0 : return;
894 0 : }
895 :
896 0 : transport_socket_timeout_stat_ = &timeout_stat;
897 0 : if (transport_socket_connect_timer_ == nullptr) {
898 0 : transport_socket_connect_timer_ =
899 0 : dispatcher_.createScaledTimer(Event::ScaledTimerType::TransportSocketConnectTimeout,
900 0 : [this] { onTransportSocketConnectTimeout(); });
901 0 : }
902 0 : transport_socket_connect_timer_->enableTimer(timeout);
903 0 : }
904 :
905 784 : void ServerConnectionImpl::raiseEvent(ConnectionEvent event) {
906 784 : switch (event) {
907 0 : case ConnectionEvent::ConnectedZeroRtt:
908 : // The transport socket is still connecting, so skip changing connect state.
909 0 : break;
910 784 : case ConnectionEvent::Connected:
911 784 : case ConnectionEvent::RemoteClose:
912 784 : case ConnectionEvent::LocalClose:
913 784 : transport_connect_pending_ = false;
914 784 : transport_socket_connect_timer_.reset();
915 784 : }
916 784 : ConnectionImpl::raiseEvent(event);
917 784 : }
918 784 : bool ServerConnectionImpl::initializeReadFilters() {
919 784 : bool initialized = ConnectionImpl::initializeReadFilters();
920 784 : if (initialized) {
921 : // Server connection starts as connected, and we must explicitly signal to
922 : // the downstream transport socket that the underlying socket is connected.
923 : // We delay this step until after the filters are initialized and can
924 : // receive the connection events.
925 784 : onConnected();
926 784 : }
927 784 : return initialized;
928 784 : }
929 :
930 0 : void ServerConnectionImpl::onTransportSocketConnectTimeout() {
931 0 : stream_info_.setConnectionTerminationDetails(kTransportSocketConnectTimeoutTerminationDetails);
932 0 : closeConnectionImmediatelyWithDetails(
933 0 : StreamInfo::LocalCloseReasons::get().TransportSocketTimeout);
934 0 : transport_socket_timeout_stat_->inc();
935 0 : setFailureReason("connect timeout");
936 0 : }
937 :
938 : ClientConnectionImpl::ClientConnectionImpl(
939 : Event::Dispatcher& dispatcher, const Address::InstanceConstSharedPtr& remote_address,
940 : const Network::Address::InstanceConstSharedPtr& source_address,
941 : Network::TransportSocketPtr&& transport_socket,
942 : const Network::ConnectionSocket::OptionsSharedPtr& options,
943 : const Network::TransportSocketOptionsConstSharedPtr& transport_options)
944 : : ClientConnectionImpl(dispatcher, std::make_unique<ClientSocketImpl>(remote_address, options),
945 : source_address, std::move(transport_socket), options,
946 1328 : transport_options) {}
947 :
948 : ClientConnectionImpl::ClientConnectionImpl(
949 : Event::Dispatcher& dispatcher, std::unique_ptr<ConnectionSocket> socket,
950 : const Address::InstanceConstSharedPtr& source_address,
951 : Network::TransportSocketPtr&& transport_socket,
952 : const Network::ConnectionSocket::OptionsSharedPtr& options,
953 : const Network::TransportSocketOptionsConstSharedPtr& transport_options)
954 : : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info_,
955 : false),
956 1328 : stream_info_(dispatcher_.timeSource(), socket_->connectionInfoProviderSharedPtr()) {
957 :
958 1328 : stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
959 :
960 1328 : if (transport_options) {
961 173 : for (const auto& object : transport_options->downstreamSharedFilterStateObjects()) {
962 : // This does not throw as all objects are distinctly named and the stream info is empty.
963 0 : stream_info_.filterState()->setData(object.name_, object.data_, object.state_type_,
964 0 : StreamInfo::FilterState::LifeSpan::Connection,
965 0 : object.stream_sharing_);
966 0 : }
967 173 : }
968 :
969 : // There are no meaningful socket options or source address semantics for
970 : // non-IP sockets, so skip.
971 1328 : if (socket_->connectionInfoProviderSharedPtr()->remoteAddress()->ip() == nullptr) {
972 0 : return;
973 0 : }
974 1328 : if (!Network::Socket::applyOptions(options, *socket_,
975 1328 : envoy::config::core::v3::SocketOption::STATE_PREBIND)) {
976 : // Set a special error state to ensure asynchronous close to give the owner of the
977 : // ConnectionImpl a chance to add callbacks and detect the "disconnect".
978 0 : immediate_error_event_ = ConnectionEvent::LocalClose;
979 : // Trigger a write event to close this connection out-of-band.
980 0 : ioHandle().activateFileEvents(Event::FileReadyType::Write);
981 0 : return;
982 0 : }
983 :
984 1328 : const Network::Address::InstanceConstSharedPtr* source = &source_address;
985 :
986 1328 : if (socket_->connectionInfoProvider().localAddress()) {
987 0 : source = &socket_->connectionInfoProvider().localAddress();
988 0 : }
989 :
990 1328 : if (*source != nullptr) {
991 0 : Api::SysCallIntResult result = socket_->bind(*source);
992 0 : if (result.return_value_ < 0) {
993 0 : setFailureReason(absl::StrCat("failed to bind to ", source->get()->asString(), ": ",
994 0 : errorDetails(result.errno_)));
995 0 : ENVOY_LOG_MISC(debug, failureReason());
996 0 : bind_error_ = true;
997 : // Set a special error state to ensure asynchronous close to give the owner of the
998 : // ConnectionImpl a chance to add callbacks and detect the "disconnect".
999 0 : immediate_error_event_ = ConnectionEvent::LocalClose;
1000 :
1001 : // Trigger a write event to close this connection out-of-band.
1002 0 : ioHandle().activateFileEvents(Event::FileReadyType::Write);
1003 0 : }
1004 0 : }
1005 1328 : }
1006 :
1007 1328 : void ClientConnectionImpl::connect() {
1008 1328 : ENVOY_CONN_LOG_EVENT(debug, "client_connection", "connecting to {}", *this,
1009 1328 : socket_->connectionInfoProvider().remoteAddress()->asString());
1010 1328 : const Api::SysCallIntResult result = transport_socket_->connect(*socket_);
1011 1328 : stream_info_.upstreamInfo()->upstreamTiming().onUpstreamConnectStart(dispatcher_.timeSource());
1012 1328 : if (result.return_value_ == 0) {
1013 : // write will become ready.
1014 0 : ASSERT(connecting_);
1015 0 : return;
1016 0 : }
1017 :
1018 1328 : ASSERT(SOCKET_FAILURE(result.return_value_));
1019 : #ifdef WIN32
1020 : // winsock2 connect returns EWOULDBLOCK if the socket is non-blocking and the connection
1021 : // cannot be completed immediately. We do not check for `EINPROGRESS` as that error is for
1022 : // blocking operations.
1023 : if (result.errno_ == SOCKET_ERROR_AGAIN) {
1024 : #else
1025 1328 : if (result.errno_ == SOCKET_ERROR_IN_PROGRESS) {
1026 1328 : #endif
1027 1328 : ASSERT(connecting_);
1028 1328 : ENVOY_CONN_LOG_EVENT(debug, "connection_in_progress", "connection in progress", *this);
1029 1328 : } else {
1030 0 : immediate_error_event_ = ConnectionEvent::RemoteClose;
1031 0 : connecting_ = false;
1032 0 : setFailureReason(absl::StrCat("immediate connect error: ", errorDetails(result.errno_)));
1033 0 : ENVOY_CONN_LOG_EVENT(debug, "connection_immediate_error", "{}", *this, failureReason());
1034 :
1035 : // Trigger a write event. This is needed on macOS and seems harmless on Linux.
1036 0 : ioHandle().activateFileEvents(Event::FileReadyType::Write);
1037 0 : }
1038 1328 : }
1039 :
1040 1170 : void ClientConnectionImpl::onConnected() {
1041 1170 : stream_info_.upstreamInfo()->upstreamTiming().onUpstreamConnectComplete(dispatcher_.timeSource());
1042 : // There are no meaningful socket source address semantics for non-IP sockets, so skip.
1043 1170 : if (socket_->connectionInfoProviderSharedPtr()->remoteAddress()->ip()) {
1044 1170 : socket_->connectionInfoProvider().maybeSetInterfaceName(ioHandle());
1045 1170 : const auto maybe_interface_name = socket_->connectionInfoProvider().interfaceName();
1046 1170 : if (maybe_interface_name.has_value()) {
1047 0 : ENVOY_CONN_LOG_EVENT(debug, "conn_interface", "connected on local interface '{}'", *this,
1048 0 : maybe_interface_name.value());
1049 0 : }
1050 1170 : }
1051 1170 : ConnectionImpl::onConnected();
1052 1170 : }
1053 :
1054 : } // namespace Network
1055 : } // namespace Envoy
|